api 调用次数限制实现

通过计数器记录调用频率,结合时间窗口判断是否超限,拦截超额请求并返回错误码,可结合Redis分布式锁处理高并发

API 调用次数限制实现

API 调用次数限制是保护服务端资源、防止滥用的重要手段,通过限制单位时间内的请求次数,可以有效控制流量、避免过载,同时防止恶意攻击(如 DDoS、暴力破解等)。

api 调用次数限制实现


常见限流算法

令牌桶算法 (Token Bucket)

原理 描述
生成令牌 以固定速率生成令牌,存入容量固定的“桶”中
消耗令牌 每次请求需消耗令牌,无令牌则拒绝
突发处理 允许短时间内突发流量(桶容量决定)

优点:支持突发流量,灵活性高
缺点:实现复杂度较高,需维护状态

漏桶算法 (Leaky Bucket)

原理 描述
固定流出 请求按固定速率处理,多余请求排队
队列缓冲 超出处理能力的请求进入等待队列

优点:平滑流量,避免突发冲击
缺点:延迟较高,无法应对瞬时高峰

固定窗口计数 (Fixed Window)

原理 描述
时间窗口 将时间划分为固定窗口(如 1 分钟)
计数重置 每个窗口结束后重置请求计数

优点:实现简单
缺点:临界点可能集中触发拒绝

api 调用次数限制实现

滑动窗口计数 (Sliding Window)

原理 描述
时间分段 将时间划分为小段(如 1 秒)
滑动统计 统计最近 N 个小段内的总请求数

优点:更精确控制
缺点:需存储大量时间片段数据


实现方案

Python 实现令牌桶算法

import time
from threading import Lock
class TokenBucket:
    def __init__(self, rate, capacity):
        self.rate = rate  # 令牌生成速率(每秒)
        self.capacity = capacity  # 桶容量
        self.tokens = capacity  # 当前令牌数
        self.last_time = time.time()  # 上次更新时间
        self.lock = Lock()  # 线程安全
    def allow_request(self):
        with self.lock:
            current_time = time.time()
            # 计算新生成的令牌数
            elapsed = current_time self.last_time
            self.tokens = min(self.capacity, self.tokens + elapsed * self.rate)
            self.last_time = current_time
            if self.tokens >= 1:
                self.tokens -= 1
                return True
            else:
                return False

Redis 实现限流

(1) 固定窗口计数
-KEY: user_id 或 API_name
local key = KEYS[1]
local limit = tonumber(ARGV[1])  -限制次数
local window = tonumber(ARGV[2])  -窗口大小(秒)
local current = redis.call("INCR", key)
if current == 1 then
    redis.call("EXPIRE", key, window)
end
if current > limit then
    return 0  -拒绝请求
else
    return 1  -允许请求
end
(2) 滑动窗口计数(60秒窗口,1秒粒度)
local key = KEYS[1]
local limit = tonumber(ARGV[1])
local window = 60  -总窗口时间(秒)
local interval = 1  -时间片段长度(秒)
-当前时间戳对应的桶索引
local current_bucket = math.floor(tonumber(ARGV[2]) / interval) % window
-删除过期的桶
for i=1, window do
    if i ~= current_bucket then
        redis.call("ZREM", key, i)
    end
end
-增加当前桶的计数
redis.call("ZINCRBY", key, 1, current_bucket)
-设置过期时间(清理旧数据)
redis.call("PEXPIRE", key, window * interval)
-计算最近窗口的总请求数
local total = 0
for i=1, window do
    local count = tonumber(redis.call("ZSCORE", key, i)) or 0
    total = total + count
    if total > limit then break end
end
if total > limit then
    return 0  -拒绝请求
else
    return 1  -允许请求
end

分布式环境实现

方案 特点 适用场景
Redis 共享内存状态,支持集群 多实例服务限流
Nginx 配置简单,高性能 单点入口限流
本地内存+分布式协调 低延迟,需 ZK/ETCD 高可用要求场景

相关问题与解答

Q1: 如何动态调整 API 限流策略?

A1:可通过以下方式实现动态调整:

  1. 配置中心:使用 etcd、Consul 或 Redis 存储限流参数,运行时动态读取。
  2. 自适应算法:根据历史流量自动调整速率(如漏桶算法动态调整流出速率)。
  3. 分级策略:针对不同用户/IP 设置不同限额(如普通用户 100次/分钟,VIP用户 1000次/分钟)。

Q2: 如何处理高并发下的限流冲突?

A2:优化方案包括:

api 调用次数限制实现

  1. 原子操作:使用 Redis INCR/ZINCRBY 等原子指令避免竞态条件。
  2. 异步队列:将限流判断前置到队列阶段(如使用 Kafka 削峰)。
  3. 本地缓存:结合本地令牌桶(如 Guava RateLimiter)

以上就是关于“api 调用次数限制实现”的问题,朋友们可以点击主页了解更多内容,希望可以够帮助大家!

【版权声明】:本站所有内容均来自网络,若无意侵犯到您的权利,请及时与我们联系将尽快删除相关内容!

(0)
热舞的头像热舞
上一篇 2025-05-09 07:03
下一篇 2025-05-09 07:09

相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

联系我们

QQ-14239236

在线咨询: QQ交谈

邮件:asy@cxas.com

工作时间:周一至周五,9:30-18:30,节假日休息

关注微信