黄色网页视频 I 影音先锋日日狠狠久久 I 秋霞午夜毛片 I 秋霞一二三区 I 国产成人片无码视频 I 国产 精品 自在自线 I av免费观看网站 I 日本精品久久久久中文字幕5 I 91看视频 I 看全色黄大色黄女片18 I 精品不卡一区 I 亚洲最新精品 I 欧美 激情 在线 I 人妻少妇精品久久 I 国产99视频精品免费专区 I 欧美影院 I 欧美精品在欧美一区二区少妇 I av大片网站 I 国产精品黄色片 I 888久久 I 狠狠干最新 I 看看黄色一级片 I 黄色精品久久 I 三级av在线 I 69色综合 I 国产日韩欧美91 I 亚洲精品偷拍 I 激情小说亚洲图片 I 久久国产视频精品 I 国产综合精品一区二区三区 I 色婷婷国产 I 最新成人av在线 I 国产私拍精品 I 日韩成人影音 I 日日夜夜天天综合

示例:python模擬日志生成+Flume+Kafka+Spark

系統(tǒng) 2112 0

生成模擬數(shù)據(jù)

  1. 編寫 generate_log.py
            
              
                #coding=UTF-8
              
              
                import
              
               random

              
                import
              
               time

url_paths
              
                =
              
              
                [
              
              
                "class/112.html"
              
              
                ,
              
              
                "class/128.html"
              
              
                ,
              
              
                "class/145.html"
              
              
                ,
              
              
                "class/130.html"
              
              
                ,
              
              
                "class/146.html"
              
              
                ,
              
              
                "class/131.html"
              
              
                ,
              
              
                "learn/821"
              
              
                ,
              
              
                "course/list"
              
              
                ]
              
              

ip_slices
              
                =
              
              
                [
              
              
                132
              
              
                ,
              
              
                156
              
              
                ,
              
              
                124
              
              
                ,
              
              
                10
              
              
                ,
              
              
                29
              
              
                ,
              
              
                167
              
              
                ,
              
              
                143
              
              
                ,
              
              
                187
              
              
                ,
              
              
                30
              
              
                ,
              
              
                46
              
              
                ,
              
              
                55
              
              
                ,
              
              
                63
              
              
                ,
              
              
                72
              
              
                ,
              
              
                87
              
              
                ,
              
              
                98
              
              
                ,
              
              
                168
              
              
                ]
              
              

http_referers
              
                =
              
              
                [
              
              
                "https://www.baidu.com/s?wd={query}"
              
              
                ,
              
              
                "https://www.sogou.com/web?query={query}"
              
              
                ,
              
              
                "https://cn.bing.com/search?q={query}"
              
              
                ,
              
              
                "https://www.so.com/s?q={query}"
              
              
                ]
              
              

search_keyword
              
                =
              
              
                [
              
              
                "spark sql實(shí)戰(zhàn)"
              
              
                ,
              
              
                "hadoop 基礎(chǔ)"
              
              
                ,
              
              
                "storm實(shí)戰(zhàn)"
              
              
                ,
              
              
                "spark streaming實(shí)戰(zhàn)"
              
              
                ]
              
              

status_code
              
                =
              
              
                [
              
              
                "200"
              
              
                ,
              
              
                "404"
              
              
                ,
              
              
                "500"
              
              
                ]
              
              
                def
              
              
                sample_status_code
              
              
                (
              
              
                )
              
              
                :
              
              
                return
              
               random
              
                .
              
              sample
              
                (
              
              status_code
              
                ,
              
              
                1
              
              
                )
              
              
                [
              
              
                0
              
              
                ]
              
              
                def
              
              
                sample_referer
              
              
                (
              
              
                )
              
              
                :
              
              
                if
              
               random
              
                .
              
              uniform
              
                (
              
              
                0
              
              
                ,
              
              
                1
              
              
                )
              
              
                >
              
              
                0.2
              
              
                :
              
              
                return
              
              
                "-"
              
              
    refer_str
              
                =
              
              random
              
                .
              
              sample
              
                (
              
              http_referers
              
                ,
              
              
                1
              
              
                )
              
              
    query_str
              
                =
              
              random
              
                .
              
              sample
              
                (
              
              search_keyword
              
                ,
              
              
                1
              
              
                )
              
              
                return
              
               refer_str
              
                [
              
              
                0
              
              
                ]
              
              
                .
              
              
                format
              
              
                (
              
              query
              
                =
              
              query_str
              
                [
              
              
                0
              
              
                ]
              
              
                )
              
              
                def
              
              
                sample_url
              
              
                (
              
              
                )
              
              
                :
              
              
                return
              
               random
              
                .
              
              sample
              
                (
              
              url_paths
              
                ,
              
              
                1
              
              
                )
              
              
                [
              
              
                0
              
              
                ]
              
              
                def
              
              
                sample_ip
              
              
                (
              
              
                )
              
              
                :
              
              
                slice
              
              
                =
              
              random
              
                .
              
              sample
              
                (
              
              ip_slices
              
                ,
              
              
                4
              
              
                )
              
              
                return
              
              
                "."
              
              
                .
              
              join
              
                (
              
              
                [
              
              
                str
              
              
                (
              
              item
              
                )
              
              
                for
              
               item 
              
                in
              
              
                slice
              
              
                ]
              
              
                )
              
              
                def
              
              
                generate_log
              
              
                (
              
              count
              
                =
              
              
                10
              
              
                )
              
              
                :
              
              
    time_str
              
                =
              
              time
              
                .
              
              strftime
              
                (
              
              
                "%Y-%m-%d %H:%M:%S"
              
              
                ,
              
              time
              
                .
              
              localtime
              
                (
              
              
                )
              
              
                )
              
              

    f
              
                =
              
              
                open
              
              
                (
              
              
                "C:/Users/DaiRenLong/Desktop/streaming_access.log"
              
              
                ,
              
              
                "w+"
              
              
                )
              
              
                while
              
               count 
              
                >=
              
              
                1
              
              
                :
              
              
        query_log
              
                =
              
              
                "{ip}\t{local_time}\t\"GET /{url} HTTP/1.1\"\t{status_code}\t{refer}"
              
              
                .
              
              
                format
              
              
                (
              
              url
              
                =
              
              sample_url
              
                (
              
              
                )
              
              
                ,
              
              ip
              
                =
              
              sample_ip
              
                (
              
              
                )
              
              
                ,
              
              refer
              
                =
              
              sample_referer
              
                (
              
              
                )
              
              
                ,
              
              status_code
              
                =
              
              sample_status_code
              
                (
              
              
                )
              
              
                ,
              
              local_time
              
                =
              
              time_str
              
                )
              
              
                print
              
              
                (
              
              query_log
              
                )
              
              
        f
              
                .
              
              write
              
                (
              
              query_log
              
                +
              
              
                "\n"
              
              
                )
              
              
        count
              
                =
              
              count
              
                -
              
              
                1
              
              
                if
              
               __name__ 
              
                ==
              
              
                '__main__'
              
              
                :
              
              
                # 每一分鐘生成一次日志信息
              
              
                while
              
              
                True
              
              
                :
              
              
        generate_log
              
                (
              
              
                )
              
              
        time
              
                .
              
              sleep
              
                (
              
              
                60
              
              
                )
              
            
          
  1. 日志文件對(duì)接flume==>kafka
    Flume配置文件: https://blog.csdn.net/drl_blogs/article/details/95192574#execkafkaconf_1
  2. 運(yùn)行flume
            
              flume-ng agent \
--name exec-memory-logger \
--conf conf 
              
                $FLUME_HOME
              
              /conf \
--conf-file 
              
                $FLUME_HOME
              
              /conf/streaming_project.conf \
-Dflume.root.logger
              
                =
              
              INFO,console 
              
                &
              
            
          
  1. 運(yùn)行kafka消費(fèi)者
            
               kafka-console-consumer.sh \
 --zookeeper hadoop01:2181 \
 --topic kafka_streaming_topic

            
          
  1. 運(yùn)行python文件測(cè)試
            
               python generate_log.py

            
          
  1. 查看kafka消費(fèi)者消費(fèi)者是否有信息

  2. 編寫代碼打通通道

            
              
                import
              
               org
              
                .
              
              apache
              
                .
              
              log4j
              
                .
              
              
                {
              
              Level
              
                ,
              
               Logger
              
                }
              
              
                import
              
               org
              
                .
              
              apache
              
                .
              
              spark
              
                .
              
              SparkConf

              
                import
              
               org
              
                .
              
              apache
              
                .
              
              spark
              
                .
              
              streaming
              
                .
              
              kafka
              
                .
              
              KafkaUtils

              
                import
              
               org
              
                .
              
              apache
              
                .
              
              spark
              
                .
              
              streaming
              
                .
              
              
                {
              
              Seconds
              
                ,
              
               StreamingContext
              
                }
              
              

object kafka_Receiver_streaming 
              
                {
              
              
  Logger
              
                .
              
              
                getLogger
              
              
                (
              
              
                "org"
              
              
                )
              
              
                .
              
              
                setLevel
              
              
                (
              
              Level
              
                .
              
              WARN
              
                )
              
              
  def 
              
                main
              
              
                (
              
              args
              
                :
              
               Array
              
                [
              
              String
              
                ]
              
              
                )
              
              
                :
              
               Unit 
              
                =
              
              
                {
              
              
    val sparkConf 
              
                =
              
              
                new
              
              
                SparkConf
              
              
                (
              
              
                )
              
              
                .
              
              
                setAppName
              
              
                (
              
              
                "kafka_Receiver_streaming"
              
              
                )
              
              
                .
              
              
                setMaster
              
              
                (
              
              
                "local[*]"
              
              
                )
              
              
                .
              
              
                set
              
              
                (
              
              
                "spark.port.maxRetries"
              
              
                ,
              
              
                "100"
              
              
                )
              
              

    val ssc 
              
                =
              
              
                new
              
              
                StreamingContext
              
              
                (
              
              sparkConf
              
                ,
              
              
                Seconds
              
              
                (
              
              
                60
              
              
                )
              
              
                )
              
              

    val messages 
              
                =
              
               KafkaUtils
              
                .
              
              
                createStream
              
              
                (
              
              ssc
              
                ,
              
              
                "hadoop01:2181"
              
              
                ,
              
              
                "test"
              
              
                ,
              
              
                Map
              
              
                (
              
              
                "kafka_streaming_topic"
              
              
                -
              
              
                >
              
              
                1
              
              
                )
              
              
                )
              
              
    messages
              
                .
              
              
                map
              
              
                (
              
              _
              
                .
              
              _2
              
                )
              
              
                .
              
              
                count
              
              
                (
              
              
                )
              
              
                .
              
              
                print
              
              
                (
              
              
                )
              
              

    ssc
              
                .
              
              
                start
              
              
                (
              
              
                )
              
              
    ssc
              
                .
              
              
                awaitTermination
              
              
                (
              
              
                )
              
              
                }
              
              
                }
              
            
          
  1. 運(yùn)行代碼查看結(jié)果

更多文章、技術(shù)交流、商務(wù)合作、聯(lián)系博主

微信掃碼或搜索:z360901061

微信掃一掃加我為好友

QQ號(hào)聯(lián)系: 360901061

您的支持是博主寫作最大的動(dòng)力,如果您喜歡我的文章,感覺我的文章對(duì)您有幫助,請(qǐng)用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點(diǎn)擊下面給點(diǎn)支持吧,站長(zhǎng)非常感激您!手機(jī)微信長(zhǎng)按不能支付解決辦法:請(qǐng)將微信支付二維碼保存到相冊(cè),切換到微信,然后點(diǎn)擊微信右上角掃一掃功能,選擇支付二維碼完成支付。

【本文對(duì)您有幫助就好】

您的支持是博主寫作最大的動(dòng)力,如果您喜歡我的文章,感覺我的文章對(duì)您有幫助,請(qǐng)用微信掃描上面二維碼支持博主2元、5元、10元、自定義金額等您想捐的金額吧,站長(zhǎng)會(huì)非常 感謝您的哦?。?!

發(fā)表我的評(píng)論
最新評(píng)論 總共0條評(píng)論