分布式任务调度
任务调度架构
核心概念
任务(Job):
- 需要定时执行的业务逻辑
- 可以是简单任务或复杂工作流
调度器(Scheduler):
- 负责触发任务执行
- 支持 Cron 表达式配置
执行器(Executor):
- 实际执行任务的节点
- 支持集群部署
任务分片(Sharding):
- 将大任务拆分为多个小任务
- 分布式并行执行
架构设计
┌─────────────────────────────────────────────┐
│ 调度中心 (Scheduler) │
│ ┌─────────────┐ ┌─────────────┐ │
│ │ 任务管理 │ │ 调度引擎 │ │
│ └─────────────┘ └─────────────┘ │
└───────────────────┬─────────────────────────┘
│
┌───────────┼───────────┐
│ │ │
▼ ▼ ▼
┌───────────┐ ┌───────────┐ ┌───────────┐
│ 执行器 1 │ │ 执行器 2 │ │ 执行器 3 │
│ Instance 1│ │ Instance 2│ │ Instance 3│
└───────────┘ └───────────┘ └───────────┘
XXL-JOB
1. 快速开始
添加依赖:
<dependency>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-job-core</artifactId>
<version>2.4.0</version>
</dependency>
配置调度器:
xxl:
job:
admin:
addresses: http://localhost:8080/xxl-job-admin
executor:
appname: xxl-job-executor
ip:
port: 9999
logpath: /data/applogs/xxl-job/jobhandler
logretentiondays: 30
实现任务处理器:
@XxlJob("demoJobHandler")
public void demoJobHandler() throws Exception {
XxlJobHelper.log("XXL-JOB, Hello World.");
for (int i = 0; i < 5; i++) {
XxlJobHelper.log("beat at:" + i);
TimeUnit.SECONDS.sleep(2);
}
}
2. 任务分片
@XxlJob("shardingJobHandler")
public void shardingJobHandler() throws Exception {
// 分片参数
int shardIndex = XxlJobHelper.getShardIndex();
int shardTotal = XxlJobHelper.getShardTotal();
XxlJobHelper.log("分片参数:index={}, total={}", shardIndex, shardTotal);
// 根据分片参数处理数据
List<User> users = getUserList();
for (int i = 0; i < users.size(); i++) {
if (i % shardTotal == shardIndex) {
processUser(users.get(i));
}
}
}
3. 任务配置
Cron 表达式:
0 0 2 * * ? # 每天凌晨 2 点执行
0 0/5 * * * ? # 每 5 分钟执行一次
0 0 1 1 * ? # 每月 1 号凌晨 1 点执行
任务参数:
- 运行模式:BEAN/GLUE
- JobHandler:执行器 Bean 名称
- 执行策略:单机/轮询/分片广播
- 故障转移:自动切换到可用节点
- 阻塞处理:单机串行/丢弃后续调度/覆盖之前调度
4. 任务管理
手动触发:
- 在管理控制台手动执行任务
- 用于测试和应急处理
暂停/恢复:
- 暂停任务执行
- 恢复任务执行
任务监控:
- 查看任务执行日志
- 监控任务执行状态
- 分析任务执行时间
Elastic-Job
1. 快速开始
添加依赖:
<dependency>
<groupId>org.apache.shardingsphere.elasticjob</groupId>
<artifactId>elasticjob-lite-spring-boot-starter</artifactId>
<version>3.0.1</version>
</dependency>
<dependency>
<groupId>org.apache.shardingsphere.elasticjob</groupId>
<artifactId>elasticjob-error-handler-email</artifactId>
<version>3.0.1</version>
</dependency>
配置 Zookeeper:
elasticjob:
reg-center:
server-lists: localhost:2181
namespace: elasticjob-lite-springboot
jobs:
simpleJob:
elasticJobClass: com.example.job.SimpleJob
cron: 0/5 * * * * ?
sharding-total-count: 3
sharding-item-parameters: 0=A,1=B,2=C
实现任务:
public class SimpleJob implements SimpleJob {
@Override
public void execute(ShardingContext context) {
System.out.println(String.format(
"任务名称:%s, 分片项:%d, 分片参数:%s",
context.getJobName(),
context.getShardingItem(),
context.getShardingParameter()
));
// 业务逻辑
doWork(context.getShardingItem());
}
private void doWork(int shardingItem) {
// 根据分片项处理数据
}
}
2. 分片策略
简单分片:
public class ShardingJob implements SimpleJob {
@Override
public void execute(ShardingContext context) {
int currentShardingItem = context.getShardingItem();
int shardingTotalCount = context.getShardingTotalCount();
// 获取所有数据
List<Data> allData = getAllData();
// 分片处理
for (int i = 0; i < allData.size(); i++) {
if (i % shardingTotalCount == currentShardingItem) {
processData(allData.get(i));
}
}
}
}
数据分片:
public class DataShardingJob implements DataflowJob<User> {
@Override
public List<User> fetchData(ShardingContext context) {
// 获取待处理数据
int shardingItem = context.getShardingItem();
int shardingTotalCount = context.getShardingTotalCount();
return userRepository.findPendingData(
shardingItem, shardingTotalCount
);
}
@Override
public void processData(ShardingContext context, List<User> data) {
// 处理数据
for (User user : data) {
processUser(user);
}
}
}
3. 任务监听
作业状态监听:
@Component
public class JobStatusListener implements ElasticJobStatusListener {
@Override
public void onLoad(ShardingContexts shardingContexts) {
log.info("任务加载:{}", shardingContexts.getJobName());
}
@Override
public void beforeJobExecuted(ShardingContexts shardingContexts) {
log.info("任务执行前:{}", shardingContexts.getJobName());
}
@Override
public void afterJobExecuted(ShardingContexts shardingContexts) {
log.info("任务执行后:{}", shardingContexts.getJobName());
}
}
作业异常监听:
@Component
public class JobExceptionListener implements ElasticJobExceptionListener {
@Override
public void onJobException(String jobName, Throwable cause) {
log.error("任务执行异常:{}", jobName, cause);
// 发送告警
alertService.sendAlert(
String.format("任务 %s 执行异常", jobName)
);
}
}
PowerJob
1. 快速开始
添加依赖:
<dependency>
<groupId>tech.powerjob</groupId>
<artifactId>powerjob-worker-spring-boot-starter</artifactId>
<version>4.3.0</version>
</dependency>
配置 PowerJob:
powerjob:
worker:
enabled: true
akka-port: 10086
server-address: localhost:7700
app-name: powerjob-worker
network-interface:
store-strategy: disk
实现任务:
@Component
public class DemoJob implements BasicJob {
@Override
public void process(BasicJobContext context) {
log.info("PowerJob 任务执行");
// 业务逻辑
doWork();
// 更新任务状态
context.setSuccess("任务执行成功");
}
}
2. 任务类型
BasicJob:
@Component
public class SimpleJob implements BasicJob {
@Override
public void process(BasicJobContext context) {
// 简单任务逻辑
}
}
MapJob:
@Component
public class MapJob implements MapJob {
@Override
public List<MapTask> process(MapJobContext context) {
// 生成子任务
List<MapTask> tasks = new ArrayList<>();
for (int i = 0; i < 100; i++) {
tasks.add(new MapTask(String.valueOf(i)));
}
return tasks;
}
}
MapReduceJob:
@Component
public class MapReduceJob implements MapReduceJob<Long, Void> {
@Override
public List<Long> map(MapJobContext context) {
// Map 阶段
return generateTasks();
}
@Override
public Void reduce(ReduceJobContext<Long> context) {
// Reduce 阶段
List<Long> results = context.getSubTaskResults();
processResults(results);
return null;
}
}
3. 任务编排
工作流配置:
@Configuration
public class WorkflowConfig {
@Bean
public WorkflowDefinition workflowDefinition() {
WorkflowDefinition definition = new WorkflowDefinition();
definition.setName("orderWorkflow");
definition.setNodes(Arrays.asList(
createNode("createOrder", "创建订单"),
createNode("payOrder", "支付订单"),
createNode("shipOrder", "发货")
));
// 配置依赖关系
definition.addDependency("payOrder", "createOrder");
definition.addDependency("shipOrder", "payOrder");
return definition;
}
private WorkflowNode createNode(String name, String displayName) {
WorkflowNode node = new WorkflowNode();
node.setName(name);
node.setDisplayName(displayName);
node.setType(WorkflowNodeType.JOB);
return node;
}
}
任务调度最佳实践
1. 任务设计
单一职责:
- 每个任务只做一件事
- 避免任务过于复杂
- 便于维护和监控
幂等性:
- 任务可重复执行
- 避免重复处理
- 支持故障恢复
可中断性:
- 支持任务取消
- 支持任务暂停
- 支持任务恢复
2. 性能优化
分片策略:
@XxlJob("dataProcessJob")
public void dataProcessJob() {
int shardIndex = XxlJobHelper.getShardIndex();
int shardTotal = XxlJobHelper.getShardTotal();
// 根据分片处理数据
int pageSize = 1000;
int pageNum = 0;
while (true) {
List<Data> dataList = dataMapper.selectPage(
pageNum, pageSize, shardIndex, shardTotal
);
if (dataList.isEmpty()) {
break;
}
processData(dataList);
pageNum++;
}
}
批量处理:
@XxlJob("batchJob")
public void batchJob() {
int batchSize = 100;
List<Long> ids = getPendingIds();
for (int i = 0; i < ids.size(); i += batchSize) {
List<Long> batch = ids.subList(
i, Math.min(i + batchSize, ids.size())
);
processBatch(batch);
}
}
异步处理:
@XxlJob("asyncJob")
public void asyncJob() {
List<Task> tasks = getTasks();
// 异步执行
List<CompletableFuture<Void>> futures = tasks.stream()
.map(task -> CompletableFuture.runAsync(() -> {
processTask(task);
}))
.collect(Collectors.toList());
// 等待所有任务完成
CompletableFuture.allOf(
futures.toArray(new CompletableFuture[0])
).join();
}
3. 容错处理
重试机制:
@XxlJob("retryJob")
public void retryJob() {
int maxRetries = 3;
int retryCount = 0;
while (retryCount < maxRetries) {
try {
doWork();
break;
} catch (Exception e) {
retryCount++;
if (retryCount >= maxRetries) {
throw e;
}
// 等待后重试
Thread.sleep(1000 * retryCount);
}
}
}
故障转移:
xxl:
job:
executor:
# 故障转移
failover: true
# 自动注册
auto-register: true
降级处理:
@XxlJob("degradationJob")
public void degradationJob() {
try {
doWork();
} catch (Exception e) {
log.error("任务执行失败", e);
// 降级处理
doDegradationWork();
}
}
4. 监控告警
任务监控:
@Component
public class JobMonitor {
@Autowired
private MeterRegistry meterRegistry;
@Scheduled(fixedRate = 60000)
public void monitorJobs() {
// 统计任务执行次数
Counter counter = meterRegistry.counter("job.execution.count");
// 统计任务执行时间
Timer timer = meterRegistry.timer("job.execution.time");
// 统计任务失败次数
Counter failCounter = meterRegistry.counter("job.execution.fail");
}
}
告警配置:
@Component
public class JobAlertHandler {
@Autowired
private AlertService alertService;
@EventListener
public void onJobFail(JobFailEvent event) {
alertService.sendAlert(
String.format("任务 %s 执行失败:%s",
event.getJobName(),
event.getErrorMessage())
);
}
@EventListener
public void onJobTimeout(JobTimeoutEvent event) {
alertService.sendAlert(
String.format("任务 %s 执行超时:%d 秒",
event.getJobName(),
event.getTimeout())
);
}
}
常见问题
1. 任务丢失
问题:任务未执行
解决方案:
- 检查调度器配置
- 检查执行器注册
- 查看调度日志
2. 任务重复执行
问题:任务被多次执行
解决方案:
- 实现幂等性
- 配置阻塞处理策略
- 使用分布式锁
3. 任务性能问题
问题:任务执行慢
解决方案:
- 优化任务逻辑
- 增加分片数量
- 调整线程池配置
总结
分布式任务调度是微服务架构中的重要组件,支持定时任务的分布式执行、任务分片、故障转移等功能。
XXL-JOB、Elastic-Job、PowerJob 是主流的分布式任务调度框架,各有特点,可根据业务场景选择。
在生产环境中,建议建立完善的任务监控和告警机制,确保任务可靠执行。