Java-多线程

1. Java实现多线程的方式

1.1 继承Thread

继承Thread,重写run方法,创建实例,执行start方法

1
2
3
4
5
6
7
8
9
10
11
12
13
public class ThreadTest extends Thread {

@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "正在运行");
}

public static void main(String[] args) {
ThreadTest threadTest = new ThreadTest();
threadTest.start();
System.out.println("==============");
}
}

1.2 实现Runnable

自定义类实现Runnable,实现run方法,创建Thread类,使用Runnable接口的实现对象作为参数传递给Thread对象,调用start方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class ThreadTest implements Runnable {

@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "正在运行");
}

public static void main(String[] args) {
Thread thread = new Thread(new ThreadTest());
thread.start();
System.out.println(Thread.currentThread().getName() + "==============");

// 或者使用lambda写法
new Thread(() -> System.out.println(Thread.currentThread().getName() + "正在运行")).start();
}
}

1.3 通过Callable和FutureTask方式

创建callable接口的实现类,并实现call方法,结合FutureTask类包装Callable对象,创建Thread类,使用FutureTask接口的实现对象作为参数传递给Thread对象,调用start方法,futureTask.get()获取返回值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class ThreadTest implements Callable<String> {

@Override
public String call() throws Exception {
System.out.println(Thread.currentThread().getName() + "正在运行");
return "hello world";
}

public static void main(String[] args) {
// 这里也可使用lambda写法
FutureTask<String> futureTask = new FutureTask<>(new ThreadTest());
Thread thread = new Thread(futureTask);
thread.start();
// 获取返回值
try {
String result = futureTask.get();
System.out.println("返回值是:" + result);
} catch (InterruptedException e) {
//阻塞等待中被中断,则抛出
e.printStackTrace();
} catch (ExecutionException e) {
//执行过程发生异常被抛出
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "==============");
}
}

1.4 通过线程池创建

自定义Runnable接口,实现run方法,创建线程池,调用执行方法并传入对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class ThreadTest implements Runnable {

@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "正在运行");
}

public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(3);
for(int i=0; i<10; i++){
executorService.execute(new ThreadTest());
}
//关闭线程池
executorService.shutdown();
System.out.println(Thread.currentThread().getName() + "==============");
}
}

在阿里巴巴编码规范中,创建线程池不允许使用Executors方式,要自己创建ThreadPoolExecutor对象

  • Executors创建的线程池底层也是调用 ThreadPoolExecutor,只不过使用不同的参数、队列、拒绝策略等,如果使用不当,会造成资源耗尽问题;直接使用ThreadPoolExecutor让使用者更加清楚线程池允许规则,常见参数的使用,避免风险
  • 常见的线程池问题
    • newFixedThreadPool和newSingleThreadExecutor: 队列使用LinkedBlockingQueue,队列长度为 Integer.MAX_VALUE,可能造成堆积,导致OOM
    • newScheduledThreadPool和newCachedThreadPool: 线程池里面允许最大的线程数是Integer.MAX_VALUE,可能会创建过多线程,导致OOM

ThreadPoolExecutor创建线程参数

1
2
3
4
5
6
7
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) { ...... }
  • corePoolSize:核心线程数,线程池维护线程的最少数量,默认情况下核心线程会⼀直存活,即使没有任务也不会受keepAliveTime控制,注意:在刚创建线程池时线程不会⽴即启动,到有任务提交时才开始创建线程并逐步达到 corePoolSize
  • maximumPoolSize:线程池维护线程的最大数量,超过将被阻塞。注意:当核心线程满,且阻塞队列也满时,才会判断当前线程数是否小于最大线程数,才决定是否创建新线程
  • keepAliveTime:⾮核心线程的闲置超时时间,超过这个时间就会被回收,直到线程数量等于corePoolSize
  • unit:指定keepAliveTime的单位,如TimeUnit.SECONDS、TimeUnit.MILLISECONDS
  • workQueue:线程池中的任务队列,常用的是 ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue
  • threadFactory:创建新线程时使用的工厂
  • handler: RejectedExecutionHandler是⼀个接口且只有⼀个方法,线程池中的数量大于maximumPoolSize,对拒绝任务的处理策略,默认有4种策略AbortPolicy、CallerRunsPolicy、DiscardOldestPolicy、DiscardPolicy

创建demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
ThreadPoolExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
// 线程池创建的核心线程数,线程池维护线程的最少数量,即使没有任务需要执行,也会一直存活
// 如果设置allowCoreThreadTimeout=true(默认false)时,核心线程会超时关闭
threadPoolTaskExecutor.setCorePoolSize(16);

// 最大线程池数量,当线程数>=corePoolSize,且任务队列已满时。线程池会创建新线程来处理任务
// 当线程数=maxPoolSize,且任务队列已满时,线程池会拒绝处理任务而抛出异常
threadPoolTaskExecutor.setMaxPoolSize(64);

// 缓存队列(阻塞队列)当核心线程数达到最大时,新任务会放在队列中排队等待执行
threadPoolTaskExecutor.setQueueCapacity(1024);

// 当线程空闲时间达到keepAliveTime时,线程会退出,直到线程数量=corePoolSize
// 允许线程空闲时间60秒,当maxPoolSize的线程在空闲时间到达的时候销毁
// 如果allowCoreThreadTimeout=true,则会直到线程数量=0
threadPoolTaskExecutor.setKeepAliveSeconds(30);

// spring 提供的 ThreadPoolTaskExecutor 线程池,是有setThreadNamePrefix() 方法的。
// jdk 提供的ThreadPoolExecutor 线程池是没有 setThreadNamePrefix() 方法的
threadPoolTaskExecutor.setThreadNamePrefix("自定义线程:");
threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);

// rejection-policy:当pool已经达到max size的时候,如何处理新任务
// CallerRunsPolicy():交由调用方线程运行,比如 main 线程;如果添加到线程池失败,那么主线程会自己去执行该任务,不会等待线程池中的线程去执行
// AbortPolicy():该策略是线程池的默认策略,如果线程池队列满了丢掉这个任务并且抛出RejectedExecutionException异常。
// DiscardPolicy():如果线程池队列满了,会直接丢掉这个任务并且不会有任何异常
// DiscardOldestPolicy():丢弃队列中最老的任务,队列满了,会将最早进入队列的任务删掉腾出空间,再尝试加入队列
threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
threadPoolTaskExecutor.initialize();
return threadPoolTaskExecutor;

2. Spring异步执行方法@Async

@Async用于在Spring中执行异步任务

  • 启动类里面使用@EnableAsync注解开启功能,自动扫描
  • 定义异步任务类并使用@Component标记组件被容器扫描,异步方法加上@Async

创建线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@Configuration
@EnableAsync
public class ThreadPoolTaskConfig {

@Bean("threadPoolTaskExecutor")
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {

ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
//线程池创建的核心线程数,线程池维护线程的最少数量,即使没有任务需要执行,也会一直存活
//如果设置allowCoreThreadTimeout=true(默认false)时,核心线程会超时关闭
threadPoolTaskExecutor.setCorePoolSize(16);

//最大线程池数量,当线程数>=corePoolSize,且任务队列已满时。线程池会创建新线程来处理任务
//当线程数=maxPoolSize,且任务队列已满时,线程池会拒绝处理任务而抛出异常
threadPoolTaskExecutor.setMaxPoolSize(64);

//缓存队列(阻塞队列)当核心线程数达到最大时,新任务会放在队列中排队等待执行
threadPoolTaskExecutor.setQueueCapacity(1024);

//当线程空闲时间达到keepAliveTime时,线程会退出,直到线程数量=corePoolSize
//允许线程空闲时间60秒,当maxPoolSize的线程在空闲时间到达的时候销毁
//如果allowCoreThreadTimeout=true,则会直到线程数量=0
threadPoolTaskExecutor.setKeepAliveSeconds(30);

//spring 提供的 ThreadPoolTaskExecutor 线程池,是有setThreadNamePrefix() 方法的。
//jdk 提供的ThreadPoolExecutor 线程池是没有 setThreadNamePrefix() 方法的
threadPoolTaskExecutor.setThreadNamePrefix("自定义线程:");
threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);

// rejection-policy:当pool已经达到max size的时候,如何处理新任务
// CallerRunsPolicy():交由调用方线程运行,比如 main 线程;如果添加到线程池失败,那么主线程会自己去执行该任务,不会等待线程池中的线程去执行
//AbortPolicy():该策略是线程池的默认策略,如果线程池队列满了丢掉这个任务并且抛出RejectedExecutionException异常。
//DiscardPolicy():如果线程池队列满了,会直接丢掉这个任务并且不会有任何异常
//DiscardOldestPolicy():丢弃队列中最老的任务,队列满了,会将最早进入队列的任务删掉腾出空间,再尝试加入队列
threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
threadPoolTaskExecutor.initialize();
return threadPoolTaskExecutor;
}

}

执行方法上添加注解,并指定线程池

1
2
@Async("threadPoolTaskExecutor")
public void sendCode()

3. 线程变量ThreadLocal

3.1 ThreadLocal

​ 主要解决多线程中数据因并发产生不⼀致问题,同个线程共享数据,ThreadLocal为每⼀个线程都提供了变量的副本,使得每个线程在某时间访问到的并不是同⼀个对象,这样就隔离了多个线程对数据的数据共享,这样的结果是耗费了内存,但大大减少了线程同步所带来性能消耗,也减少了线程并发控制的复杂度。

ThreadLocal不能使用原子类型,只能使用Object类型

  • 原理和源码
    • ThreadLocal中的⼀个内部类ThreadLocalMap,这个类没有实现map接口,就是⼀个普通的Java类,但是实现的类似map的功能
    • 每个数据用Entry保存,其中的Entry继承与WeakReference,用⼀个键值对存储,键为ThreadLocal的引用。
    • 每个线程持有⼀个ThreadLocalMap对象,每⼀个新的线程Thread都会实例化⼀个ThreadLocalMap并赋值给成员变量threadLocals,使用时若已经存在threadLocals,则直接使用已经存在的对象。
  • 为什么ThreadLocal的键是弱引用,如果是强引用有什么问题
    • Java中除了基础的数据类型以外,其它的都为引用类型。而Java根据其生命周期的长短将引用类型又分为强引用 、 软引用 、 弱引用 、 虚引用,正常情况下我们平时基本上我们只用到强引用类型
    • 强引用 new了⼀个对象就是强引用 Object obj = new Object();
    • 软引用的生命周期比强引用短⼀些,通过SoftReference类实现,当内存空间足够,垃圾回收器就不会回收它; 当JVM认为内存空间不足时,就会去试图回收软引用指向的对象,也就是说在JVM抛出OutOfMemoryError之前,会去清理软引用对象主要用来描述⼀些【有用但并不是必需】的对象
      使用场景:适合用来实现缓存,内存空间充足的时候将数据缓存在内存中,如果空间不足了就将其回收掉
    • 弱引用是通过WeakReference类实现的,它的生命周期比软引用还要短,在GC的时候,不管内存空间足不足都会回收这个对象
      使用场景:⼀个对象只是偶尔使用,希望在使用时能随时获取,但也不想影响对该对象的垃圾收集,则可以考虑使用弱引用来指向该对象。
    • 如果是强引用,即使把ThreadLocal设置为null,但是ThreadLocalMap还持有ThreadLocal的强引用,如果没有手动删除,ThreadLocal不会被回收,导致Entry内存泄漏
    • 如果是弱引用,引用ThreadLocal的对象被回收了,由于ThreadLocalMap持有ThreadLocal的弱引用,即使没有手动删除,ThreadLocal也会被回收。value在下⼀次ThreadLocalMap调用set、get、remove的时候会被清除。

3.2 InheritableThreadLocal

​ InheritableThreadLocal继承于ThreadLocal,主要解决父子线程变量传递问题,InheritableThreadLocal可以把父线程的变量传递给子线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class InheritableThreadLocal<T> extends ThreadLocal<T> {

protected T childValue(T parentValue) {
return parentValue;
}

ThreadLocalMap getMap(Thread t) {
return t.inheritableThreadLocals;
}

void createMap(Thread t, T firstValue) {
t.inheritableThreadLocals = new ThreadLocalMap(this, firstValue);
}
}


public class ThreadLocal<T> {
...
ThreadLocalMap getMap(Thread t) {
return t.threadLocals;
}

void createMap(Thread t, T firstValue) {
t.threadLocals = new ThreadLocalMap(this, firstValue);
}
...
}

// 在初始化的时候,进行传值
if (inheritThreadLocals && parent.inheritableThreadLocals != null)
this.inheritableThreadLocals =
ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
  • 原理

    • InheritableThreadLocal存储数据的在ThreadLocalMap的inheritableThreadLocals对象里,在线程初始化的时候,该对象会复制父线程的数据到子线程
    • ThreadLocalMap存储数据在threadLocals中,不会复制父线程数据
  • 存在的问题

    • 使用线程池时失效,InheritableThreadLocal是在创建初始化线程的时候,进行赋值,但是线程池中的线程都是提前创建好的,所以无法复制线程变量

3.3 TransmittableThreadLocal

https://github.com/alibaba/transmittable-thread-local

🔧 功能

👉 TransmittableThreadLocal(TTL):在使用线程池等会池化复用线程的执行组件情况下,提供ThreadLocal值的传递功能,解决异步执行时上下文传递的问题。一个Java标准库本应为框架/中间件设施开发提供的标配能力,本库功能聚焦 & 0依赖,支持Java 6~20

JDKInheritableThreadLocal类可以完成父线程到子线程的值传递。但对于使用线程池等会池化复用线程的执行组件的情况,线程由线程池创建好,并且线程是池化起来反复使用的;这时父子线程关系的ThreadLocal值传递已经没有意义,应用需要的实际上是把 任务提交给线程池时ThreadLocal值传递到 任务执行时

本库提供的TransmittableThreadLocal类继承并加强InheritableThreadLocal类,解决上述的问题,使用详见 User Guide

整个TransmittableThreadLocal库的核心功能(用户API与框架/中间件的集成API、线程池ExecutorService/ForkJoinPool/TimerTask及其线程工厂的Wrapper),只有 ***~1000 SLOC代码行***,非常精小。

欢迎 👏

TTL v2.13+开始,升级到Java 8
如果需要Java 6的支持,使用版本2.12.x Maven Central

🎨 需求场景

ThreadLocal的需求场景即TransmittableThreadLocal的潜在需求场景,如果你的业务需要『在使用线程池等会池化复用线程的执行组件情况下传递ThreadLocal值』则是TransmittableThreadLocal目标场景。

下面是几个典型场景例子。

  1. 分布式跟踪系统 或 全链路压测(即链路打标)
  2. 日志收集记录系统上下文
  3. SessionCache
  4. 应用容器或上层框架跨应用代码给下层SDK传递信息

各个场景的展开说明参见子文档 需求场景

👥 User Guide

使用类TransmittableThreadLocal来保存值,并跨线程池传递。

TransmittableThreadLocal继承InheritableThreadLocal,使用方式也类似。相比InheritableThreadLocal,添加了protectedtransmitteeValue方法,用于定制 任务提交给线程池时ThreadLocal值传递到 任务执行时 的拷贝行为,缺省是简单的赋值传递。注意:如果传递的是一个对象(引用类型)且没有做深拷贝,如直接传递引用或是浅拷贝,那么

  • 跨线程传递而不再有线程封闭,传递对象在多个线程之间是有共享的;
  • InheritableThreadLocal.childValue一样,使用者/业务逻辑要注意传递对象的线程安全。
关于transmitteeValue方法 的 展开说明

关于构词后缀eree的说明:

  • transmit是动词传递,transmitter动作的执行者/主动方,而transmittee动作的接收者/被动方。
  • eree后缀的常见词是employer(雇主)/employee(雇员)、caller(调用者)/callee(被调用者)。

具体使用方式见下面的说明。

1. 简单使用

父线程给子线程传递值。

示例代码:

1
2
3
4
5
6
7
8
9
10
11
TransmittableThreadLocal<String> context = new TransmittableThreadLocal<>();

// =====================================================

// 在父线程中设置
context.set("value-set-in-parent");

// =====================================================

// 在子线程中可以读取,值是"value-set-in-parent"
String value = context.get();

# 完整可运行的Demo代码参见SimpleDemo.kt

这其实是InheritableThreadLocal的功能,应该使用InheritableThreadLocal来完成。

但对于使用线程池等会池化复用线程的执行组件的情况,线程由线程池创建好,并且线程是池化起来反复使用的;这时父子线程关系的ThreadLocal值传递已经没有意义,应用需要的实际上是把 任务提交给线程池时ThreadLocal值传递到 任务执行时

解决方法参见下面的这几种用法。

2. 保证线程池中传递值

2.1 修饰RunnableCallable

使用TtlRunnableTtlCallable来修饰传入线程池的RunnableCallable

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
TransmittableThreadLocal<String> context = new TransmittableThreadLocal<>();

// =====================================================

// 在父线程中设置
context.set("value-set-in-parent");

Runnable task = new RunnableTask();
// 额外的处理,生成修饰了的对象ttlRunnable
Runnable ttlRunnable = TtlRunnable.get(task);
executorService.submit(ttlRunnable);

// =====================================================

// Task中可以读取,值是"value-set-in-parent"
String value = context.get();

***注意***:
即使是同一个Runnable任务多次提交到线程池时,每次提交时都需要通过修饰操作(即TtlRunnable.get(task))以抓取这次提交时的TransmittableThreadLocal上下文的值;即如果同一个任务下一次提交时不执行修饰而仍然使用上一次的TtlRunnable,则提交的任务运行时会是之前修饰操作所抓取的上下文。示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
// 第一次提交
Runnable task = new RunnableTask();
executorService.submit(TtlRunnable.get(task));

// ...业务逻辑代码,
// 并且修改了 TransmittableThreadLocal上下文 ...
context.set("value-modified-in-parent");

// 再次提交
// 重新执行修饰,以传递修改了的 TransmittableThreadLocal上下文
executorService.submit(TtlRunnable.get(task));

上面演示了RunnableCallable的处理类似

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
TransmittableThreadLocal<String> context = new TransmittableThreadLocal<>();

// =====================================================

// 在父线程中设置
context.set("value-set-in-parent");

Callable call = new CallableTask();
// 额外的处理,生成修饰了的对象ttlCallable
Callable ttlCallable = TtlCallable.get(call);
executorService.submit(ttlCallable);

// =====================================================

// Call中可以读取,值是"value-set-in-parent"
String value = context.get();

# 完整可运行的Demo代码参见TtlWrapperDemo.kt

整个过程的完整时序图

时序图

2.2 修饰线程池

省去每次RunnableCallable传入线程池时的修饰,这个逻辑可以在线程池中完成。

通过工具类TtlExecutors完成,有下面的方法:

  • getTtlExecutor:修饰接口Executor
  • getTtlExecutorService:修饰接口ExecutorService
  • getTtlScheduledExecutorService:修饰接口ScheduledExecutorService

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
ExecutorService executorService = ...
// 额外的处理,生成修饰了的对象executorService
executorService = TtlExecutors.getTtlExecutorService(executorService);

TransmittableThreadLocal<String> context = new TransmittableThreadLocal<>();

// =====================================================

// 在父线程中设置
context.set("value-set-in-parent");

Runnable task = new RunnableTask();
Callable call = new CallableTask();
executorService.submit(task);
executorService.submit(call);

// =====================================================

// Task或是Call中可以读取,值是"value-set-in-parent"
String value = context.get();

# 完整可运行的Demo代码参见TtlExecutorWrapperDemo.kt

2.3 使用Java Agent来修饰JDK线程池实现类

这种方式,实现线程池的传递是透明的,业务代码中没有修饰Runnable或是线程池的代码。即可以做到应用代码 无侵入
# 关于 无侵入 的更多说明参见文档Java Agent方式对应用代码无侵入

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// ## 1. 框架上层逻辑,后续流程框架调用业务 ##
TransmittableThreadLocal<String> context = new TransmittableThreadLocal<>();
context.set("value-set-in-parent");

// ## 2. 应用逻辑,后续流程业务调用框架下层逻辑 ##
ExecutorService executorService = Executors.newFixedThreadPool(3);

Runnable task = new RunnableTask();
Callable call = new CallableTask();
executorService.submit(task);
executorService.submit(call);

// ## 3. 框架下层逻辑 ##
// Task或是Call中可以读取,值是"value-set-in-parent"
String value = context.get();

Demo参见AgentDemo.kt。执行工程下的脚本scripts/run-agent-demo.sh即可运行Demo。

目前TTL Agent中,修饰了的JDK执行器组件(即如线程池)如下:

  1. java.util.concurrent.ThreadPoolExecutor
    
    1
    2
    3
    4
    5
    6
    7







    java.util.concurrent.ScheduledThreadPoolExecutor
    1
    2
    3
    4
    5

    - 修饰实现代码在[`JdkExecutorTtlTransformlet.java`](https://github.com/alibaba/transmittable-thread-local/blob/master/ttl-agent/src/main/java/com/alibaba/ttl3/agent/transformlet/internal/JdkExecutorTtlTransformlet.java)。

    2. ```
    java.util.concurrent.ForkJoinTask
    (对应的执行器组件是
    1
    java.util.concurrent.ForkJoinPool
    ) - 修饰实现代码在[`ForkJoinTtlTransformlet.java`](https://github.com/alibaba/transmittable-thread-local/blob/master/ttl-agent/src/main/java/com/alibaba/ttl3/agent/transformlet/internal/ForkJoinTtlTransformlet.java)。从版本 ***`2.5.1`\*** 开始支持。 - ***注意\***:`Java 8`引入的[***`CompletableFuture`\***](https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/concurrent/CompletableFuture.html)与(并行执行的)[***`Stream`\***](https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/stream/package-summary.html)底层是通过`ForkJoinPool`来执行,所以支持`ForkJoinPool`后,`TTL`也就透明支持了`CompletableFuture`与`Stream`。🎉
  2. java.util.TimerTask
    
    1
    2
    3

    的子类(对应的执行器组件是

    java.util.Timer
    1
    2
    3
    4
    5
    6
    7
    8
    9



    - 修饰实现代码在[`TimerTaskTtlTransformlet.java`](https://github.com/alibaba/transmittable-thread-local/blob/master/ttl-agent/src/main/java/com/alibaba/ttl3/agent/transformlet/internal/TimerTaskTtlTransformlet.java)。从版本 ***`2.7.0`\*** 开始支持。

    - ***注意\***:从`2.11.2`版本开始缺省开启`TimerTask`的修饰(因为保证正确性是第一位,而不是最佳实践『不推荐使用`TimerTask`』:);`2.11.1`版本及其之前的版本没有缺省开启`TimerTask`的修饰。

    - 使用

    Agent
    1
    2
    3

    参数

    ttl.agent.enable.timer.task
    1
    2
    3

    开启/关闭

    TimerTask
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23

    的修饰:

    - `-javaagent:path/to/transmittable-thread-local-2.x.y.jar=ttl.agent.enable.timer.task:true`
    - `-javaagent:path/to/transmittable-thread-local-2.x.y.jar=ttl.agent.enable.timer.task:false`

    - 更多关于`TTL Agent`参数的配置说明详见[`TtlAgent.java`的JavaDoc](https://github.com/alibaba/transmittable-thread-local/blob/master/ttl-agent/src/main/java/com/alibaba/ttl3/agent/TtlAgent.java)。

    > 关于`java.util.TimerTask`/`java.util.Timer` 的 展开说明

    #### `Java Agent`的启动参数配置

    在`Java`的启动参数加上:`-javaagent:path/to/transmittable-thread-local-2.x.y.jar`。

    ***注意\***:

    - 如果修改了下载的`TTL`的`Jar`的文件名(`transmittable-thread-local-2.x.y.jar`),则需要自己手动通过`-Xbootclasspath JVM`参数来显式配置。
    比如修改文件名成`ttl-foo-name-changed.jar`,则还需要加上`Java`的启动参数:`-Xbootclasspath/a:path/to/ttl-foo-name-changed.jar`。
    - 或使用`v2.6.0`之前的版本(如`v2.5.1`),则也需要自己手动通过`-Xbootclasspath JVM`参数来显式配置(就像`TTL`之前的版本的做法一样)。
    加上`Java`的启动参数:`-Xbootclasspath/a:path/to/transmittable-thread-local-2.5.1.jar`。

    `Java`命令行示例如下:

    java -javaagent:path/to/transmittable-thread-local-2.x.y.jar \ -cp classes \ com.alibaba.demo.ttl.agent.AgentDemo

如果修改了TTL jar文件名 或 TTL版本是 2.6.0 之前

则还需要显式设置 -Xbootclasspath 参数

java -javaagent:path/to/ttl-foo-name-changed.jar
-Xbootclasspath/a:path/to/ttl-foo-name-changed.jar
-cp classes
com.alibaba.demo.ttl.agent.AgentDemo

java -javaagent:path/to/transmittable-thread-local-2.5.1.jar
-Xbootclasspath/a:path/to/transmittable-thread-local-2.5.1.jar
-cp classes
com.alibaba.demo.ttl.agent.AgentDemo

1
2
3
4
5
6
7
8
9
10
11

> 关于`boot class path` 的 展开说明

# 🔌 Java API Docs

当前版本的Java API文档地址: https://alibaba.github.io/transmittable-thread-local/apidocs/

# 🍪 Maven依赖

示例:

com.alibaba transmittable-thread-local 2.14.1
1
2
3
4
5
6
7
8

可以在 [search.maven.org](https://search.maven.org/artifact/com.alibaba/transmittable-thread-local) 查看可用的版本。

# 🔨 关于编译构建

编译构建的环境要求: ***`JDK 8+`\***;用`Maven`常规的方式执行编译构建即可:
\# 在工程中已经包含了符合版本要求的`Maven`,直接运行 ***工程根目录下的`mvnw`\***;并不需要先手动自己安装好`Maven`。

# 运行测试Case ./mvnw test # 编译打包 ./mvnw package # 运行测试Case、编译打包、安装TTL库到Maven本地 ./mvnw install

#####################################################

如果使用你自己安装的 maven,版本要求:maven 3.3.9+

mvn install

1
2
3
4
5
6
7

# ❓ FAQ

***Q1. `TTL Agent`与其它`Agent`(如`Skywalking`、`Promethues`)配合使用时不生效?\***

配置`TTL Agent`在最前的位置,可以避免与其它其它`Agent`配合使用时,`TTL Agent`可能的不生效问题。配置示例:

java -javaagent:path/to/transmittable-thread-local-2.x.y.jar
-javaagent:path/to/skywalking-agent.jar
-jar your-app.jar

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102

原因是:

- 像`Skywalking`这样的`Agent`的入口逻辑(`premain`)包含了线程池的启动。
- 如果配置在这样的`Agent`配置在前面,到了`TTL Agent`(的`premain`)时,`TTL`需要加强的线程池类已经加载(`load`)了。
- `TTL Agent`的`TtlTransformer`是在类加载时触发类的增强;如果类已经加载了会跳过`TTL Agent`的增强逻辑。

更多讨论参见 [Issue:`TTL agent`与其他`Agent`的兼容性问题 #226](https://github.com/alibaba/transmittable-thread-local/issues/226)。

***Q2. `MacOS`下,使用`Java Agent`,可能会报`JavaLaunchHelper`的出错信息\***

JDK Bug: https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8021205
可以换一个版本的`JDK`。我的开发机上`1.7.0_40`有这个问题,`1.6.0_51`、`1.7.0_45`可以运行。
\# `1.7.0_45`还是有`JavaLaunchHelper`的出错信息,但不影响运行。

# ✨ 使用`TTL`的好处与必要性

> 注:不读这一节,并不会影响你使用`TTL`来解决你碰到的问题,可以放心跳过;读了 [User Guide](https://github.com/alibaba/transmittable-thread-local#-user-guide) 就可以快速用起来了~ 😄 这一节信息密度较高不易读。

***好处:透明且自动完成所有异步执行上下文的可定制、规范化的捕捉与传递。\***
这个好处也是`TransmittableThreadLocal`的目标。

***必要性:随着应用的分布式微服务化并使用各种中间件,越来越多的功能与组件会涉及不同的上下文,逻辑流程也越来越长;上下文问题实际上是个大的易错的架构问题,需要统一的对业务透明的解决方案。\***

使用`ThreadLocal`作为业务上下文传递的经典技术手段在中间件、技术与业务框架中广泛大量使用。而对于生产应用,几乎一定会使用线程池等异步执行组件,以高效支撑线上大流量。但使用`ThreadLocal`及其`set/remove`的上下文传递模式,在使用线程池等异步执行组件时,存在多方面的问题:

***1. 从业务使用者角度来看\***

1. 繁琐
- 业务逻辑要知道:有哪些上下文;各个上下文是如何获取的。
- 并需要业务逻辑去一个一个地捕捉与传递。
2. 依赖
- 需要直接依赖不同`ThreadLocal`上下文各自的获取的逻辑或类。
- 像`RPC`的上下文(如`Dubbo`的`RpcContext`)、全链路跟踪的上下文(如`SkyWalking`的`ContextManager`)、不同业务模块中的业务流程上下文,等等。
3. 静态(易漏)
- 因为要 ***事先\*** 知道有哪些上下文,如果系统出现了一个新的上下文,业务逻辑就要修改添加上新上下文传递的几行代码。也就是说因 ***系统的\*** 上下文新增,***业务的\*** 逻辑就跟进要修改。
- 而对于业务来说,不关心系统的上下文,即往往就可能遗漏,会是线上故障了。
- 随着应用的分布式微服务化并使用各种中间件,越来越多的功能与组件会涉及不同的上下文,逻辑流程也越来越长;上下文问题实际上是个大的易错的架构问题,需要统一的对业务透明的解决方案。
4. 定制性
- 因为需要业务逻辑来完成捕捉与传递,业务要关注『上下文的传递方式』:直接传引用?还是拷贝传值?拷贝是深拷贝还是浅拷贝?在不同的上下文会需要不同的做法。
- 『上下文的传递方式』往往是 ***上下文的提供者\***(或说是业务逻辑的框架部分)才能决策处理好的;而 ***上下文的使用者\***(或说是业务逻辑的应用部分)往往不(期望)知道上下文的传递方式。这也可以理解成是 ***依赖\***,即业务逻辑 依赖/关注/实现了 系统/架构的『上下文的传递方式』。

***2. 从整体流程实现角度来看\***

关注的是 **上下文传递流程的规范化**。上下文传递到了子线程要做好 ***清理\***(或更准确地说是要 ***恢复\*** 成之前的上下文),需要业务逻辑去处理好。如果业务逻辑对**清理**的处理不正确,比如:

- 如果清理操作漏了:
- 下一次执行可能是上次的,即『上下文的 ***污染\***/***串号\***』,会导致业务逻辑错误。
- 『上下文的 ***泄漏\***』,会导致内存泄漏问题。
- 如果清理操作做多了,会出现上下文 ***丢失\***。

上面的问题,在业务开发中引发的`Bug`真是**屡见不鲜** !本质原因是:***`ThreadLocal`的`set/remove`的上下文传递模式\*** 在使用线程池等异步执行组件的情况下不再是有效的。常见的典型例子:

- 当线程池满了且线程池的`RejectedExecutionHandler`使用的是`CallerRunsPolicy`时,提交到线程池的任务会在提交线程中直接执行,`ThreadLocal.remove`操作**清理**提交线程的上下文导致上下文**丢失**。
- 类似的,使用`ForkJoinPool`(包含并行执行`Stream`与`CompletableFuture`,底层使用`ForkJoinPool`)的场景,展开的`ForkJoinTask`会在任务提交线程中直接执行。同样导致上下文**丢失**。

怎么设计一个『上下文传递流程』方案(即上下文的生命周期),以**保证**没有上面的问题?

期望:上下文生命周期的操作从业务逻辑中分离出来。业务逻辑不涉及生命周期,就不会有业务代码如疏忽清理而引发的问题了。整个上下文的传递流程或说生命周期可以规范化成:捕捉、回放和恢复这3个操作,即[***`CRR(capture/replay/restore)`模式\***](https://github.com/alibaba/transmittable-thread-local/blob/master/docs/developer-guide.md#-框架中间件集成ttl传递)。更多讨论参见 [Issue:能在详细讲解一下`replay`、`restore`的设计理念吗?#201](https://github.com/alibaba/transmittable-thread-local/issues/201)。

总结上面的说明:在生产应用(几乎一定会使用线程池等异步执行组件)中,使用`ThreadLocal`及其`set/remove`的上下文传递模式**几乎一定是有问题的**,***只是在等一个出`Bug`的机会\***。

更多`TTL`好处与必要性的展开讨论参见 [Issue:这个库带来怎样的好处和优势? #128](https://github.com/alibaba/transmittable-thread-local/issues/128),欢迎继续讨论 ♥️

# 🗿 更多文档

- [🎨 需求场景说明](https://github.com/alibaba/transmittable-thread-local/blob/master/docs/requirement-scenario.md)
- [❤️ 小伙伴同学们写的`TTL`使用场景 与 设计实现解析的文章(写得都很好!) - Issue #123](https://github.com/alibaba/transmittable-thread-local/issues/123)
- [🎓 Developer Guide](https://github.com/alibaba/transmittable-thread-local/blob/master/docs/developer-guide.md)
- [☔ 性能测试](https://github.com/alibaba/transmittable-thread-local/blob/master/docs/performance-test.md)

# 📚 相关资料

## JDK Core Classes

- [WeakHashMap](https://docs.oracle.com/javase/10/docs/api/java/util/WeakHashMap.html)
- [InheritableThreadLocal](https://docs.oracle.com/javase/10/docs/api/java/lang/InheritableThreadLocal.html)



## 4. 多线程编排CompletableFuture

Future只能通过阻塞或者轮询的方式获取结果,而且不支持设置回调方法。在Java8中,CompletableFuture对Future进行了扩展,支持回调方法,同时也支持多个线程的组合和编排操作

### 4.1 操作API

- 创建异步任务

- runAsync: 不带返回值
- supplyAsync: 带返回值

```java
@Test
void test1() {
ExecutorService threadPool = new ThreadPoolExecutor(5, 10, 1, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(100));
CompletableFuture<Void> future1 = CompletableFuture.runAsync(() ->
System.out.println(Thread.currentThread().getName() + "正在运行"), threadPool);
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "正在运行");
return "hello world";
}, threadPool);
}
  • 获取任务返回值

    • get(): 方法阻塞等待返回
    • get(long timeout, TimeUnit unit): 方法阻塞,等待参数中的事件如果没有返回抛出TimeoutException异常
    • getNow(T valueIfAbsent): 执行代码的时刻获取返回值,如果没有返回值,返回参数中的默认值
    • join():和get方法一样阻塞线程等待返回值,join不用必须try cache,join可能抛出unchecked异常
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    @Test
    void test2() {
    ExecutorService threadPool = new ThreadPoolExecutor(5, 10, 1, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(100));
    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    ThreadUtil.sleep(1000);
    System.out.println(Thread.currentThread().getName() + "正在运行");
    return "hello world";
    }, threadPool);

    String defaultStr = future.getNow("默认返回值");
    System.out.println(defaultStr);


    try {
    String result = future.get(300, TimeUnit.MILLISECONDS);
    System.out.println(result);
    } catch (InterruptedException e) {
    e.printStackTrace();
    } catch (ExecutionException e) {
    e.printStackTrace();
    } catch (TimeoutException e) {
    e.printStackTrace();
    }

    String joinStr = future.join();
    System.out.println(joinStr);

    try {
    String result = future.get();
    System.out.println(result);
    } catch (InterruptedException e) {
    e.printStackTrace();
    } catch (ExecutionException e) {
    e.printStackTrace();
    }

    }
  • thenRun和thenRunAsync

    • 当前任务正常执行完成后,执行下一个任务,没有入参和返回值,thenRun使用同一个线程执行下一个任务,thenRunAsync新获取一个线程执行下一个任务
    • 入参:Runnable
    • future.thenRunAsync(() -> System.out.println(Thread.currentThread().getName() + "正在运行"), threadPool);
  • thenAccept和thenAcceptAsync

    • 当前任务正常执行完成后,将当前任务的返回值作为下一个任务入参,没有返回值
    • 入参:Consumer<T>void accept(T t);
    • future.thenAcceptAsync(o -> System.out.println("获取到上个任务的返回值:" + o), threadPool);
  • thenApply和thenApplyAsync

    • 当前任务正常执行完成后,将当前任务的返回值作为下一个任务入参,且有返回值。
    • 入参:Function<T, R>R apply(T t);
    • CompletableFuture<String> future1 = future.thenApplyAsync(o -> o + " CompletableFuture", threadPool);
  • whenComplete和whenCompleteAsync

    • 当前任务正常执行完成后,将当前任务的返回值作为下一个任务入参,同时会传递异常参数,正常返回,异常参数为null,返回上一个任务的结果
    • 入参BiConsumer<T, U>void accept(T t, U u);
    • CompletableFuture<String> future2 = future.whenCompleteAsync((o, e) -> System.out.println("获取到上个任务的返回值:" + o), threadPool);
  • handle和handleAsync

    • 当前任务正常执行完成后,将当前任务的返回值作为下一个任务入参,同时会传递异常参数,有返回值
    • 入参:BiFunction<T, U, R>R apply(T t, U u);
    • CompletableFuture<String> future3 = future.handleAsync((o, e) -> o + " handle", threadPool);
  • thenCompose和thenComposeAsync

    • 当前任务正常执行完成后,将当前任务的返回值作为下一个任务入参,返回值需要是一个CompletableFuture
    • 入参:Function<T, R>R apply(T t);
    • CompletableFuture<String> future5 = future.thenComposeAsync(o -> CompletableFuture.supplyAsync(() -> o +" Compose"), threadPool);
  • thenCombine和thenCombineAsync

    • 当前任务正常执行完成,并且第一个参数的CompletableFuture也执行完,将两个返回传给第二个参数,有返回值
    • 入参:public <U,V> CompletableFuture thenCombineAsync(
      CompletionStage<? extends U> other,
      BiFunction<? super T,? super U,? extends V> fn, Executor executor)
    1
    2
    3
    4
    5
    6
    7
    8
    @Test
    void test4() {
    ExecutorService threadPool = Executors.newFixedThreadPool(1);
    CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello ", threadPool);
    CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> " World", threadPool);
    CompletableFuture<String> future = future1.thenCombineAsync(future2, (o1, o2) -> o1 + o2, threadPool);
    System.out.println(future.join());
    }
  • allOf

    • 参数里的所有任务都执行完后,返回一个CompletableFuture<Void>
    1
    2
    3
    4
    5
    6
    7
    8
    9
    @Test
    void test5() {
    ExecutorService threadPool = Executors.newFixedThreadPool(2);
    CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "future1 ", threadPool);
    CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "future2 ", threadPool);
    CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> "future3 ", threadPool);
    CompletableFuture.allOf(future1, future2, future3);
    System.out.println(future1.join()+future2.join()+future3.join());
    }
  • anyOf

    • 参数里的任意任务都执行完后,返回一个CompletableFuture<Object>
    1
    2
    3
    4
    5
    6
    7
    8
    9
    @Test
    void test6() {
    ExecutorService threadPool = Executors.newFixedThreadPool(2);
    CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "future1 ", threadPool);
    CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "future2 ", threadPool);
    CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> "future3 ", threadPool);
    CompletableFuture<Object> anyOf = CompletableFuture.anyOf(future1, future2, future3);
    System.out.println(anyOf.join());
    }

CompletableFuture原理与实践-外卖商家端API的异步化