欢迎来访,清水煮面
联系jacob95047@gmail.com

服务端限流算法

服务端限流算法是指一种用于控制服务端请求流量的技术方案,通过在服务端对请求进行限制和控制,避免因客户端请求过多而导致服务端资源的浪费和系统崩溃等问题。限流算法是分布式系统中常用的一种解决方案,可以有效提高系统的可用性和可靠性。常见的限流算法有:计数器、滑动窗口、漏桶、令牌桶。

计数器算法

计数器算法是限流算法中最简单且易于实现的一种算法。例如,我们需要限制接口A在1分钟内的访问次数不能超过100次。为了实现这个目标,我们可以在每一分钟重置计数器counter,当请求到来时,如果距离上次重置计数器的时间在1分钟以内,我们就将counter加1。如果1分钟内counter的值大于100,说明请求过多,需要进行限流控制。

这个算法虽然简单,但是存在临界问题。假设有一个用户在0:59时发送了100个请求,接着在1:00时又瞬间发送了100个请求。这意味着该用户在仅1秒的时间内瞬间发送了200个请求。换言之,该用户在1秒内请求量达到了限流上限的2倍,这将会对服务器产生瞬时压力。

滑动窗口

滑动窗口算法,又称为rolling window算法,可以解决计数算法中的临界问题。类似于TCP中的滑动窗口算法,我们在计算counter值时,并不是计算0:00~1:00这样固定的时间窗口内的请求总数,而是计算距离当前最近1分钟内的请求总量。这样,就可以避免请求量在短时间内突破限流上限的问题。
我们将1分钟的时间按照每10秒分割为6个格子,并为每个格子分别设置一个独立的计数器,组成一个count_lst。当新的请求到来时,首先根据上次请求时间与当前时间的差值移动窗口,此时新请求所属的时间格子一定是最后一个10秒的格子,因此可以直接将计数器count_lst[-1]加1。在滑动窗口中,我们只统计最近1分钟内的计数器之和,来进行请求的限流控制。
对应的python参考代码如下:

class SlidingWindow(object):
    def __init__(self, size, limit):
        self.size = size  # 时间窗口的大小
        self.limit = limit  # 时间窗口内允许通过的请求数量
        self.window = []  # 时间窗口
        self.last_timestamp = time.time()  # 上一次请求的时间戳

    def is_allowed(self):
        # 获取当前时间戳
        now = time.time()

        # 移除时间窗口中时间超过规定大小的记录
        self.window = [x for x in self.window if x > now - self.size]

        # 判断时间窗口内请求数是否超过限制
        if len(self.window) < self.limit:
            # 将当前请求的时间戳加入时间窗口
            self.window.append(now)
            # 更新上一次请求的时间戳
            self.last_timestamp = now
            return True
        else:
            return False

令牌桶算法

滑动窗口算法虽然能够解决瞬时请求过大的问题,但并不保证请求在时间窗口内均匀分布。相比之下,令牌桶算法可以限制请求速率。在令牌桶算法中,我们会为每个请求发放一个令牌,只有在获取到合法令牌的情况下,请求才不会被限流。而令牌桶算法会限制最大令牌数量即桶的容量,并且在令牌桶没有满的情况下,会定时生成一定数量的令牌加入到桶中。每个请求都会消耗一些令牌,如果桶中的令牌数量小于单次请求消耗的令牌数量,则请求会被限制。
该算法的缺点也是有的。假如系统上线时没有预热,那么可能会出现由于此时桶中还没有令牌,而导致请求被误杀的情况,而且,如果流量波动过于剧烈,可能需要调整令牌生成速率和桶的容量,才能更好地适应流量的变化。
对应的python参考代码如下:

class TokenBucket(object):
    def __init__(self, capacity, rate):
        self.capacity = float(capacity)     # 桶的容量
        self.rate = float(rate)             # 令牌生成速率(个/秒)
        self.tokens = self.capacity         # 当前桶中令牌数量
        self.last_time = time.time()        # 上一次取令牌的时间

    def get_tokens(self):
        """获取令牌"""
        now = time.time()
        if now > self.last_time:
            # 计算增加的令牌数量
            delta = (now - self.last_time) * self.rate
            # 令牌数量不得超过桶的容量
            self.tokens = min(self.capacity, self.tokens + delta)
            self.last_time = now
        # 是否有令牌可用
        if self.tokens >= 1:
            self.tokens -= 1
            return True
        else:
            return False

漏桶算法

漏桶算法的基本思想是,将请求按照一定的速率流出(即底部请求被处理的速率),如果请求进入的速率高于流出速率,那么请求就会积累在漏桶中,当漏桶溢出时,请求就会被丢弃。漏桶中由于请求是暂存在桶中的,所以请求什么时候能被处理,则是有延时的。
对应的python参考代码如下:

class LeakyBucket(object):
    def __init__(self, capacity, rate):
        self.capacity = capacity  # 漏桶的容量
        self.rate = rate  # 水流出的速率(即桶底部请求被处理的速率)
        self.water_amount = 0  # 桶中当前的水量
        self.last_leak_time = time.time()  # 上一次漏水的时间点

    def is_allowed(self):
        now = time.time()
        # 先漏水,计算当前桶中的水量
        self.water_amount = max(0, self.water_amount - (now - self.last_leak_time) * self.rate)
        # 更新上一次漏水的时间点
        self.last_leak_time = now

        if self.water_amount < self.capacity:
            # 桶未满,允许通过
            self.water_amount += 1
            return True
        else:
            # 桶已满,限流
            return False

is_allowed() 方法判断当前请求是否允许通过。在该方法中,首先根据当前时间和上一次漏水的时间点,计算出当前桶中的水量。然后,如果桶未满,将桶中的水量加 1,并返回 True 表示允许通过;如果桶已满,直接返回 False 表示限流。
注意,当请求被限流时,桶中的水量不会减少,因为在该方法中只有在有请求通过时才会加水,而没有请求通过时不会漏水,也就不会减少水量。因此,如果请求一直被限流,桶中的水量会一直保持不变。

分布式集群下的限流

相比单一机器或者进程的限流服务,分布式服务器架构中的限流服务相对复杂一些。本质上单机限流和分布式限流的区别其实在于 “计数器” 存放的位置。针对单机限流,我们可以直接将计数器存放到内存中,分布式限流通常需要使用分布式缓存或分布式锁来实现,比如redis。
下面提供一个分布式的令牌桶限流算法,利用python和redis来实现:

import time
import redis

class TokenBucket(object):
    def __init__(self, capacity, fill_rate, redis_host, redis_port, redis_password):
        self.capacity = float(capacity)  # 桶的容量
        self.tokens = float(capacity)  # 当前令牌数量
        self.fill_rate = float(fill_rate)  # 令牌填充速率
        self.redis_conn = redis.Redis(host=redis_host, port=redis_port, password=redis_password)

    def get_tokens(self):
        # 获取当前时间戳
        now = time.time()

        # 计算需要添加的令牌数量
        delta = self.fill_rate * (now - self.redis_conn.get('last_time'))

        # 更新最后填充时间
        self.redis_conn.set('last_time', now)

        # 限制令牌数量不超过桶的容量
        self.tokens = min(self.capacity, self.tokens + delta)

        return self.tokens

    def reduce_tokens(self, count=1):
        # 获取当前令牌数量
        tokens = self.get_tokens()

        # 判断令牌数量是否足够
        if tokens >= count:
            # 减少令牌数量
            self.tokens = tokens - count
            return True
        else:
            return False

赞(0)
未经允许不得转载(与我联系):清水面 » 服务端限流算法

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址

Warning: error_log(/usr/local/lighthouse/softwares/wordpress/wp-content/plugins/spider-analyser/#log/log-1915.txt): failed to open stream: No such file or directory in /usr/local/lighthouse/softwares/wordpress/wp-content/plugins/spider-analyser/spider.class.php on line 2900