限流之漏桶算法与令牌桶算法
在开发高并发系统时有三把利器用来保护系统:缓存、降级和限流
缓存:缓存的目的是提升系统访问速度和增大系统处理容量
降级:降级是当服务器压力剧增的情况下,根据当前业务情况及流量对一些服务和页面有策略的降级,以此释放服务器资源以保证核心任务的正常运行
限流:限流的目的是通过对并发访问/请求进行限速,或者对一个时间窗口内的请求进行限速来保护系统,一旦达到限制速率则可以拒绝服务、排队或等待、降级等处理
漏桶算法
漏斗有一个进水口 和 一个出水口,出水口以一定速率出水,并且有一个最大出水速率:
在漏斗中没有水的时候,
-
如果进水速率小于等于最大出水速率,那么,出水速率等于进水速率,此时,不会积水
-
如果进水速率大于最大出水速率,那么,漏斗以最大速率出水,此时,多余的水会积在漏斗中
在漏斗中有水的时候,出水口以最大速率出水
-
如果漏斗未满,且有进水的话,那么这些水会积在漏斗中
-
如果漏斗已满,且有进水的话,那么这些水会溢出到漏斗之外
令牌桶算法
对于很多应用场景来说,除了要求能够限制数据的平均传输速率外,还要求允许某种程度的突发传输
。这时候漏桶算法可能就不合适了,令牌桶算法更为适合。
令牌桶算法的原理是系统以恒定的速率产生令牌,然后把令牌放到令牌桶中,令牌桶有一个容量,当令牌桶满了的时候,再向其中放令牌,那么多余的令牌会被丢弃;当想要处理一个请求的时候,需要从令牌桶中取出一个令牌,如果此时令牌桶中没有令牌,那么则拒绝该请求。
RateLimiter
用法
ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(100));
// 指定每秒放1个令牌
RateLimiter limiter = RateLimiter.create(1);
for (int i = 1; i < 50; i++) {
// 请求RateLimiter, 超过permits会被阻塞
//acquire(int permits)函数主要用于获取permits个令牌,并计算需要等待多长时间,进而挂起等待,并将该值返回
Double acquire = null;
if (i == 1) {
acquire = limiter.acquire(1);
} else if (i == 2) {
acquire = limiter.acquire(10);
} else if (i == 3) {
acquire = limiter.acquire(2);
} else if (i == 4) {
acquire = limiter.acquire(20);
} else {
acquire = limiter.acquire(2);
}
executorService.submit(new Task("获取令牌成功,获取耗:" + acquire + " 第 " + i + " 个任务执行"));
}
...
static class Task implements Runnable {
String str;
public Task(String str) {
this.str = str;
}
@Override
public void run() {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
System.out.println(sdf.format(new Date()) + " | " + Thread.currentThread().getName() + str);
}
}
2019-07-06 16:48:32.495 | pool-1-thread-1获取令牌成功,获取耗:0.0 第 1 个任务执行
2019-07-06 16:48:33.437 | pool-1-thread-2获取令牌成功,获取耗:0.98403 第 2 个任务执行
2019-07-06 16:48:43.434 | pool-1-thread-3获取令牌成功,获取耗:9.993843 第 3 个任务执行
2019-07-06 16:48:45.434 | pool-1-thread-4获取令牌成功,获取耗:1.996439 第 4 个任务执行
2019-07-06 16:49:05.436 | pool-1-thread-5获取令牌成功,获取耗:19.997107 第 5 个任务执行
2019-07-06 16:49:07.434 | pool-1-thread-6获取令牌成功,获取耗:1.995649 第 6 个任务执行
2019-07-06 16:49:09.432 | pool-1-thread-7获取令牌成功,获取耗:1.997342 第 7 个任务执行
2019-07-06 16:49:11.432 | pool-1-thread-8获取令牌成功,获取耗:1.999978 第 8 个任务执行
2019-07-06 16:49:13.433 | pool-1-thread-9获取令牌成功,获取耗:1.999568 第 9 个任务执行
2019-07-06 16:49:15.439 | pool-1-thread-10获取令牌成功,获取耗:1.999212 第 10 个任务执行
2019-07-06 16:49:17.435 | pool-1-thread-11获取令牌成功,获取耗:1.993059 第 11 个任务执行
2019-07-06 16:49:19.434 | pool-1-thread-12获取令牌成功,获取耗:1.99708 第 12 个任务执行
分布式限流Redis+Lua
local key = "rate.limit:" .. KEYS[1] -- 限流KEY
local limit = tonumber(ARGV[1]) -- 限流大小
local current = tonumber(redis.call('get', key) or "0") + 1
if current > limit then -- 如果超出限流大小
return 0
else -- 请求数+1,并设置2秒过期
redis.call("INCRBY", key, "1")
redis.call("EXPIRE", key, "2")
return current
end
-- https://github.com/junhc/rate-limiter
-- 判断source_str 中是否contains pattern_str
-- @param source_str
-- @param patter_str
local function contains(source_str, sub_str)
local start_pos, end_pos = string.find(source_str, sub_str);
if start_pos == nil then
return false;
end
local source_str_len = string.len(source_str);
if source_str_len == end_pos then
return true
elseif string.sub(source_str, end_pos + 1, end_pos + 1) == "," then
return true
end
return false;
end
-- 获取令牌
-- 返回码
-- 0 没有令牌桶配置
-- -1 表示取令牌失败,也就是桶里没有令牌
-- 1 表示取令牌成功
-- @param key 令牌的唯一标识
-- @param permits 请求令牌数量
-- @param curr_mill_second 当前毫秒数
-- @param context 使用令牌的应用标识
local function acquire(key, permits, curr_mill_second, context)
local rate_limit_info = redis.pcall("HMGET", key, "last_mill_second", "curr_permits", "max_permits", "rate", "apps")
local last_mill_second = rate_limit_info[1]
local curr_permits = tonumber(rate_limit_info[2])
local max_permits = tonumber(rate_limit_info[3])
local rate = rate_limit_info[4]
local apps = rate_limit_info[5]
-- 标识没有配置令牌桶
if type(apps) == 'boolean' or apps == nil or not contains(apps, context) then
return 0
end
local local_curr_permits = max_permits;
-- 令牌桶刚刚创建,上一次获取令牌的毫秒数为空
-- 根据和上一次向桶里添加令牌的时间和当前时间差,触发式往桶里添加令牌,并且更新上一次向桶里添加令牌的时间
-- 如果向桶里添加的令牌数不足一个,则不更新上一次向桶里添加令牌的时间
if (type(last_mill_second) ~= 'boolean' and last_mill_second ~= nil) then
local reverse_permits = math.floor(((curr_mill_second - last_mill_second) / 1000) * rate)
local expect_curr_permits = reverse_permits + curr_permits;
local_curr_permits = math.min(expect_curr_permits, max_permits);
-- 大于0表示不是第一次获取令牌,也没有向桶里添加令牌
if (reverse_permits > 0) then
redis.pcall("HSET", key, "last_mill_second", curr_mill_second)
end
else
redis.pcall("HSET", key, "last_mill_second", curr_mill_second)
end
local result = -1
if (local_curr_permits - permits >= 0) then
result = 1
redis.pcall("HSET", key, "curr_permits", local_curr_permits - permits)
else
redis.pcall("HSET", key, "curr_permits", local_curr_permits)
end
return result
end
-- 初始化令牌桶配置
-- @param key 令牌的唯一标识
-- @param max_permits 桶大小
-- @param rate 向桶里添加令牌的速率
-- @param apps 可以使用令牌桶的应用列表,应用之前用逗号分隔
local function init(key, max_permits, rate, apps)
local rate_limit_info = redis.pcall("HMGET", key, "last_mill_second", "curr_permits", "max_permits", "rate", "apps")
local org_max_permits = tonumber(rate_limit_info[3])
local org_rate = rate_limit_info[4]
local org_apps = rate_limit_info[5]
if (org_max_permits == nil) or (apps ~= org_apps or rate ~= org_rate or max_permits ~= org_max_permits) then
redis.pcall("HMSET", key, "max_permits", max_permits, "rate", rate, "curr_permits", max_permits, "apps", apps)
end
return 1;
end
-- 删除令牌桶
local function delete(key)
redis.pcall("DEL", key)
return 1;
end
local key = KEYS[1]
local method = ARGV[1]
if method == 'acquire' then
return acquire(key, ARGV[2], ARGV[3], ARGV[4])
elseif method == 'init' then
return init(key, ARGV[2], ARGV[3], ARGV[4])
elseif method == 'delete' then
return delete(key)
else
-- ignore
end