码上游记

hongjian.xia的个人博客

0%

Java线程之前数据传递的方案

引言

在日常开发中经常遇到这样一个场景:在拦截器中对用户做统一的鉴权,鉴权通过后将当前登录用户的信息ThreadLocal使用存储在当前线程中,方便后续业务代码中使用。但有时,执行的业务比较耗时,
我们会将业务执行逻辑封装到由@Async标注的异步方法中或提交到线程池中执行,这时在执行业务代码会无法获取到之前缓存的用户信息,因为线程已经发生了切换。
那么,我们改怎样将任务提交任务线程缓存的数据传递给执行线程呢?

ThreadLcoal的原理

首先,我们简单看一下ThreadLocal的基本原理。
使用ThreadLocal#set方法存储的数据会存储在当前线程的threadLcoals成员变量中,threadLocals的类型是ThreadLocal.ThreadLocalMap,可以简单的理解成一个Map。
当调用ThreadLocal#get时,会从当前线程的threadLocals成员变量中取出存储的值。

使用InheritableThreadLocal传递数据

InheritableThreadLocal可以将数据由父线程传递给子线程。与ThreadLcoal类似,InheritableThreadLocal也是将数据存储在当前线程的一个成员变量里(inheritableThradLocals)。

1
2
3
4
5
6
7
8
9
/* ThreadLocal values pertaining to this thread. This map is maintained
* by the ThreadLocal class. */
ThreadLocal.ThreadLocalMap threadLocals = null;

/*
* InheritableThreadLocal values pertaining to this thread. This map is
* maintained by the InheritableThreadLocal class.
*/
ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;

那么父线程是怎么将数据传递给子线程的呢?查看Thread类的构造方法,可以看到初始化inheritableThreadLocals时使用了ThreadLcoal.createInheriableMap,并传递了父线程的inheriableThreadLocal作为参数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
Thread(String name, int characteristics, boolean bound) {
// ...
// 省略无关代码

// thread locals
if ((characteristics & NO_INHERIT_THREAD_LOCALS) == 0) {
Thread parent = currentThread();
ThreadLocal.ThreadLocalMap parentMap = parent.inheritableThreadLocals;
// 在这里做了初始化
if (parentMap != null && parentMap.size() > 0) {
this.inheritableThreadLocals = ThreadLocal.createInheritedMap(parentMap);
}
this.contextClassLoader = contextClassLoader(parent);
} else {
// default CCL to the system class loader when not inheriting
this.contextClassLoader = ClassLoader.getSystemClassLoader();
}

// 省略无关代码
// ...
}

运行以下代码:

1
2
3
4
5
6
7
8
9
10
11
public class Test {
public static void main(String[] args) {
InheritableThreadLocal<String> currentUserHolder = new InheritableThreadLocal<>();
currentUserHolder.set("tom");
System.out.printf("Thread: %s, current user: %s%n", Thread.currentThread().getName(), currentUserHolder.get());
Thread child = new Thread(() -> {
System.out.printf("Thread: %s, current user: %s%n", Thread.currentThread().getName(), currentUserHolder.get());
});
child.start();
}
}

得到结果:

1
2
Thread: main, current user: tom
Thread: Thread-0, current user: tom

数据成功地从父线程传递到了子线程。但是inheriableThreadLocals的初始化在线程创建的时候,线程创建后,对InheritableThreadLocal实例后续的操作子线程将无法感知。
执行以下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class Test {
public static void main(String[] args) {
InheritableThreadLocal<String> currentUserHolder = new InheritableThreadLocal<>();
currentUserHolder.set("tom");
System.out.printf("Thread: %s, current user: %s%n", Thread.currentThread().getName(), currentUserHolder.get());
Thread child = new Thread(() -> {
System.out.printf("Thread: %s, current user: %s%n", Thread.currentThread().getName(), currentUserHolder.get());
});
// 调整InheritableThreadLocal中存储的值
currentUserHolder.set("jerry");
System.out.printf("Thread: %s, current user: %s%n", Thread.currentThread().getName(), currentUserHolder.get());
child.start();
}
}

会得到以下结果:

1
2
3
Thread: main, current user: tom
Thread: main, current user: jerry
Thread: Thread-0, current user: tom

因此InheritableThreadLocal并不能胜任一般的业务场景,因为一般都不会手动新建线程,而是使用线程池。

手动传递ThreadLocal中的值

既然InheriableThreadLocal自动传递数据有局限性,那么是否可以手动传递呢?答案当然是:可以!

线程池的使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47

public class TaskWrapperTest {
public static class UserCtxHolder {
private static final ThreadLocal<String> threadLocal = new ThreadLocal<>();

public static String get() {
return threadLocal.get();
}

public static void set(String val) {
threadLocal.set(val);
}

public static void clean() {
threadLocal.remove();
}
}

public static class UserCtxAwareRunnable implements Runnable {
private final String currentUser;
private final Runnable task;

public UserCtxAwareRunnable(Runnable task) {
this.currentUser = UserCtxHolder.get();
this.task = task;
}

@Override
public void run() {
try {
UserCtxHolder.set(currentUser);
task.run();
} finally {
UserCtxHolder.clean();
}
}
}

public static void main(String[] args) {
UserCtxHolder.set("tom");
System.out.println("Thread: %s, current user: %s".formatted(Thread.currentThread().getName(), UserCtxHolder.get()));
try (ExecutorService executor = Executors.newSingleThreadExecutor()) {
executor.execute(new UserCtxAwareRunnable(() -> System.out.println("Thread: %s, current user: %s".formatted(Thread.currentThread().getName(), UserCtxHolder.get()))));
}
}

}

运行结果:

1
2
Thread: main, current user: tom
Thread: pool-1-thread-1, current user: tom

每次执行任务的时候都需要传递自定义的Runnalbe的包装类UserCtxAwareRunnalbe的实例,这样不可避免存在无用,未使用包装类,直接传入Runnable实例,导致数据未传递。
为避免这种情况,可以自定义一个线程池,传入Runnable实例时,自动使用UserCtxAwareRunnable包装。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class CommonBizThreadPool implements Executor {
public static final CommonBizThreadPool INSTANCE = new CommonBizThreadPool();

private final Executor executor;
private CommonBizThreadPool() {
// 示例代码,勿用于生产环境,实际按需创建线程池
executor = Executors.newSingleThreadExecutor();
}

@Override
public void execute(Runnable command) {
// 在这里包装Runnable实例
executor.execute(new UserCtxAwareRunnable(command));
}
}

Spring异步任务

接下来要处理另一个问题,对于@Async注解标记的异步方法,这些方法默认使用spring定义的ThreadPoolTaskExecutor线程池执行,我们也要对这个线程池执行的任务进行上文类似装饰(手动设置ThreadLocal)。
ThreadPoolTaskExecutor提供了setTaskDecorator方法,使得我们可以“装饰”一个任务。
定义一个任务装饰器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class UserCtxAwareTaskDecorator implements TaskDecorator {
@Override
public Runnable decorate() {
String currentUser = UserCtxHolder.get();
return () -> {
try {
UserCtxHolder.set(currentUser);
runnable.run();
} finally {
UserCtxHolder.clean();
}
};
}
}

定义ThreadPoolTaskExecutorbean

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Configuration
public class BeanConfiguration {

@Bean
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 线程池参数根据实际需求修改
executor.setCorePoolSize(5);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(100);
executor.setThreadNamePrefix("custom-");
// 设置任务装饰器
executor.setTaskDecorator(new UserCtxAwareTaskDecorator());
executor.initialize();
return executor;
}
}

延申

如果使用到了多个ThreadLocal,那么就需要在线程池任务包装类UserCtxAwareRunnableUserCtxAwareTaskDecorator中处理这些ThreadLocal,这样就有些繁琐了。
其实我们可以借助阿里的transmittable-thread-local,轻松的实现数据在父子线程中传递。

  1. 引入依赖

    1
    2
    3
    4
    5
    <dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>transmittable-thread-local</artifactId>
    <version>2.14.5</version>
    </dependency>
  2. 替换原来使用的ThreadLocalTransmittableThreadLocal

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    public class UserCtxHolder {
    private static final TransmitableThreadLocal<String> threadLocal = new TTransmitableThreadLocal<>();

    public static String get() {
    return threadLocal.get();
    }

    public static void set(String val) {
    threadLocal.set(val);
    }

    public static void clean() {
    threadLocal.remove();
    }
    }
  3. 使用TtlExecutors.getTtlExecutor装饰原线程池

    1
    2
    3
    4
    5
    6
    7
    8
    public class CommonBizThreadPool2 {
    // 使用TtlExecutors.getTtlExecutor装饰原线程池
    public static final Executor INSTANCE = TtlExecutors.getTtlExecutor(Executors.newSingleThreadExecutor());

    public static Executor getInstance() {
    return INSTANCE;
    }
    }
  4. 创建一个实现了AsyncConfigurater接口的bean(处理@Async异步调用时的数据传递)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    @Component
    public class CustomAsyncConfig implements AsyncConfigurer {
    @Override
    public Executor getAsyncExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    // 线程池参数根据实际情况设置
    executor.setCorePoolSize(5);
    executor.setMaxPoolSize(10);
    executor.setQueueCapacity(100);
    executor.setThreadNamePrefix("CustomAsyncConfig-");
    executor.initialize();
    // 包装线程池
    return TtlExecutors.getTtlExecutor(executor);
    }
    }