Skip to content
清晨的一缕阳光
返回

Spring Boot 分布式定时任务

前言

定时任务是应用中的常见需求。Spring Boot 提供了@Scheduled 注解支持定时任务,但在分布式环境下需要考虑任务重复执行问题。本文将介绍 Spring Boot 定时任务的完整方案。

基础使用

1. 启用定时任务

@SpringBootApplication
@EnableScheduling
public class DemoApplication {
    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }
}

2. 创建定时任务

@Component
public class ScheduledTasks {
    
    /**
     * 固定间隔执行
     */
    @Scheduled(fixedRate = 5000) // 每 5 秒
    public void fixedRateTask() {
        log.info("固定间隔任务执行:{}", LocalDateTime.now());
    }
    
    /**
     * 固定延迟执行
     */
    @Scheduled(fixedDelay = 5000) // 上次执行完成后 5 秒
    public void fixedDelayTask() {
        log.info("固定延迟任务执行:{}", LocalDateTime.now());
        
        // 模拟耗时操作
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
    
    /**
     * 初始延迟
     */
    @Scheduled(initialDelay = 10000, fixedRate = 5000)
    public void initialDelayTask() {
        log.info("初始延迟任务执行:{}", LocalDateTime.now());
    }
    
    /**
     * Cron 表达式
     */
    @Scheduled(cron = "0 0 12 * * ?") // 每天 12 点执行
    public void cronTask() {
        log.info("Cron 任务执行:{}", LocalDateTime.now());
    }
    
    /**
     * 从配置读取 Cron
     */
    @Scheduled(cron = "${task.cron.expression}")
    public void configCronTask() {
        log.info("配置 Cron 任务执行:{}", LocalDateTime.now());
    }
}

3. Cron 表达式

秒  分  时  日  月  周  年
*   *   *   *   *   *   *

常用示例:
0 0 12 * * ?      - 每天 12 点
0 15 10 ? * MON   - 每周一 10:15
0 0/5 14 * * ?    - 每天 14 点每 5 分钟
0 0-5 14 * * ?    - 每天 14 点到 14 点 05 分
0 0 14 ? * WED    - 每周三 14 点
0 0 12 1/1 * ?    - 每月 1 日 12 点
0 15 10 ? * MON-FRI - 工作日 10:15

4. 异步执行

@Configuration
@EnableScheduling
@EnableAsync
public class ScheduleConfig {
    
    @Bean(name = "taskScheduler")
    public TaskScheduler taskScheduler() {
        ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
        scheduler.setPoolSize(10);
        scheduler.setThreadNamePrefix("scheduled-task-");
        scheduler.setWaitForTasksToCompleteOnShutdown(true);
        scheduler.setAwaitTerminationSeconds(60);
        return scheduler;
    }
}
@Component
public class AsyncScheduledTasks {
    
    /**
     * 异步执行定时任务
     */
    @Async
    @Scheduled(fixedRate = 5000)
    public void asyncTask() {
        log.info("异步任务执行:{}", Thread.currentThread().getName());
        
        // 耗时操作
        processTask();
    }
}

分布式锁

1. Redis 分布式锁

@Component
public class DistributedScheduledTasks {
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    /**
     * 使用 Redis 锁防止重复执行
     */
    @Scheduled(fixedRate = 5000)
    public void distributedTask() {
        String lockKey = "lock:distributed-task";
        String lockValue = UUID.randomUUID().toString();
        
        // 尝试获取锁
        Boolean acquired = redisTemplate.opsForValue()
            .setIfAbsent(lockKey, lockValue, 10, TimeUnit.SECONDS);
        
        if (Boolean.TRUE.equals(acquired)) {
            try {
                log.info("执行分布式任务:{}", LocalDateTime.now());
                
                // 业务逻辑
                processTask();
                
            } finally {
                // 释放锁(使用 Lua 脚本保证原子性)
                String script = """
                    if redis.call("get", KEYS[1]) == ARGV[1] then
                        return redis.call("del", KEYS[1])
                    else
                        return 0
                    end
                    """;
                
                redisTemplate.execute(
                    new DefaultRedisScript<>(script, Long.class),
                    Collections.singletonList(lockKey),
                    lockValue
                );
            }
        } else {
            log.debug("未获取到锁,跳过执行");
        }
    }
}

2. Redisson 分布式锁

@Component
public class RedissonScheduledTasks {
    
    @Autowired
    private RedissonClient redissonClient;
    
    /**
     * 使用 Redisson 锁
     */
    @Scheduled(fixedRate = 5000)
    public void redissonTask() {
        RLock lock = redissonClient.getLock("lock:redisson-task");
        
        boolean locked = false;
        try {
            locked = lock.tryLock(0, 10, TimeUnit.SECONDS);
            
            if (locked) {
                log.info("执行 Redisson 任务:{}", LocalDateTime.now());
                
                // 业务逻辑
                processTask();
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } finally {
            if (locked && lock.isHeldByCurrentThread()) {
                lock.unlock();
            }
        }
    }
}

3. ShedLock

<dependency>
    <groupId>net.javacrumbs.shedlock</groupId>
    <artifactId>shedlock-spring</artifactId>
    <version>5.10.0</version>
</dependency>
<dependency>
    <groupId>net.javacrumbs.shedlock</groupId>
    <artifactId>shedlock-provider-jdbc-template</artifactId>
    <version>5.10.0</version>
</dependency>
@Configuration
@EnableSchedulerLock(defaultLockAtMostFor = "10m")
public class ShedLockConfig {
    
    @Bean
    public LockProvider lockProvider(DataSource dataSource) {
        return new JdbcTemplateLockProvider(dataSource);
    }
}
@Component
public class ShedLockScheduledTasks {
    
    /**
     * 使用 ShedLock
     */
    @Scheduled(fixedRate = 5000)
    @SchedulerLock(name = "shedlockTask", 
                   lockAtMostFor = "10s", 
                   lockAtLeastFor = "5s")
    public void shedlockTask() {
        log.info("执行 ShedLock 任务:{}", LocalDateTime.now());
        
        // 业务逻辑
        processTask();
    }
}

XXL-Job

1. 添加依赖

<dependency>
    <groupId>com.xuxueli</groupId>
    <artifactId>xxl-job-core</artifactId>
    <version>2.4.0</version>
</dependency>

2. 配置

xxl:
  job:
    admin:
      addresses: http://localhost:8080/xxl-job-admin
    executor:
      appname: demo-executor
      port: 9999
      logpath: /data/applogs/xxl-job/jobhandler
      logretentiondays: 30

3. 创建任务

@Component
public class XxlJobTasks {
    
    /**
     * 简单任务
     */
    @XxlJob("demoJobHandler")
    public void demoJobHandler() throws Exception {
        log.info("XXL-JOB 简单任务执行");
        
        // 业务逻辑
        processTask();
    }
    
    /**
     * 分片广播任务
     */
    @XxlJob("shardingJobHandler")
    public void shardingJobHandler() throws Exception {
        // 获取分片参数
        int shardIndex = XxlJobHelper.getShardIndex();
        int shardTotal = XxlJobHelper.getShardTotal();
        
        log.info("分片参数:index={}, total={}", shardIndex, shardTotal);
        
        // 根据分片索引处理数据
        List<Order> orders = orderRepository.findOrdersByShard(shardIndex, shardTotal);
        
        for (Order order : orders) {
            processOrder(order);
            
            // 记录执行日志
            XxlJobHelper.log("处理订单:{}", order.getId());
        }
    }
    
    /**
     * 带参数任务
     */
    @XxlJob("paramJobHandler")
    public void paramJobHandler() throws Exception {
        // 获取任务参数
        String jobParam = XxlJobHelper.getJobParam();
        
        log.info("任务参数:{}", jobParam);
        
        // 解析参数并执行
        JobParams params = JSON.parseObject(jobParam, JobParams.class);
        processWithParams(params);
    }
    
    /**
     * 任务生命周期
     */
    @XxlJob(value = "lifecycleJobHandler", init = "init", destroy = "destroy")
    public void lifecycleJobHandler() throws Exception {
        log.info("任务执行");
        processTask();
    }
    
    public void init() {
        log.info("任务初始化");
    }
    
    public void destroy() {
        log.info("任务销毁");
    }
}

4. 任务管理

访问 XXL-Job 管理平台:

ElasticJob

1. 添加依赖

<dependency>
    <groupId>org.apache.shardingsphere.elasticjob</groupId>
    <artifactId>elasticjob-lite-spring-boot-starter</artifactId>
    <version>3.0.1</version>
</dependency>

2. 配置

elasticjob:
  reg-center:
    server-lists: localhost:2181
    namespace: elasticjob-lite-springboot
  jobs:
    demo-job:
      elasticJobClass: com.example.demo.job.DemoJob
      cron: 0/5 * * * * ?
      sharding-total-count: 3
      sharding-item-parameters: 0=A,1=B,2=C

3. 创建任务

@Component
public class DemoJob implements SimpleJob {
    
    @Override
    public void execute(ShardingContext context) {
        log.info("执行分片项:{}, 参数:{}", 
            context.getShardingItem(), 
            context.getShardingParameter());
        
        // 根据分片项处理数据
        int shardItem = context.getShardingItem();
        int shardTotal = context.getShardingTotalCount();
        
        List<Data> dataList = getDataBySharding(shardItem, shardTotal);
        
        for (Data data : dataList) {
            processData(data);
        }
    }
}

最佳实践

1. 任务监控

@Component
public class TaskMonitor {
    
    @Autowired
    private MeterRegistry meterRegistry;
    
    /**
     * 记录任务执行指标
     */
    @Around("@annotation(org.springframework.scheduling.annotation.Scheduled)")
    public Object monitorTask(ProceedingJoinPoint pjp) throws Throwable {
        String taskName = pjp.getSignature().getName();
        
        long startTime = System.currentTimeMillis();
        
        try {
            Object result = pjp.proceed();
            
            long cost = System.currentTimeMillis() - startTime;
            
            // 记录成功指标
            meterRegistry.counter("scheduled.task.success",
                "task", taskName
            ).increment();
            
            meterRegistry.timer("scheduled.task.duration",
                "task", taskName
            ).record(cost, TimeUnit.MILLISECONDS);
            
            return result;
        } catch (Throwable e) {
            // 记录失败指标
            meterRegistry.counter("scheduled.task.failure",
                "task", taskName
            ).increment();
            
            throw e;
        }
    }
}

2. 错误处理

@Component
public class ScheduledTasks {
    
    /**
     * 带错误处理的定时任务
     */
    @Scheduled(fixedRate = 5000)
    public void taskWithErrorHandling() {
        try {
            processTask();
        } catch (Exception e) {
            log.error("任务执行失败", e);
            
            // 发送告警
            alertService.sendAlert("定时任务失败:" + e.getMessage());
            
            // 记录到数据库,便于后续处理
            taskErrorRepository.save(new TaskError(
                "taskWithErrorHandling",
                e.getMessage(),
                LocalDateTime.now()
            ));
        }
    }
}

3. 动态开关

@Component
public class ScheduledTasks {
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    /**
     * 支持动态开关的定时任务
     */
    @Scheduled(fixedRate = 5000)
    public void taskWithSwitch() {
        // 检查任务开关
        Boolean enabled = (Boolean) redisTemplate.opsForValue()
            .get("task:enabled:taskWithSwitch");
        
        if (Boolean.FALSE.equals(enabled)) {
            log.debug("任务已禁用,跳过执行");
            return;
        }
        
        // 执行任务
        processTask();
    }
}

4. 任务编排

@Component
public class TaskOrchestration {
    
    /**
     * 任务 A
     */
    @Scheduled(fixedRate = 60000)
    public void taskA() {
        log.info("执行任务 A");
        
        // 触发任务 B
        taskB();
    }
    
    /**
     * 任务 B(在任务 A 完成后执行)
     */
    public void taskB() {
        log.info("执行任务 B");
        
        // 业务逻辑
    }
    
    /**
     * 使用 CompletableFuture 编排
     */
    @Scheduled(fixedRate = 60000)
    public void orchestratedTasks() {
        CompletableFuture.runAsync(this::taskA)
            .thenRunAsync(this::taskB)
            .thenRunAsync(this::taskC)
            .exceptionally(ex -> {
                log.error("任务编排执行失败", ex);
                return null;
            });
    }
    
    public void taskC() {
        log.info("执行任务 C");
    }
}

5. 性能优化

# 线程池配置
spring:
  task:
    scheduling:
      pool:
        size: 10
      thread-name-prefix: scheduled-
@Configuration
public class ScheduleConfig {
    
    @Bean
    public TaskScheduler taskScheduler() {
        ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
        scheduler.setPoolSize(20);
        scheduler.setThreadNamePrefix("scheduled-task-");
        scheduler.setWaitForTasksToCompleteOnShutdown(true);
        scheduler.setAwaitTerminationSeconds(60);
        scheduler.setErrorHandler(t -> log.error("任务执行异常", t));
        return scheduler;
    }
}

总结

定时任务要点:

定时任务是自动化处理的重要工具。


分享这篇文章到:

上一篇文章
Java 最佳实践总结
下一篇文章
Seata TCC 模式实战