黄色网页视频 I 影音先锋日日狠狠久久 I 秋霞午夜毛片 I 秋霞一二三区 I 国产成人片无码视频 I 国产 精品 自在自线 I av免费观看网站 I 日本精品久久久久中文字幕5 I 91看视频 I 看全色黄大色黄女片18 I 精品不卡一区 I 亚洲最新精品 I 欧美 激情 在线 I 人妻少妇精品久久 I 国产99视频精品免费专区 I 欧美影院 I 欧美精品在欧美一区二区少妇 I av大片网站 I 国产精品黄色片 I 888久久 I 狠狠干最新 I 看看黄色一级片 I 黄色精品久久 I 三级av在线 I 69色综合 I 国产日韩欧美91 I 亚洲精品偷拍 I 激情小说亚洲图片 I 久久国产视频精品 I 国产综合精品一区二区三区 I 色婷婷国产 I 最新成人av在线 I 国产私拍精品 I 日韩成人影音 I 日日夜夜天天综合

Storm 中drpc調(diào)用

系統(tǒng) 2145 0
      package storm.starter;



import backtype.storm.Config;

import backtype.storm.LocalCluster;

import backtype.storm.LocalDRPC;

import backtype.storm.StormSubmitter;

import backtype.storm.drpc.DRPCSpout;

import backtype.storm.task.ShellBolt;

import backtype.storm.topology.BasicOutputCollector;

import backtype.storm.topology.IRichBolt;

import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.topology.TopologyBuilder;

import backtype.storm.topology.base.BaseBasicBolt;

import backtype.storm.tuple.Fields;

import backtype.storm.tuple.Tuple;

import backtype.storm.tuple.Values;

import storm.starter.spout.RandomSentenceSpout;



import java.lang.management.ManagementFactory;

import java.util.HashMap;

import java.util.Map;



import org.apache.log4j.Logger;

import org.apache.log4j.PropertyConfigurator;



/**

 * This topology demonstrates Storm's stream groupings and multilang

 * capabilities.

 */

public class Drpctest {

	public static final Logger logger = Logger.getLogger(Drpctest.class);

	public static class WordCount extends BaseBasicBolt {

		Map<String, Integer> counts = new HashMap<String, Integer>();



		@Override

		public void execute(Tuple tuple, BasicOutputCollector collector) {

			String word = tuple.getString(0);

			logger.error(this.toString() + "word = " + word);

			Integer count = counts.get(word);

			if (count == null)

				count = 0;

			count++;

			counts.put(word, count);

			logger.error(this.toString() + "count = " + count);

			collector.emit(new Values(word, count));

		}



		String str = Thread.currentThread().getName();



		@Override

		public void declareOutputFields(OutputFieldsDeclarer declarer) {

			logger.error("declareOutputFields :");

			declarer.declare(new Fields("result", "count"));

		}

	}



	public static class DrpcBolt extends BaseBasicBolt {

		Map<String, Integer> counts = new HashMap<String, Integer>();



		@Override

		public void execute(Tuple tuple, BasicOutputCollector collector) {

			String logString = tuple.getString(0);

			logger.error("DrpcBolt recve :" + logString);

		}



		@Override

		public void declareOutputFields(OutputFieldsDeclarer declarer) {

			// 暫時(shí)沒(méi)用

			declarer.declare(new Fields("word1", "count1"));

		}

	}



	public static void main(String[] args) throws Exception {

		TopologyBuilder builder = new TopologyBuilder();



		// drpc

		LocalDRPC drpc = new LocalDRPC();

		DRPCSpout drpc_spout = new DRPCSpout("testdrpc", drpc);

		builder.setSpout("drpcspout", drpc_spout, 3);



		PropertyConfigurator

				.configure("/home/hadoop/code1/Kafka/src/Log4j.properties");



		// 接入drpc

		builder.setBolt("DrpcBolt", new DrpcBolt(), 1).shuffleGrouping(

				"drpcspout");



		Config conf = new Config();

		conf.setDebug(true);



		if (args != null && args.length > 0) {

			conf.setNumWorkers(3);



			StormSubmitter.submitTopology(args[0], conf,

					builder.createTopology());

		} else {

			conf.setMaxTaskParallelism(3);

			conf.setDebug(true);



			LocalCluster cluster = new LocalCluster();

			cluster.submitTopology("word-count", conf, builder.createTopology());



			String str = "send test drpc"; // 和 DRPCSpout 名字對(duì)應(yīng)

			drpc.execute("testdrpc", str);



			Thread.sleep(10000);



			cluster.shutdown();

		}

	}

}


    

?

Storm 中drpc調(diào)用


更多文章、技術(shù)交流、商務(wù)合作、聯(lián)系博主

微信掃碼或搜索:z360901061

微信掃一掃加我為好友

QQ號(hào)聯(lián)系: 360901061

您的支持是博主寫作最大的動(dòng)力,如果您喜歡我的文章,感覺(jué)我的文章對(duì)您有幫助,請(qǐng)用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點(diǎn)擊下面給點(diǎn)支持吧,站長(zhǎng)非常感激您!手機(jī)微信長(zhǎng)按不能支付解決辦法:請(qǐng)將微信支付二維碼保存到相冊(cè),切換到微信,然后點(diǎn)擊微信右上角掃一掃功能,選擇支付二維碼完成支付。

【本文對(duì)您有幫助就好】

您的支持是博主寫作最大的動(dòng)力,如果您喜歡我的文章,感覺(jué)我的文章對(duì)您有幫助,請(qǐng)用微信掃描上面二維碼支持博主2元、5元、10元、自定義金額等您想捐的金額吧,站長(zhǎng)會(huì)非常 感謝您的哦!!!

發(fā)表我的評(píng)論
最新評(píng)論 總共0條評(píng)論