Skip to content
清晨的一缕阳光
返回

分布式任务调度

分布式任务调度

任务调度架构

核心概念

任务(Job)

调度器(Scheduler)

执行器(Executor)

任务分片(Sharding)

架构设计

┌─────────────────────────────────────────────┐
│            调度中心 (Scheduler)              │
│  ┌─────────────┐  ┌─────────────┐          │
│  │  任务管理   │  │  调度引擎   │          │
│  └─────────────┘  └─────────────┘          │
└───────────────────┬─────────────────────────┘

        ┌───────────┼───────────┐
        │           │           │
        ▼           ▼           ▼
┌───────────┐ ┌───────────┐ ┌───────────┐
│ 执行器 1   │ │ 执行器 2   │ │ 执行器 3   │
│ Instance 1│ │ Instance 2│ │ Instance 3│
└───────────┘ └───────────┘ └───────────┘

XXL-JOB

1. 快速开始

添加依赖

<dependency>
    <groupId>com.xuxueli</groupId>
    <artifactId>xxl-job-core</artifactId>
    <version>2.4.0</version>
</dependency>

配置调度器

xxl:
  job:
    admin:
      addresses: http://localhost:8080/xxl-job-admin
    executor:
      appname: xxl-job-executor
      ip:
      port: 9999
      logpath: /data/applogs/xxl-job/jobhandler
      logretentiondays: 30

实现任务处理器

@XxlJob("demoJobHandler")
public void demoJobHandler() throws Exception {
    XxlJobHelper.log("XXL-JOB, Hello World.");
    
    for (int i = 0; i < 5; i++) {
        XxlJobHelper.log("beat at:" + i);
        TimeUnit.SECONDS.sleep(2);
    }
}

2. 任务分片

@XxlJob("shardingJobHandler")
public void shardingJobHandler() throws Exception {
    // 分片参数
    int shardIndex = XxlJobHelper.getShardIndex();
    int shardTotal = XxlJobHelper.getShardTotal();
    
    XxlJobHelper.log("分片参数:index={}, total={}", shardIndex, shardTotal);
    
    // 根据分片参数处理数据
    List<User> users = getUserList();
    for (int i = 0; i < users.size(); i++) {
        if (i % shardTotal == shardIndex) {
            processUser(users.get(i));
        }
    }
}

3. 任务配置

Cron 表达式

0 0 2 * * ?  # 每天凌晨 2 点执行
0 0/5 * * * ?  # 每 5 分钟执行一次
0 0 1 1 * ?  # 每月 1 号凌晨 1 点执行

任务参数

4. 任务管理

手动触发

暂停/恢复

任务监控

Elastic-Job

1. 快速开始

添加依赖

<dependency>
    <groupId>org.apache.shardingsphere.elasticjob</groupId>
    <artifactId>elasticjob-lite-spring-boot-starter</artifactId>
    <version>3.0.1</version>
</dependency>

<dependency>
    <groupId>org.apache.shardingsphere.elasticjob</groupId>
    <artifactId>elasticjob-error-handler-email</artifactId>
    <version>3.0.1</version>
</dependency>

配置 Zookeeper

elasticjob:
  reg-center:
    server-lists: localhost:2181
    namespace: elasticjob-lite-springboot
  jobs:
    simpleJob:
      elasticJobClass: com.example.job.SimpleJob
      cron: 0/5 * * * * ?
      sharding-total-count: 3
      sharding-item-parameters: 0=A,1=B,2=C

实现任务

public class SimpleJob implements SimpleJob {
    
    @Override
    public void execute(ShardingContext context) {
        System.out.println(String.format(
            "任务名称:%s, 分片项:%d, 分片参数:%s",
            context.getJobName(),
            context.getShardingItem(),
            context.getShardingParameter()
        ));
        
        // 业务逻辑
        doWork(context.getShardingItem());
    }
    
    private void doWork(int shardingItem) {
        // 根据分片项处理数据
    }
}

2. 分片策略

简单分片

public class ShardingJob implements SimpleJob {
    
    @Override
    public void execute(ShardingContext context) {
        int currentShardingItem = context.getShardingItem();
        int shardingTotalCount = context.getShardingTotalCount();
        
        // 获取所有数据
        List<Data> allData = getAllData();
        
        // 分片处理
        for (int i = 0; i < allData.size(); i++) {
            if (i % shardingTotalCount == currentShardingItem) {
                processData(allData.get(i));
            }
        }
    }
}

数据分片

public class DataShardingJob implements DataflowJob<User> {
    
    @Override
    public List<User> fetchData(ShardingContext context) {
        // 获取待处理数据
        int shardingItem = context.getShardingItem();
        int shardingTotalCount = context.getShardingTotalCount();
        
        return userRepository.findPendingData(
            shardingItem, shardingTotalCount
        );
    }
    
    @Override
    public void processData(ShardingContext context, List<User> data) {
        // 处理数据
        for (User user : data) {
            processUser(user);
        }
    }
}

3. 任务监听

作业状态监听

@Component
public class JobStatusListener implements ElasticJobStatusListener {
    
    @Override
    public void onLoad(ShardingContexts shardingContexts) {
        log.info("任务加载:{}", shardingContexts.getJobName());
    }
    
    @Override
    public void beforeJobExecuted(ShardingContexts shardingContexts) {
        log.info("任务执行前:{}", shardingContexts.getJobName());
    }
    
    @Override
    public void afterJobExecuted(ShardingContexts shardingContexts) {
        log.info("任务执行后:{}", shardingContexts.getJobName());
    }
}

作业异常监听

@Component
public class JobExceptionListener implements ElasticJobExceptionListener {
    
    @Override
    public void onJobException(String jobName, Throwable cause) {
        log.error("任务执行异常:{}", jobName, cause);
        
        // 发送告警
        alertService.sendAlert(
            String.format("任务 %s 执行异常", jobName)
        );
    }
}

PowerJob

1. 快速开始

添加依赖

<dependency>
    <groupId>tech.powerjob</groupId>
    <artifactId>powerjob-worker-spring-boot-starter</artifactId>
    <version>4.3.0</version>
</dependency>

配置 PowerJob

powerjob:
  worker:
    enabled: true
    akka-port: 10086
    server-address: localhost:7700
    app-name: powerjob-worker
    network-interface:
    store-strategy: disk

实现任务

@Component
public class DemoJob implements BasicJob {
    
    @Override
    public void process(BasicJobContext context) {
        log.info("PowerJob 任务执行");
        
        // 业务逻辑
        doWork();
        
        // 更新任务状态
        context.setSuccess("任务执行成功");
    }
}

2. 任务类型

BasicJob

@Component
public class SimpleJob implements BasicJob {
    @Override
    public void process(BasicJobContext context) {
        // 简单任务逻辑
    }
}

MapJob

@Component
public class MapJob implements MapJob {
    
    @Override
    public List<MapTask> process(MapJobContext context) {
        // 生成子任务
        List<MapTask> tasks = new ArrayList<>();
        for (int i = 0; i < 100; i++) {
            tasks.add(new MapTask(String.valueOf(i)));
        }
        return tasks;
    }
}

MapReduceJob

@Component
public class MapReduceJob implements MapReduceJob<Long, Void> {
    
    @Override
    public List<Long> map(MapJobContext context) {
        // Map 阶段
        return generateTasks();
    }
    
    @Override
    public Void reduce(ReduceJobContext<Long> context) {
        // Reduce 阶段
        List<Long> results = context.getSubTaskResults();
        processResults(results);
        return null;
    }
}

3. 任务编排

工作流配置

@Configuration
public class WorkflowConfig {
    
    @Bean
    public WorkflowDefinition workflowDefinition() {
        WorkflowDefinition definition = new WorkflowDefinition();
        definition.setName("orderWorkflow");
        definition.setNodes(Arrays.asList(
            createNode("createOrder", "创建订单"),
            createNode("payOrder", "支付订单"),
            createNode("shipOrder", "发货")
        ));
        
        // 配置依赖关系
        definition.addDependency("payOrder", "createOrder");
        definition.addDependency("shipOrder", "payOrder");
        
        return definition;
    }
    
    private WorkflowNode createNode(String name, String displayName) {
        WorkflowNode node = new WorkflowNode();
        node.setName(name);
        node.setDisplayName(displayName);
        node.setType(WorkflowNodeType.JOB);
        return node;
    }
}

任务调度最佳实践

1. 任务设计

单一职责

幂等性

可中断性

2. 性能优化

分片策略

@XxlJob("dataProcessJob")
public void dataProcessJob() {
    int shardIndex = XxlJobHelper.getShardIndex();
    int shardTotal = XxlJobHelper.getShardTotal();
    
    // 根据分片处理数据
    int pageSize = 1000;
    int pageNum = 0;
    
    while (true) {
        List<Data> dataList = dataMapper.selectPage(
            pageNum, pageSize, shardIndex, shardTotal
        );
        
        if (dataList.isEmpty()) {
            break;
        }
        
        processData(dataList);
        pageNum++;
    }
}

批量处理

@XxlJob("batchJob")
public void batchJob() {
    int batchSize = 100;
    List<Long> ids = getPendingIds();
    
    for (int i = 0; i < ids.size(); i += batchSize) {
        List<Long> batch = ids.subList(
            i, Math.min(i + batchSize, ids.size())
        );
        processBatch(batch);
    }
}

异步处理

@XxlJob("asyncJob")
public void asyncJob() {
    List<Task> tasks = getTasks();
    
    // 异步执行
    List<CompletableFuture<Void>> futures = tasks.stream()
        .map(task -> CompletableFuture.runAsync(() -> {
            processTask(task);
        }))
        .collect(Collectors.toList());
    
    // 等待所有任务完成
    CompletableFuture.allOf(
        futures.toArray(new CompletableFuture[0])
    ).join();
}

3. 容错处理

重试机制

@XxlJob("retryJob")
public void retryJob() {
    int maxRetries = 3;
    int retryCount = 0;
    
    while (retryCount < maxRetries) {
        try {
            doWork();
            break;
        } catch (Exception e) {
            retryCount++;
            if (retryCount >= maxRetries) {
                throw e;
            }
            
            // 等待后重试
            Thread.sleep(1000 * retryCount);
        }
    }
}

故障转移

xxl:
  job:
    executor:
      # 故障转移
      failover: true
      # 自动注册
      auto-register: true

降级处理

@XxlJob("degradationJob")
public void degradationJob() {
    try {
        doWork();
    } catch (Exception e) {
        log.error("任务执行失败", e);
        
        // 降级处理
        doDegradationWork();
    }
}

4. 监控告警

任务监控

@Component
public class JobMonitor {
    
    @Autowired
    private MeterRegistry meterRegistry;
    
    @Scheduled(fixedRate = 60000)
    public void monitorJobs() {
        // 统计任务执行次数
        Counter counter = meterRegistry.counter("job.execution.count");
        
        // 统计任务执行时间
        Timer timer = meterRegistry.timer("job.execution.time");
        
        // 统计任务失败次数
        Counter failCounter = meterRegistry.counter("job.execution.fail");
    }
}

告警配置

@Component
public class JobAlertHandler {
    
    @Autowired
    private AlertService alertService;
    
    @EventListener
    public void onJobFail(JobFailEvent event) {
        alertService.sendAlert(
            String.format("任务 %s 执行失败:%s",
                event.getJobName(),
                event.getErrorMessage())
        );
    }
    
    @EventListener
    public void onJobTimeout(JobTimeoutEvent event) {
        alertService.sendAlert(
            String.format("任务 %s 执行超时:%d 秒",
                event.getJobName(),
                event.getTimeout())
        );
    }
}

常见问题

1. 任务丢失

问题:任务未执行

解决方案

2. 任务重复执行

问题:任务被多次执行

解决方案

3. 任务性能问题

问题:任务执行慢

解决方案

总结

分布式任务调度是微服务架构中的重要组件,支持定时任务的分布式执行、任务分片、故障转移等功能。

XXL-JOB、Elastic-Job、PowerJob 是主流的分布式任务调度框架,各有特点,可根据业务场景选择。

在生产环境中,建议建立完善的任务监控和告警机制,确保任务可靠执行。


分享这篇文章到:

上一篇文章
Redis 核心配置详解
下一篇文章
Go 代码规范与测试