Junhc

岂止于博客

限流之漏桶算法与令牌桶算法

在开发高并发系统时有三把利器用来保护系统:缓存、降级和限流

缓存:缓存的目的是提升系统访问速度和增大系统处理容量

降级:降级是当服务器压力剧增的情况下,根据当前业务情况及流量对一些服务和页面有策略的降级,以此释放服务器资源以保证核心任务的正常运行

限流:限流的目的是通过对并发访问/请求进行限速,或者对一个时间窗口内的请求进行限速来保护系统,一旦达到限制速率则可以拒绝服务、排队或等待、降级等处理

漏桶算法

漏斗有一个进水口 和 一个出水口,出水口以一定速率出水,并且有一个最大出水速率:

在漏斗中没有水的时候,

  • 如果进水速率小于等于最大出水速率,那么,出水速率等于进水速率,此时,不会积水

  • 如果进水速率大于最大出水速率,那么,漏斗以最大速率出水,此时,多余的水会积在漏斗中

在漏斗中有水的时候,出水口以最大速率出水

  • 如果漏斗未满,且有进水的话,那么这些水会积在漏斗中

  • 如果漏斗已满,且有进水的话,那么这些水会溢出到漏斗之外

令牌桶算法

对于很多应用场景来说,除了要求能够限制数据的平均传输速率外,还要求允许某种程度的突发传输。这时候漏桶算法可能就不合适了,令牌桶算法更为适合。

令牌桶算法的原理是系统以恒定的速率产生令牌,然后把令牌放到令牌桶中,令牌桶有一个容量,当令牌桶满了的时候,再向其中放令牌,那么多余的令牌会被丢弃;当想要处理一个请求的时候,需要从令牌桶中取出一个令牌,如果此时令牌桶中没有令牌,那么则拒绝该请求。

RateLimiter用法
ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(100));
// 指定每秒放1个令牌
RateLimiter limiter = RateLimiter.create(1);
for (int i = 1; i < 50; i++) {
    // 请求RateLimiter, 超过permits会被阻塞
    //acquire(int permits)函数主要用于获取permits个令牌,并计算需要等待多长时间,进而挂起等待,并将该值返回
    Double acquire = null;
    if (i == 1) {
        acquire = limiter.acquire(1);
    } else if (i == 2) {
        acquire = limiter.acquire(10);
    } else if (i == 3) {
        acquire = limiter.acquire(2);
    } else if (i == 4) {
        acquire = limiter.acquire(20);
    } else {
        acquire = limiter.acquire(2);
    }
    executorService.submit(new Task("获取令牌成功,获取耗:" + acquire + " 第 " + i + " 个任务执行"));
}
...
static class Task implements Runnable {
    String str;
    public Task(String str) {
        this.str = str;
    }

    @Override
    public void run() {
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
        System.out.println(sdf.format(new Date()) + " | " + Thread.currentThread().getName() + str);
    }
}
2019-07-06 16:48:32.495 | pool-1-thread-1获取令牌成功,获取耗:0.0 第 1 个任务执行
2019-07-06 16:48:33.437 | pool-1-thread-2获取令牌成功,获取耗:0.98403 第 2 个任务执行
2019-07-06 16:48:43.434 | pool-1-thread-3获取令牌成功,获取耗:9.993843 第 3 个任务执行
2019-07-06 16:48:45.434 | pool-1-thread-4获取令牌成功,获取耗:1.996439 第 4 个任务执行
2019-07-06 16:49:05.436 | pool-1-thread-5获取令牌成功,获取耗:19.997107 第 5 个任务执行
2019-07-06 16:49:07.434 | pool-1-thread-6获取令牌成功,获取耗:1.995649 第 6 个任务执行
2019-07-06 16:49:09.432 | pool-1-thread-7获取令牌成功,获取耗:1.997342 第 7 个任务执行
2019-07-06 16:49:11.432 | pool-1-thread-8获取令牌成功,获取耗:1.999978 第 8 个任务执行
2019-07-06 16:49:13.433 | pool-1-thread-9获取令牌成功,获取耗:1.999568 第 9 个任务执行
2019-07-06 16:49:15.439 | pool-1-thread-10获取令牌成功,获取耗:1.999212 第 10 个任务执行
2019-07-06 16:49:17.435 | pool-1-thread-11获取令牌成功,获取耗:1.993059 第 11 个任务执行
2019-07-06 16:49:19.434 | pool-1-thread-12获取令牌成功,获取耗:1.99708 第 12 个任务执行
分布式限流Redis+Lua
local key = "rate.limit:" .. KEYS[1] -- 限流KEY
local limit = tonumber(ARGV[1])      -- 限流大小
local current = tonumber(redis.call('get', key) or "0") + 1
if current > limit then              -- 如果超出限流大小
   return 0
else                                 -- 请求数+1,并设置2秒过期
   redis.call("INCRBY", key, "1")
   redis.call("EXPIRE", key, "2")
   return current
end
-- https://github.com/junhc/rate-limiter
-- 判断source_str 中是否contains pattern_str
-- @param source_str
-- @param patter_str
local function contains(source_str, sub_str)
    local start_pos, end_pos = string.find(source_str, sub_str);
    if start_pos == nil then
        return false;
    end
    local source_str_len = string.len(source_str);

    if source_str_len == end_pos then
        return true
    elseif string.sub(source_str, end_pos + 1, end_pos + 1) == "," then
        return true
    end
    return false;
end


-- 获取令牌
-- 返回码
-- 0 没有令牌桶配置
-- -1 表示取令牌失败,也就是桶里没有令牌
-- 1 表示取令牌成功
-- @param key 令牌的唯一标识
-- @param permits  请求令牌数量
-- @param curr_mill_second 当前毫秒数
-- @param context 使用令牌的应用标识
local function acquire(key, permits, curr_mill_second, context)
    local rate_limit_info = redis.pcall("HMGET", key, "last_mill_second", "curr_permits", "max_permits", "rate", "apps")
    local last_mill_second = rate_limit_info[1]
    local curr_permits = tonumber(rate_limit_info[2])
    local max_permits = tonumber(rate_limit_info[3])
    local rate = rate_limit_info[4]
    local apps = rate_limit_info[5]

    -- 标识没有配置令牌桶
    if type(apps) == 'boolean' or apps == nil or not contains(apps, context) then
        return 0
    end


    local local_curr_permits = max_permits;


    -- 令牌桶刚刚创建,上一次获取令牌的毫秒数为空
    -- 根据和上一次向桶里添加令牌的时间和当前时间差,触发式往桶里添加令牌,并且更新上一次向桶里添加令牌的时间
    -- 如果向桶里添加的令牌数不足一个,则不更新上一次向桶里添加令牌的时间
    if (type(last_mill_second) ~= 'boolean'  and last_mill_second ~= nil) then
        local reverse_permits = math.floor(((curr_mill_second - last_mill_second) / 1000) * rate)
        local expect_curr_permits = reverse_permits + curr_permits;
        local_curr_permits = math.min(expect_curr_permits, max_permits);

        -- 大于0表示不是第一次获取令牌,也没有向桶里添加令牌
        if (reverse_permits > 0) then
            redis.pcall("HSET", key, "last_mill_second", curr_mill_second)
        end
    else
        redis.pcall("HSET", key, "last_mill_second", curr_mill_second)
    end


    local result = -1
    if (local_curr_permits - permits >= 0) then
        result = 1
        redis.pcall("HSET", key, "curr_permits", local_curr_permits - permits)
    else
        redis.pcall("HSET", key, "curr_permits", local_curr_permits)
    end

    return result
end



-- 初始化令牌桶配置
-- @param key 令牌的唯一标识
-- @param max_permits 桶大小
-- @param rate  向桶里添加令牌的速率
-- @param apps  可以使用令牌桶的应用列表,应用之前用逗号分隔
local function init(key, max_permits, rate, apps)
    local rate_limit_info = redis.pcall("HMGET", key, "last_mill_second", "curr_permits", "max_permits", "rate", "apps")
    local org_max_permits = tonumber(rate_limit_info[3])
    local org_rate = rate_limit_info[4]
    local org_apps = rate_limit_info[5]

    if (org_max_permits == nil) or (apps ~= org_apps or rate ~= org_rate or max_permits ~= org_max_permits) then
        redis.pcall("HMSET", key, "max_permits", max_permits, "rate", rate, "curr_permits", max_permits, "apps", apps)
    end
    return 1;
end


-- 删除令牌桶
local function delete(key)
    redis.pcall("DEL", key)
    return 1;
end


local key = KEYS[1]
local method = ARGV[1]

if method == 'acquire' then
    return acquire(key, ARGV[2], ARGV[3], ARGV[4])
elseif method == 'init' then
    return init(key, ARGV[2], ARGV[3], ARGV[4])
elseif method == 'delete' then
    return delete(key)
else
    -- ignore
end
参考资料