Skip to content
清晨的一缕阳光
返回

Spring Cloud LoadBalancer

Spring Cloud LoadBalancer

负载均衡原理

架构设计

┌─────────────┐
│   Client    │
└──────┬──────┘


┌─────────────────────────────────────────┐
│     Spring Cloud LoadBalancer           │
│  ┌─────────────────────────────────┐   │
│  │    LoadBalancerClient           │   │
│  │    - 获取服务实例列表            │   │
│  │    - 应用负载均衡策略            │   │
│  └──────────────┬──────────────────┘   │
│                 │                       │
│  ┌──────────────▼──────────────────┐   │
│  │    ServiceInstanceListSupplier  │   │
│  │    - 从注册中心获取实例          │   │
│  │    - 健康检查                    │   │
│  └──────────────┬──────────────────┘   │
└─────────────────┼───────────────────────┘

        ┌─────────┼─────────┐
        │         │         │
        ▼         ▼         ▼
┌───────────┐ ┌───────────┐ ┌───────────┐
│ Instance 1│ │ Instance 2│ │ Instance 3│
│ :8081     │ │ :8082     │ │ :8083     │
└───────────┘ └───────────┘ └───────────┘

核心组件

快速开始

1. 添加依赖

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-loadbalancer</artifactId>
</dependency>

<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>

2. 基础配置

spring:
  cloud:
    loadbalancer:
      enabled: true
      cache:
        enabled: true  # 启用缓存
        ttl: 30000     # 缓存 30 秒

3. 使用 LoadBalanced RestTemplate

@Configuration
public class RestTemplateConfig {
    
    @Bean
    @LoadBalanced
    public RestTemplate restTemplate() {
        return new RestTemplate();
    }
}

@Service
public class UserService {
    
    @Autowired
    private RestTemplate restTemplate;
    
    public User getUser(Long id) {
        // 自动负载均衡
        String url = "http://user-service/users/" + id;
        return restTemplate.getForObject(url, User.class);
    }
}

4. 使用 LoadBalanced WebClient

@Configuration
public class WebClientConfig {
    
    @Bean
    @LoadBalanced
    public WebClient.Builder loadBalancedWebClientBuilder() {
        return WebClient.builder();
    }
}

@Service
public class UserService {
    
    @Autowired
    private WebClient.Builder webClientBuilder;
    
    public Mono<User> getUser(Long id) {
        return webClientBuilder.build()
            .get()
            .uri("http://user-service/users/{id}", id)
            .retrieve()
            .bodyToMono(User.class);
    }
}

负载均衡策略

1. 轮询策略(默认)

@Configuration
public class LoadBalancerConfig {
    
    @Bean
    public ReactorServiceInstanceLoadBalancer loadBalancer(
        Environment environment,
        LoadBalancerClientFactory loadBalancerClientFactory
    ) {
        String name = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME);
        return new RoundRobinLoadBalancer(
            loadBalancerClientFactory.getLazyProvider(name, ServiceInstanceListSupplier.class),
            name
        );
    }
}

工作原理

2. 随机策略

@Configuration
public class RandomLoadBalancerConfig {
    
    @Bean
    public ReactorServiceInstanceLoadBalancer loadBalancer(
        Environment environment,
        LoadBalancerClientFactory loadBalancerClientFactory
    ) {
        String name = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME);
        return new RandomLoadBalancer(
            loadBalancerClientFactory.getLazyProvider(name, ServiceInstanceListSupplier.class),
            name
        );
    }
}

3. 加权轮询策略

@Component
public class WeightedRoundRobinLoadBalancer implements ReactorServiceInstanceLoadBalancer {
    
    private final String serviceId;
    private final ObjectProvider<ServiceInstanceListSupplier> supplier;
    private final Map<String, Integer> weights = new ConcurrentHashMap<>();
    private final AtomicLong position = new AtomicLong(0);
    
    public WeightedRoundRobinLoadBalancer(
        ObjectProvider<ServiceInstanceListSupplier> supplier,
        String serviceId
    ) {
        this.supplier = supplier;
        this.serviceId = serviceId;
    }
    
    @Override
    public Mono<ServiceInstance> choose(Request request) {
        List<ServiceInstance> instances = supplier.getIfAvailable()
            .get()
            .collectList()
            .block();
        
        if (instances == null || instances.isEmpty()) {
            return Mono.empty();
        }
        
        // 计算总权重
        int totalWeight = instances.stream()
            .mapToInt(instance -> getWeight(instance))
            .sum();
        
        // 生成随机数
        int offset = (int) (position.incrementAndGet() % totalWeight);
        
        // 选择实例
        ServiceInstance selected = null;
        for (ServiceInstance instance : instances) {
            int weight = getWeight(instance);
            if (offset < weight) {
                selected = instance;
                break;
            }
            offset -= weight;
        }
        
        return Mono.just(selected != null ? selected : instances.get(0));
    }
    
    private int getWeight(ServiceInstance instance) {
        // 从元数据获取权重,默认 1
        String weightStr = instance.getMetadata().get("weight");
        return weightStr != null ? Integer.parseInt(weightStr) : 1;
    }
}

4. 最少连接数策略

@Component
public class LeastConnectionLoadBalancer implements ReactorServiceInstanceLoadBalancer {
    
    private final String serviceId;
    private final ObjectProvider<ServiceInstanceListSupplier> supplier;
    private final Map<String, AtomicInteger> connections = new ConcurrentHashMap<>();
    
    @Override
    public Mono<ServiceInstance> choose(Request request) {
        List<ServiceInstance> instances = supplier.getIfAvailable()
            .get()
            .collectList()
            .block();
        
        if (instances == null || instances.isEmpty()) {
            return Mono.empty();
        }
        
        // 选择连接数最少的实例
        return instances.stream()
            .min(Comparator.comparingInt(
                instance -> connections.getOrDefault(instance.getInstanceId(), new AtomicInteger(0)).get()
            ))
            .map(Mono::just)
            .orElse(Mono.empty());
    }
}

5. 响应时间策略

@Component
public class ResponseTimeLoadBalancer implements ReactorServiceInstanceLoadBalancer {
    
    private final String serviceId;
    private final ObjectProvider<ServiceInstanceListSupplier> supplier;
    private final Map<String, Average> responseTimes = new ConcurrentHashMap<>();
    
    @Override
    public Mono<ServiceInstance> choose(Request request) {
        List<ServiceInstance> instances = supplier.getIfAvailable()
            .get()
            .collectList()
            .block();
        
        if (instances == null || instances.isEmpty()) {
            return Mono.empty();
        }
        
        // 选择平均响应时间最短的实例
        return instances.stream()
            .min(Comparator.comparingDouble(
                instance -> {
                    Average avg = responseTimes.get(instance.getInstanceId());
                    return avg != null ? avg.getMean() : Double.MAX_VALUE;
                }
            ))
            .map(Mono::just)
            .orElse(Mono.empty());
    }
    
    public void recordResponseTime(String instanceId, long responseTime) {
        responseTimes.computeIfAbsent(instanceId, k -> new Average())
            .add(responseTime);
    }
    
    static class Average {
        private double sum = 0;
        private long count = 0;
        
        public synchronized void add(long value) {
            sum += value;
            count++;
        }
        
        public double getMean() {
            return count > 0 ? sum / count : 0;
        }
    }
}

自定义实例选择

1. 基于区域的选择

@Configuration
public class ZoneAwareLoadBalancerConfig {
    
    @Value("${spring.cloud.loadbalancer.zone:default}")
    private String currentZone;
    
    @Bean
    public ReactorServiceInstanceLoadBalancer loadBalancer(
        Environment environment,
        LoadBalancerClientFactory loadBalancerClientFactory
    ) {
        return request -> {
            String serviceId = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME);
            ServiceInstanceListSupplier supplier = loadBalancerClientFactory
                .getLazyProvider(serviceId, ServiceInstanceListSupplier.class)
                .getIfAvailable();
            
            return supplier.get().collectList()
                .map(instances -> {
                    // 优先选择同区域实例
                    List<ServiceInstance> sameZoneInstances = instances.stream()
                        .filter(instance -> currentZone.equals(instance.getMetadata().get("zone")))
                        .collect(Collectors.toList());
                    
                    if (!sameZoneInstances.isEmpty()) {
                        return sameZoneInstances.get(new Random().nextInt(sameZoneInstances.size()));
                    }
                    
                    // 降级到所有实例
                    if (!instances.isEmpty()) {
                        return instances.get(new Random().nextInt(instances.size()));
                    }
                    
                    return null;
                });
        };
    }
}

2. 基于元数据的选择

@Configuration
public class MetadataAwareLoadBalancerConfig {
    
    @Bean
    public ReactorServiceInstanceLoadBalancer loadBalancer(
        Environment environment,
        LoadBalancerClientFactory loadBalancerClientFactory
    ) {
        return request -> {
            String serviceId = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME);
            ServiceInstanceListSupplier supplier = loadBalancerClientFactory
                .getLazyProvider(serviceId, ServiceInstanceListSupplier.class)
                .getIfAvailable();
            
            return supplier.get().collectList()
                .map(instances -> {
                    // 根据版本选择
                    String version = request.getContext().get("version", String.class);
                    
                    if (version != null) {
                        List<ServiceInstance> versionInstances = instances.stream()
                            .filter(instance -> version.equals(instance.getMetadata().get("version")))
                            .collect(Collectors.toList());
                        
                        if (!versionInstances.isEmpty()) {
                            return versionInstances.get(new Random().nextInt(versionInstances.size()));
                        }
                    }
                    
                    // 默认轮询
                    if (!instances.isEmpty()) {
                        return instances.get(0);
                    }
                    
                    return null;
                });
        };
    }
}

健康检查

1. 启用健康检查

spring:
  cloud:
    loadbalancer:
      cache:
        enabled: true
      health-check:
        enabled: true
        interval: 30s  # 健康检查间隔

2. 自定义健康检查

@Component
public class CustomHealthCheck implements ServiceInstanceListSupplier {
    
    private final ServiceInstanceListSupplier delegate;
    private final RestTemplate restTemplate;
    
    public CustomHealthCheck(
        @Qualifier("healthCheckDelegate") ServiceInstanceListSupplier delegate,
        RestTemplate restTemplate
    ) {
        this.delegate = delegate;
        this.restTemplate = restTemplate;
    }
    
    @Override
    public String getServiceId() {
        return delegate.getServiceId();
    }
    
    @Override
    public Flux<List<ServiceInstance>> get() {
        return delegate.get()
            .map(instances -> instances.stream()
                .filter(this::isHealthy)
                .collect(Collectors.toList())
            );
    }
    
    private boolean isHealthy(ServiceInstance instance) {
        try {
            String healthUrl = "http://" + instance.getHost() + ":" + instance.getPort() + "/actuator/health";
            ResponseEntity<Map> response = restTemplate.getForEntity(healthUrl, Map.class);
            
            if (response.getStatusCode().is2xxSuccessful()) {
                Map<String, Object> body = response.getBody();
                return "UP".equals(body.get("status"));
            }
        } catch (Exception e) {
            return false;
        }
        
        return false;
    }
    
    @Override
    public Flux<List<ServiceInstance>> get(Request request) {
        return get();
    }
}

性能优化

1. 缓存配置

spring:
  cloud:
    loadbalancer:
      cache:
        enabled: true
        ttl: 30000  # 缓存 30 秒
        capacity: 256  # 缓存容量

2. 异步加载

@Configuration
public class AsyncLoadBalancerConfig {
    
    @Bean
    public ServiceInstanceListSupplier serviceInstanceListSupplier(
        DiscoveryClient discoveryClient,
        Environment environment
    ) {
        return new ServiceInstanceListSupplier() {
            private final String serviceId = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME);
            
            @Override
            public String getServiceId() {
                return serviceId;
            }
            
            @Override
            public Flux<List<ServiceInstance>> get() {
                return Flux.fromCallable(() -> discoveryClient.getInstances(serviceId))
                    .subscribeOn(Schedulers.boundedElastic());
            }
            
            @Override
            public Flux<List<ServiceInstance>> get(Request request) {
                return get();
            }
        };
    }
}

3. 连接池配置

feign:
  httpclient:
    enabled: true
    max-connections: 200
    max-connections-per-route: 50

监控与指标

1. 集成 Actuator

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>

2. 自定义指标

@Component
public class LoadBalancerMetrics {
    
    private final MeterRegistry meterRegistry;
    private final Counter chooseCounter;
    private final Timer chooseTimer;
    
    public LoadBalancerMetrics(MeterRegistry meterRegistry) {
        this.meterRegistry = meterRegistry;
        this.chooseCounter = Counter.builder("loadbalancer.choose.count")
            .description("负载均衡选择次数")
            .register(meterRegistry);
        this.chooseTimer = Timer.builder("loadbalancer.choose.time")
            .description("负载均衡选择耗时")
            .register(meterRegistry);
    }
    
    public void recordChoose(String serviceId, String instanceId, long duration) {
        chooseCounter.increment();
        chooseTimer.record(duration, TimeUnit.MILLISECONDS);
        
        meterRegistry.counter("loadbalancer.choose.service", "service", serviceId)
            .increment();
        meterRegistry.counter("loadbalancer.choose.instance", "instance", instanceId)
            .increment();
    }
}

最佳实践

1. 多策略组合

@Configuration
@Profile("prod")
public class ProductionLoadBalancerConfig {
    
    @Bean
    public ReactorServiceInstanceLoadBalancer loadBalancer() {
        // 生产环境:加权轮询 + 健康检查
        return new WeightedRoundRobinLoadBalancer();
    }
}

@Configuration
@Profile("dev")
public class DevelopmentLoadBalancerConfig {
    
    @Bean
    public ReactorServiceInstanceLoadBalancer loadBalancer() {
        // 开发环境:随机策略
        return new RandomLoadBalancer();
    }
}

2. 灰度发布

@Configuration
public class GrayLoadBalancerConfig {
    
    @Bean
    public ReactorServiceInstanceLoadBalancer loadBalancer() {
        return request -> {
            // 根据用户 ID 决定走灰度版本还是正式版本
            String userId = request.getContext().get("userId", String.class);
            String version = isGrayUser(userId) ? "gray" : "prod";
            
            // 根据版本选择实例
            return chooseByVersion(version);
        };
    }
    
    private boolean isGrayUser(String userId) {
        // 灰度用户判断逻辑
        int hash = Math.abs(userId.hashCode());
        return hash % 100 < 10;  // 10% 的用户是灰度用户
    }
}

3. 故障转移

@Component
public class FailoverLoadBalancer implements ReactorServiceInstanceLoadBalancer {
    
    private final ReactorServiceInstanceLoadBalancer delegate;
    
    public FailoverLoadBalancer(ReactorServiceInstanceLoadBalancer delegate) {
        this.delegate = delegate;
    }
    
    @Override
    public Mono<ServiceInstance> choose(Request request) {
        return delegate.choose(request)
            .onErrorResume(e -> {
                log.error("负载均衡失败,使用备用策略", e);
                // 故障转移:直接返回第一个实例
                return getFallbackInstance(request);
            });
    }
    
    private Mono<ServiceInstance> getFallbackInstance(Request request) {
        // 备用实例选择逻辑
        return Mono.empty();
    }
}

常见问题

1. 负载均衡不生效

问题:服务调用没有负载均衡

排查步骤

2. 实例选择不均匀

问题:某些实例请求多,某些请求少

解决方案

3. 性能问题

问题:负载均衡影响性能

解决方案

总结

Spring Cloud LoadBalancer 提供了灵活的客户端负载均衡能力,支持多种负载均衡策略和自定义扩展。

合理选择负载均衡策略可以提高系统的吞吐量和可用性。

在生产环境中,建议结合健康检查和缓存机制,并建立完善的监控指标。


分享这篇文章到:

上一篇文章
Go 代码规范与测试
下一篇文章
InnoDB 存储引擎详解