前言
一個業務型的服務,被open接口后,遭遇并發掃數據,于是要做限流操作。一直固執的認為,業務API和OpenAPI要分開處理,或許因為起初接入其他企業ERP系統都是走較為規范的OpenAPI,始終對于這種開發系統業務API的做法感覺不好。
窗口限流
需求是要在Django的一個工程里做限流,倘若是rest_framework的View也好辦,直接就提供了限流 rest_framework throttling
可參照文檔設置。不能直接使用設置的原因是,面對是Django做的一個服務,然后proxy至別的服務,工程僅僅承擔一個轉發的職責。如果在LB上限流,無法區分來源IP,只能是總量限流,很可能導致一旦被限流,正常平臺訪問被拒絕。所以我需要的限流需求非常清晰,首先限流的粒度是需要先知道訪問來源的真實IP,在一定窗口時間內的訪問次數,諸如 100/min。
rest_framework 提供了比錯的實現思路,類似實現一套打點記錄的,片段存儲,打點記錄為需要限制的實時條件。就以上述 100/min為例,首先一分鐘之內,IP1沒有任何訪問,則沒有任何限制數據,redis的過期時間,滿足了此數據設置,再有,1分鐘之內,要滿足次數不超過100次,維護一個數組,長度超過100則意味超過訪問限制,數組中記錄請求每次訪問的時刻值,窗口滑動就是淘汰掉連續訪問中,以當前時刻后置一分鐘之前的訪問打點,保證了數組窗口永遠都是以當前最近請求進入1min之內的記錄點。
# throttle setting
THROTTLE_RATES
=
{
'resource1'
:
'100/min'
,
'resource2'
:
'20/second'
}
# throttle class
class
WindowAccessThrottle
:
cache
=
Cache
(
)
timer
=
time
.
time
def
__init__
(
self
,
request
,
view
,
scope
)
:
self
.
rate
=
settings
.
THROTTLE_RATES
[
scope
]
self
.
request
=
request
self
.
view
=
view
self
.
key
=
self
.
get_cache_key
(
)
def
parse_rate
(
self
)
:
num
,
period
=
self
.
rate
.
split
(
'/'
)
num_requests
=
int
(
num
)
duration
=
{
's'
:
1
,
'm'
:
60
,
'h'
:
3600
,
'd'
:
86400
}
[
period
[
0
]
]
return
num_requests
,
duration
def
get_cache_key
(
self
)
:
host
=
self
.
request
.
META
[
'HTTP_X_FORWARDED_FOR'
]
\
if
self
.
request
.
META
.
get
(
'HTTP_X_FORWARDED_FOR'
,
None
)
else
\
self
.
request
.
META
[
'REMOTE_ADDR'
]
return
'throttle:{}:{}'
.
format
(
host
,
self
.
view
.
__name__
)
def
allow_request
(
self
)
:
history
=
self
.
cache
.
get_value
(
self
.
key
,
[
]
)
now
=
self
.
timer
(
)
num_requests
,
duration
=
self
.
parse_rate
(
)
while
history
and
history
[
-
1
]
<=
now
-
duration
:
history
.
pop
(
)
if
len
(
history
)
>=
num_requests
:
return
False
history
.
insert
(
0
,
now
)
self
.
cache
.
set
(
self
.
key
,
history
,
duration
)
return
True
注意
1,上述示例可根據實際需求修改
2,在做IP級別限定是,如果直接調用request.META[‘REMOTE_ADDR’]獲取的是請求直接過來的IP,實際部署服務多數是經過LB,或者nginx反向代理的,REMOTE_ADDR多數就是前置LB的IP,所以取用HTTP_X_FORWARDED_FOR獲取發起請求的遠端IP。
3,
cache = Cache()
就是一個redis的封裝,稍微實現下
cache.get_value(self.key, [])
對獲取支持默認值
4,使用時類似原生的throttle,在view函數中設置 scope
4,配合Django的中間件,調用判定,大致如下:
from
django
.
urls
import
resolve
'''
實際下面中間件需要根據需求自定義調試,如果只是rest_framework的View可以直接用原生的設定,因為筆者是自己封裝的轉發View,
相當于重新自定義一個完全新的通用視圖,需要重新實現限流
'''
class
ThrottleMiddleware
(
MiddlewareMixin
)
:
def
process_request
(
self
,
request
)
:
resolver
=
resolve
(
request
.
path
)
throttle_scope
=
getattr
(
resolver
.
func
,
'throttle_scope'
,
None
)
throttle
=
WindowAccessThrottle
(
request
,
resolver
.
func
,
throttle_scope
)
if
throttle
.
allow_request
(
)
:
return
else
:
return
HttpResponse
(
)
漏斗限流
上面窗口限流,一定程度上解決了流量猛增的問題,但是以上面 120/min的限流為例,用戶在1分鐘的某一瞬間,120的并發,此種場景,上面的限流器基本沒有作用了,設想能夠在短時間內,既限制訪問的總量,也能限制訪問的頻率至于過高,漏斗限流就非常理想,基本抽象模型:
1,漏斗參數:
- capacity:容量,漏斗大小
- rate:漏斗流出速率,可以用 total和duration計算,一段時間duration內允許通過的總量total
2,當漏斗為空漏斗時:
- 訪問進入的速率 < rate,此時漏斗無積壓,請求一律通過
- 訪問進入的速率 >= rate,此時漏斗中逐漸積壓,且漏斗以rate值不斷流出
3,當漏斗不為空時:
- 出水口以最大速率流出
- 漏斗未滿,會繼續納入
- 漏斗已滿,則會直接溢出,拒絕請求
用漏斗限流實現上述IP限流,示例如下:
THROTTLE_RATES
=
{
'funnel'
:
{
'capacity'
:
15
,
'duration'
:
60
,
# seconds
'total'
:
30
,
}
,
}
class
FunnelThrottle
:
cache
=
CusCache
(
)
timer
=
time
.
time
def
__init__
(
self
,
request
,
view
,
scope
)
:
config
=
settings
.
THROTTLE_RATES
[
scope
]
self
.
rate
=
config
[
'total'
]
/
config
[
'duration'
]
self
.
capacity
=
config
[
'capacity'
]
self
.
duration
=
config
[
'duration'
]
self
.
request
=
request
self
.
view
=
view
self
.
key
=
self
.
get_cache_key
(
)
def
get_cache_key
(
self
)
:
"""
same as WindowAccessThrottle
"""
pass
def
allow_request
(
self
)
:
history
=
self
.
cache
.
get_value
(
self
.
key
,
[
]
)
now
=
self
.
timer
(
)
if
not
history
:
# 空漏斗直接放行
history
.
insert
(
0
,
now
)
self
.
cache
.
set
(
self
.
key
,
history
,
self
.
duration
)
return
True
latest_duration
=
now
-
history
[
0
]
# 距離最近的一次放行時間間隔
leak_count
=
int
(
latest_duration
*
self
.
rate
)
# 由間隔時間和漏斗流速計算此段時間漏斗騰出空間
for
i
in
range
(
leak_count
)
:
if
history
:
history
.
pop
(
)
else
:
break
# 在上述漏斗清理流出空間后,漏斗仍舊滿量,直接判定不可訪問
if
len
(
history
)
>=
self
.
capacity
:
return
False
# 如果可訪問,請求進入漏斗計量
history
.
insert
(
0
,
now
)
self
.
cache
.
set
(
self
.
key
,
history
,
self
.
duration
)
return
True
Note:
1,漏斗限流方式和之前窗口限流所用的數據結構在cache中基本一致,只因判定算法不同,所達到的限流效果,完全不同
2,漏斗限流,進入漏斗計量的點,表示一律放行通過了,只是,在漏斗中會根據下一次訪問進入時間判定該點是否由漏斗的rate失效,而達到容量合理,限制流速的效果
Redis 漏斗限流 (redis-cell)
上述的漏斗限流算法,在Redis的模塊中已經內置實現了一個,具體介紹請參見Github redis-cell詳細介紹 筆者安裝在MacOS上,基本沒有問題:
# 下載mac版本安裝包
https://github.com/brandur/redis-cell/releases
# 解壓
tar
-zxf redis-cell-*.tar.gz
# 復制可執行文件
cp
libredis_cell.dylib /your_redis_server_localtion
# 重啟redis-server,把libredis_cell.dylib加載上
redis-server --loadmodule /path/to/modules/libredis_cell.dylib
安裝重啟后,可以在redis中執行 CL.THROTTLE 命令:
# CL.THROTTLE user123 15 30 60 1和實現算法中的配置類似,user123表示限流key,15: capacity,30: total,60: duration,
127.0.0.1:6379> CL.THROTTLE user123 15 30 60 1
1) (integer) 0 # 0表示允許,1表示拒絕
2) (integer) 16 # 漏斗容量 max_burst + 1 = 15 +1 =16
3) (integer) 15 # 漏斗剩余容量
4) (integer) -1 # 如果被拒絕,多少秒后重試
5) (integer) 2 # 多長時間后漏斗完全漏空
但是redis-cell沒有找到對應的sdk
Python Bound method
# python 3.x
def
func
(
)
:
pass
class
A
:
@
classmethod
def
method_cls
(
cls
)
:
pass
def
method_a
(
self
)
:
pass
class
B
(
A
)
:
pass
a
,
b
=
A
(
)
,
B
(
)
print
(
func
)
#
print
(
a
.
method_a
)
#
<__main__.A object at 0x10ef11978>>
print
(
b
.
method_cls
)
#
>
對于上文中
func
就是一個函數對象,而
method_a
和
method_cls
是歸屬類A的所以,是一個
bound method
,那么如何查看一個
bound method
的歸屬呢?
Python 2.x中提供了 im_func,im_class,im_self三個屬性:
-
im_funcis the function object. -
im_classis the class the method comes from. -
im_selfis the self object the method is bound to.
Python3.x中
-
__func__replaceim_func -
__self__replaceim_self
2.x中的im_class取消
# python 3.x
print
(
a
.
method_a
.
__self__
)
print
(
b
.
method_cls
.
__self__
)
# print(func.__self__) error func 無 __self__
print
(
b
.
method_cls
.
__self__
.
__name__
)
# print(b.method_cls.__self__.__name__) error b.method_cls.__self__是一個實例,無__name__屬性
關于
__name__
和
__qualname__
請參見 PEP 3155
更多文章、技術交流、商務合作、聯系博主
微信掃碼或搜索:z360901061
微信掃一掃加我為好友
QQ號聯系: 360901061
您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元

