引言 在日常开发中经常遇到这样一个场景:在拦截器中对用户做统一的鉴权,鉴权通过后将当前登录用户的信息ThreadLocal使用存储在当前线程中,方便后续业务代码中使用。但有时,执行的业务比较耗时, 我们会将业务执行逻辑封装到由@Async标注的异步方法中或提交到线程池中执行,这时在执行业务代码会无法获取到之前缓存的用户信息,因为线程已经发生了切换。 那么,我们改怎样将任务提交任务线程缓存的数据传递给执行线程呢?
ThreadLcoal的原理 首先,我们简单看一下ThreadLocal的基本原理。 使用ThreadLocal#set方法存储的数据会存储在当前线程的threadLcoals成员变量中,threadLocals的类型是ThreadLocal.ThreadLocalMap,可以简单的理解成一个Map。 当调用ThreadLocal#get时,会从当前线程的threadLocals成员变量中取出存储的值。
使用InheritableThreadLocal传递数据 InheritableThreadLocal可以将数据由父线程传递给子线程。与ThreadLcoal类似,InheritableThreadLocal也是将数据存储在当前线程的一个成员变量里(inheritableThradLocals)。
1 2 3 4 5 6 7 8 9 ThreadLocal.ThreadLocalMap threadLocals = null ; ThreadLocal.ThreadLocalMap inheritableThreadLocals = null ;
那么父线程是怎么将数据传递给子线程的呢?查看Thread类的构造方法,可以看到初始化inheritableThreadLocals时使用了ThreadLcoal.createInheriableMap,并传递了父线程的inheriableThreadLocal作为参数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 Thread(String name, int characteristics, boolean bound) { if ((characteristics & NO_INHERIT_THREAD_LOCALS) == 0 ) { Thread parent = currentThread(); ThreadLocal.ThreadLocalMap parentMap = parent.inheritableThreadLocals; if (parentMap != null && parentMap.size() > 0 ) { this .inheritableThreadLocals = ThreadLocal.createInheritedMap(parentMap); } this .contextClassLoader = contextClassLoader(parent); } else { this .contextClassLoader = ClassLoader.getSystemClassLoader(); } }
运行以下代码:
1 2 3 4 5 6 7 8 9 10 11 public class Test { public static void main (String[] args) { InheritableThreadLocal<String> currentUserHolder = new InheritableThreadLocal <>(); currentUserHolder.set("tom" ); System.out.printf("Thread: %s, current user: %s%n" , Thread.currentThread().getName(), currentUserHolder.get()); Thread child = new Thread (() -> { System.out.printf("Thread: %s, current user: %s%n" , Thread.currentThread().getName(), currentUserHolder.get()); }); child.start(); } }
得到结果:
1 2 Thread: main, current user: tom Thread: Thread-0, current user: tom
数据成功地从父线程传递到了子线程。但是inheriableThreadLocals的初始化在线程创建的时候,线程创建后,对InheritableThreadLocal实例后续的操作子线程将无法感知。 执行以下代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public class Test { public static void main (String[] args) { InheritableThreadLocal<String> currentUserHolder = new InheritableThreadLocal <>(); currentUserHolder.set("tom" ); System.out.printf("Thread: %s, current user: %s%n" , Thread.currentThread().getName(), currentUserHolder.get()); Thread child = new Thread (() -> { System.out.printf("Thread: %s, current user: %s%n" , Thread.currentThread().getName(), currentUserHolder.get()); }); currentUserHolder.set("jerry" ); System.out.printf("Thread: %s, current user: %s%n" , Thread.currentThread().getName(), currentUserHolder.get()); child.start(); } }
会得到以下结果:
1 2 3 Thread: main, current user: tom Thread: main, current user: jerry Thread: Thread-0, current user: tom
因此InheritableThreadLocal并不能胜任一般的业务场景,因为一般都不会手动新建线程,而是使用线程池。
手动传递ThreadLocal中的值 既然InheriableThreadLocal自动传递数据有局限性,那么是否可以手动传递呢?答案当然是:可以!
线程池的使用 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 public class TaskWrapperTest { public static class UserCtxHolder { private static final ThreadLocal<String> threadLocal = new ThreadLocal <>(); public static String get () { return threadLocal.get(); } public static void set (String val) { threadLocal.set(val); } public static void clean () { threadLocal.remove(); } } public static class UserCtxAwareRunnable implements Runnable { private final String currentUser; private final Runnable task; public UserCtxAwareRunnable (Runnable task) { this .currentUser = UserCtxHolder.get(); this .task = task; } @Override public void run () { try { UserCtxHolder.set(currentUser); task.run(); } finally { UserCtxHolder.clean(); } } } public static void main (String[] args) { UserCtxHolder.set("tom" ); System.out.println("Thread: %s, current user: %s" .formatted(Thread.currentThread().getName(), UserCtxHolder.get())); try (ExecutorService executor = Executors.newSingleThreadExecutor()) { executor.execute(new UserCtxAwareRunnable (() -> System.out.println("Thread: %s, current user: %s" .formatted(Thread.currentThread().getName(), UserCtxHolder.get())))); } } }
运行结果:
1 2 Thread: main, current user: tom Thread: pool-1-thread-1, current user: tom
每次执行任务的时候都需要传递自定义的Runnalbe的包装类UserCtxAwareRunnalbe的实例,这样不可避免存在无用,未使用包装类,直接传入Runnable实例,导致数据未传递。 为避免这种情况,可以自定义一个线程池,传入Runnable实例时,自动使用UserCtxAwareRunnable包装。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public class CommonBizThreadPool implements Executor { public static final CommonBizThreadPool INSTANCE = new CommonBizThreadPool (); private final Executor executor; private CommonBizThreadPool () { executor = Executors.newSingleThreadExecutor(); } @Override public void execute (Runnable command) { executor.execute(new UserCtxAwareRunnable (command)); } }
Spring异步任务 接下来要处理另一个问题,对于@Async注解标记的异步方法,这些方法默认使用spring定义的ThreadPoolTaskExecutor线程池执行,我们也要对这个线程池执行的任务进行上文类似装饰(手动设置ThreadLocal)。ThreadPoolTaskExecutor提供了setTaskDecorator方法,使得我们可以“装饰”一个任务。 定义一个任务装饰器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public class UserCtxAwareTaskDecorator implements TaskDecorator { @Override public Runnable decorate () { String currentUser = UserCtxHolder.get(); return () -> { try { UserCtxHolder.set(currentUser); runnable.run(); } finally { UserCtxHolder.clean(); } }; } }
定义ThreadPoolTaskExecutorbean
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @Configuration public class BeanConfiguration { @Bean public ThreadPoolTaskExecutor threadPoolTaskExecutor () { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor (); executor.setCorePoolSize(5 ); executor.setMaxPoolSize(10 ); executor.setQueueCapacity(100 ); executor.setThreadNamePrefix("custom-" ); executor.setTaskDecorator(new UserCtxAwareTaskDecorator ()); executor.initialize(); return executor; } }
延申 如果使用到了多个ThreadLocal,那么就需要在线程池任务包装类UserCtxAwareRunnable和UserCtxAwareTaskDecorator中处理这些ThreadLocal,这样就有些繁琐了。 其实我们可以借助阿里的transmittable-thread-local,轻松的实现数据在父子线程中传递。
引入依赖
1 2 3 4 5 <dependency > <groupId > com.alibaba</groupId > <artifactId > transmittable-thread-local</artifactId > <version > 2.14.5</version > </dependency >
替换原来使用的ThreadLocal为TransmittableThreadLocal
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public class UserCtxHolder { private static final TransmitableThreadLocal<String> threadLocal = new TTransmitableThreadLocal <>(); public static String get () { return threadLocal.get(); } public static void set (String val) { threadLocal.set(val); } public static void clean () { threadLocal.remove(); } }
使用TtlExecutors.getTtlExecutor装饰原线程池
1 2 3 4 5 6 7 8 public class CommonBizThreadPool2 { public static final Executor INSTANCE = TtlExecutors.getTtlExecutor(Executors.newSingleThreadExecutor()); public static Executor getInstance () { return INSTANCE; } }
创建一个实现了AsyncConfigurater接口的bean(处理@Async异步调用时的数据传递)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @Component public class CustomAsyncConfig implements AsyncConfigurer { @Override public Executor getAsyncExecutor () { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor (); executor.setCorePoolSize(5 ); executor.setMaxPoolSize(10 ); executor.setQueueCapacity(100 ); executor.setThreadNamePrefix("CustomAsyncConfig-" ); executor.initialize(); return TtlExecutors.getTtlExecutor(executor); } }