1.案例需求
实现自增数字相加的和 1+2+3+4+5+6+……..
2.需求分析
Spout来发送数字作为input
使用Bolt来实现求和逻辑
将结果输出到控制台
3.导入Storm的pom依赖
4.具体代码实现
package cn.ysjh; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import org.apache.storm.utils.Utils; import java.util.Map; //* 使用Storm实现求和功能 // public class SumStorm { // Spout需要继承Base // public static class DataSourceSpout extends BaseRichSpout{ private SpoutOutputCollector spoutOutputCollector; // 初始化方法,只会被调用一次 // @Override public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) { this.spoutOutputCollector=spoutOutputCollector; } int num=0; // 会产生数据,在实际生产中肯定是从消息队列中获取数据 这个方法是一个死循环,会一直运行 // @Override public void nextTuple() { this.spoutOutputCollector.emit(new Values(++num)); System.out.println(“数据:"+num); //防止数据产生太快 Utils.sleep(1000); } // 声明输出字段 // @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { //这里要和上边nextTuple()方法中的Values中的对应 outputFieldsDeclarer.declare(new Fields(“number”)); } } // 数据的累计求和Bolt:接收数据并处理 // private static class SumBolt extends BaseRichBolt { // 初始化方法,会被执行一次 // @Override public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { } int sum=0; // 获取Spout发送过来的数据 Bolt中获取值可以根据index获取,也可以根据上一个环节中定义的fields名称获取,建议使用后一种方法 // @Override public void execute(Tuple tuple) { Integer Num = tuple.getIntegerByField(“number”); sum+=Num; System.out.println(“Sum:"+sum); } // /*/ @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { } } public static void main(String[] args){ //TopologyBuilder根据Spout和Bolt来构建topology,Storm中任何一个作业都是通过Topology的方式进行提交的,Topology中需要指定Spout和Bolt执行的顺序 TopologyBuilder builder = new TopologyBuilder(); builder.setSpout(“DataSourceSpout”,new DataSourceSpout()); builder.setBolt(“SumBolt”,new SumBolt()).shuffleGrouping(“DataSourceSpout”); //创建本地模式,只需使用LocalCluster类 LocalCluster cluster = new LocalCluster(); cluster.submitTopology(“SumStorm”,new Config(),builder.createTopology()); } }
可以看出先实现Spout和Bolt,最后实现Topology进行本地运行
注意:
这里是本地运行模式,不需要Storm集群环境,如果要提交到集群上运行,最后的LocalCluster需要修改为StormSubmitter将Topology提交到集群上运行
5.运行截图
最后修改于 2018-11-12

本作品采用知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议进行许可。