XXL-Job-入门

1. 简介

XXL-Job是一个分布式任务调度平台

官方网站:https://www.xuxueli.com/xxl-job/

2. 设计思想

  • 将调度行为抽象形成“调度中心”公共平台,而平台自身并不承担业务逻辑,“调度中心”负责发起调度请求。
  • 将任务抽象成分散的JobHandler,交由“执行器”统一管理
  • “执行器”负责接收调度请求并执行对应的JobHandler中业务逻辑。
  • “调度”和“任务”两部分可以相互解耦,提高系统整体稳定性和扩展性

3. 下载源码

版本2.3.0:https://github.com/xuxueli/xxl-job/tree/2.3.0

  • 目录介绍
    • doc:xxl-job的文档资料,包括了数据库的脚本
    • xxl-job-core:公共jar包依赖
    • xxl-job-admin:调度中心
    • xxl-job-executor-samples:执行器
  • 数据库表
    • xxl_job_group:执行器信息表,用于维护任务执行器的信息
    • xxl_job_info:调度扩展信息表,主要是用于保存xxl-job的调度任务的扩展信息,比如说像任务分组、任务名、机器的地址等等
    • xxl_job_lock:任务调度锁表
    • xxl_job_log:日志表,主要是用在保存xxl-job任务调度历史信息,像调度结果、执行结果、调度入参等等
    • xxl_job_log_report:日志报表,会存储xxl-job任务调度的日志报表,会在调度中心里的报表功能里使用到
    • xxl_job_logglue:任务的GLUE日志,用于保存GLUE日志的更新历史变化,支持GLUE版本的回溯功能
    • xxl_job_registry:执行器的注册表,用在维护在线的执行器与调度中心的地址信息
    • xxl_job_user:系统的用户表

4. 启动服务

  • 在源码中找到doc/db/table_xxl_job.sql文件,在数据库执行脚本

  • 修改xxl-job-admin的配置文件application.properties

    1
    2
    3
    4
    5
    spring.datasource.url=jdbc:mysql://192.168.80.128:3306/xxl_job?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&serverTimezone=Asia/Shanghai
    spring.datasource.username=root
    spring.datasource.password=aacopy.cn

    xxl.job.accessToken=aacopy.cn
  • 启动XxlJobAdminApplication

  • 访问http://127.0.0.1:8080/xxl-job-admin

  • 默认账号密码:admin/123456

5. 客户端

  • 创建一个springboot项目

  • 引入maven依赖

    1
    2
    3
    4
    5
    <dependency>
    <groupId>com.xuxueli</groupId>
    <artifactId>xxl-job-core</artifactId>
    <version>2.3.0</version>
    </dependency>
  • 添加配置文件

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    # web port
    server.port=8081
    #----------xxl-job配置--------------
    logging.config=classpath:logback.xml
    #调度中心部署地址,多个配置逗号分隔 "http://address01,http://address02"
    xxl.job.admin.addresses=http://127.0.0.1:8080/xxl-job-admin

    #执行器token,非空时启用 xxl-job, access token,和服务端要保持一致
    xxl.job.accessToken=aacopy.cn

    # 执行器app名称,和控制台那边配置一样的名称,不然注册不上去
    xxl.job.executor.appname=xxxl-job-learn

    # [选填]执行器注册:优先使用该配置作为注册地址,为空时使用内嵌服务 ”IP:PORT“ 作为注册地址。
    #从而更灵活的支持容器类型执行器动态IP和动态映射端口问题。
    xxl.job.executor.address=

    #[选填]执行器IP :默认为空表示自动获取IP(即springboot容器的ip和端口,可以自动获取,也可以指定),多网卡时可手动设置指定IP,该IP不会绑定Host仅作为通讯实用;地址信息用于 "执行器注册" 和 "调度中心请求并触发任务",
    xxl.job.executor.ip=

    # [选填]执行器端口号:小于等于0则自动获取;默认端口为9999,单机部署多个执行器时,注意要配置不同执行器端口;
    xxl.job.executor.port=9999

    #执行器日志文件存储路径,需要对该路径拥有读写权限;为空则使用默认路径
    xxl.job.executor.logpath=/data/applogs/xxl-job/jobhandler

    #执行器日志保存天数
    xxl.job.executor.logretentiondays=30

  • 创建logback.xml配置文件

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    <?xml version="1.0" encoding="UTF-8"?>
    <configuration debug="false" scan="true" scanPeriod="1 seconds">

    <contextName>logback</contextName>
    <property name="log.path" value="/data/applogs/xxl-job/xxl-job-learn.log"/>

    <appender name="console" class="ch.qos.logback.core.ConsoleAppender">
    <encoder>
    <pattern>%d{HH:mm:ss.SSS} %contextName [%thread] %-5level %logger{36} - %msg%n</pattern>
    </encoder>
    </appender>

    <appender name="file" class="ch.qos.logback.core.rolling.RollingFileAppender">
    <file>${log.path}</file>
    <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
    <fileNamePattern>${log.path}.%d{yyyy-MM-dd}.zip</fileNamePattern>
    </rollingPolicy>
    <encoder>
    <pattern>%date %level [%thread] %logger{36} [%file : %line] %msg%n
    </pattern>
    </encoder>
    </appender>

    <root level="info">
    <appender-ref ref="console"/>
    <appender-ref ref="file"/>
    </root>

    </configuration>
  • 创建config配置文件

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    @Configuration
    public class XxlJobConfig {
    private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);

    @Value("${xxl.job.admin.addresses}")
    private String adminAddresses;

    @Value("${xxl.job.accessToken}")
    private String accessToken;

    @Value("${xxl.job.executor.appname}")
    private String appname;

    @Value("${xxl.job.executor.address}")
    private String address;

    @Value("${xxl.job.executor.ip}")
    private String ip;

    @Value("${xxl.job.executor.port}")
    private int port;

    @Value("${xxl.job.executor.logpath}")
    private String logPath;

    @Value("${xxl.job.executor.logretentiondays}")
    private int logRetentionDays;


    @Bean
    public XxlJobSpringExecutor xxlJobExecutor() {
    logger.info(">>>>>>>>>>> xxl-job config init.");
    XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
    xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
    xxlJobSpringExecutor.setAppname(appname);
    xxlJobSpringExecutor.setAddress(address);
    xxlJobSpringExecutor.setIp(ip);
    xxlJobSpringExecutor.setPort(port);
    xxlJobSpringExecutor.setAccessToken(accessToken);
    xxlJobSpringExecutor.setLogPath(logPath);
    xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);

    return xxlJobSpringExecutor;
    }

    /**
    * 针对多网卡、容器内部署等情况,可借助 "spring-cloud-commons" 提供的 "InetUtils" 组件灵活定制注册IP;
    *
    * 1、引入依赖:
    * <dependency>
    * <groupId>org.springframework.cloud</groupId>
    * <artifactId>spring-cloud-commons</artifactId>
    * <version>${version}</version>
    * </dependency>
    *
    * 2、配置文件,或者容器启动变量
    * spring.cloud.inetutils.preferred-networks: 'xxx.xxx.xxx.'
    *
    * 3、获取IP
    * String ip_ = inetUtils.findFirstNonLoopbackHostInfo().getIpAddress();
    */


    }
  • 编写jobhandler任务

    1
    2
    3
    4
    5
    6
    7
    8
    @Component
    public class SampleXxlJob {

    @XxlJob("demoJobHandler1")
    public void demo1() {
    System.out.println("==============");
    }
    }
  • 新建执行器

  • 新建任务

  • 启动任务

  • 查看日志

6. 执行器管理

  • Appname:是每一个执行器的唯一表示AppName,执行器会以周期性为appname进行注册,为任务调度的时候使用
  • 名称:执行器的名称,因为appname有限制字母与数字等等组成,可读性不强,这个名称就是为了提高执行器的可读性
  • 注册方式:调度中心获取执行器地址的方式
    • 自动注册:执行器自动进行执行器的注册,通过底层的注册表可以动态的发现执行器机器的地址
    • 手动录入:人工手动录入执行器的地址信息,多地址使用逗号进行分割,供调度中心使用
  • 机器地址:“注册方式”为手动录入的时候才能使用,支持人工维护执行器的地址
  • 点击保存后可能要等30S左右才回显示机器的地址

7. 任务管理

  • 示例执行器:所用到的执行器
  • 任务描述:概述该任务是做什么的
  • 路由策略:
    • 第一个:选择第一个机器
    • 最后一个:选择最后一个机器
    • 轮询:依次选择执行
    • 随机:随机选择在线的机器
    • 一致性HASH:每个任务按照Hash算法固定选择某一台机器,并且所有的任务均匀散列在不同的机器上
    • 最不经常使用:使用频率最低的机器优先被使用
    • 最近最久未使用:最久未使用的机器优先被选举
    • 故障转移:按照顺序依次进行心跳检测,第一个心跳检测成功的机器选定为目标的执行器并且会发起任务调度
    • 忙碌转移:按照顺序来依次进行空闲检测,第一个空闲检测成功的机器会被选定为目标群机器,并且会发起任务调度
    • 分片广播:广播触发对于集群中的所有机器执行任务,同时会系统会自动传递分片的参数
  • Cron:执行规则
  • 调度过期策略:调度中心错过调度时间的补偿处理策略,包括:忽略、立即补偿触发一次等
  • JobHandler:定义执行器的名字
  • 阻塞处理策略:
    • 单机串行:新的调度任务在进入到执行器之后,该调度任务进入FIFO队列,并以串行的方式去进行
    • 丢弃后续调度:新的调度任务在进入到执行器之后,如果存在相同的且正在运行的调度任务,本次的调度任务请求就会被丢弃掉,并且标记为失败
    • 覆盖之前的调度:新的调度任务在进入到执行器之后,如果存在相同的且正在运行的调度任务,就会终止掉当前正在运行的调度任务,并且清空队列,运行新的调度任务。
  • 子任务ID:输入子任务的任务id,可填写多个
  • 任务超时时间:添加任务超时的时候,单位s,设置时间大于0的时候就会生效
  • 失败重试次数:设置失败重试的次数,设置时间大于0的时候就会生效
  • 负责人:填写该任务调度的负责人
  • 报警邮件:出现报警,则发送邮件

8. 参数传递和日志及执行结果

  • 通过api方法获取参数

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    @Component
    @Slf4j
    public class SampleXxlJob {

    @XxlJob("demoJobHandler1")
    public void demo1() {
    //获取参数
    String jobParam = XxlJobHelper.getJobParam();
    log.info("获取到参数:{}", jobParam);
    //打印日志
    XxlJobHelper.log("hello xxl-job");
    //执行结果成功
    // XxlJobHelper.handleSuccess("执行成功!!!");
    //执行失败
    XxlJobHelper.handleFail("任务执行失败!!!");
    }
    }
  • 执行结果

    • 可以查看执行日志和结果日志

9. 分片任务

  • 执行器集群部署,如果任务的路由策略选择【分片广播】,一次任务调度将会【广播触发】对应集群中所有执行器执行一次任务,同时系统自动传递分片参数,执行器可根据分片参数开发分片任务

  • 需要处理的海量数据,以执行器为划分,每个执行器分配一定的任务数,并行执行

  • XXL-Job支持动态扩容执行器集群,从而动态增加分片数量,到达更快处理任务

  • 分片的值是调度中心分配的

  • 编写代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    @XxlJob("shardingJobHandler")
    public void shardingJobHandler() {

    // 分片参数
    int shardIndex = XxlJobHelper.getShardIndex();
    int shardTotal = XxlJobHelper.getShardTotal();

    XxlJobHelper.log("分片参数:当前分片序号 = {}, 总分片数 = {}", shardIndex, shardTotal);

    // 业务逻辑
    for (int i = 0; i < shardTotal; i++) {
    if (i == shardIndex) {
    XxlJobHelper.log("第 {} 片, 命中分片开始处理", i);
    } else {
    XxlJobHelper.log("第 {} 片, 忽略", i);
    }
    }

    }
  • 任务配置