Spring-@Async

@Async用于在Spring中执行异步任务

1. 使用方式

  • 启动类里面使用@EnableAsync注解开启功能,自动扫描
  • 定义异步任务类并使用@Component标记组件被容器扫描,异步方法加上@Async

2. @Async失效情况

  • 注解@Async的方法不是public方法
  • 注解@Async的返回值只能为void或者Future
  • 注解@Async方法使用static修饰也会失效
  • spring无法扫描到异步类,没加注解@Async 或 @EnableAsync注解
  • 调用方与被调方不能在同一个类
    • Spring 在扫描bean的时候会扫描方法上是否包含@Async注解,动态地生成一个子类(即proxy代理类),当这个有注解的方法被调用的时候,实际上是由代理类来调用的,代理类在调用时增加异步作用
    • 如果这个有注解的方法是被同一个类中的其他方法调用的,那么该方法的调用并没有通过代理类,而是直接通过原来的那个 bean,所以就失效了
    • 所以调用方与被调方不能在同一个类,主要是使用了动态代理,同一个类的时候直接调用,不是通过生成的动态代理类调用
    • 一般将要异步执行的方法单独抽取成一个类
  • 类中需要使用@Autowired或@Resource等注解自动注入,不能自己手动new对象
  • 在Async 方法上标注@Transactional是没用的,但在Async 方法调用的方法上标注@Transactional 是有效的

3. 编码实践

3.1 编写接口

RestTemplate远程调用第三方地址

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Service
@Slf4j
public class NoticeServiceImpl implements NoticeService {

@Autowired
private RestTemplate restTemplate;

@Override
public void sendCode() {
long beginTime = CommonUtil.getCurrentTimestamp();
ResponseEntity<String> forEntity = restTemplate.getForEntity("http://xxx", String.class);
String body = forEntity.getBody();
long endTime = CommonUtil.getCurrentTimestamp();
log.info("耗时={},body={}",endTime-beginTime,body);
}
}

3.2 Jmeter压力测试:

线程数:200,Ramp-Up时间:2,循环次数:500

Label # 样本 平均值 中位数 90% 百分位 95% 百分位 99% 百分位 最小值 最大值 异常 % 吞吐量 接收 KB/sec 发送 KB/sec
HTTP请求 100000 73 31 282 346 440 27 3005 0.00% 2432.79406 488.72 337.36
总体 100000 73 31 282 346 440 27 3005 0.00% 2432.79406 488.72 337.36

3.3 异步调用

  • 在启动类上添加注解@EnableAsync

  • 在service实现方法NoticeServiceImpl上添加注解@Async

再次压测

Label # 样本 平均值 中位数 90% 百分位 95% 百分位 99% 百分位 最小值 最大值 异常 % 吞吐量 接收 KB/sec 发送 KB/sec
HTTP请求 100000 12 10 22 28 44 0 170 0.00% 13065.06402 2624.62 1811.76
总体 100000 12 10 22 28 44 0 170 0.00% 13065.06402 2624.62 1811.76

4. 存在的问题

现象:压测后很快跑完全部内容,是因为都在线程池内部的阻塞队列里面

  • 极容易出现OOM,或者消息丢失

  • 默认8个核心线程数占用满了之后, 新的调用就会进入队列, 最大值是Integer.MAX_VALUE

  • 设置下idea启动进程的jvm参数: -Xms50M -Xmx50M,再次压测,会出现异常

1
Exception in thread "http-nio-8001-Poller" java.lang.OutOfMemoryError: Java heap space

代码位置:

  • TaskExecutionProperties
  • TaskExecutionAutoConfiguration

说明:

  • 直接使用 @Async 注解没指定线程池的话,即未设置TaskExecutor时
  • 默认使用Spring创建ThreadPoolTaskExecutor
  • 核心线程数:8
  • 最大线程数:Integer.MAX_VALUE ( 21亿多)
  • 队列使用LinkedBlockingQueue
  • 容量是:Integer.MAX_VALUE
  • 空闲线程保留时间:60s
  • 线程池拒绝策略:AbortPolicy

5. 解决问题

5.1 ThreadPoolTaskExecutor和ThreadPoolExecutor

  • ThreadPoolExecutor,这个类是JDK中的线程池类,继承自Executor,里面有一个execute()方法,用来执行线程,线程池主要提供一个线程队列,队列中保存着所有等待状态的线程,避免了创建与销毁的额外开销
  • ThreadPoolTaskExecutor,是spring包下的,是Spring为我们提供的线程池类,Spring异步线程池的接口类是TaskExecutor,本质还是java.util.concurrent.Executor

5.2 解决方式

spring会先搜索TaskExecutor类型的bean或者名字为taskExecutorExecutor类型的bean,所以我们最好来自定义一个线程池,加入Spring IOC容器里面,即可覆盖

去掉启动类上的@EnableAsync注解,添加线程配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@Configuration
@EnableAsync
public class ThreadPoolTaskConfig {

@Bean("threadPoolTaskExecutor")
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {

ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
//线程池创建的核心线程数,线程池维护线程的最少数量,即使没有任务需要执行,也会一直存活
//如果设置allowCoreThreadTimeout=true(默认false)时,核心线程会超时关闭
threadPoolTaskExecutor.setCorePoolSize(16);

//最大线程池数量,当线程数>=corePoolSize,且任务队列已满时。线程池会创建新线程来处理任务
//当线程数=maxPoolSize,且任务队列已满时,线程池会拒绝处理任务而抛出异常
threadPoolTaskExecutor.setMaxPoolSize(64);

//缓存队列(阻塞队列)当核心线程数达到最大时,新任务会放在队列中排队等待执行
threadPoolTaskExecutor.setQueueCapacity(1024);

//当线程空闲时间达到keepAliveTime时,线程会退出,直到线程数量=corePoolSize
//允许线程空闲时间60秒,当maxPoolSize的线程在空闲时间到达的时候销毁
//如果allowCoreThreadTimeout=true,则会直到线程数量=0
threadPoolTaskExecutor.setKeepAliveSeconds(30);

//spring 提供的 ThreadPoolTaskExecutor 线程池,是有setThreadNamePrefix() 方法的。
//jdk 提供的ThreadPoolExecutor 线程池是没有 setThreadNamePrefix() 方法的
threadPoolTaskExecutor.setThreadNamePrefix("自定义线程:");
threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);

// rejection-policy:当pool已经达到max size的时候,如何处理新任务
// CallerRunsPolicy():交由调用方线程运行,比如 main 线程;如果添加到线程池失败,那么主线程会自己去执行该任务,不会等待线程池中的线程去执行
//AbortPolicy():该策略是线程池的默认策略,如果线程池队列满了丢掉这个任务并且抛出RejectedExecutionException异常。
//DiscardPolicy():如果线程池队列满了,会直接丢掉这个任务并且不会有任何异常
//DiscardOldestPolicy():丢弃队列中最老的任务,队列满了,会将最早进入队列的任务删掉腾出空间,再尝试加入队列
threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
threadPoolTaskExecutor.initialize();
return threadPoolTaskExecutor;
}

}
1
2
@Async("threadPoolTaskExecutor")
public void sendCode()
  • 线程判断规则:

    先是CorePoolSize是否满足,然后是Queue阻塞队列是否满,最后才是MaxPoolSize是否满足

  • 高并发下核心线程怎么设置?

    • 分IO密集还是CPU密集
      • CPU密集设置为跟核心数一样大小
      • IO密集型设置为2倍CPU核心数
    • 非固定,根据实际情况压测进行调整