黄色网页视频 I 影音先锋日日狠狠久久 I 秋霞午夜毛片 I 秋霞一二三区 I 国产成人片无码视频 I 国产 精品 自在自线 I av免费观看网站 I 日本精品久久久久中文字幕5 I 91看视频 I 看全色黄大色黄女片18 I 精品不卡一区 I 亚洲最新精品 I 欧美 激情 在线 I 人妻少妇精品久久 I 国产99视频精品免费专区 I 欧美影院 I 欧美精品在欧美一区二区少妇 I av大片网站 I 国产精品黄色片 I 888久久 I 狠狠干最新 I 看看黄色一级片 I 黄色精品久久 I 三级av在线 I 69色综合 I 国产日韩欧美91 I 亚洲精品偷拍 I 激情小说亚洲图片 I 久久国产视频精品 I 国产综合精品一区二区三区 I 色婷婷国产 I 最新成人av在线 I 国产私拍精品 I 日韩成人影音 I 日日夜夜天天综合

流式計算之Storm簡介

系統(tǒng) 2322 0

Storm是一個分布式的、容錯的實時計算系統(tǒng),遵循Eclipse Public License 1.0,Storm可以方便地在一個計算機(jī)集群中編寫與擴(kuò)展復(fù)雜的實時計算,Storm之于實時處理,就好比Hadoop之于批處理。Storm保證每個消息都會得到處理,而且它很快——在一個小集群中,每秒可以處理數(shù)以百萬計的消息??梢允褂萌我饩幊陶Z言來做開發(fā)。
主要商業(yè)應(yīng)用及案例:Twitter
Storm的優(yōu)點
1. 簡單的編程模型。類似于MapReduce降低了并行批處理復(fù)雜性,Storm降低了進(jìn)行實時處理的復(fù)雜性。
2. 服務(wù)化,一個服務(wù)框架,支持熱部署,即時上線或下線App.
3. 可以使用各種編程語言。你可以在Storm之上使用各種編程語言。默認(rèn)支持Clojure、Java、Ruby和Python。要增加對其他語言的支持,只需實現(xiàn)一個簡單的Storm通信協(xié)議即可。
4. 容錯性。Storm會管理工作進(jìn)程和節(jié)點的故障。
5. 水平擴(kuò)展。計算是在多個線程、進(jìn)程和服務(wù)器之間并行進(jìn)行的。
6. 可靠的消息處理。Storm保證每個消息至少能得到一次完整處理。任務(wù)失敗時,它會負(fù)責(zé)從消息源重試消息。
7. 快速。系統(tǒng)的設(shè)計保證了消息能得到快速的處理,使用ZeroMQ作為其底層消息隊列。
8. 本地模式。Storm有一個“本地模式”,可以在處理過程中完全模擬Storm集群。這讓你可以快速進(jìn)行開發(fā)和單元測試。


Storm目前存在的問題

1. ? 目前的開源版本中只是單節(jié)點 Nimbus ,掛掉只能自動重啟,可以考慮實現(xiàn)一個雙 nimbus 的布局。
2. Clojure
是一個在 JVM 平臺運行的動態(tài)函數(shù)式編程語言 , 優(yōu)勢在于流程計算, ? Storm 的部分核心內(nèi)容由 Clojure 編寫,雖然性能上提高不少但同時也提升了維護(hù)成本。

Storm 架構(gòu)

Storm 集群由一個主節(jié)點和多個工作節(jié)點組成。主節(jié)點運行了一個名為 “Nimbus” 的守護(hù)進(jìn)程,用于分配代碼、布置任務(wù)及故障檢測。每個工作節(jié)點都運行了一個名為 “Supervisor” 的守護(hù)進(jìn)程,用于監(jiān)聽工作,開始并終止工作進(jìn)程。 Nimbus Supervisor 都能快速失敗,而且是無狀態(tài)的,這樣一來它們就變得十分健壯,兩者的協(xié)調(diào)工作是由 Zookeeper 來完成的。 ZooKeeper 用于管理集群中的不同組件, ZeroMQ 是內(nèi)部消息系統(tǒng), JZMQ ZeroMQMQ Java Binding 。有個名為 storm-deploy 的子項目,可以在 AWS 上一鍵部署 Storm 集群 .

流式計算之Storm簡介

Storm 術(shù)語解釋

Storm 的術(shù)語包括 Stream 、 Spout 、 Bolt 、 Task 、 Worker Stream Grouping Topology 。 Stream 是被處理的數(shù)據(jù)。 Sprout 是數(shù)據(jù)源。 Bolt 處理數(shù)據(jù)。 Task 是運行于 Spout Bolt 中的 ? 線程。 Worker 是運行這些線程的進(jìn)程。 Stream Grouping 規(guī)定了 Bolt 接收什么東西作為輸入數(shù)據(jù)。數(shù)據(jù)可以隨機(jī)分配(術(shù)語為 Shuffle ),或者根據(jù)字段值分配(術(shù)語為 Fields ),或者 ? 廣播(術(shù)語為 All ),或者總是發(fā)給一個 Task (術(shù)語為 Global ),也可以不關(guān)心該數(shù)據(jù)(術(shù)語為 None ),或者由自定義邏輯來決定(術(shù)語為 Direct )。 Topology 是由 Stream Grouping 連接起來的 Spout Bolt 節(jié)點網(wǎng)絡(luò) . 下面進(jìn)行詳細(xì)介紹:

  • Topologies ? 用于封裝一個實時計算應(yīng)用程序的邏輯,類似于 Hadoop MapReduce Job

流式計算之Storm簡介

  • Stream ? 消息流,是一個沒有邊界的 tuple 序列,這些 tuples 會被以一種分布式的方式并行地創(chuàng)建和處理
  • Spouts ? 消息源,是消息生產(chǎn)者,他會從一個外部源讀取數(shù)據(jù)并向 topology 里面面發(fā)出消息: tuple
  • Bolts ? 消息處理者,所有的消息處理邏輯被封裝在 bolts 里面,處理輸入的數(shù)據(jù)流并產(chǎn)生輸出的新數(shù)據(jù)流 , 可執(zhí)行過濾,聚合,查詢數(shù)據(jù)庫等操作

流式計算之Storm簡介

  • Task ? 每一個 Spout Bolt 會被當(dāng)作很多 task 在整個集群里面執(zhí)行 , 每一個 task 對應(yīng)到一個線程 .

流式計算之Storm簡介

  • Stream groupings ? 消息分發(fā)策略 , 定義一個 Topology 的其中一步是定義每個 tuple 接受什么樣的流作為輸入 ,stream grouping 就是用來定義一個 stream 應(yīng)該如果分配給 Bolts .

stream grouping 分類

1. Shuffle Grouping: ? 隨機(jī)分組, ? 隨機(jī)派發(fā) stream 里面的 tuple , ? 保證每個 bolt 接收到的 tuple 數(shù)目相同 .
2. Fields Grouping
:按字段分組, ? 比如按 userid 來分組, ? 具有同樣 userid tuple 會被分到相同的 Bolts , ? 而不同的 userid 則會被分配到不同的 Bolts.
3. All Grouping
? 廣播發(fā)送, ? 對于每一個 tuple ? 所有的 Bolts 都會收到 .
4. Global Grouping:
? 全局分組,這個 tuple 被分配到 storm 中的一個 bolt 的其中一個 task. 再具體一點就是分配給 id 值最低的那個 task.
5. Non Grouping:
? 不分組,意思是說 stream 不關(guān)心到底誰會收到它的 tuple. 目前他和 Shuffle grouping 是一樣的效果 , 有點不同的是 storm 會把這個 bolt 放到這個 bolt 的訂閱者同一個線程去執(zhí)行 .
6. Direct Grouping:
? 直接分組 , 這是一種比較特別的分組方法,用這種分組意味著消息的發(fā)送者舉鼎由消息接收者的哪個 task 處理這個消息 . 只有被聲明為 Direct Stream 的消息流可以聲明這種分組方法 . 而且這種消息 tuple 必須使用 emitDirect 方法來發(fā)射 . 消息處理者可以通過 TopologyContext 來或者處理它的消息的 taskid (OutputCollector.emit 方法也會返回 taskid)

Storm 如何保證消息被處理

storm 保證每個 tuple 會被 topology 完整的執(zhí)行。 storm 會追蹤由每個 spout tuple 所產(chǎn)生的 tuple ( 一個 bolt 處理一個 tuple 之后可能會發(fā)射別的 tuple 從而可以形成樹狀結(jié)構(gòu) ), ? 并且跟蹤這棵 tuple 樹什么時候成功處理完。每個 topology 都有一個消息超時的設(shè)置, ? 如果 storm 在這個超時的時間內(nèi)檢測不到某個 tuple 樹到底有沒有執(zhí)行成功, ? 那么 topology 會把這個 tuple 標(biāo)記為執(zhí)行失敗,并且過一會會重新發(fā)射這個 tuple

一個 tuple 能根據(jù)新獲取到的 spout 而觸發(fā)創(chuàng)建基于此的上千個 tuple

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout(1, new KestrelSpout("kestrel.backtype.com",

?????????????????????????????????????22133,

?????????????????????????????????????"sentence_queue",

?????????????????????????????????????new StringScheme()));

builder.setBolt(2, new SplitSentence(), 10)

????????.shuffleGrouping(1);

builder.setBolt(3, new WordCount(), 20)

????????.fieldsGrouping(2, new Fields("word"));

這個 topology kestrel queue 讀取句子 , 并把句子劃分成單詞 , 然后匯總每個單詞出現(xiàn)的次數(shù) , 一個 tuple 負(fù)責(zé)讀取句子 , 每一個 tuple 分別對應(yīng)計算每一個單詞出現(xiàn)的次數(shù) , 大概樣子如下所示 :

流式計算之Storm簡介

一個 tuple 的生命周期 :

public interface ISpout extends Serializable {

????void open(Map conf, TopologyContext context, SpoutOutputCollector collector);

????void close();

????void nextTuple();

????void ack(Object msgId);

????void fail(Object msgId);

}

首先 storm 通過調(diào)用 spout nextTuple 方法來獲取下一個 tuple, Spout 通過 open 方法參數(shù)里面提供的 SpoutOutputCollector 來發(fā)射新 tuple 到它的其中一個輸出消息流 , ? 發(fā)射 tuple 的時候 spout 會提供一個 message-id, ? 后面我們通過這個 tuple-id 來追蹤這個 tuple 。舉例來說, ? KestrelSpout kestrel 隊列里面讀取一個消息,并且把 kestrel 提供的消息 id 作為 message-id, ? 看例子:

collector.emit(new Values("field1", "field2", 3) , msgId);

?

接下來, ? 這個發(fā)射的 tuple 被傳送到消息處理者 bolt 那里, ? storm 會跟蹤這個消息的樹形結(jié)構(gòu)是否創(chuàng)建 , 根據(jù) messageid 調(diào)用 Spout 里面的 ack 函數(shù)以確認(rèn) tuple 是否被完全處理。如果 tuple 超時就會調(diào)用 spout fail 方法。由此看出同一個 tuple 不管是 acked 還是 fail 都是由創(chuàng)建他的那個 spout 發(fā)出的 , 所以即使 spout 在集群環(huán)境中執(zhí)行了很多的 task, 這個 tule 也不會被不同的 task acked failed.
當(dāng) kestrelspout kestrel 隊列中得到一個消息后會打開這個他 , 這意味著他并不會把此消息拿走 , 消息的狀態(tài)會顯示為 pending, 直到等待確認(rèn)此消息已經(jīng)處理完成 , 處于 pending 狀態(tài)直到 ack 或者 fail 被調(diào)用 , 處于 "Pending" 的消息不會再被其他隊列消費者使用 . 如果在這過程中 spout 中處理此消息的 task 斷開連接或失去響應(yīng)則此 pending 的消息會回到 " 等待處理 " 狀態(tài) .

Storm 的一些常用應(yīng)用場景

1. 流聚合
流聚合把兩個或者多個數(shù)據(jù)流聚合成一個數(shù)據(jù)流 ? ? 基于一些共同的 tuple 字段。

builder.setBolt(5, new MyJoiner(), parallelism)

??.fieldsGrouping(1, new Fields("joinfield1", "joinfield2"))

??.fieldsGrouping(2, new Fields("joinfield1", "joinfield2"))

??.fieldsGrouping(3, new Fields("joinfield1", "joinfield2"))

?

2. 批處理
有時候為了性能或者一些別的原因, ? 你可能想把一組 tuple 一起處理, ? 而不是一個個單獨處理。

3.BasicBolt
1.
? 讀一個輸入 tuple
2.
? 根據(jù)這個輸入 tuple 發(fā)射一個或者多個 tuple
3.
? execute 的方法的最后 ack 那個輸入 tuple
遵循這類模式的 bolt 一般是函數(shù)或者是過濾器 , ? 這種模式太常見, storm 為這類模式單獨封裝了一個接口 : IbasicBolt

4. 內(nèi)存內(nèi)緩存 +Fields grouping 組合
bolt 的內(nèi)存里面緩存一些東西非常常見。緩存在和 fields grouping 結(jié)合起來之后就更有用了。比如,你有一個 bolt 把短鏈接變成長鏈接 (bit.ly, t.co 之類的 ) 。你可以把短鏈接到長鏈接的對應(yīng)關(guān)系利用 LRU 算法緩存在內(nèi)存里面以避免重復(fù)計算。比如組件一發(fā)射短鏈接,組件二把短鏈接轉(zhuǎn)化成長鏈接并緩存在內(nèi)存里面??匆幌孪旅鎯啥未a有什么不一樣:

builder.setBolt(2, new ExpandUrl(), parallelism)

??.shuffleGrouping(1);

builder.setBolt(2, new ExpandUrl(), parallelism)

??.fieldsGrouping(1, new Fields("url"));

5. 計算 top N
比如你有一個 bolt 發(fā)射這樣的 tuple: "value", "count" 并且你想一個 bolt 基于這些信息算出 top N tuple 。最簡單的辦法是有一個 bolt 可以做一個全局的 grouping 的動作并且在內(nèi)存里面保持這 top N 的值。
這個方式對于大數(shù)據(jù)量的流顯然是沒有擴(kuò)展性的, ? 因為所有的數(shù)據(jù)會被發(fā)到同一臺機(jī)器。一個更好的方法是在多臺機(jī)器上面并行的計算這個流每一部分的 top N, ? 然后再有一個 bolt 合并這些機(jī)器上面所算出來的 top N 以算出最后的 top N, ? 代碼大概是這樣的 :

builder.setBolt(2, new RankObjects(), parallellism)

??.fieldsGrouping(1, new Fields("value"));

builder.setBolt(3, new MergeObjects())

??.globalGrouping(2);

這個模式之所以可以成功是因為第一個 bolt fields grouping 使得這種并行算法在語義上是正確的。
TimeCacheMap 來高效地保存一個最近被更新的對象的緩存

6. TimeCacheMap 來高效地保存一個最近被更新的對象的緩存
有時候你想在內(nèi)存里面保存一些最近活躍的對象,以及那些不再活躍的對象。 ? TimeCacheMap ? 是一個非常高效的數(shù)據(jù)結(jié)構(gòu),它提供了一些 callback 函數(shù)使得我們在對象不再活躍的時候我們可以做一些事情 .

7. 分布式 RPC:CoordinatedBolt KeyedFairBolt
storm 做分布式 RPC 應(yīng)用的時候有兩種比較常見的模式 : 它們被封裝在 CoordinatedBolt KeyedFairBolt 里面 . CoordinatedBolt 包裝你的 bolt, 并且確定什么時候你的 bolt 已經(jīng)接收到所有的 tuple, 它主要使用 Direct Stream 來做這個 .
KeyedFairBolt
同樣包裝你的 bolt 并且保證你的 topology 同時處理多個 DRPC 調(diào)用,而不是串行地一次只執(zhí)行一個。

流式計算之Storm簡介


更多文章、技術(shù)交流、商務(wù)合作、聯(lián)系博主

微信掃碼或搜索:z360901061

微信掃一掃加我為好友

QQ號聯(lián)系: 360901061

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機(jī)微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。

【本文對您有幫助就好】

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描上面二維碼支持博主2元、5元、10元、自定義金額等您想捐的金額吧,站長會非常 感謝您的哦?。?!

發(fā)表我的評論
最新評論 總共0條評論