实现功能:
将一个特定数组中的数据每隔0.2秒随机取出一个然后将这些数据实时的写入到HDFS中
实现代码: package cn.ysjh.drpc; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.hdfs.bolt.HdfsBolt; import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat; import org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat; import org.apache.storm.hdfs.bolt.format.FileNameFormat; import org.apache.storm.hdfs.bolt.format.RecordFormat; import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy; import org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy; import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy; import org.apache.storm.hdfs.bolt.sync.SyncPolicy; import org.apache.storm.redis.bolt.RedisStoreBolt; import org.apache.storm.redis.common.config.JedisPoolConfig; import org.apache.storm.redis.common.mapper.RedisDataTypeDescription; import org.apache.storm.redis.common.mapper.RedisStoreMapper; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.ITuple; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import org.apache.storm.utils.Utils; import java.util.HashMap; import java.util.Map; import java.util.Random; public class StormHDFS { private static class DataSourceSpout extends BaseRichSpout { private SpoutOutputCollector spoutOutputCollector; @Override public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) { this.spoutOutputCollector = spoutOutputCollector; } public static final String[] words = new String[]{“apple”, “ysjh”, “shjkl”, “ueyowir”, “tiuyh”}; @Override public void nextTuple() { Random random = new Random(); String word = words[random.nextInt(words.length)]; this.spoutOutputCollector.emit(new Values(word)); System.out.println(“数据” + word); Utils.sleep(200); } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields(“lines”)); } } //* 词频分割Bolt // private static class SplitBolt extends BaseRichBolt { private OutputCollector outputCollector; @Override public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { this.outputCollector = outputCollector; } // 对lines按照逗号进行切分 /*/ @Override public void execute(Tuple tuple) { String lines = tuple.getStringByField(“lines”); this.outputCollector.emit(new Values(lines)); } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields(“words”)); } } public static void main(String[] args) { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout(“DataSourceSpout”, new DataSourceSpout()); builder.setBolt(“SplitBolt”, new SplitBolt()).shuffleGrouping(“DataSourceSpout”); RecordFormat format = new DelimitedRecordFormat() .withFieldDelimiter("|"); SyncPolicy syncPolicy = new CountSyncPolicy(100); FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, FileSizeRotationPolicy.Units.MB); FileNameFormat fileNameFormat = new DefaultFileNameFormat() .withPath("/test/"); HdfsBolt bolt = new HdfsBolt() .withFsUrl(“hdfs:/cdh0:8020”) .withFileNameFormat(fileNameFormat) .withRecordFormat(format) .withRotationPolicy(rotationPolicy) .withSyncPolicy(syncPolicy); builder.setBolt(“HdfsBolt”,bolt).shuffleGrouping(“SplitBolt”); LocalCluster cluster = new LocalCluster(); cluster.submitTopology(“StormHDFS”, new Config(), builder.createTopology()); } }
注意:
执行上述代码后可能会报没有写HDFS权限的错,这时候就需要修改HDFS相应目录的权限
最后修改于 2018-11-16

本作品采用知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议进行许可。