🌀 技术人生
凡事有交代,件件有着落,事事有回音
Storm整合Redis

实现功能:

将之前的词频统计案例改编,将一个数组中的数据每隔1秒取出一个,通过Storm的Topology处理之后写入到Redis中

首先要记得导入pom依赖 org.apache.storm storm-redis 1.1.1

代码实现:

package cn.ysjh.drpc; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.redis.bolt.RedisStoreBolt; import org.apache.storm.redis.common.config.JedisPoolConfig; import org.apache.storm.redis.common.mapper.RedisDataTypeDescription; import org.apache.storm.redis.common.mapper.RedisStoreMapper; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.ITuple; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import org.apache.storm.utils.Utils; import java.util./; public class StormRedis { private static class DataSourceSpout extends BaseRichSpout { private SpoutOutputCollector spoutOutputCollector; @Override public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) { this.spoutOutputCollector = spoutOutputCollector; } public static final String[] words = new String[]{“apple”, “ysjh”, “shjkl”, “ueyowir”, “tiuyh”}; @Override public void nextTuple() { Random random = new Random(); String word = words[random.nextInt(words.length)]; this.spoutOutputCollector.emit(new Values(word)); System.out.println(“数据” + word); Utils.sleep(1000); } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields(“lines”)); } } // 词频分割Bolt // private static class SplitBolt extends BaseRichBolt { private OutputCollector outputCollector; @Override public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { this.outputCollector = outputCollector; } // 对lines按照逗号进行切分 // @Override public void execute(Tuple tuple) { String lines = tuple.getStringByField(“lines”); this.outputCollector.emit(new Values(lines)); } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields(“words”)); } } // 词频统计Bolt /*/ private static class CountBolt extends BaseRichBolt { private OutputCollector outputCollector; @Override public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { this.outputCollector=outputCollector; } Map<String, Integer> map = new HashMap<>(); @Override public void execute(Tuple tuple) { String words = tuple.getStringByField(“words”); Integer count = map.get(words); if (count == null) { count = 0; } count++; map.put(words, count); //输出 this.outputCollector.emit(new Values(words,map.get(words))); } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields(“word”,“count”)); } } public static class WordCountStoreMapper implements RedisStoreMapper { private RedisDataTypeDescription description; private final String hashKey = “wc”; public WordCountStoreMapper() { description = new RedisDataTypeDescription( RedisDataTypeDescription.RedisDataType.HASH, hashKey); } @Override public RedisDataTypeDescription getDataTypeDescription() { return description; } @Override public String getKeyFromTuple(ITuple tuple) { return tuple.getStringByField(“word”); } @Override public String getValueFromTuple(ITuple tuple) { return tuple.getIntegerByField(“count”)+""; } } public static void main(String[] args) { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout(“DataSourceSpout”, new DataSourceSpout()); builder.setBolt(“SplitBolt”, new SplitBolt()).shuffleGrouping(“DataSourceSpout”); builder.setBolt(“CountBolt”, new CountBolt()).shuffleGrouping(“SplitBolt”); JedisPoolConfig poolConfig = new JedisPoolConfig.Builder() .setHost(“118.89.108.116”).setPort(6379).build(); RedisStoreMapper storeMapper = new WordCountStoreMapper(); RedisStoreBolt storeBolt = new RedisStoreBolt(poolConfig, storeMapper); builder.setBolt(“RedisStoreBolt”,storeBolt).shuffleGrouping(“CountBolt”); LocalCluster cluster = new LocalCluster(); cluster.submitTopology(“StormRedis”, new Config(), builder.createTopology()); } }

运行截图:

然后在Redis的图形连接软件中不断刷线来查看db0数据库中的键值对的变化


最后修改于 2018-11-15

知识共享许可协议
本作品采用知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议进行许可。