微信搜索superit|邀请体验:大数据, 数据管理、OLAP分析与可视化平台 | 赞助作者:赞助作者

使用flume-ng+kafka+storm+mysql 搭建实时日志处理平台

Flume aide_941 30℃

使用flume-ng+kafka+storm+mysql 搭建实时日志处理平台

 版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/charry_a/article/details/79279077

一、架构介绍

因为要采集的日志已经在线上,为了不侵入代码(主要也是其他产品不会因为我们搞这个日志监控而修改代码后重新上线),已经不能再规范日志化输出,也就是需要对老系统进行日志分析。对于不同的应用,不同的日志类型,比如nginx日志、tomcat日志、应用日志等都需要分别采集;调研了flume和Logstash,当然还有更轻量级的filebeta;最后选择了flume,有以下几个原因:

1、flume是JAVA写的,对于项目组来说,入手成本稍低,虽然logstash也不错,但是要改源码,还得学习下ruby

2、flume和目前主流的系统或框架集成都比较容易,如hadoop、hbase、es,当采集日志后,数据落地比较方便

flume有3个非常重要的组件,分别是source、channel、sink 分别对应数据源,数据传输通道,数据落地。一般来说,当我们采集日志的时候,需要在app server上部署一个flume agent,然后将日志信息传输到指定的sink里面,但是这样做不是很好扩展,比如flume—>kafka这样的架构,假如

1、因为我们采集的日志跨项目组,也就是有不同的项目日志,如果直接flume-kafka,那么我的kafka配置信息需要对外完全暴露

2、假如现在我的kafka机器需要停机维护或者节点挂了,就会影响到flume端的日志采集服务

所以这里我们考虑让flume分层部署,不同的项目日志经过agent后再到一个数据缓存层,并且做好数据缓存层的failover,大概架构如下

 

首先,在不同的server上部署agent,根据不同的日志来源,为了更快的传输,第一层agent的channel使用exec,也就是实时监控新日志的产生,第二层agent的channel因为要做数据缓存,这里采用file,而且这里使用flume 的故障转移机制,也就是第一层的agent对应的sink配置2个不同的agent,使用优先级约束,当第二层其中某一个agent宕机后,将自动由低优先级的agent来负责接收第一层的日志。

这里以一个项目的其中某个日志来举例,首先编写conf文件

  1. a1.sources = s1
  2. a1.sinks = k1 k2
  3. a1.channels = c1
  4. # Describe/configure the source
  5. a1.sources.s1.type = exec
  6. a1.sources.s1.channels = c1
  7. a1.sources.s1.command = tail -F /home/esuser/flume/logs/flume.log
  8. #config interceptor
  9. a1.sources.s1.interceptors=i1 i2
  10. a1.sources.s1.interceptors.i1.type=host
  11. a1.sources.s1.interceptors.i1.useIP=true
  12. a1.sources.s1.interceptors.i1.preserverExisting=false
  13. a1.sources.s1.interceptors.i2.type=static
  14. a1.sources.s1.interceptors.i2.key=projectname
  15. a1.sources.s1.interceptors.i2.value=honeybee
  16. a1.sources.s1.interceptors.i2.preserverExisting=false
  17. # Describe the sink
  18. a1.sinks.k1.type = avro
  19. a1.sinks.k1.hostname = 192.168.80.132
  20. a1.sinks.k1.port= 3333
  21. a1.sinks.k1.channels = c1
  22. a1.sinks.k2.type = avro
  23. a1.sinks.k2.hostname = 192.168.80.132
  24. a1.sinks.k2.port = 4444
  25. a1.sinks.k2.channels = c1
  26. # Use a channel that buffers events in memory
  27. a1.channels.c1.type = memory
  28. a1.channels.c1.capacity = 1000
  29. a1.channels.c1.transactionCapacity = 100
  30. # Bind the source and sink to the channel
  31. a1.sources.s1.channels = c1
  32. a1.sinks.k1.channel = c1
  33. a1.sinks.k2.channel = c1
  34. a1.sinkgroups = g1
  35. a1.sinkgroups.g1.sinks = k1 k2
  36. a1.sinkgroups.g1.processor.type = failover
  37. a1.sinkgroups.g1.processor.priority.k1 = 10
  38. a1.sinkgroups.g1.processor.priority.k2 = 5
  39. a1.sinkgroups.g1.processor.maxpenalty = 1000

这里定义了一个source,2个sink,其中一个发送到3333监听端口,另一个发送到4444监听端口,channel采用exec。     接下来看一下第二层

先看3333接口

 

  1. a1.sources = s1
  2. a1.sinks = k1
  3. a1.channels = c1
  4. # Describe/configure the source
  5. a1.sources.s1.type = avro
  6. a1.sources.s1.channels = c1
  7. a1.sources.s1.bind = 192.168.80.132
  8. a1.sources.s1.port = 3333
  9. a1.sources.s1.threads = 2
  10. # Describe the sink
  11. a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
  12. a1.sinks.k1.brokerList = 192.168.80.132:9092
  13. a1.sinks.k1.topic=test
  14. a1.sinks.k..serializer.class=kafka.serializer.StringEncoder
  15. # Use a channel that buffers events in memory
  16. a1.channels.c1.type = file
  17. a1.channels.c1.checkpointDir=/home/esuser/flume/checkpoint3333
  18. a1.channels.c1.dataDirs=/home/esuser/flume/data3333
  19. # Bind the source and sink to the channel
  20. a1.sources.s1.channels = c1
  21. a1.sinks.k1.channel = c1

再看 4444端口

  1. a1.sources = s1
  2. a1.sinks = k1
  3. a1.channels = c1
  4. # Describe/configure the source
  5. a1.sources.s1.type = avro
  6. a1.sources.s1.channels = c1
  7. a1.sources.s1.bind = 192.168.80.132
  8. a1.sources.s1.port = 4444
  9. a1.sources.s1.threads = 2
  10. # Describe the sink
  11. a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
  12. a1.sinks.k1.brokerList = 192.168.80.132
  13. a1.sinks.k1.topic = test
  14. a1.sinks.k1.serializer.class=kafka.serializer.StringEncoder
  15. # Use a channel that buffers events in memory
  16. a1.channels.c1.type = file
  17. a1.channels.c1.checkpointDir = /home/esuser/flume/checkpoint4444
  18. a1.channels.c1.dataDirs=/home/esuser/flume/data4444
  19. # Bind the source and sink to the channel
  20. a1.sources.s1.channels = c1
  21. a1.sinks.k1.channel = c1

根据第一层配置,因为3333端口优先级高于4444端口,所以如果环境良好,第二层里面只会有3333端口收到消息,只有当3333端口所在进程宕掉后,4444端口就会接过3333端口所干的事情。
现在我们先来测试一下flume的配置是否是正确的,先写一个定时任务,每隔5s写入一条数据,日志文件就是第一层agent监听的文件

定时任务启动

日志文件

再启动agent,命令参考

 

   ./flume-ng agent -c ../conf -f ../myconf/flume-avro4444.conf -n a1 -Dflume.root.logger=INFO,console

启动kafka消费者

已经消费了flume传输过来的数据,现在我手动kill 调3333端口,测试一下flume的故障转移

kill之后 第一层的agent就会报连不上

这个时间我们的日志依然在产生,再看一下消费者是否消费了

依然在消费数据。

现在我们编写storm消费kafka数据然后存入Mysql

pom.xml

  1. <project xmlns=“http://maven.apache.org/POM/4.0.0” xmlns:xsi=“http://www.w3.org/2001/XMLSchema-instance”
  2. xsi:schemaLocation=“http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd”>
  3. <modelVersion>4.0.0</modelVersion>
  4. <groupId>flume</groupId>
  5. <artifactId>storm</artifactId>
  6. <version>0.0.1-SNAPSHOT</version>
  7. <packaging>jar</packaging>
  8. <name>storm</name>
  9. <url>http://maven.apache.org</url>
  10. <properties>
  11. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  12. <spring.version>4.3.3.RELEASE</spring.version>
  13. </properties>
  14. <dependencies>
  15. <dependency>
  16. <groupId>org.apache.curator</groupId>
  17. <artifactId>curator-framework</artifactId>
  18. <version>2.7.0</version>
  19. </dependency>
  20. <dependency>
  21. <groupId>org.apache.storm</groupId>
  22. <artifactId>storm-core</artifactId>
  23. <version>1.1.0</version>
  24. <scope>provided</scope>
  25. <exclusions>
  26. <exclusion>
  27. <groupId>ch.qos.logback</groupId>
  28. <artifactId>logback-classic</artifactId>
  29. </exclusion>
  30. <exclusion>
  31. <groupId>org.slf4j</groupId>
  32. <artifactId>log4j-over-slf4j</artifactId>
  33. </exclusion>
  34. <exclusion>
  35. <groupId>org.slf4j</groupId>
  36. <artifactId>slf4j-api</artifactId>
  37. </exclusion>
  38. <exclusion>
  39. <groupId>org.clojure</groupId>
  40. <artifactId>clojure</artifactId>
  41. </exclusion>
  42. </exclusions>
  43. </dependency>
  44. <dependency>
  45. <groupId>org.apache.kafka</groupId>
  46. <artifactId>kafka_2.11</artifactId>
  47. <version>0.9.0.1</version>
  48. <exclusions>
  49. <exclusion>
  50. <groupId>org.apache.zookeeper</groupId>
  51. <artifactId>zookeeper</artifactId>
  52. </exclusion>
  53. <exclusion>
  54. <groupId>org.slf4j</groupId>
  55. <artifactId>slf4j-log4j12</artifactId>
  56. </exclusion>
  57. <exclusion>
  58. <groupId>log4j</groupId>
  59. <artifactId>log4j</artifactId>
  60. </exclusion>
  61. </exclusions>
  62. </dependency>
  63. <dependency>
  64. <groupId>org.apache.storm</groupId>
  65. <artifactId>storm-kafka</artifactId>
  66. <version>1.0.0</version>
  67. </dependency>
  68. <!– storm运行jar报错新增 –>
  69. <dependency>
  70. <groupId>com.googlecode.json-simple</groupId>
  71. <artifactId>json-simple</artifactId>
  72. <version>1.1.1</version>
  73. </dependency>
  74. <dependency>
  75. <groupId>org.springframework</groupId>
  76. <artifactId>spring-core</artifactId>
  77. <version>${spring.version}</version>
  78. </dependency>
  79. <dependency>
  80. <groupId>org.springframework</groupId>
  81. <artifactId>spring-beans</artifactId>
  82. <version>${spring.version}</version>
  83. </dependency>
  84. <dependency>
  85. <groupId>org.springframework</groupId>
  86. <artifactId>spring-aop</artifactId>
  87. <version>${spring.version}</version>
  88. </dependency>
  89. <dependency>
  90. <groupId>org.springframework</groupId>
  91. <artifactId>spring-context</artifactId>
  92. <version>${spring.version}</version>
  93. </dependency>
  94. <dependency>
  95. <groupId>mysql</groupId>
  96. <artifactId>mysql-connector-java</artifactId>
  97. <version>5.1.26</version>
  98. </dependency>
  99. <dependency>
  100. <groupId>org.apache.storm</groupId>
  101. <artifactId>storm-jdbc</artifactId>
  102. <version>1.0.1</version>
  103. </dependency>
  104. <dependency>
  105. <groupId>org.apache.storm</groupId>
  106. <artifactId>storm-redis</artifactId>
  107. <version>1.1.0</version>
  108. </dependency>
  109. </dependencies>
  110. <build>
  111. <plugins>
  112. <plugin>
  113. <groupId>org.apache.maven.plugins</groupId>
  114. <artifactId>maven-compiler-plugin</artifactId>
  115. <version>2.3.2</version>
  116. <configuration>
  117. <source>1.8</source>
  118. <target>1.8</target>
  119. <compilerVersion>1.8</compilerVersion>
  120. </configuration>
  121. </plugin>
  122. <plugin>
  123. <artifactId>maven-assembly-plugin</artifactId>
  124. <configuration>
  125. <descriptorRefs>
  126. <descriptorRef>jar-with-dependencies</descriptorRef>
  127. </descriptorRefs>
  128. <archive>
  129. <manifest>
  130. <mainClass></mainClass>
  131. </manifest>
  132. </archive>
  133. </configuration>
  134. <executions>
  135. <execution>
  136. <id>make-assembly</id>
  137. <phase>package</phase>
  138. <goals>
  139. <goal>single</goal>
  140. </goals>
  141. </execution>
  142. </executions>
  143. </plugin>
  144. </plugins>
  145. </build>
  146. </project>

spout

  1. package com.log.storm;
  2. import java.util.Arrays;
  3. import org.apache.storm.Config;
  4. import org.apache.storm.LocalCluster;
  5. import org.apache.storm.StormSubmitter;
  6. import org.apache.storm.kafka.BrokerHosts;
  7. import org.apache.storm.kafka.KafkaSpout;
  8. import org.apache.storm.kafka.SpoutConfig;
  9. import org.apache.storm.kafka.StringScheme;
  10. import org.apache.storm.kafka.ZkHosts;
  11. import org.apache.storm.spout.SchemeAsMultiScheme;
  12. import org.apache.storm.topology.TopologyBuilder;
  13. public class CounterTopology {
  14. public static void main(String[] args) {
  15. try {
  16. String kafkaZookeeper = “192.168.80.132:2181”;
  17. BrokerHosts brokerHosts = new ZkHosts(kafkaZookeeper);
  18. SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, “test”, “/kafka2storm”, “id”);
  19. spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
  20. spoutConfig.zkServers = Arrays.asList(new String[] { “192.168.80.132” });
  21. spoutConfig.zkPort = 2181;
  22. KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
  23. TopologyBuilder builder = new TopologyBuilder();
  24. builder.setSpout(“spout”, kafkaSpout, 1);
  25. builder.setBolt(“counter”, new ParseBolt(), 2).shuffleGrouping(“spout”);
  26. builder.setBolt(“insertbolt”, PersistentBolt.getJdbcInsertBolt(), 1).shuffleGrouping(“counter”);
  27. Config config = new Config();
  28. config.setDebug(false);
  29. if (args != null && args.length > 0) {
  30. config.setNumWorkers(1);
  31. StormSubmitter.submitTopology(args[0], config, builder.createTopology());
  32. } else {
  33. config.setMaxTaskParallelism(3);
  34. LocalCluster cluster = new LocalCluster();
  35. cluster.submitTopology(“special-topology”, config, builder.createTopology());
  36. Thread.sleep(50000);
  37. cluster.killTopology(“special-topology”);
  38. cluster.shutdown();
  39. }
  40. } catch (Exception e) {
  41. e.printStackTrace();
  42. }
  43. }
  44. }

ParseBolt

  1. package com.log.storm;
  2. import java.util.Map;
  3. import org.apache.storm.task.OutputCollector;
  4. import org.apache.storm.task.TopologyContext;
  5. import org.apache.storm.topology.OutputFieldsDeclarer;
  6. import org.apache.storm.topology.base.BaseRichBolt;
  7. import org.apache.storm.tuple.Fields;
  8. import org.apache.storm.tuple.Tuple;
  9. import org.apache.storm.tuple.Values;
  10. import org.slf4j.Logger;
  11. import org.slf4j.LoggerFactory;
  12. public class ParseBolt extends BaseRichBolt{
  13. private Logger logger = LoggerFactory.getLogger(ParseBolt.class);
  14. private static final long serialVersionUID = –5508421065181891596L;
  15. private OutputCollector collector;
  16. @SuppressWarnings(“rawtypes”)
  17. @Override
  18. public void prepare(Map stormConf, TopologyContext context,
  19. OutputCollector collector) {
  20. this.collector = collector;
  21. }
  22. @Override
  23. public void execute(Tuple tuple) {
  24. String message = tuple.getString(0);
  25. logger.info(“bolt receive message : “ + message);
  26. collector.emit(tuple, new Values(message));
  27. collector.ack(tuple);
  28. }
  29. @Override
  30. public void declareOutputFields(OutputFieldsDeclarer declarer) {
  31. declarer.declare(new Fields(“rule_value”));
  32. }
  33. }

PersistentBolt

  1. package com.log.storm;
  2. import java.sql.Types;
  3. import java.util.HashMap;
  4. import java.util.List;
  5. import java.util.Map;
  6. import org.apache.storm.jdbc.bolt.JdbcInsertBolt;
  7. import org.apache.storm.jdbc.common.Column;
  8. import org.apache.storm.jdbc.common.ConnectionProvider;
  9. import org.apache.storm.jdbc.common.HikariCPConnectionProvider;
  10. import org.apache.storm.jdbc.mapper.JdbcMapper;
  11. import org.apache.storm.jdbc.mapper.SimpleJdbcMapper;
  12. import org.apache.storm.shade.com.google.common.collect.Lists;
  13. @SuppressWarnings(“serial”)
  14. public class PersistentBolt {
  15. private static Map<String, Object> hikariConfigMap = new HashMap<String, Object>() {{
  16. put(“dataSourceClassName”, “com.mysql.jdbc.jdbc2.optional.MysqlDataSource”);
  17. put(“dataSource.url”, “jdbc:mysql://192.168.80.132:3306/logmonitor”);
  18. put(“dataSource.user”, “root”);
  19. put(“dataSource.password”, “123456”);
  20. }};
  21. public static ConnectionProvider connectionProvider = new HikariCPConnectionProvider(hikariConfigMap);
  22. public static JdbcInsertBolt getJdbcInsertBolt() {
  23. JdbcInsertBolt jdbcInsertBolt = null;
  24. @SuppressWarnings(“rawtypes”)
  25. List<Column> schemaColumns = Lists.newArrayList(new Column(“rule_value”, Types.VARCHAR));
  26. if(null != schemaColumns && !schemaColumns.isEmpty()) {
  27. JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(schemaColumns);
  28. jdbcInsertBolt = new JdbcInsertBolt(connectionProvider, simpleJdbcMapper)
  29. .withInsertQuery(“insert into t_rule_config(rule_value) values(?)”)
  30. .withQueryTimeoutSecs(50);
  31. }
  32. return jdbcInsertBolt;
  33. }
  34. }

然后打包后提交topology到storm集群     

查看storm日志,可以看到storm已经消费了数据

再看DB表,可以看到已经有了日志,这里没有对收到的message做任何处理,所以是最原始的日志文件,乱码问题是因为编码没设置,可忽略。

至此,从应用server—>flume–>kafka—->storm—>mysql整个链路已经通了。基本链路比较清晰,但是各个组件还有很多细节的东西,当然,这都是后话了。

转载请注明:SuperIT » 使用flume-ng+kafka+storm+mysql 搭建实时日志处理平台

喜欢 (0)or分享 (0)