引言
ThreadLocal
ThreadLocal这个类给线程提供了一个本地变量,这个变量是该线程自己拥有,各线程间不共享。在该线程存活和ThreadLocal实例能访问的时候,保存了对这个变量副本的引用。当线程消失的时候,所有的本地实例都会被GC。并且建议ThreadLocal最好是使用 private static 修饰。
InheritableThreadLocal
InheritableThreadLocal是为了解决子线程获得父线程本地变量的需求,继承自ThreadLocal。如果你使用它,那么保存的所有东西都已经不在原来的threadLocals里面,而是在一个新的叫inheritableThreadLocals变量中。意思就是说每个线程Thread里面还有一个Map变量,名叫inheritableThreadLocals,它保存的是需要传递的引用(通过InheritableThreadLocal设置的线程变量)。
这种父子传递的需求还是有些比较重要的应用场景,如上下文传递(用户标识、事务等),调用日志跟踪等。Log4j中的MDC就是基于InheritableThreadLocal实现。但一般来说我们用线程池比较多,线程池会缓存线程,重复使用,线程可能会执行不同的任务。这样一来它的上下文传递就达不到正确的效果。
TransmittableThreadLocal
阿里巴巴有个开源项目就是为了解决线程池中变量传递,它里面有个叫,继承于InheritableThreadLocal。它通过包装返回Runnable的方式代理了run方法,在run之前copy装载线程变量,run之后清除线程变量,来实现此功能。TransmittableThreadLocal除了继承过来的线程Map,它还定义了一个名叫holder的InheritableThreadLocal静态变量,也就是说TransmittableThreadLocal有两套线程变量。
但事实上我们不可能所有的应用均采用InheritableThreadLocal,尽管他是一个不错的选择,但如何让ThreadLocal也实现在Hystrix应用场景下实现线程上下文的传播呢。这就是本章的重点了。
HystrixConcurrencystrategy
是Hystrix的线程池创建源码所在,并且提供方法wrapCallable来装饰线程池执行环境。所以我们我们可以自定义一个并发策略,即可于Hystrix应用场景内实现线程上下文的传播。附wrapCallable源码
/** * Provides an opportunity to wrap/decorate a {@code Callable} before execution. * * This can be used to inject additional behavior such as copying of thread state (such as {@link ThreadLocal}). *
* Default Implementation *
* Pass-thru that does no wrapping. * * @param callable * {@code Callable
} to be executed via a {@link ThreadPoolExecutor} * @return {@code Callable } either as a pass-thru or wrapping the one given */public Callable wrapCallable(Callable callable) { return callable;}
Ps:注释上有一句这么说的:This can be used to inject additional behavior such as copying of thread state (such as {
ThreadLocal}).拓展
既然这个方法可以装饰线程池回调,那么我们亦可定义一个装饰器接口,只要这个接口的实现类,都会通过上述并发策略装饰不同业务不同场景需要的线程变量。
装饰器接口定义
/** * Hystrix CallBack 装饰器定义 * * @author wangzhuhua * @date 2018/09/07 下午4:18 **/public interface HystrixCallableWrapper { /** * 装饰 Callable实例 * * @param callable * 待装饰实例 * @param* 返回类型 * @return 装饰后的实例 */ Callable wrap(Callable callable);}
HystrixConcurrencystrategy Custom
/** * Hystrix并发策略 * * @author wangzhuhua * @date 2018/09/07 下午4:06 **/public class HystrixConcurrencyStrategyCustom extends HystrixConcurrencyStrategy { /** 装饰队列 */ private final Listwrappers; public HystrixConcurrencyStrategyCustom(List wrappers) { this.wrappers = wrappers; } @Override public Callable wrapCallable(Callable callable) { return new CallableWrapperChain(callable, this.wrappers).wrapCallable(); } /** * callback 调用链 * * @author wangzhuhua * @date 2018/09/07 下午4:38 **/ private static class CallableWrapperChain { /** 回调 */ private final Callable callable; /** 回调包装 */ private final List wrappers; CallableWrapperChain(Callable callable, List wrappers) { this.callable = callable; this.wrappers = wrappers; } /** * 装饰线程hystrix callable * * @return {@link Callable} */ Callable wrapCallable() { Callable delegate = callable; for (HystrixCallableWrapper wrapper : wrappers) { delegate = wrapper.wrap(delegate); } return delegate; } }}
Configuration
/** * Hystrix配置 * * @author wangzhuhua * @date 2018/09/07 下午4:01 **/@Configuration@ConditionalOnProperty(value = "feign.hystrix.enabled", havingValue = "true")public class HystrixConfiguration { @Autowired(required = false) private Listwrappers = new ArrayList<>(); @PostConstruct public void init() { HystrixPlugins.getInstance().registerConcurrencyStrategy(new HystrixConcurrencyStrategyCustom(wrappers)); }}
实现示例
/** * Token 装饰器 * * @author wangzhuhua * @date 2018/09/10 上午11:28 **/@Componentpublic class TokenWrapper implements HystrixCallableWrapper { @Override publicCallable wrap(Callable callable) { return new TokenAwareCallable(callable, TokenHandler.getToken()); } /** * Token装饰 * * @param */ static class TokenAwareCallable implements Callable { /** 回调代理 */ private final Callable delegate; /** Token */ private final String token; TokenAwareCallable(Callable callable, String token) { this.delegate = callable; this.token = token; } @Override public T call() throws Exception { try { // 填充当前线程变量 TokenHandler.setToken(this.token); return delegate.call(); } finally { TokenHandler.remove(); } } }}
其中TokenHandler内使用线程变量存储