四种常用限流算法对比

菜皮日记 / 104 / 2023-09-09 11:52:29

ChatGPT 可用网址,仅供交流学习使用,如对您有所帮助,请收藏并推荐给需要的朋友。
https://ckai.xyz

Leaky Bucket 漏桶

漏桶可理解为是一个限定容量的请求队列。

想象有一个桶,有水(指请求或数据)从上面流进来,水从桶下面的一个孔流出来。水流进桶的速度可以是随机的,但是水流出桶的速度是恒定的。
当水流进桶的速度较慢,桶不会被填满,请求就可以被处理。
当水流进桶的速度过快时,桶会逐渐被填满,当水超过桶的容量就会溢出,即被丢弃。

class LeakyBucketRateLimiter(object):
    def __init__(self, capacity, leak_rate):
        # 桶容量
        self.capacity = capacity
        # 水流出的速率 每秒n个单位
        self.rate = leak_rate
        # 上一次漏水的时间,用来跟本次时间做差值,乘以水流速率计算出这段时间内流出了多少水
        self.last_leak_time = int(time.time())
        # 桶中当前剩余的水
        self.remain_water = 0

    def allow_request(self, require_units=1):
        now = int(time.time())
        leaked_water = (now - self.last_leak_time) * self.rate
        self.remain_water = max(0, self.remain_water - leaked_water)
        self.last_leak_time = now

        print(f"刚刚流出{leaked_water}, 桶容量已达{self.remain_water}")

        if self.remain_water + require_units <= self.capacity:
            self.remain_water += require_units
            return True

        return False

if __name__ == "__main__":
    leaky_bucket = LeakyBucketRateLimiter(3, 1)
    for i in range(10):
        print(f"限流结果:", leaky_bucket.allow_request())
        print("----")
        s = random.randint(1, 5) / 10
        time.sleep(s)

结果

刚刚流出0, 桶容量已达0
限流结果: True
----
刚刚流出0, 桶容量已达1
限流结果: True
----
刚刚流出1, 桶容量已达1
限流结果: True
----
刚刚流出0, 桶容量已达2
限流结果: True
----
刚刚流出1, 桶容量已达2
限流结果: True
----
刚刚流出0, 桶容量已达3
限流结果: False
----
刚刚流出0, 桶容量已达3
限流结果: False
----
刚刚流出1, 桶容量已达2
限流结果: True
----
刚刚流出0, 桶容量已达3
限流结果: False
----
刚刚流出0, 桶容量已达3
限流结果: False
----

Token Bucket 令牌桶

在令牌桶算法中,系统会以一个固定的速率向桶中添加令牌。

当有请求(或数据包)到来时,会从桶中删除一定数量的令牌。如果桶中有足够的令牌,请求就可以立即处理;
如果桶中没有足够的令牌,请求就会被阻塞或丢弃,具体行为取决于具体的实现。
桶有容量限制,如果添加令牌时桶已满,新的令牌就会被丢弃。

class TokenBucketRateLimiter(object):
    def __init__(self, capacity, refill_rate):
        # 容量
        self.capacity = capacity
        # 产生令牌的速率
        self.refill_rate = refill_rate
        # 当前已有的可用令牌数
        self.current_tokens = capacity
        # 上一次产生令牌的时间,用来与当前时间计算并乘以速率,得出这段时间内产生了多少令牌
        self.last_refill_time = int(time.time())

    def allow_request(self, token_needs=1):
        now = int(time.time())
        new_tokens = (now - self.last_refill_time) * self.refill_rate
        self.current_tokens += min(self.capacity, new_tokens)
        self.last_refill_time = now

        print(f"刚刚新产生{new_tokens}, 剩余{self.current_tokens}")

        if self.current_tokens >= token_needs:
            self.current_tokens -= token_needs
            return True

        return False

if __name__ == "__main__":
    token_bucket = TokenBucketRateLimiter(3, 1)
    for i in range(10):
        print(f"限流结果:", token_bucket.allow_request())
        print("----")
        s = random.randint(1, 5) / 10
        time.sleep(s)

结果

刚刚新产生0, 剩余3
限流结果: True
----
刚刚新产生0, 剩余2
限流结果: True
----
刚刚新产生0, 剩余1
限流结果: True
----
刚刚新产生1, 剩余1
限流结果: True
----
刚刚新产生0, 剩余0
限流结果: False
----
刚刚新产生0, 剩余0
限流结果: False
----
刚刚新产生1, 剩余1
限流结果: True
----
刚刚新产生0, 剩余0
限流结果: False
----
刚刚新产生1, 剩余1
限流结果: True
----
刚刚新产生0, 剩余0
限流结果: False
----

Fixed Window 固定窗口

在一个固定的时间窗口内,只允许一定数量的请求。

如果在这个时间窗口内的请求已经达到了限制,那么新的请求就会被拒绝,过了当前时间窗口后,会进入下一个时间窗口,并重置窗口内的请求数量,重新计算。

class FixedWindowRateLimiter(object):
    def __init__(self, max_requests, window_size):
        # 单个窗口大小 单位秒
        self.window_size = window_size
        # 单个窗口中最大可处理的请求数量
        self.max_requests = max_requests
        # 当前窗口已处理请求
        self.current_request = 0
        # 窗口计算起始点
        self.last_window_start = int(time.time())

    def allow_request(self):
        now = int(time.time())

        if now - self.last_window_start >= self.window_size:
            print("窗口刷新")
            self.last_window_start = now
            self.current_request = 0

        print(f"当前窗口已处理请求 {self.current_request} 个")

        if self.current_request < self.max_requests:
            self.current_request += 1
            return True

        return False

if __name__ == "__main__":
    limiter = FixedWindowRateLimiter(3, 2)
    for i in range(10):
        print(f"限流结果:", limiter.allow_request())
        print("----")
        s = random.randint(1, 5) / 10
        time.sleep(s)

结果

当前窗口已处理请求 0 个
限流结果: True
----
当前窗口已处理请求 1 个
限流结果: True
----
当前窗口已处理请求 2 个
限流结果: True
----
当前窗口已处理请求 3 个
限流结果: False
----
当前窗口已处理请求 3 个
限流结果: False
----
当前窗口已处理请求 3 个
限流结果: False
----
当前窗口已处理请求 3 个
限流结果: False
----
当前窗口已处理请求 3 个
限流结果: False
----
当前窗口已处理请求 3 个
限流结果: False
----
窗口刷新
当前窗口已处理请求 0 个
限流结果: True
----

Sliding Window 滑动窗口

在固定窗口限流算法中,如果大量请求在一个时间窗口的边界附近到达,可能会造成瞬时的流量突增。
滑动窗口随着时间的推移,动态统计请求量,避免了在窗口边界附近的流量突增。

class SlidingWindowRateLimiter(object):
    def __init__(self, max_requests, window_size):
        # 最大请求量
        self.max_requests = max_requests
        # 窗口大小
        self.window_size = window_size
        # 存放每个请求的时间
        self.requests_list = collections.deque()

    def allow_request(self):
        now = int(time.time())
        while self.requests_list and self.requests_list[0] <= (now - self.window_size):
            self.requests_list.popleft()

        print(f"当前窗口已处理请求 {len(self.requests_list)} 个")

        if len(self.requests_list) < self.max_requests:
            self.requests_list.append(now)
            return True

        return False

if __name__ == "__main__":
    limiter = SlidingWindowRateLimiter(3, 2)
    for i in range(10):
        print(f"限流结果:", limiter.allow_request())
        print('----')
        s = random.randint(1, 5) / 10
        time.sleep(s)

结果

当前窗口已处理请求 0 个
限流结果: True
----
当前窗口已处理请求 1 个
限流结果: True
----
当前窗口已处理请求 2 个
限流结果: True
----
当前窗口已处理请求 3 个
限流结果: False
----
当前窗口已处理请求 3 个
限流结果: False
----
当前窗口已处理请求 2 个
限流结果: True
----
当前窗口已处理请求 3 个
限流结果: False
----
当前窗口已处理请求 3 个
限流结果: False
----
当前窗口已处理请求 1 个
限流结果: True
----
当前窗口已处理请求 2 个
限流结果: True
----

还有一种方式,不需要记录具体每个请求的时间点,而通过计算滑动窗口与固定窗口之间时间的偏移,估算出滑动窗口中请求量,这个方式不太准确,但节省了请求时间点的存储成本。

图中所示,已知设窗口大小为 1 小时即 60 分钟,每个窗口内可处理 100 请求,当前时间为 1:15。

在 [12:00–1:00) 的整个绿色窗口中一共有 84 个请求,在 [1:00 to 1:15) 的黄色窗口中,15 分钟内已经处理了 36 个请求,如何计算当前窗口剩余容量?

通过计算滑动窗口与前一窗口重叠部分占比,来估算前一窗口中被占用的容量,(60分钟-15分钟)/60分钟 表示滑动窗口减去黄色当前窗口后,与绿色窗口的重叠,占绿色窗口整体的百分比,也就估算出重叠部分的请求量在总共 84 个请求的占比,得出 63 个请求,再加上当前窗口的 36 请求,一共是 99 请求,那么当前滑动窗口的剩余容量就是 100 - 99 = 1 个请求容量。

class SlidingWindowRateLimiterByEstimate(object):
    def __init__(self, max_requests, window_size):
        self.max_requests = max_requests
        self.window_size = window_size

        # 当前窗口起始时间
        self.current_window_start = time.time()
        # 前一个窗口请求量
        self.pre_count = 0
        # 当前窗口请求量
        self.current_count = 0

    def allow_request(self):
        now = time.time()
        if (now - self.current_window_start) > self.window_size:
            # 滑过完整的一个窗口,重置
            self.current_window_start = now
            self.pre_count = self.current_count
            self.current_count = 0

        # 通过计算滑动窗口与前一个窗口重叠部分,占整个窗口的占比,估算重叠部分的请求量
        # 再加上当前窗口的请求量
        # 得出滑动窗口中的请求量
        estimate_count = (self.pre_count * (self.window_size - (now - self.current_window_start)) / self.window_size) + self.current_count
        print(f"估算请求量:{estimate_count}")

        if estimate_count > self.max_requests:
            return False

        self.current_count += 1
        return True

if __name__ == "__main__":
    limiter = SlidingWindowRateLimiterByEstimate(3, 2)
    for i in range(10):
        print(f"限流结果:", limiter.allow_request())
        print('----')
        s = random.randint(1, 5) / 10
        time.sleep(s)

结果

估算请求量:0.0
限流结果: True
----
估算请求量:1.0
限流结果: True
----
估算请求量:2.0
限流结果: True
----
估算请求量:3.0
限流结果: True
----
估算请求量:4.0
限流结果: False
----
估算请求量:4.0
限流结果: False
----
估算请求量:4.0
限流结果: False
----
估算请求量:3.3901939392089844
限流结果: False
----
估算请求量:2.775090217590332
限流结果: True
----
估算请求量:3.366755485534668
限流结果: False
----

参考:

本文由mdnice多平台发布


作者
菜皮日记
许可协议
CC BY 4.0
发布于
2023-09-09
修改于
2024-07-14
Bonnie image
尚未登录