欧美三区_成人在线免费观看视频_欧美极品少妇xxxxⅹ免费视频_a级毛片免费播放_鲁一鲁中文字幕久久_亚洲一级特黄

Spark學習實例(Python):窗口操作 Window

系統 1646 0

說到流處理,Spark為我們提供了窗口函數,允許在滑動數據窗口上應用轉換,常用場景如每五分鐘商場人流密度、每分鐘流量等等,接下來我們通過畫圖來了解Spark Streaming的窗口函數如何工作的,處理過程圖如下所示:

Spark學習實例(Python):窗口操作 Window Operations_第1張圖片

上圖中綠色的小框框是一批一批的數據流,虛線框和實線框分別是前一個窗口和后一個窗口,從圖中可以看出后一個窗口在前一個窗口基礎上移動了兩個批次的數據流,而我們真正通過算子操作的數據其實就是窗口內所有的數據流。

在代碼實現前了解下窗口操作常用的函數有:

  • window
  • countByWindow
  • reduceByWindow
  • reduceByKeyAndWindow
  • reduceByKeyAndWindow
  • countByValueAndWindow

window最原始的窗口,提供兩個參數,第一個參數是窗口長度,第二個參數是滑動間隔,返回一個新的DStream, 返回的結果可以進行算子操作,代碼實現如下

            
              from pyspark import SparkContext
from pyspark.streaming import StreamingContext

if __name__ == '__main__':
    sc = SparkContext(appName="windowStream", master="local[*]")
    # 第二個參數指統計多長時間的數據
    ssc = StreamingContext(sc, 5)
    lines = ssc.socketTextStream("localhost", 9999)
    # 第一個參數是窗口長度,這里是60秒, 第二個參數是滑動間隔,這里是10秒
    dstream = lines.window(60, 10)
    dstream.pprint()
    # -------------------------------------------
    # Time: 2019-08-13 19:46:45
    # -------------------------------------------
    # hello
    # world
    ssc.start()
    ssc.awaitTermination()
            
          

現在終端使用nc發送數據

root@root:~$ nc -lk 9999
hello
world

countByWindow統計每個滑動窗口內數據條數,要注意的是使用該函數要加上checkpoint機制,代碼實現如下

            
              from pyspark import SparkContext
from pyspark.streaming import StreamingContext

if __name__ == '__main__':
    sc = SparkContext(appName="windowStream", master="local[*]")
    # 第二個參數指統計多長時間的數據
    ssc = StreamingContext(sc, 5)
    ssc.checkpoint("/tmp/window")
    lines = ssc.socketTextStream("localhost", 9999)
    # 第一個參數是窗口長度,這里是60秒, 第二個參數是滑動間隔,這里是10秒
    dstream = lines.countByWindow(60, 10)
    dstream.pprint()
    # -------------------------------------------
    # Time: 2019-08-14 18:56:40
    # -------------------------------------------
    # 2
    ssc.start()
    ssc.awaitTermination()
            
          

reduceByWindow聚合每個鍵的值,底層執行的是reduceByKeyAndWindow,實現代碼如下

            
              from pyspark import SparkContext
from pyspark.streaming import StreamingContext

def fun(x):
    return x

if __name__ == '__main__':
    sc = SparkContext(appName="windowStream", master="local[*]")
    # 第二個參數指統計多長時間的數據
    ssc = StreamingContext(sc, 5)
    ssc.checkpoint("/tmp/window")
    lines = ssc.socketTextStream("localhost", 9999)
    # 第一個參數執行指定函數, 第二個參數是窗口長度,這里是60秒, 第三個參數是滑動間隔,這里是10秒
    dstream = lines.reduceByWindow(fun, 60, 10)
    dstream.pprint()
    # -------------------------------------------
    # Time: 2019-08-14 19:23:30
    # -------------------------------------------
    # hello
    ssc.start()
    ssc.awaitTermination()
            
          

reduceByKeyAndWindow是對(K,V)窗口數據相同的K執行對應的fun,實現代碼如下

            
              from pyspark import SparkContext
from pyspark.streaming import StreamingContext

def fun(x,y):
    return x+y

if __name__ == '__main__':
    sc = SparkContext(appName="windowStream", master="local[*]")
    # 第二個參數指統計多長時間的數據
    ssc = StreamingContext(sc, 5)
    ssc.checkpoint("/tmp/window")
    lines = ssc.socketTextStream("localhost", 9999)
    # 第一個參數執行的功能函數fun, 第二個參數是窗口長度,這里是60秒, 第三個參數是滑動間隔,這里是10秒,
    # 第四個參數設定并行度
    dstream = lines.map(lambda x:(x,1)).reduceByKeyAndWindow(fun, 60, 10)
    dstream.pprint()
    # -------------------------------------------
    # Time: 2019-08-14 19:23:30
    # -------------------------------------------
    # ('hello', 2)
    ssc.start()
    ssc.awaitTermination()
            
          

countByValueAndWindow是對窗口數據進行單詞統計,實現代碼如下

            
              from pyspark import SparkContext
from pyspark.streaming import StreamingContext

if __name__ == '__main__':
    sc = SparkContext(appName="windowStream", master="local[*]")
    # 第二個參數指統計多長時間的數據
    ssc = StreamingContext(sc, 5)
    ssc.checkpoint("/tmp/window")
    lines = ssc.socketTextStream("localhost", 9999)
    # 第一個參數是窗口長度,這里是60秒, 第二個參數是滑動間隔,這里是10秒, 第三個參數任務并行度
    dstream = lines.countByValueAndWindow(60, 10)
    dstream.pprint()
    # -------------------------------------------
    # Time: 2019-08-14 19:23:30
    # -------------------------------------------
    # ('hello', 3)
    # ('world', 1)
    ssc.start()
    ssc.awaitTermination()
            
          

以上就是所有窗口函數的使用

?

Spark學習目錄:

  • Spark學習實例1(Python):單詞統計 Word Count
  • Spark學習實例2(Python):加載數據源Load Data Source
  • Spark學習實例3(Python):保存數據Save Data
  • Spark學習實例4(Python):RDD轉換 Transformations
  • Spark學習實例5(Python):RDD執行 Actions
  • Spark學習實例6(Python):共享變量Shared Variables
  • Spark學習實例7(Python):RDD、DataFrame、DataSet相互轉換
  • Spark學習實例8(Python):輸入源實時處理 Input Sources Streaming
  • Spark學習實例9(Python):窗口操作 Window Operations

?

?


更多文章、技術交流、商務合作、聯系博主

微信掃碼或搜索:z360901061

微信掃一掃加我為好友

QQ號聯系: 360901061

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點擊下面給點支持吧,站長非常感激您!手機微信長按不能支付解決辦法:請將微信支付二維碼保存到相冊,切換到微信,然后點擊微信右上角掃一掃功能,選擇支付二維碼完成支付。

【本文對您有幫助就好】

您的支持是博主寫作最大的動力,如果您喜歡我的文章,感覺我的文章對您有幫助,請用微信掃描上面二維碼支持博主2元、5元、10元、自定義金額等您想捐的金額吧,站長會非常 感謝您的哦!!!

發表我的評論
最新評論 總共0條評論
主站蜘蛛池模板: 日韩一二 | 青娱乐手机免费视频 | 亚洲aⅴ天堂av在线电影软件 | 呦呦在线视频 | 欧美高清成人 | 精品在线一区二区三区 | 黑人性xxxⅹxxbbbbb | 亚洲一区在线播放 | 日韩精品一区二区三区在线播放 | 性国产精品 | 日韩视频在线一区二区三区 | 久久久久久久蜜桃 | 日韩免费福利视频 | 日韩精品视频在线 | 欧美日本一区 | 久久av影院| 日本一区二区三区中文字幕 | 日日网站 | 欧美综合在线观看 | 国产一区网址 | 久久久久国产亚洲日本 | 久久精品视频网站 | 一级一级一级一级毛片 | 亚洲精品乱码久久久久久蜜桃 | 欧美成人免费丝袜视频在线观看 | 久久久久日韩精品免费观看网 | 日韩二区精品 | 99国内精品 | 亚洲精品久久久久影院 | 久久秋霞理论电影 | 久久综合五月开心婷婷深深爱 | 在线不卡视频 | 国产伦精品一区二区三区精品视频 | 噜噜狠狠 | 欧美白人战黑吊 | 波多野吉衣一区二区三区四区 | 日韩欧美在线视频播放 | 日本一级成人毛片免费观看 | 成人福利在线视频 | 97在线碰碰观看免费高清 | 亚洲欧美日韩在线观看播放 |