Java-多线程
Java-多线程
cmyang1. Java实现多线程的方式
1.1 继承Thread
继承Thread,重写run方法,创建实例,执行start方法
1 | public class ThreadTest extends Thread { |
1.2 实现Runnable
自定义类实现Runnable,实现run方法,创建Thread类,使用Runnable接口的实现对象作为参数传递给Thread对象,调用start方法
1 | public class ThreadTest implements Runnable { |
1.3 通过Callable和FutureTask方式
创建callable接口的实现类,并实现call方法,结合FutureTask类包装Callable对象,创建Thread类,使用FutureTask接口的实现对象作为参数传递给Thread对象,调用start方法,futureTask.get()获取返回值
1 | public class ThreadTest implements Callable<String> { |
1.4 通过线程池创建
自定义Runnable接口,实现run方法,创建线程池,调用执行方法并传入对象
1 | public class ThreadTest implements Runnable { |
在阿里巴巴编码规范中,创建线程池不允许使用Executors方式,要自己创建ThreadPoolExecutor对象
- Executors创建的线程池底层也是调用 ThreadPoolExecutor,只不过使用不同的参数、队列、拒绝策略等,如果使用不当,会造成资源耗尽问题;直接使用ThreadPoolExecutor让使用者更加清楚线程池允许规则,常见参数的使用,避免风险
- 常见的线程池问题
- newFixedThreadPool和newSingleThreadExecutor: 队列使用LinkedBlockingQueue,队列长度为 Integer.MAX_VALUE,可能造成堆积,导致OOM
- newScheduledThreadPool和newCachedThreadPool: 线程池里面允许最大的线程数是Integer.MAX_VALUE,可能会创建过多线程,导致OOM
ThreadPoolExecutor创建线程参数
1 | public ThreadPoolExecutor(int corePoolSize, |
- corePoolSize:核心线程数,线程池维护线程的最少数量,默认情况下核心线程会⼀直存活,即使没有任务也不会受keepAliveTime控制,注意:在刚创建线程池时线程不会⽴即启动,到有任务提交时才开始创建线程并逐步达到 corePoolSize
- maximumPoolSize:线程池维护线程的最大数量,超过将被阻塞。注意:当核心线程满,且阻塞队列也满时,才会判断当前线程数是否小于最大线程数,才决定是否创建新线程
- keepAliveTime:⾮核心线程的闲置超时时间,超过这个时间就会被回收,直到线程数量等于corePoolSize
- unit:指定keepAliveTime的单位,如TimeUnit.SECONDS、TimeUnit.MILLISECONDS
- workQueue:线程池中的任务队列,常用的是 ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue
- threadFactory:创建新线程时使用的工厂
- handler: RejectedExecutionHandler是⼀个接口且只有⼀个方法,线程池中的数量大于maximumPoolSize,对拒绝任务的处理策略,默认有4种策略AbortPolicy、CallerRunsPolicy、DiscardOldestPolicy、DiscardPolicy
创建demo
1 | ThreadPoolExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor(); |
2. Spring异步执行方法@Async
@Async用于在Spring中执行异步任务
- 启动类里面使用@EnableAsync注解开启功能,自动扫描
- 定义异步任务类并使用@Component标记组件被容器扫描,异步方法加上@Async
创建线程池
1 |
|
执行方法上添加注解,并指定线程池
1 |
|
3. 线程变量ThreadLocal
3.1 ThreadLocal
主要解决多线程中数据因并发产生不⼀致问题,同个线程共享数据,ThreadLocal为每⼀个线程都提供了变量的副本,使得每个线程在某时间访问到的并不是同⼀个对象,这样就隔离了多个线程对数据的数据共享,这样的结果是耗费了内存,但大大减少了线程同步所带来性能消耗,也减少了线程并发控制的复杂度。
ThreadLocal不能使用原子类型,只能使用Object类型
- 原理和源码
- ThreadLocal中的⼀个内部类ThreadLocalMap,这个类没有实现map接口,就是⼀个普通的Java类,但是实现的类似map的功能
- 每个数据用Entry保存,其中的Entry继承与WeakReference,用⼀个键值对存储,键为ThreadLocal的引用。
- 每个线程持有⼀个ThreadLocalMap对象,每⼀个新的线程Thread都会实例化⼀个ThreadLocalMap并赋值给成员变量threadLocals,使用时若已经存在threadLocals,则直接使用已经存在的对象。
- 为什么ThreadLocal的键是弱引用,如果是强引用有什么问题
- Java中除了基础的数据类型以外,其它的都为引用类型。而Java根据其生命周期的长短将引用类型又分为强引用 、 软引用 、 弱引用 、 虚引用,正常情况下我们平时基本上我们只用到强引用类型
- 强引用 new了⼀个对象就是强引用 Object obj = new Object();
- 软引用的生命周期比强引用短⼀些,通过SoftReference类实现,当内存空间足够,垃圾回收器就不会回收它; 当JVM认为内存空间不足时,就会去试图回收软引用指向的对象,也就是说在JVM抛出OutOfMemoryError之前,会去清理软引用对象主要用来描述⼀些【有用但并不是必需】的对象
使用场景:适合用来实现缓存,内存空间充足的时候将数据缓存在内存中,如果空间不足了就将其回收掉 - 弱引用是通过WeakReference类实现的,它的生命周期比软引用还要短,在GC的时候,不管内存空间足不足都会回收这个对象
使用场景:⼀个对象只是偶尔使用,希望在使用时能随时获取,但也不想影响对该对象的垃圾收集,则可以考虑使用弱引用来指向该对象。 - 如果是强引用,即使把ThreadLocal设置为null,但是ThreadLocalMap还持有ThreadLocal的强引用,如果没有手动删除,ThreadLocal不会被回收,导致Entry内存泄漏
- 如果是弱引用,引用ThreadLocal的对象被回收了,由于ThreadLocalMap持有ThreadLocal的弱引用,即使没有手动删除,ThreadLocal也会被回收。value在下⼀次ThreadLocalMap调用set、get、remove的时候会被清除。
3.2 InheritableThreadLocal
InheritableThreadLocal继承于ThreadLocal,主要解决父子线程变量传递问题,InheritableThreadLocal可以把父线程的变量传递给子线程。
1 | public class InheritableThreadLocal<T> extends ThreadLocal<T> { |
原理
- InheritableThreadLocal存储数据的在ThreadLocalMap的inheritableThreadLocals对象里,在线程初始化的时候,该对象会复制父线程的数据到子线程
- ThreadLocalMap存储数据在threadLocals中,不会复制父线程数据
存在的问题
- 使用线程池时失效,InheritableThreadLocal是在创建初始化线程的时候,进行赋值,但是线程池中的线程都是提前创建好的,所以无法复制线程变量
3.3 TransmittableThreadLocal
https://github.com/alibaba/transmittable-thread-local
🔧 功能
👉
TransmittableThreadLocal
(TTL
):在使用线程池等会池化复用线程的执行组件情况下,提供ThreadLocal
值的传递功能,解决异步执行时上下文传递的问题。一个Java
标准库本应为框架/中间件设施开发提供的标配能力,本库功能聚焦 & 0依赖,支持Java 6~20
。
JDK
的InheritableThreadLocal
类可以完成父线程到子线程的值传递。但对于使用线程池等会池化复用线程的执行组件的情况,线程由线程池创建好,并且线程是池化起来反复使用的;这时父子线程关系的ThreadLocal
值传递已经没有意义,应用需要的实际上是把 任务提交给线程池时的ThreadLocal
值传递到 任务执行时。本库提供的
TransmittableThreadLocal
类继承并加强InheritableThreadLocal
类,解决上述的问题,使用详见 User Guide。整个
TransmittableThreadLocal
库的核心功能(用户API
与框架/中间件的集成API
、线程池ExecutorService
/ForkJoinPool
/TimerTask
及其线程工厂的Wrapper
),只有 ***~1000SLOC
代码行***,非常精小。欢迎 👏
- 建议和提问,提交 Issue
- 贡献和改进,Fork 后提通过 Pull Request 贡献代码
🎨 需求场景
ThreadLocal
的需求场景即TransmittableThreadLocal
的潜在需求场景,如果你的业务需要『在使用线程池等会池化复用线程的执行组件情况下传递ThreadLocal
值』则是TransmittableThreadLocal
目标场景。下面是几个典型场景例子。
- 分布式跟踪系统 或 全链路压测(即链路打标)
- 日志收集记录系统上下文
Session
级Cache
- 应用容器或上层框架跨应用代码给下层
SDK
传递信息各个场景的展开说明参见子文档 需求场景。
👥 User Guide
使用类
TransmittableThreadLocal
来保存值,并跨线程池传递。
TransmittableThreadLocal
继承InheritableThreadLocal
,使用方式也类似。相比InheritableThreadLocal
,添加了protected
的transmitteeValue
方法,用于定制 任务提交给线程池时 的ThreadLocal
值传递到 任务执行时 的拷贝行为,缺省是简单的赋值传递。注意:如果传递的是一个对象(引用类型)且没有做深拷贝,如直接传递引用或是浅拷贝,那么
- 跨线程传递而不再有线程封闭,传递对象在多个线程之间是有共享的;
- 与
InheritableThreadLocal.childValue
一样,使用者/业务逻辑要注意传递对象的线程安全。关于
transmitteeValue
方法 的 展开说明关于构词后缀
er
与ee
的说明:
transmit
是动词传递,transmitter
动作的执行者/主动方,而transmittee
动作的接收者/被动方。er
与ee
后缀的常见词是employer
(雇主)/employee
(雇员)、caller
(调用者)/callee
(被调用者)。具体使用方式见下面的说明。
1. 简单使用
父线程给子线程传递值。
示例代码:
1
2
3
4
5
6
7
8
9
10
11 TransmittableThreadLocal<String> context = new TransmittableThreadLocal<>();
// =====================================================
// 在父线程中设置
context.set("value-set-in-parent");
// =====================================================
// 在子线程中可以读取,值是"value-set-in-parent"
String value = context.get();# 完整可运行的Demo代码参见
SimpleDemo.kt
。这其实是
InheritableThreadLocal
的功能,应该使用InheritableThreadLocal
来完成。但对于使用线程池等会池化复用线程的执行组件的情况,线程由线程池创建好,并且线程是池化起来反复使用的;这时父子线程关系的
ThreadLocal
值传递已经没有意义,应用需要的实际上是把 任务提交给线程池时的ThreadLocal
值传递到 任务执行时。解决方法参见下面的这几种用法。
2. 保证线程池中传递值
2.1 修饰
Runnable
和Callable
使用
TtlRunnable
和TtlCallable
来修饰传入线程池的Runnable
和Callable
。示例代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 TransmittableThreadLocal<String> context = new TransmittableThreadLocal<>();
// =====================================================
// 在父线程中设置
context.set("value-set-in-parent");
Runnable task = new RunnableTask();
// 额外的处理,生成修饰了的对象ttlRunnable
Runnable ttlRunnable = TtlRunnable.get(task);
executorService.submit(ttlRunnable);
// =====================================================
// Task中可以读取,值是"value-set-in-parent"
String value = context.get();***注意***:
即使是同一个Runnable
任务多次提交到线程池时,每次提交时都需要通过修饰操作(即TtlRunnable.get(task)
)以抓取这次提交时的TransmittableThreadLocal
上下文的值;即如果同一个任务下一次提交时不执行修饰而仍然使用上一次的TtlRunnable
,则提交的任务运行时会是之前修饰操作所抓取的上下文。示例代码如下:
1
2
3
4
5
6
7
8
9
10
11 // 第一次提交
Runnable task = new RunnableTask();
executorService.submit(TtlRunnable.get(task));
// ...业务逻辑代码,
// 并且修改了 TransmittableThreadLocal上下文 ...
context.set("value-modified-in-parent");
// 再次提交
// 重新执行修饰,以传递修改了的 TransmittableThreadLocal上下文
executorService.submit(TtlRunnable.get(task));上面演示了
Runnable
,Callable
的处理类似
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 TransmittableThreadLocal<String> context = new TransmittableThreadLocal<>();
// =====================================================
// 在父线程中设置
context.set("value-set-in-parent");
Callable call = new CallableTask();
// 额外的处理,生成修饰了的对象ttlCallable
Callable ttlCallable = TtlCallable.get(call);
executorService.submit(ttlCallable);
// =====================================================
// Call中可以读取,值是"value-set-in-parent"
String value = context.get();# 完整可运行的Demo代码参见
TtlWrapperDemo.kt
。整个过程的完整时序图
2.2 修饰线程池
省去每次
Runnable
和Callable
传入线程池时的修饰,这个逻辑可以在线程池中完成。通过工具类
TtlExecutors
完成,有下面的方法:
getTtlExecutor
:修饰接口Executor
getTtlExecutorService
:修饰接口ExecutorService
getTtlScheduledExecutorService
:修饰接口ScheduledExecutorService
示例代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 ExecutorService executorService = ...
// 额外的处理,生成修饰了的对象executorService
executorService = TtlExecutors.getTtlExecutorService(executorService);
TransmittableThreadLocal<String> context = new TransmittableThreadLocal<>();
// =====================================================
// 在父线程中设置
context.set("value-set-in-parent");
Runnable task = new RunnableTask();
Callable call = new CallableTask();
executorService.submit(task);
executorService.submit(call);
// =====================================================
// Task或是Call中可以读取,值是"value-set-in-parent"
String value = context.get();# 完整可运行的Demo代码参见
TtlExecutorWrapperDemo.kt
。2.3 使用
Java Agent
来修饰JDK
线程池实现类这种方式,实现线程池的传递是透明的,业务代码中没有修饰
Runnable
或是线程池的代码。即可以做到应用代码 无侵入。
# 关于 无侵入 的更多说明参见文档Java Agent
方式对应用代码无侵入。示例代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 // ## 1. 框架上层逻辑,后续流程框架调用业务 ##
TransmittableThreadLocal<String> context = new TransmittableThreadLocal<>();
context.set("value-set-in-parent");
// ## 2. 应用逻辑,后续流程业务调用框架下层逻辑 ##
ExecutorService executorService = Executors.newFixedThreadPool(3);
Runnable task = new RunnableTask();
Callable call = new CallableTask();
executorService.submit(task);
executorService.submit(call);
// ## 3. 框架下层逻辑 ##
// Task或是Call中可以读取,值是"value-set-in-parent"
String value = context.get();Demo参见
AgentDemo.kt
。执行工程下的脚本scripts/run-agent-demo.sh
即可运行Demo。目前
TTL Agent
中,修饰了的JDK
执行器组件(即如线程池)如下:
java.util.concurrent.ThreadPoolExecutor
java.util.concurrent.ScheduledThreadPoolExecutor
1
2
3
4
5
6
7
和(对应的执行器组件是
1
2
3
4
5
- 修饰实现代码在[`JdkExecutorTtlTransformlet.java`](https://github.com/alibaba/transmittable-thread-local/blob/master/ttl-agent/src/main/java/com/alibaba/ttl3/agent/transformlet/internal/JdkExecutorTtlTransformlet.java)。
2. ```
java.util.concurrent.ForkJoinTask) - 修饰实现代码在[`ForkJoinTtlTransformlet.java`](https://github.com/alibaba/transmittable-thread-local/blob/master/ttl-agent/src/main/java/com/alibaba/ttl3/agent/transformlet/internal/ForkJoinTtlTransformlet.java)。从版本 ***`2.5.1`\*** 开始支持。 - ***注意\***:`Java 8`引入的[***`CompletableFuture`\***](https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/concurrent/CompletableFuture.html)与(并行执行的)[***`Stream`\***](https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/stream/package-summary.html)底层是通过`ForkJoinPool`来执行,所以支持`ForkJoinPool`后,`TTL`也就透明支持了`CompletableFuture`与`Stream`。🎉
1 java.util.concurrent.ForkJoinPooljava.util.TimerTask
java.util.Timer
1
2
3
的子类(对应的执行器组件是Agent
1
2
3
4
5
6
7
8
9
)
- 修饰实现代码在[`TimerTaskTtlTransformlet.java`](https://github.com/alibaba/transmittable-thread-local/blob/master/ttl-agent/src/main/java/com/alibaba/ttl3/agent/transformlet/internal/TimerTaskTtlTransformlet.java)。从版本 ***`2.7.0`\*** 开始支持。
- ***注意\***:从`2.11.2`版本开始缺省开启`TimerTask`的修饰(因为保证正确性是第一位,而不是最佳实践『不推荐使用`TimerTask`』:);`2.11.1`版本及其之前的版本没有缺省开启`TimerTask`的修饰。
- 使用ttl.agent.enable.timer.task
1
2
3
参数TimerTask
1
2
3
开启/关闭java -javaagent:path/to/transmittable-thread-local-2.x.y.jar \ -cp classes \ com.alibaba.demo.ttl.agent.AgentDemo
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
的修饰:
- `-javaagent:path/to/transmittable-thread-local-2.x.y.jar=ttl.agent.enable.timer.task:true`
- `-javaagent:path/to/transmittable-thread-local-2.x.y.jar=ttl.agent.enable.timer.task:false`
- 更多关于`TTL Agent`参数的配置说明详见[`TtlAgent.java`的JavaDoc](https://github.com/alibaba/transmittable-thread-local/blob/master/ttl-agent/src/main/java/com/alibaba/ttl3/agent/TtlAgent.java)。
> 关于`java.util.TimerTask`/`java.util.Timer` 的 展开说明
#### `Java Agent`的启动参数配置
在`Java`的启动参数加上:`-javaagent:path/to/transmittable-thread-local-2.x.y.jar`。
***注意\***:
- 如果修改了下载的`TTL`的`Jar`的文件名(`transmittable-thread-local-2.x.y.jar`),则需要自己手动通过`-Xbootclasspath JVM`参数来显式配置。
比如修改文件名成`ttl-foo-name-changed.jar`,则还需要加上`Java`的启动参数:`-Xbootclasspath/a:path/to/ttl-foo-name-changed.jar`。
- 或使用`v2.6.0`之前的版本(如`v2.5.1`),则也需要自己手动通过`-Xbootclasspath JVM`参数来显式配置(就像`TTL`之前的版本的做法一样)。
加上`Java`的启动参数:`-Xbootclasspath/a:path/to/transmittable-thread-local-2.5.1.jar`。
`Java`命令行示例如下:如果修改了TTL jar文件名 或 TTL版本是 2.6.0 之前
则还需要显式设置 -Xbootclasspath 参数
java -javaagent:path/to/ttl-foo-name-changed.jar
-Xbootclasspath/a:path/to/ttl-foo-name-changed.jar
-cp classes
com.alibaba.demo.ttl.agent.AgentDemojava -javaagent:path/to/transmittable-thread-local-2.5.1.jar
-Xbootclasspath/a:path/to/transmittable-thread-local-2.5.1.jar
-cp classes
com.alibaba.demo.ttl.agent.AgentDemo
1
2
3
4
5
6
7
8
9
10
11
> 关于`boot class path` 的 展开说明
# 🔌 Java API Docs
当前版本的Java API文档地址: https://alibaba.github.io/transmittable-thread-local/apidocs/
# 🍪 Maven依赖
示例:com.alibaba transmittable-thread-local 2.14.1 # 运行测试Case ./mvnw test # 编译打包 ./mvnw package # 运行测试Case、编译打包、安装TTL库到Maven本地 ./mvnw install
1
2
3
4
5
6
7
8
可以在 [search.maven.org](https://search.maven.org/artifact/com.alibaba/transmittable-thread-local) 查看可用的版本。
# 🔨 关于编译构建
编译构建的环境要求: ***`JDK 8+`\***;用`Maven`常规的方式执行编译构建即可:
\# 在工程中已经包含了符合版本要求的`Maven`,直接运行 ***工程根目录下的`mvnw`\***;并不需要先手动自己安装好`Maven`。#####################################################
如果使用你自己安装的 maven,版本要求:maven 3.3.9+
mvn install
1
2
3
4
5
6
7
# ❓ FAQ
***Q1. `TTL Agent`与其它`Agent`(如`Skywalking`、`Promethues`)配合使用时不生效?\***
配置`TTL Agent`在最前的位置,可以避免与其它其它`Agent`配合使用时,`TTL Agent`可能的不生效问题。配置示例:java -javaagent:path/to/transmittable-thread-local-2.x.y.jar
-javaagent:path/to/skywalking-agent.jar
-jar your-app.jar
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
原因是:
- 像`Skywalking`这样的`Agent`的入口逻辑(`premain`)包含了线程池的启动。
- 如果配置在这样的`Agent`配置在前面,到了`TTL Agent`(的`premain`)时,`TTL`需要加强的线程池类已经加载(`load`)了。
- `TTL Agent`的`TtlTransformer`是在类加载时触发类的增强;如果类已经加载了会跳过`TTL Agent`的增强逻辑。
更多讨论参见 [Issue:`TTL agent`与其他`Agent`的兼容性问题 #226](https://github.com/alibaba/transmittable-thread-local/issues/226)。
***Q2. `MacOS`下,使用`Java Agent`,可能会报`JavaLaunchHelper`的出错信息\***
JDK Bug: https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8021205
可以换一个版本的`JDK`。我的开发机上`1.7.0_40`有这个问题,`1.6.0_51`、`1.7.0_45`可以运行。
\# `1.7.0_45`还是有`JavaLaunchHelper`的出错信息,但不影响运行。
# ✨ 使用`TTL`的好处与必要性
> 注:不读这一节,并不会影响你使用`TTL`来解决你碰到的问题,可以放心跳过;读了 [User Guide](https://github.com/alibaba/transmittable-thread-local#-user-guide) 就可以快速用起来了~ 😄 这一节信息密度较高不易读。
***好处:透明且自动完成所有异步执行上下文的可定制、规范化的捕捉与传递。\***
这个好处也是`TransmittableThreadLocal`的目标。
***必要性:随着应用的分布式微服务化并使用各种中间件,越来越多的功能与组件会涉及不同的上下文,逻辑流程也越来越长;上下文问题实际上是个大的易错的架构问题,需要统一的对业务透明的解决方案。\***
使用`ThreadLocal`作为业务上下文传递的经典技术手段在中间件、技术与业务框架中广泛大量使用。而对于生产应用,几乎一定会使用线程池等异步执行组件,以高效支撑线上大流量。但使用`ThreadLocal`及其`set/remove`的上下文传递模式,在使用线程池等异步执行组件时,存在多方面的问题:
***1. 从业务使用者角度来看\***
1. 繁琐
- 业务逻辑要知道:有哪些上下文;各个上下文是如何获取的。
- 并需要业务逻辑去一个一个地捕捉与传递。
2. 依赖
- 需要直接依赖不同`ThreadLocal`上下文各自的获取的逻辑或类。
- 像`RPC`的上下文(如`Dubbo`的`RpcContext`)、全链路跟踪的上下文(如`SkyWalking`的`ContextManager`)、不同业务模块中的业务流程上下文,等等。
3. 静态(易漏)
- 因为要 ***事先\*** 知道有哪些上下文,如果系统出现了一个新的上下文,业务逻辑就要修改添加上新上下文传递的几行代码。也就是说因 ***系统的\*** 上下文新增,***业务的\*** 逻辑就跟进要修改。
- 而对于业务来说,不关心系统的上下文,即往往就可能遗漏,会是线上故障了。
- 随着应用的分布式微服务化并使用各种中间件,越来越多的功能与组件会涉及不同的上下文,逻辑流程也越来越长;上下文问题实际上是个大的易错的架构问题,需要统一的对业务透明的解决方案。
4. 定制性
- 因为需要业务逻辑来完成捕捉与传递,业务要关注『上下文的传递方式』:直接传引用?还是拷贝传值?拷贝是深拷贝还是浅拷贝?在不同的上下文会需要不同的做法。
- 『上下文的传递方式』往往是 ***上下文的提供者\***(或说是业务逻辑的框架部分)才能决策处理好的;而 ***上下文的使用者\***(或说是业务逻辑的应用部分)往往不(期望)知道上下文的传递方式。这也可以理解成是 ***依赖\***,即业务逻辑 依赖/关注/实现了 系统/架构的『上下文的传递方式』。
***2. 从整体流程实现角度来看\***
关注的是 **上下文传递流程的规范化**。上下文传递到了子线程要做好 ***清理\***(或更准确地说是要 ***恢复\*** 成之前的上下文),需要业务逻辑去处理好。如果业务逻辑对**清理**的处理不正确,比如:
- 如果清理操作漏了:
- 下一次执行可能是上次的,即『上下文的 ***污染\***/***串号\***』,会导致业务逻辑错误。
- 『上下文的 ***泄漏\***』,会导致内存泄漏问题。
- 如果清理操作做多了,会出现上下文 ***丢失\***。
上面的问题,在业务开发中引发的`Bug`真是**屡见不鲜** !本质原因是:***`ThreadLocal`的`set/remove`的上下文传递模式\*** 在使用线程池等异步执行组件的情况下不再是有效的。常见的典型例子:
- 当线程池满了且线程池的`RejectedExecutionHandler`使用的是`CallerRunsPolicy`时,提交到线程池的任务会在提交线程中直接执行,`ThreadLocal.remove`操作**清理**提交线程的上下文导致上下文**丢失**。
- 类似的,使用`ForkJoinPool`(包含并行执行`Stream`与`CompletableFuture`,底层使用`ForkJoinPool`)的场景,展开的`ForkJoinTask`会在任务提交线程中直接执行。同样导致上下文**丢失**。
怎么设计一个『上下文传递流程』方案(即上下文的生命周期),以**保证**没有上面的问题?
期望:上下文生命周期的操作从业务逻辑中分离出来。业务逻辑不涉及生命周期,就不会有业务代码如疏忽清理而引发的问题了。整个上下文的传递流程或说生命周期可以规范化成:捕捉、回放和恢复这3个操作,即[***`CRR(capture/replay/restore)`模式\***](https://github.com/alibaba/transmittable-thread-local/blob/master/docs/developer-guide.md#-框架中间件集成ttl传递)。更多讨论参见 [Issue:能在详细讲解一下`replay`、`restore`的设计理念吗?#201](https://github.com/alibaba/transmittable-thread-local/issues/201)。
总结上面的说明:在生产应用(几乎一定会使用线程池等异步执行组件)中,使用`ThreadLocal`及其`set/remove`的上下文传递模式**几乎一定是有问题的**,***只是在等一个出`Bug`的机会\***。
更多`TTL`好处与必要性的展开讨论参见 [Issue:这个库带来怎样的好处和优势? #128](https://github.com/alibaba/transmittable-thread-local/issues/128),欢迎继续讨论 ♥️
# 🗿 更多文档
- [🎨 需求场景说明](https://github.com/alibaba/transmittable-thread-local/blob/master/docs/requirement-scenario.md)
- [❤️ 小伙伴同学们写的`TTL`使用场景 与 设计实现解析的文章(写得都很好!) - Issue #123](https://github.com/alibaba/transmittable-thread-local/issues/123)
- [🎓 Developer Guide](https://github.com/alibaba/transmittable-thread-local/blob/master/docs/developer-guide.md)
- [☔ 性能测试](https://github.com/alibaba/transmittable-thread-local/blob/master/docs/performance-test.md)
# 📚 相关资料
## JDK Core Classes
- [WeakHashMap](https://docs.oracle.com/javase/10/docs/api/java/util/WeakHashMap.html)
- [InheritableThreadLocal](https://docs.oracle.com/javase/10/docs/api/java/lang/InheritableThreadLocal.html)
## 4. 多线程编排CompletableFuture
Future只能通过阻塞或者轮询的方式获取结果,而且不支持设置回调方法。在Java8中,CompletableFuture对Future进行了扩展,支持回调方法,同时也支持多个线程的组合和编排操作
### 4.1 操作API
- 创建异步任务
- runAsync: 不带返回值
- supplyAsync: 带返回值
```java
@Test
void test1() {
ExecutorService threadPool = new ThreadPoolExecutor(5, 10, 1, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(100));
CompletableFuture<Void> future1 = CompletableFuture.runAsync(() ->
System.out.println(Thread.currentThread().getName() + "正在运行"), threadPool);
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "正在运行");
return "hello world";
}, threadPool);
}
获取任务返回值
- get(): 方法阻塞等待返回
- get(long timeout, TimeUnit unit): 方法阻塞,等待参数中的事件如果没有返回抛出TimeoutException异常
- getNow(T valueIfAbsent): 执行代码的时刻获取返回值,如果没有返回值,返回参数中的默认值
- join():和get方法一样阻塞线程等待返回值,join不用必须try cache,join可能抛出unchecked异常
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
void test2() {
ExecutorService threadPool = new ThreadPoolExecutor(5, 10, 1, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(100));
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
ThreadUtil.sleep(1000);
System.out.println(Thread.currentThread().getName() + "正在运行");
return "hello world";
}, threadPool);
String defaultStr = future.getNow("默认返回值");
System.out.println(defaultStr);
try {
String result = future.get(300, TimeUnit.MILLISECONDS);
System.out.println(result);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
String joinStr = future.join();
System.out.println(joinStr);
try {
String result = future.get();
System.out.println(result);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}thenRun和thenRunAsync
- 当前任务正常执行完成后,执行下一个任务,没有入参和返回值,thenRun使用同一个线程执行下一个任务,thenRunAsync新获取一个线程执行下一个任务
- 入参:
Runnable
future.thenRunAsync(() -> System.out.println(Thread.currentThread().getName() + "正在运行"), threadPool);
thenAccept和thenAcceptAsync
- 当前任务正常执行完成后,将当前任务的返回值作为下一个任务入参,没有返回值
- 入参:
Consumer<T>
,void accept(T t);
future.thenAcceptAsync(o -> System.out.println("获取到上个任务的返回值:" + o), threadPool);
thenApply和thenApplyAsync
- 当前任务正常执行完成后,将当前任务的返回值作为下一个任务入参,且有返回值。
- 入参:
Function<T, R>
,R apply(T t);
CompletableFuture<String> future1 = future.thenApplyAsync(o -> o + " CompletableFuture", threadPool);
whenComplete和whenCompleteAsync
- 当前任务正常执行完成后,将当前任务的返回值作为下一个任务入参,同时会传递异常参数,正常返回,异常参数为null,返回上一个任务的结果
- 入参
BiConsumer<T, U>
,void accept(T t, U u);
CompletableFuture<String> future2 = future.whenCompleteAsync((o, e) -> System.out.println("获取到上个任务的返回值:" + o), threadPool);
handle和handleAsync
- 当前任务正常执行完成后,将当前任务的返回值作为下一个任务入参,同时会传递异常参数,有返回值
- 入参:
BiFunction<T, U, R>
,R apply(T t, U u);
CompletableFuture<String> future3 = future.handleAsync((o, e) -> o + " handle", threadPool);
thenCompose和thenComposeAsync
- 当前任务正常执行完成后,将当前任务的返回值作为下一个任务入参,返回值需要是一个CompletableFuture
- 入参:
Function<T, R>
,R apply(T t);
CompletableFuture<String> future5 = future.thenComposeAsync(o -> CompletableFuture.supplyAsync(() -> o +" Compose"), threadPool);
thenCombine和thenCombineAsync
- 当前任务正常执行完成,并且第一个参数的CompletableFuture也执行完,将两个返回传给第二个参数,有返回值
- 入参:public <U,V> CompletableFuture
thenCombineAsync(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn, Executor executor)
1
2
3
4
5
6
7
8
void test4() {
ExecutorService threadPool = Executors.newFixedThreadPool(1);
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello ", threadPool);
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> " World", threadPool);
CompletableFuture<String> future = future1.thenCombineAsync(future2, (o1, o2) -> o1 + o2, threadPool);
System.out.println(future.join());
}allOf
- 参数里的所有任务都执行完后,返回一个
CompletableFuture<Void>
1
2
3
4
5
6
7
8
9
void test5() {
ExecutorService threadPool = Executors.newFixedThreadPool(2);
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "future1 ", threadPool);
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "future2 ", threadPool);
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> "future3 ", threadPool);
CompletableFuture.allOf(future1, future2, future3);
System.out.println(future1.join()+future2.join()+future3.join());
}- 参数里的所有任务都执行完后,返回一个
anyOf
- 参数里的任意任务都执行完后,返回一个
CompletableFuture<Object>
1
2
3
4
5
6
7
8
9
void test6() {
ExecutorService threadPool = Executors.newFixedThreadPool(2);
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "future1 ", threadPool);
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "future2 ", threadPool);
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> "future3 ", threadPool);
CompletableFuture<Object> anyOf = CompletableFuture.anyOf(future1, future2, future3);
System.out.println(anyOf.join());
}- 参数里的任意任务都执行完后,返回一个