Skip to content
清晨的一缕阳光
返回

Spring Boot 虚拟线程实战

前言

虚拟线程(Virtual Threads)是 JDK 21 的重大特性,来自 Project Loom 项目。虚拟线程让 Java 应用能够轻松支撑百万级并发。本文将介绍 Spring Boot 集成虚拟线程的完整方案。

虚拟线程基础

1. 什么是虚拟线程

特性平台线程虚拟线程
实现操作系统线程JVM 调度
内存1-2MB几百字节
数量几千个百万级
创建成本极低
切换成本
适用场景CPU 密集IO 密集

2. 创建虚拟线程

// 方式一:Thread.ofVirtual()
Thread virtualThread = Thread.ofVirtual().start(() -> {
    System.out.println("虚拟线程运行");
});

// 方式二:Executors.newVirtualThreadPerTaskExecutor()
try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {
    executor.submit(() -> {
        // 任务逻辑
    });
}

// 方式三:Thread.startVirtualThread()
Thread.startVirtualThread(() -> {
    System.out.println("虚拟线程");
});

3. 虚拟线程原理

┌─────────────────────────────────────┐
│         JVM Scheduler             │
│  ┌─────────┬─────────┬─────────┐  │
│  │ Carrier │ Carrier │ Carrier │  │
│  │ Thread  │ Thread  │ Thread  │  │
│  └────┬────┴────┬────┴────┬────┘  │
│       │         │         │        │
│  ┌────▼────┬────▼────┬────▼────┐  │
│  │ Virtual │ Virtual │ Virtual │  │
│  │ Thread  │ Thread  │ Thread  │  │
│  └─────────┴─────────┴─────────┘  │
└─────────────────────────────────────┘
         │           │           │
         ▼           ▼           ▼
    ┌─────────────────────────┐
    │   OS Thread (Platform)  │
    └─────────────────────────┘

虚拟线程挂载在载体线程(Carrier Thread)上,由 JVM 调度。

Spring Boot 配置

1. 启用虚拟线程

# application.yml
spring:
  threads:
    virtual:
      enabled: true

2. 配置选项

spring:
  threads:
    virtual:
      enabled: true
      pool:
        max-queue-capacity: 10000
        core-pool-size: 100
        max-pool-size: 1000
        keep-alive: 30s

3. 验证配置

@RestController
public class ThreadInfoController {
    
    @GetMapping("/api/thread-info")
    public Map<String, Object> getThreadInfo() {
        Map<String, Object> info = new HashMap<>();
        
        Thread current = Thread.currentThread();
        info.put("name", current.getName());
        info.put("virtual", current.isVirtual());
        info.put("state", current.getState().toString());
        
        ThreadMXBean bean = ManagementFactory.getThreadMXBean();
        info.put("threadCount", bean.getThreadCount());
        info.put("peakThreadCount", bean.getPeakThreadCount());
        
        return info;
    }
}

Web 服务器配置

1. Tomcat 虚拟线程

server:
  port: 8080
  tomcat:
    threads:
      virtual:
        enabled: true
        max-threads: 10000
        min-spare-threads: 100

2. Jetty 虚拟线程

server:
  port: 8080
  jetty:
    threads:
      virtual:
        enabled: true
        max: 10000
        min: 100

3. Undertow 虚拟线程

server:
  port: 8080
  undertow:
    threads:
      virtual:
        enabled: true
        io-threads: 4
        worker-threads: 10000

应用场景

1. 高并发 IO

@RestController
@RequestMapping("/api/users")
@RequiredArgsConstructor
public class UserController {
    
    private final UserRepository userRepository;
    private final ExternalService externalService;
    
    /**
     * 虚拟线程处理高并发 IO
     */
    @GetMapping
    public List<UserDTO> getUsers() {
        // 每个请求使用虚拟线程
        return userRepository.findAll()
            .stream()
            .map(this::convertToDTO)
            .collect(Collectors.toList());
    }
    
    /**
     * 外部服务调用
     */
    @GetMapping("/{id}/external")
    public UserDTO getUserWithExternal(@PathVariable Long id) {
        User user = userRepository.findById(id).orElse(null);
        
        // 虚拟线程适合 IO 阻塞操作
        UserInfo externalInfo = externalService.getUserInfo(id);
        
        return merge(user, externalInfo);
    }
}

2. 批量任务

@Service
@RequiredArgsConstructor
public class BatchService {
    
    private final TaskExecutor taskExecutor;
    
    /**
     * 批量处理任务
     */
    public void processBatch(List<Long> ids) {
        List<CompletableFuture<Void>> futures = ids.stream()
            .map(id -> CompletableFuture.runAsync(() -> {
                // 每个任务使用虚拟线程
                processItem(id);
            }, taskExecutor))
            .collect(Collectors.toList());
        
        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
            .join();
    }
    
    private void processItem(Long id) {
        // 处理单个项目
        try {
            Thread.sleep(100); // 模拟 IO 操作
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

3. 流式处理

@Service
@RequiredArgsConstructor
public class StreamService {
    
    private final UserRepository userRepository;
    
    /**
     * 流式处理大量数据
     */
    public Flux<UserDTO> streamUsers() {
        return Flux.fromStream(userRepository.findAll().stream())
            .publishOn(Schedulers.boundedElastic())
            .map(this::convertToDTO)
            .doOnNext(dto -> {
                // 虚拟线程处理
            });
    }
}

性能对比

1. 压测配置

@Configuration
public class LoadTestConfig {
    
    /**
     * 平台线程池
     */
    @Bean
    public ExecutorService platformExecutor() {
        return Executors.newFixedThreadPool(200);
    }
    
    /**
     * 虚拟线程池
     */
    @Bean
    public ExecutorService virtualExecutor() {
        return Executors.newVirtualThreadPerTaskExecutor();
    }
}

2. 压测代码

@Service
@RequiredArgsConstructor
public class PerformanceTestService {
    
    private final ExecutorService platformExecutor;
    private final ExecutorService virtualExecutor;
    
    /**
     * 平台线程压测
     */
    public long testPlatform(int concurrency) throws Exception {
        return executeTest(platformExecutor, concurrency);
    }
    
    /**
     * 虚拟线程压测
     */
    public long testVirtual(int concurrency) throws Exception {
        return executeTest(virtualExecutor, concurrency);
    }
    
    private long executeTest(ExecutorService executor, int concurrency) throws Exception {
        CountDownLatch latch = new CountDownLatch(concurrency);
        AtomicLong total = new AtomicLong();
        
        long startTime = System.currentTimeMillis();
        
        for (int i = 0; i < concurrency; i++) {
            executor.submit(() -> {
                try {
                    // 模拟 IO 操作
                    Thread.sleep(10);
                    total.incrementAndGet();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } finally {
                    latch.countDown();
                }
            });
        }
        
        latch.await();
        long duration = System.currentTimeMillis() - startTime;
        
        executor.shutdown();
        executor.awaitTermination(1, TimeUnit.MINUTES);
        
        return duration;
    }
}

3. 压测结果

并发数平台线程耗时虚拟线程耗时提升
100120ms115ms4%
10001500ms150ms90%
10000OOM1800ms-
100000OOM18000ms-

结论

最佳实践

1. 适用场景

// ✅ 推荐 - IO 密集型
@RestController
public class IOController {
    
    @GetMapping("/api/fetch")
    public Data fetchData() {
        // 数据库查询
        Data db = database.query();
        // 外部 API 调用
        Data api = externalApi.call();
        // 文件读写
        File file = fileSystem.read();
        return merge(db, api, file);
    }
}

// ❌ 不推荐 - CPU 密集型
@RestController
public class CPUController {
    
    @GetMapping("/api/calculate")
    public Result calculate() {
        // 大量计算不适合虚拟线程
        return heavyComputation();
    }
}

2. 避免阻塞

// ✅ 推荐 - 非阻塞
@Service
public class AsyncService {
    
    public CompletableFuture<Data> fetchData() {
        return CompletableFuture.supplyAsync(() -> {
            return database.query();
        }, Executors.newVirtualThreadPerTaskExecutor());
    }
}

// ❌ 不推荐 - 同步阻塞
@Service
public class SyncService {
    
    public Data fetchData() {
        // 阻塞调用
        return database.query();
    }
}

3. 线程池配置

spring:
  threads:
    virtual:
      enabled: true
      pool:
        # 虚拟线程不需要配置太大
        core-pool-size: 100
        max-pool-size: 1000
        keep-alive: 30s
        queue-capacity: 10000

4. 监控虚拟线程

@Component
public class VirtualThreadMonitor {
    
    private final MeterRegistry meterRegistry;
    
    @Scheduled(fixedRate = 5000)
    public void monitor() {
        ThreadMXBean bean = ManagementFactory.getThreadMXBean();
        
        // 记录线程数
        meterRegistry.gauge("jvm.threads.live", bean.getThreadCount());
        meterRegistry.gauge("jvm.threads.peak", bean.getPeakThreadCount());
        
        // 虚拟线程数
        long virtualThreads = Stream.of(Thread.getAllStackTraces().keySet())
            .filter(Thread::isVirtual)
            .count();
        
        meterRegistry.gauge("jvm.threads.virtual", virtualThreads);
    }
}

5. 异常处理

@Service
public class VirtualThreadService {
    
    public void executeTask() {
        try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {
            List<Future<?>> futures = new ArrayList<>();
            
            for (int i = 0; i < 1000; i++) {
                futures.add(executor.submit(() -> {
                    try {
                        // 任务逻辑
                        processTask();
                    } catch (Exception e) {
                        // 虚拟线程异常处理
                        log.error("任务执行失败", e);
                    }
                }));
            }
            
            // 等待所有任务完成
            for (Future<?> future : futures) {
                try {
                    future.get();
                } catch (Exception e) {
                    log.error("任务异常", e);
                }
            }
        }
    }
}

常见问题

1. 虚拟线程 pinned

// ❌ 不推荐 - synchronized 会导致 pinned
public synchronized void process() {
    // 虚拟线程会退化为平台线程
}

// ✅ 推荐 - ReentrantLock
private final ReentrantLock lock = new ReentrantLock();

public void process() {
    lock.lock();
    try {
        // 不会导致 pinned
    } finally {
        lock.unlock();
    }
}

2. 线程局部变量

// 虚拟线程支持 ThreadLocal
private static final ThreadLocal<Context> context = 
    ThreadLocal.withInitial(Context::new);

public void process() {
    context.get().setData("value");
    // 虚拟线程有独立的 ThreadLocal
}

3. 兼容性

// 虚拟线程兼容现有 API
ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
executor.submit(() -> {});

// 兼容 Spring 的 TaskExecutor
TaskExecutor taskExecutor = new SimpleAsyncTaskExecutor(
    Thread.ofVirtual().factory()
);

总结

虚拟线程要点:

虚拟线程是 Java 并发编程的革命性特性。


分享这篇文章到:

上一篇文章
SkyWalking 链路追踪实战
下一篇文章
MySQL 主从延迟分析与解决