微信搜索superit|邀请体验:大数据, 数据管理、OLAP分析与可视化平台 | 赞助作者:赞助作者

storm从kafka消息队列读取数据进行读写操作

kafka aide_941 38℃

storm从kafka消息队列读取数据进行读写操作

 版权声明:“假装、是个有灵魂的程序员” —— Go Big Or Go Home https://blog.csdn.net/u011663149/article/details/85625136

业务场景:

storm+kafka 作为经典组合,storm的作用主要是进行流式计算,对于源源不断的均匀数据流流入处理是非常有效的,而现实生活中大部分场景并不是均匀的数据流,而是时而多时而少的数据流入,这种情况下显然用批量处理是不合适的,如果使用storm做实时计算的话可能因为数据拥堵而导致服务器挂掉,应对这种情况,使用kafka作为消息队列是非常合适的选择,kafka可以将不均匀的数据转换成均匀的消息流,从而和storm比较完善的结合,这样才可以实现稳定的流式计算。

废话不多说,直接low一个小demo:

Topology:

  1. public class KafkaTopology {
  2. private String topic ;
  3. private String zkRoot;
  4. private String spoutId;
  5. public KafkaTopology() {}
  6. public KafkaTopology(String topic,String zkRoot, String spoutId){
  7. this.topic = topic;
  8. this.zkRoot = zkRoot;
  9. this.spoutId = spoutId;
  10. }
  11. public void run(){
  12. //指定zk的地址
  13. BrokerHosts brokerHosts = new ZkHosts(“storm1:2181,storm2:2181,storm3:2181”);
  14. //配置spout
  15. SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, topic, zkRoot, spoutId);
  16. spoutConfig.forceFromStart = true; //从什么位置读取
  17. spoutConfig.scheme = new SchemeAsMultiScheme(new MessageScheme());
  18. TopologyBuilder builder = new TopologyBuilder();
  19. ////设置一个spout用来从kaflka消息队列中读取数据并发送给下一级的bolt组件,
  20. //此处用的spout组件并非自定义的,而是storm中已经开发好的KafkaSpout
  21. builder.setSpout(spoutId, new KafkaSpout(spoutConfig));
  22. builder.setBolt(“wordSplit” , new WordSplit()).shuffleGrouping(spoutId);
  23. builder.setBolt(“writer”, new WriterBolt(),4).fieldsGrouping(“wordSplit”, new Fields(“word”));
  24. Config conf = new Config();
  25. conf.setNumWorkers(4);
  26. conf.setNumAckers(0);
  27. conf.setDebug(false);
  28. //LocalCluster用来将topology提交到本地模拟器运行,方便开发调试
  29. LocalCluster localCluster = new LocalCluster();
  30. localCluster.submitTopology(“WordCount”, conf, builder.createTopology());
  31. //提交topology到storm集群中运行
  32. // StormSubmitter.submitTopology(“wordCount”, conf, builder.createTopology());
  33. }
  34. public static void main(String[] args) {
  35. new KafkaTopology(“test”, “/kafka-storm”, “KafkaSpout”).run();
  36. }
  37. }

WordSplit:

  1. public class WordSplit extends BaseBasicBolt {
  2. private static final long serialVersionUID = 1L;
  3. @Override
  4. public void execute(Tuple tuple, BasicOutputCollector collector) {
  5. String line = tuple.getString(0);
  6. String[] words = line.split(” “);
  7. for (String word : words) {
  8. //判断某字符串是否不为空且长度不为0且不由空白符(whitespace)构成
  9. if(StringUtils.isNotBlank(word.trim())){
  10. word = word.toUpperCase();
  11. collector.emit(new Values(word));
  12. }
  13. }
  14. }
  15. @Override
  16. public void declareOutputFields(OutputFieldsDeclarer declarer) {
  17. declarer.declare(new Fields(“word”));
  18. }
  19. }

WirterBolt:

  1. // 将数据写入文件
  2. public class WriterBolt extends BaseBasicBolt{
  3. private static final long serialVersionUID = 1L;
  4. FileWriter writer = null;
  5. @Override
  6. public void prepare(Map conf, TopologyContext context) {
  7. try {
  8. writer = new FileWriter(“e:/”+“wordcount-“ +UUID.randomUUID().toString());
  9. } catch (IOException e) {
  10. e.printStackTrace();
  11. }
  12. }
  13. @Override
  14. public void execute(Tuple tuple, BasicOutputCollector collector) {
  15. String line = tuple.getString(0);
  16. try {
  17. writer.write(line);
  18. writer.write(“\n”);
  19. writer.flush();
  20. } catch (IOException e) {
  21. e.printStackTrace();
  22. }
  23. }
  24. @Override
  25. public void declareOutputFields(OutputFieldsDeclarer declarer) {
  26. }
  27. }

本demo主要是standlocal模式,生产环境一些参数的优化不作解释·~~~

转载请注明:SuperIT » storm从kafka消息队列读取数据进行读写操作

喜欢 (0)or分享 (0)