Adhesive框架中是分布式組件客戶端首先實現的是基于Json序列化+二進制協議的Memcached客戶端。在本文中會介紹其中的實現細節。
我們先來看一下項目結構:
從這個結構大致可以看出:
1)Memcached只是其中的一個具體實現,這個組件期望提供一個ClientSocket-ClientNode-ClientCluster的基礎實現,以后可以有各種客戶端基于這種結構來實現
2)對于Memcached的實現,其中把協議部分放在的Protocol文件夾中,并且根據協議為每一個請求和響應封裝類型,也就是使用面向對象的方式而不是拼數據包的方式來封裝協議
那么現在首先來介紹基礎結構。從最底部的層次開始,最底部應該是對Socket進行一個封裝,在這里我們實現了一個ClientSocket,主要完成下面功能:
1)封裝Read、Write、Connect、Reset(因為我們實現的是Socket池,所以在Socket使用之后,歸還池之前需要重置)操作
2)封裝Socket基本狀態,包括創建時間、忙碌時間、閑置時間、發生錯誤時的回調方法
?
在ClientSocket之上的一層是ClientNode,也就是一個節點的客戶端,很明顯,這里需要做的是Socket連接池,具體完成的工作有:
1)進行連接池的維護,包括移除空閑超時的Socket、強制結束忙碌時間過長的Socket、補充新的Socket到連接池的下限
2)初始化池、結束池、從池獲取Socket、把使用后的Socket返回池、創建非池Socket
在正常使用的時候,所有Socket都從池中獲取,如果整個Node不可用,那么我們定時創建非池Socket來測試Node是否恢復
?
在ClientNode之上的是ClientCluster,也就是集群,對于需要客戶端進行一致性哈希分發節點的分布式組件來說,這層就很必要了,完成的功能主要有:
1)初始化集群、使用一致性哈希從集群獲得節點、直接獲得ClientSocket
2)在節點出錯的時候進行重新節點分配、嘗試恢復出錯的節點
?
ClientCluster是使用ClientNodeLocator來分配節點的,其中的算法也就是一致性哈希算法。
之前說過節點有權重的概念,在這里也就是通過虛擬節點的數量來設置節點權重,權重越高分配到Key的數量也就會越多。
?
在ClientCluster之上還封裝了一層AbstractClient,也就是直接面向用戶的API入口。
public
abstract
class
AbstractClient<T>
where
T : AbstractClient<T>,
new
()
完成的功能有:
1)保存所有的Cluster,初始化Cluster
2)獲取具體的XXXClient的實現,比如MemcachedClient
?
很明顯,我們的第一個實現MemcachedClient是繼承了AbstractClient:
public
partial
class
MemcachedClient : AbstractClient<MemcachedClient>
在這里使用了部分類,內部的實現都放在了MemcachedClient_Internal.cs中,而對外的API都放在了MemcachedClient.cs中。
?
對于Memcached的二進制協議,我們首先是實現一個頭的格式包:
[StructLayout(LayoutKind.Sequential, Pack = 1)]
internal
struct
Header
{
internal
byte
Magic;
internal
byte
Opcode;
internal
ushort
KeyLength;
internal
byte
ExtraLength;
internal
byte
DataType;
internal
ushort
Reserved;
internal
uint
TotalBodyLength;
internal
uint
Opaque;
internal
ulong
Version;
}
由于我們會直接把結構打包為字節數組,所以這里聲明了結構的內存布局。在Protocol.cs中,我們有一些實用的方法,比如結構和字節數組雙向轉換的實現:
internal
static
T BytesToStruct<T>(
this
byte
[] rawData)
{
T result =
default
(T);
RespectEndianness(
typeof
(T), rawData);
GCHandle handle = GCHandle.Alloc(rawData, GCHandleType.Pinned);
try
{
IntPtr rawDataPtr = handle.AddrOfPinnedObject();
result = (T)Marshal.PtrToStructure(rawDataPtr,
typeof
(T));
}
finally
{
handle.Free();
}
return
result;
}
internal
static
byte
[] StructToBytes<T>(
this
T data)
{
byte
[] rawData =
new
byte
[Marshal.SizeOf(data)];
GCHandle handle = GCHandle.Alloc(rawData, GCHandleType.Pinned);
try
{
IntPtr rawDataPtr = handle.AddrOfPinnedObject();
Marshal.StructureToPtr(data, rawDataPtr,
false
);
}
finally
{
handle.Free();
}
RespectEndianness(
typeof
(T), rawData);
return
rawData;
}
private
static
void
RespectEndianness(Type type,
byte
[] data)
{
var fields = type.GetFields(BindingFlags.NonPublic | BindingFlags.Instance).Select(field =>
new
{
Field = field,
Offset = Marshal.OffsetOf(type, field.Name).ToInt32()
}).ToList();
fields.ForEach(item => Array.Reverse(data, item.Offset, Marshal.SizeOf(item.Field.FieldType)));
}
在定義了頭之后,我們就可以封裝一個抽象的請求包了:
只要實現這個包,然后調用其GetBytes方法就可以直接獲得需要發送的請求數據包,它會在內部處理Header和Body數據的打包。
比如,我們來看一個Set操作的包實現:
internal
class
SetRequestPackage : AbstractRequestPackage
{
private
TimeSpan expireSpan;
private
byte
[] valueBytes;
private
ulong
version;
public
override
Opcode Opcode
{
get {
return
Opcode.Set; }
}
internal
SetRequestPackage(
string
key,
byte
[] valueBytes, TimeSpan expireSpan,
ulong
version)
:
base
(key)
{
if
(expireSpan > TimeSpan.FromDays(30))
throw
new
ArgumentOutOfRangeException(
"過期時間不能超過30天!"
);
this
.expireSpan = expireSpan;
this
.valueBytes = valueBytes;
this
.version = version;
}
internal
SetRequestPackage(
string
key,
string
value
, TimeSpan expireSpan,
ulong
version)
:
this
(key, Encoding.UTF8.GetBytes(
value
), expireSpan, version)
{
}
internal
SetRequestPackage(
string
key,
string
value
,
ulong
version)
:
this
(key, Encoding.UTF8.GetBytes(
value
), TimeSpan.FromDays(30), version)
{
}
internal
SetRequestPackage(
string
key,
byte
[] valueBytes,
ulong
version)
:
this
(key, valueBytes, TimeSpan.FromDays(30), version)
{
}
protected
override
ulong
GetVersion()
{
return
version;
}
protected
override
byte
[] GetExtraBytes()
{
var extraBytes =
new
List<
byte
>();
uint
flag = 0xdeadbeef;
extraBytes.AddRange(flag.GetBigEndianBytes());
uint
expire = Convert.ToUInt32(expireSpan.TotalSeconds);
extraBytes.AddRange(expire.GetBigEndianBytes());
return
extraBytes.ToArray();
}
protected
override
byte
[] GetValueBytes()
{
return
valueBytes;
}
}
在這里,我們只是實現了抽象方法來為基類提供沒有的數據,并不需要關心數據是如何打包的。那么,之后發送Set請求的操作就很簡單了:
private
bool
InternalSet(
string
key,
string
value
, TimeSpan expire,
ulong
version)
{
using
(var socket = GetCluster().AcquireSocket(key))
{
if
(socket !=
null
)
{
AbstractRequestPackage requestPackage = expire == TimeSpan.MaxValue ?
new
SetRequestPackage(key,
value
, version)
:
new
SetRequestPackage(key,
value
, expire, version);
var requestData = requestPackage.GetBytes();
if
(requestData !=
null
)
{
socket.Write(requestData);
var responsePackage = ResponsePackageCreator.GetPackage(socket);
if
(responsePackage !=
null
)
{
if
(responsePackage.ResponseStatus == ResponseStatus.NoError)
{
return
true
;
}
else
if
(responsePackage.ResponseStatus != ResponseStatus.KeyExists
&& responsePackage.ResponseStatus != ResponseStatus.KeyNotFound)
{
LocalLoggingService.Warning(
"在 {0} 上執行操作 {1} 得到了不正確的回復 Key : {2} -> {3}"
,
socket.Endpoint.ToString(),
requestPackage.Opcode,
key,
responsePackage.ResponseStatus);
}
}
else
{
LocalLoggingService.Error(
"在 {0} 上執行操作 {1} 沒有得到回復 Key : {2}"
,
socket.Endpoint.ToString(),
requestPackage.Opcode,
key);
}
}
}
}
return
false
;
}
1)首先是獲取到Cluster,再獲取到池中的Socket
2)然后初始化一個SetRequestPackage,再通過GetBytes獲得數據
3)直接把數據寫入Socket
4)通過ResponsePackageCreator來獲得返回的數據包
?
很明顯,ResponsePackageCreator和AbstractRequestPackage的意圖差不多,用來把響應的數據包封裝成我們需要的數據,其中有一個:
internal
static
GeneralResponsePackage GetPackage(ClientSocket socket)
獲得的是一個通用的響應數據包:
internal
class
GeneralResponsePackage
{
internal
Opcode Opcode { get; set; }
internal
ResponseStatus ResponseStatus { get; set; }
internal
string
Key { get; set; }
internal
byte
[] ValueBytes { get; set; }
internal
ulong
Version { get; set; }
internal
string
Value
{
get
{
if
(ValueBytes !=
null
)
{
return
Encoding.UTF8.GetString(ValueBytes);
}
else
{
return
null
;
}
}
}
}
在這里基本的信息都有了,比如操作代碼、響應狀態、Key、Value、版本號。正因為Memcached的協議比較簡單,所有的響應包都是這么一個格式,所以我們并沒有實現特殊的響應包。如果要實現的話,只需要在類頭部標記OpCode并且繼承GeneralResponsePackage,ResponsePackageCreator會自動返回相應的子類:
[AttributeUsage(AttributeTargets.Class)]
internal
class
ResponsePackageAttribute : Attribute
{
internal
Opcode Opcode { get;
private
set; }
internal
ResponsePackageAttribute(Opcode opcode)
{
this
.Opcode = opcode;
}
}
在獲得了響應之后,通過判斷ResponseStatus來知道響應是否正確,并且記錄相關日志即可。這么一來,數據一去一回以及協議如何實現的整個過程就介紹完了。下面,我們再介紹一下客戶端中幾個特色功能的實現。
?
1)獲取一組Key功能。由于一個集群會有多個節點,所以要獲取一組Key,我們首先需要把Key按照節點分類,然后對于不同的節點,采用并行的方式同時獲取,這樣速度會很快,代碼片段如下:
var nodeCache =
new
Dictionary<ClientNode, List<
string
>>();
foreach
(var key
in
keys)
{
var node = GetCluster().AcquireNode(key);
if
(!nodeCache.ContainsKey(node))
nodeCache.Add(node,
new
List<
string
> { key });
else
if
(!nodeCache[node].Contains(key))
nodeCache[node].Add(key);
}
var data =
new
Dictionary<
string
,
string
>();
Parallel.ForEach(nodeCache, node =>
2)List功能。Memcached只提供了Key、Value的存儲,有的時候我們的Value是一個列表,那么我們可以有兩種方式完成這個功能。第一種就是直接把列表序列化作為一個Value保存,優點是簡單,缺點是如果以后需要修改的話需要整個列表取出,修改后再把整個列表保存進去,并且由于Memcached Value大小的限制,這么做也不能保存大列表;第二種方式是一個Value保存列表中的一個項,再使用一個KeyValue來保存其中每一項的ID,這么優點是修改方便,獲取的數據可以是列表中的一部分,缺點是實現麻煩,要考慮并發問題、要維護另外一個KeyValue來保存所有的ID。在這里,我們封裝了后一種方式的實現。
3)Locker功能。使用Memcached完成鎖的功能其實很簡單,我們只需要在獲取鎖的時候判斷Add一個空值是否成功,如果不成功則表示占有,等待一段時間嘗試獲取,一直到超時,在返回鎖的時候刪除這個項即可。在這里,我們封裝了MemcachedLocker來完成這個功能。
更多文章、技術交流、商務合作、聯系博主
微信掃碼或搜索:z360901061
微信掃一掃加我為好友
QQ號聯系: 360901061
您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對您有幫助就好】元

