🌀 技术人生
凡事有交代,件件有着落,事事有回音
Storm整合HBase

跟之前Storm整合的一样,这里直接放代码了 package cn.ysjh.drpc; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.hbase.bolt.HBaseBolt; import org.apache.storm.hbase.bolt.mapper.SimpleHBaseMapper; import org.apache.storm.redis.bolt.RedisStoreBolt; import org.apache.storm.redis.common.config.JedisPoolConfig; import org.apache.storm.redis.common.mapper.RedisDataTypeDescription; import org.apache.storm.redis.common.mapper.RedisStoreMapper; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.ITuple; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import org.apache.storm.utils.Utils; import java.util.HashMap; import java.util.Map; import java.util.Random; public class StormHBase { private static class DataSourceSpout extends BaseRichSpout { private SpoutOutputCollector spoutOutputCollector; @Override public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) { this.spoutOutputCollector = spoutOutputCollector; } public static final String[] words = new String[]{“apple”, “ysjh”, “shjkl”, “ueyowir”, “tiuyh”}; @Override public void nextTuple() { Random random = new Random(); String word = words[random.nextInt(words.length)]; this.spoutOutputCollector.emit(new Values(word)); System.out.println(“数据” + word); Utils.sleep(1500); } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields(“lines”)); } } //* 词频分割Bolt // private static class SplitBolt extends BaseRichBolt { private OutputCollector outputCollector; @Override public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { this.outputCollector = outputCollector; } // 对lines按照逗号进行切分 // @Override public void execute(Tuple tuple) { String lines = tuple.getStringByField(“lines”); this.outputCollector.emit(new Values(lines)); } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields(“words”)); } } // 词频统计Bolt /*/ private static class CountBolt extends BaseRichBolt { private OutputCollector outputCollector; @Override public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { this.outputCollector=outputCollector; } Map<String, Integer> map = new HashMap<>(); @Override public void execute(Tuple tuple) { String words = tuple.getStringByField(“words”); Integer count = map.get(words); if (count == null) { count = 0; } count++; map.put(words, count); //输出 this.outputCollector.emit(new Values(words,map.get(words))); } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields(“word”,“count”)); } } public static void main(String[] args) { Config config=new Config(); Map<String, Object> hbasemap = new HashMap<>(); hbasemap.put(“hbase.rootdir”,“hdfs://cdh0:8020/hbase”); hbasemap.put(“hbase.zookeeper.quorum”,“cdh0:2181”); config.put(“hbase.conf”,hbasemap); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout(“DataSourceSpout”, new DataSourceSpout()); builder.setBolt(“SplitBolt”, new SplitBolt()).shuffleGrouping(“DataSourceSpout”); builder.setBolt(“CountBolt”, new CountBolt()).shuffleGrouping(“SplitBolt”); SimpleHBaseMapper mapper = new SimpleHBaseMapper() .withRowKeyField(“word”) .withColumnFields(new Fields(“word”)) .withCounterFields(new Fields(“count”)) .withColumnFamily(“cf”); HBaseBolt hbase = new HBaseBolt(“WordCount”, mapper).withConfigKey(“hbase.conf”); builder.setBolt(“HBaseBolt”,hbase).shuffleGrouping(“CountBolt”); LocalCluster cluster = new LocalCluster(); cluster.submitTopology(“StormHBase”, config, builder.createTopology()); } }

可以看出,这里不仅是根据官网上的代码进行少量的添加,还需要指定HBase,不然Storm怎么找到HBase呢,这里官网上边是没有的,要指定一个Config,然后将HBase的信息写入到里面

这里是本地模式,在windows下运行肯定不行,可以打包在集群上运行,不过运行过后可能会出现有jar包com.googleguava冲突问题,可以在pom文件中将hadoop-client和storm-hbase中加上下边代码将这个jar包去掉 com.google.guava guava

然后再重新加入这个jar包即可

com.google.guava guava 16.0.1


最后修改于 2018-11-21

知识共享许可协议
本作品采用知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议进行许可。