实现功能:
将前面整合Redis的一样,只不过是将结果写入到Mysql数据库中
运行环境跟前面的案例一样,只需要加上storm-jdbc的依赖包即可
实现代码:
package cn.ysjh.drpc; import com.google.common.collect.Maps; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.jdbc.common.Column; import org.apache.storm.jdbc.common.ConnectionProvider; import org.apache.storm.jdbc.common.HikariCPConnectionProvider; import org.apache.storm.jdbc.common.JdbcClient; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import org.apache.storm.utils.Utils; import java.sql.Types; import java.util./; public class StormJDBC { private static class DataSourceSpout extends BaseRichSpout { private SpoutOutputCollector spoutOutputCollector; @Override public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) { this.spoutOutputCollector = spoutOutputCollector; } public static final String[] words = new String[]{“apple”, “ysjh”, “shjkl”, “ueyowir”, “tiuyh”}; @Override public void nextTuple() { Random random = new Random(); String word = words[random.nextInt(words.length)]; this.spoutOutputCollector.emit(new Values(word)); System.out.println(“数据” + word); Utils.sleep(1500); } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields(“lines”)); } } // 词频分割Bolt // private static class SplitBolt extends BaseRichBolt { private OutputCollector outputCollector; @Override public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { this.outputCollector = outputCollector; } // 对lines按照逗号进行切分 // @Override public void execute(Tuple tuple) { String lines = tuple.getStringByField(“lines”); this.outputCollector.emit(new Values(lines)); } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields(“words”)); } } // 词频统计Bolt /*/ private static class CountBolt extends BaseRichBolt { private OutputCollector outputCollector; @Override public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { this.outputCollector=outputCollector; } Map<String, Integer> map = new HashMap<>(); @Override public void execute(Tuple tuple) { String words = tuple.getStringByField(“words”); Integer count = map.get(words); if (count == null) { count = 0; } count++; map.put(words, count); //输出 this.outputCollector.emit(new Values(words,map.get(words))); } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields(“word”,“count”)); } } public static void main(String[] args) { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout(“DataSourceSpout”, new DataSourceSpout()); builder.setBolt(“SplitBolt”, new SplitBolt()).shuffleGrouping(“DataSourceSpout”); builder.setBolt(“CountBolt”, new CountBolt()).shuffleGrouping(“SplitBolt”); Map hikariConfigMap = Maps.newHashMap(); hikariConfigMap.put(“dataSourceClassName”,“com.mysql.jdbc.jdbc2.optional.MysqlDataSource”); hikariConfigMap.put(“dataSource.url”, “jdbc:mysql://118.89.108.116:3306/storm”); hikariConfigMap.put(“dataSource.user”,“root”); hikariConfigMap.put(“dataSource.password”,“root”); ConnectionProvider connectionProvider = new HikariCPConnectionProvider(hikariConfigMap); String tableName = “wc”; JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(tableName, connectionProvider); JdbcInsertBolt userPersistanceBolt = new JdbcInsertBolt(connectionProvider, simpleJdbcMapper) .withTableName(tableName) .withQueryTimeoutSecs(10); builder.setBolt(“JdbcInsertBolt”,userPersistanceBolt).shuffleGrouping(“CountBolt”); builder.setBolt(“MysqlCountBolt”,new MysqlCountBolt()).shuffleGrouping(“CountBolt”); LocalCluster cluster = new LocalCluster(); cluster.submitTopology(“StormJDBC”, new Config(), builder.createTopology()); } }
运行上边代码你会发现在Mysql数据库中一直有数据写入,但是不会累加,相同的数据都会重复出现在数据库中,这样肯定是不行的,需要对代码进行改进
能累加的代码:
package cn.ysjh.drpc; import com.google.common.collect.Maps; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.jdbc.common.Column; import org.apache.storm.jdbc.common.ConnectionProvider; import org.apache.storm.jdbc.common.HikariCPConnectionProvider; import org.apache.storm.jdbc.common.JdbcClient; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import org.apache.storm.utils.Utils; import java.sql.Types; import java.util./; public class StormJDBC { private static class DataSourceSpout extends BaseRichSpout { private SpoutOutputCollector spoutOutputCollector; @Override public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) { this.spoutOutputCollector = spoutOutputCollector; } public static final String[] words = new String[]{“apple”, “ysjh”, “shjkl”, “ueyowir”, “tiuyh”}; @Override public void nextTuple() { Random random = new Random(); String word = words[random.nextInt(words.length)]; this.spoutOutputCollector.emit(new Values(word)); System.out.println(“数据” + word); Utils.sleep(1500); } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields(“lines”)); } } // 词频分割Bolt // private static class SplitBolt extends BaseRichBolt { private OutputCollector outputCollector; @Override public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { this.outputCollector = outputCollector; } // 对lines按照逗号进行切分 // @Override public void execute(Tuple tuple) { String lines = tuple.getStringByField(“lines”); this.outputCollector.emit(new Values(lines)); } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields(“words”)); } } // 词频统计Bolt /*/ private static class CountBolt extends BaseRichBolt { private OutputCollector outputCollector; @Override public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { this.outputCollector=outputCollector; } Map<String, Integer> map = new HashMap<>(); @Override public void execute(Tuple tuple) { String words = tuple.getStringByField(“words”); Integer count = map.get(words); if (count == null) { count = 0; } count++; map.put(words, count); //输出 this.outputCollector.emit(new Values(words,map.get(words))); } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields(“word”,“count”)); } } public static class MysqlCountBolt extends BaseRichBolt{ private OutputCollector collector; private JdbcClient jdbcClient; private ConnectionProvider connectionProvider; public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; Map hikariConfigMap = Maps.newHashMap(); hikariConfigMap.put(“dataSourceClassName”,“com.mysql.jdbc.jdbc2.optional.MysqlDataSource”); hikariConfigMap.put(“dataSource.url”, “jdbc:mysql://localhost/storm”); hikariConfigMap.put(“dataSource.user”,“root”); hikariConfigMap.put(“dataSource.password”,“root”); connectionProvider = new HikariCPConnectionProvider(hikariConfigMap); //对数据库连接池进行初始化 connectionProvider.prepare(); jdbcClient = new JdbcClient(connectionProvider, 30); } Map<String,Integer> map = new HashMap<String,Integer>(); public void execute(Tuple input) { String word = input.getStringByField(“word”); Integer count = input.getIntegerByField(“count”); List
这里就可以实现累加了,跟前面的整合Redis数据库的运行结果一样
注意:
这里pom文件中的mysql-connector-java依赖必须是5.1.31,但是我的本地数据库是8.0版本的,使用这个驱动包会报错,所以我使用的是远程服务器上的数据库,如果你在运行中出现一样的问题,可以考虑将本地数据库版本降低
最后修改于 2018-11-16

本作品采用知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议进行许可。