Skip to content
清晨的一缕阳光
返回

网关限流实战

网关限流实战

限流算法

1. 计数器算法

时间窗口:[0s, 1s]
请求计数:100
阈值:100

当 [0s, 1s] 内请求数达到 100 时,拒绝后续请求

缺点:临界问题(0.9s 100 次 + 1.1s 100 次 = 200 次/秒)

2. 滑动窗口算法

时间轴:
┌─────┬─────┬─────┬─────┬─────┬─────┐
│  1  │  2  │  3  │  4  │  5  │  6  │
└─────┴─────┴─────┴─────┴─────┴─────┘
        ↑                     ↑
      窗口开始               窗口结束

当前窗口统计:slot 2-6 的请求总数

优点:解决临界问题

3. 令牌桶算法

令牌桶:
┌─────┬─────┬─────┬─────┬─────┐
│  🪙  │  🪙  │  🪙  │  🪙  │  🪙  │  ← 固定容量
└─────┴─────┴─────┴─────┴─────┘

  添加令牌(固定速率)

  请求获取令牌

  有令牌:通过
  无令牌:拒绝/等待

特点:允许一定程度的突发流量

4. 漏桶算法

漏桶:
┌─────────────────┐
│  💧 💧 💧 💧 💧 │  ← 请求进入
│  💧 💧 💧 💧 💧 │
└────────┬────────┘

         │ 固定速率流出

      处理请求

特点:强制固定速率,平滑流量

Redis 限流实现

1. 基于 Redis 的计数器限流

@Component
public class RedisRateLimiterFilter implements GlobalFilter, Ordered {
    
    @Autowired
    private RedisTemplate<String, String> redisTemplate;
    
    @Autowired
    private RateLimiterProperties properties;
    
    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        ServerHttpRequest request = exchange.getRequest();
        String path = request.getPath().value();
        String ip = request.getRemoteAddress().getAddress().getHostAddress();
        
        // 限流 Key
        String key = "rate_limit:" + path + ":" + ip;
        
        // 获取当前计数
        Long count = redisTemplate.opsForValue().increment(key);
        
        if (count == 1) {
            // 第一次请求,设置过期时间
            redisTemplate.expire(key, properties.getWindowSize(), TimeUnit.SECONDS);
        }
        
        // 检查是否超过阈值
        if (count > properties.getThreshold()) {
            return tooManyRequests(exchange);
        }
        
        return chain.filter(exchange);
    }
    
    @Override
    public int getOrder() {
        return -50;
    }
    
    private Mono<Void> tooManyRequests(ServerWebExchange exchange) {
        ServerHttpResponse response = exchange.getResponse();
        response.setStatusCode(HttpStatus.TOO_MANY_REQUESTS);
        response.getHeaders().setContentType(MediaType.APPLICATION_JSON);
        
        Map<String, Object> data = new HashMap<>();
        data.put("code", 429);
        data.put("message", "请求过于频繁,请稍后再试");
        
        byte[] bytes = JSON.toJSONString(data).getBytes(StandardCharsets.UTF_8);
        DataBuffer buffer = response.bufferFactory().wrap(bytes);
        
        return response.writeWith(Mono.just(buffer));
    }
}

@Configuration
@ConfigurationProperties(prefix = "rate-limiter")
public class RateLimiterProperties {
    
    /**
     * 时间窗口(秒)
     */
    private Integer windowSize = 1;
    
    /**
     * 阈值
     */
    private Integer threshold = 100;
    
    // getter/setter
}

2. 基于 Redis 的令牌桶限流

@Component
public class RedisTokenBucketFilter implements GlobalFilter, Ordered {
    
    @Autowired
    private RedisTemplate<String, String> redisTemplate;
    
    private static final String SCRIPT = 
        "local key = KEYS[1]\n" +
        "local capacity = tonumber(ARGV[1])\n" +
        "local rate = tonumber(ARGV[2])\n" +
        "local now = tonumber(ARGV[3])\n" +
        "\n" +
        "local last_time = redis.call('hget', key, 'last_time') or now\n" +
        "local tokens = redis.call('hget', key, 'tokens') or capacity\n" +
        "\n" +
        "// 计算新增令牌数\n" +
        "local delta = math.max(0, now - last_time)\n" +
        "tokens = math.min(capacity, tokens + delta * rate)\n" +
        "\n" +
        "local allowed = 0\n" +
        "if tokens >= 1 then\n" +
        "    tokens = tokens - 1\n" +
        "    allowed = 1\n" +
        "end\n" +
        "\n" +
        "redis.call('hset', key, 'last_time', now)\n" +
        "redis.call('hset', key, 'tokens', tokens)\n" +
        "redis.call('expire', key, 60)\n" +
        "\n" +
        "return allowed";
    
    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        ServerHttpRequest request = exchange.getRequest();
        String path = request.getPath().value();
        String ip = request.getRemoteAddress().getAddress().getHostAddress();
        
        String key = "token_bucket:" + path + ":" + ip;
        long now = System.currentTimeMillis() / 1000;
        
        // 执行 Lua 脚本
        RedisScript<Long> script = RedisScript.of(SCRIPT, Long.class);
        Long allowed = redisTemplate.execute(
            script,
            Collections.singletonList(key),
            100,  // 桶容量
            10,   // 每秒添加 10 个令牌
            now
        );
        
        if (allowed == 0) {
            return tooManyRequests(exchange);
        }
        
        return chain.filter(exchange);
    }
    
    @Override
    public int getOrder() {
        return -50;
    }
    
    private Mono<Void> tooManyRequests(ServerWebExchange exchange) {
        ServerHttpResponse response = exchange.getResponse();
        response.setStatusCode(HttpStatus.TOO_MANY_REQUESTS);
        // ... 同上
    }
}

Sentinel 网关限流

1. 添加依赖

<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-alibaba-sentinel</artifactId>
</dependency>

<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-gateway</artifactId>
</dependency>

2. 配置限流规则

spring:
  cloud:
    sentinel:
      enabled: true
      transport:
        dashboard: localhost:8080
      datasource:
        gw-flow:
          nacos:
            server-addr: localhost:8848
            dataId: gateway-sentinel-flow.json
            groupId: DEFAULT_GROUP
            rule-type: gw-flow

3. 限流规则配置

[
  {
    "resource": "user-service",
    "limitApp": "default",
    "grade": 1,
    "count": 100,
    "strategy": 0,
    "controlBehavior": 0,
    "clusterMode": false
  },
  {
    "resource": "order-service",
    "limitApp": "default",
    "grade": 1,
    "count": 50,
    "strategy": 0,
    "controlBehavior": 2,
    "maxQueueingTimeMs": 500,
    "clusterMode": false
  }
]

4. 热点参数限流

[
  {
    "resource": "user-service",
    "grade": 1,
    "count": 10,
    "paramIdx": 0,
    "durationInSec": 1,
    "paramFlowItemList": [
      {
        "object": "VIP_USER",
        "count": 100
      },
      {
        "object": "NORMAL_USER",
        "count": 10
      }
    ]
  }
]

多维度限流

1. IP 限流

@Component
public class IpRateLimiterFilter implements GlobalFilter, Ordered {
    
    @Autowired
    private RedisTemplate<String, String> redisTemplate;
    
    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        ServerHttpRequest request = exchange.getRequest();
        String ip = request.getRemoteAddress().getAddress().getHostAddress();
        
        String key = "rate_limit:ip:" + ip;
        Long count = redisTemplate.opsForValue().increment(key);
        
        if (count == 1) {
            redisTemplate.expire(key, 60, TimeUnit.SECONDS);
        }
        
        // 单个 IP 每秒最多 10 次请求
        if (count > 10) {
            return tooManyRequests(exchange);
        }
        
        return chain.filter(exchange);
    }
    
    @Override
    public int getOrder() {
        return -60;
    }
}

2. 用户限流

@Component
public class UserRateLimiterFilter implements GlobalFilter, Ordered {
    
    @Autowired
    private RedisTemplate<String, String> redisTemplate;
    
    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        ServerHttpRequest request = exchange.getRequest();
        
        // 获取用户 ID
        String userId = request.getHeaders().getFirst("X-User-ID");
        if (userId == null) {
            return chain.filter(exchange);
        }
        
        String key = "rate_limit:user:" + userId;
        Long count = redisTemplate.opsForValue().increment(key);
        
        if (count == 1) {
            redisTemplate.expire(key, 60, TimeUnit.SECONDS);
        }
        
        // 单个用户每秒最多 100 次请求
        if (count > 100) {
            return tooManyRequests(exchange);
        }
        
        return chain.filter(exchange);
    }
    
    @Override
    public int getOrder() {
        return -55;
    }
}

3. API 限流

@Component
public class ApiRateLimiterFilter implements GlobalFilter, Ordered {
    
    @Autowired
    private RedisTemplate<String, String> redisTemplate;
    
    // API 限流配置
    private static final Map<String, Integer> API_LIMITS = new HashMap<>();
    
    static {
        API_LIMITS.put("/api/users", 100);
        API_LIMITS.put("/api/orders", 50);
        API_LIMITS.put("/api/search", 20);
    }
    
    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        ServerHttpRequest request = exchange.getRequest();
        String path = request.getPath().value();
        
        // 获取 API 限流阈值
        Integer threshold = getApiThreshold(path);
        if (threshold == null) {
            return chain.filter(exchange);
        }
        
        String key = "rate_limit:api:" + path;
        Long count = redisTemplate.opsForValue().increment(key);
        
        if (count == 1) {
            redisTemplate.expire(key, 1, TimeUnit.SECONDS);
        }
        
        if (count > threshold) {
            return tooManyRequests(exchange);
        }
        
        return chain.filter(exchange);
    }
    
    @Override
    public int getOrder() {
        return -50;
    }
    
    private Integer getApiThreshold(String path) {
        for (Map.Entry<String, Integer> entry : API_LIMITS.entrySet()) {
            if (path.startsWith(entry.getKey())) {
                return entry.getValue();
            }
        }
        return null;
    }
}

分布式限流

1. 集群限流配置

@Component
public class ClusterRateLimiterFilter implements GlobalFilter, Ordered {
    
    @Autowired
    private RedisTemplate<String, String> redisTemplate;
    
    @Value("${server.instance-id:instance-1}")
    private String instanceId;
    
    @Value("${cluster.instances:3}")
    private Integer clusterSize;
    
    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        ServerHttpRequest request = exchange.getRequest();
        String path = request.getPath().value();
        
        // 集群总阈值
        int globalThreshold = 300;
        // 单实例阈值 = 总阈值 / 实例数
        int instanceThreshold = globalThreshold / clusterSize;
        
        String key = "rate_limit:cluster:" + path;
        Long count = redisTemplate.opsForValue().increment(key);
        
        if (count == 1) {
            redisTemplate.expire(key, 1, TimeUnit.SECONDS);
        }
        
        // 检查集群总请求数
        if (count > globalThreshold) {
            return tooManyRequests(exchange);
        }
        
        // 检查本实例请求数
        String instanceKey = key + ":" + instanceId;
        Long instanceCount = redisTemplate.opsForValue().increment(instanceKey);
        
        if (instanceCount == 1) {
            redisTemplate.expire(instanceKey, 1, TimeUnit.SECONDS);
        }
        
        if (instanceCount > instanceThreshold) {
            // 本实例超限,但集群未超限,可以转发到其他实例
            return chain.filter(exchange);
        }
        
        return chain.filter(exchange);
    }
    
    @Override
    public int getOrder() {
        return -50;
    }
}

2. 限流结果头

@Component
public class RateLimitHeaderFilter implements GlobalFilter, Ordered {
    
    @Autowired
    private RedisTemplate<String, String> redisTemplate;
    
    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        return chain.filter(exchange)
            .then(Mono.fromRunnable(() -> {
                ServerHttpRequest request = exchange.getRequest();
                ServerHttpResponse response = exchange.getResponse();
                
                String path = request.getPath().value();
                String key = "rate_limit:api:" + path;
                
                // 获取剩余请求数
                Long count = redisTemplate.opsForValue().get(key);
                int remaining = count != null ? 100 - count.intValue() : 100;
                
                // 添加限流头
                response.getHeaders().set("X-RateLimit-Limit", "100");
                response.getHeaders().set("X-RateLimit-Remaining", String.valueOf(Math.max(0, remaining)));
                response.getHeaders().set("X-RateLimit-Reset", "1");
            }));
    }
    
    @Override
    public int getOrder() {
        return Integer.MAX_VALUE;
    }
}

动态限流配置

1. Nacos 配置中心

# gateway-rate-limit.yaml
rate-limit:
  enabled: true
  rules:
    - path: /api/users/**
      threshold: 100
      window-size: 1
    - path: /api/orders/**
      threshold: 50
      window-size: 1
    - path: /api/search/**
      threshold: 20
      window-size: 1

2. 动态配置监听

@Component
public class RateLimitConfigListener {
    
    @Autowired
    private RateLimiterManager rateLimiterManager;
    
    @NacosConfigListener(dataId = "gateway-rate-limit.yaml")
    public void onConfigChange(String config) {
        // 解析配置
        RateLimitConfig rateLimitConfig = parseConfig(config);
        
        // 更新限流规则
        rateLimiterManager.updateRules(rateLimitConfig.getRules());
        
        log.info("限流配置已更新");
    }
    
    private RateLimitConfig parseConfig(String config) {
        // 解析 YAML 配置
        return YAML_PARSER.loadAs(config, RateLimitConfig.class);
    }
}

监控告警

1. 限流指标收集

@Component
public class RateLimitMetrics {
    
    @Autowired
    private MeterRegistry meterRegistry;
    
    private Counter triggerCounter;
    
    @PostConstruct
    public void init() {
        triggerCounter = Counter.builder("gateway.ratelimit.triggered")
            .description("限流触发次数")
            .register(meterRegistry);
    }
    
    public void recordTrigger(String path, String ip) {
        triggerCounter.increment();
        log.warn("触发限流:path={}, ip={}", path, ip);
    }
}

2. 限流告警

@Component
public class RateLimitAlert {
    
    @Autowired
    private AlertService alertService;
    
    @Autowired
    private MeterRegistry meterRegistry;
    
    @Scheduled(fixedRate = 60000)  // 每分钟检查
    public void checkRateLimitMetrics() {
        Counter counter = meterRegistry.find("gateway.ratelimit.triggered")
            .counter();
        
        if (counter != null) {
            double count = counter.count();
            if (count > 100) {  // 1 分钟内限流超过 100 次
                alertService.sendAlert(
                    String.format("网关限流异常,1 分钟内触发 %d 次", (int) count)
                );
            }
        }
    }
}

最佳实践

1. 分级限流

// 第一级:IP 限流(最严格)
@Component
@Order(-60)
public class IpRateLimiterFilter { }

// 第二级:用户限流
@Component
@Order(-55)
public class UserRateLimiterFilter { }

// 第三级:API 限流
@Component
@Order(-50)
public class ApiRateLimiterFilter { }

2. 限流配置

rate-limit:
  # IP 限流:单 IP 每秒 10 次
  ip:
    threshold: 10
    window-size: 1
  
  # 用户限流:单用户每秒 100 次
  user:
    threshold: 100
    window-size: 1
  
  # API 限流:按 API 配置
  api:
    - path: /api/users
      threshold: 100
    - path: /api/orders
      threshold: 50
    - path: /api/search
      threshold: 20

3. 限流响应

private Mono<Void> tooManyRequests(ServerWebExchange exchange) {
    ServerHttpResponse response = exchange.getResponse();
    response.setStatusCode(HttpStatus.TOO_MANY_REQUESTS);
    response.getHeaders().setContentType(MediaType.APPLICATION_JSON);
    response.getHeaders().set("Retry-After", "60");  // 建议 60 秒后重试
    
    Map<String, Object> data = new HashMap<>();
    data.put("code", 429);
    data.put("message", "请求过于频繁,请稍后再试");
    data.put("retry-after", 60);
    
    byte[] bytes = JSON.toJSONString(data).getBytes(StandardCharsets.UTF_8);
    DataBuffer buffer = response.bufferFactory().wrap(bytes);
    
    return response.writeWith(Mono.just(buffer));
}

总结

网关限流是保护系统稳定性的关键措施,通过在网关层控制请求速率,可以防止突发流量冲垮后端服务。

合理的限流策略应该包括 IP 限流、用户限流、API 限流等多个维度,并根据业务场景选择合适的限流算法。

在生产环境中,建议实现动态限流配置,并建立完善的监控告警机制。


分享这篇文章到:

上一篇文章
Spring Boot 配置管理详解
下一篇文章
Spring Boot 代码质量与规范