微信搜索superit|邀请体验:大数据, 数据管理、OLAP分析与可视化平台 | 赞助作者:赞助作者

Flume NG高可用集群搭建

Flume aide_941 28℃

Flume NG高可用集群搭建

软件版本:

  • CentOS 6.7
  • hadoop-2.7.4
  • apache-flume-1.6.0

一、Flume NG简述

  • Flume 是 Cloudera 提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统。
  • Flume将采集到的文件,socket数据包等各种形式的数据源,输出到HDFS、Hbase、hive、kafka等众多外部存储系统中
  • Flume针对特殊场景也具备良好的自定义扩展能力,因此flume适用于大部分的日常数据采集场景
  • 一般的采集需求,通过对flume的简单配置即可实现
  • Flume分布式系统中最核心的角色时agent,flume采集系统就是由一个个agent所连接起来形成的
  • 每个agent相当于一个数据传递员(Source到Channel到Sink之间传递数据的形式时event事件,event事件是一个数据流单元)

Flume的架构图中有3个组件,分别是source、channel、sink

  • Source:采集数据源,用于和数据源对接,获取数据
  • Sink:下沉,采集数据传送目的地,用于往下一级agent传递数据或者往最终的存储系统传递数据
  • Channel:agent的内部传输管道,用于将数据源以事件event的形式,从Source到Sink

运行流程:

从外部系统(Web Server)中收集产生的日志,然后通过Agent的Source组件将数据发送到临时存储Channel组件,最后传递给Sink组件,Sink组件将满足预设值的临时文件,存储到HDFS文件系统中。

二、搭建单点Flume NG

1、解压软件包

tar -zxvf apache-flume-1.6.0-bin.tar.gz -C /export/servers/

2、配置环境变量

  1. export FLUME_HOME=/export/servers//flume-1.6.0
  2. export PATH=$PATH:$FLUME_HOME/bin

3、修改flume配置文件

$FLUME_HOME/conf/flume-env.sh(flume-env.sh.template修改成flume-env.sh)

export JAVA_HOME=/export/servers/jdk1.8.0_171

4、简单测试—采集指定文件到 HDFS

服务器会在指定目录下,会不断产生新的日志文件,每当有新的日志文件产生,flume自动将新产生的数据源采集到文件存储系统HDFS中

创建配置文件spooldir-hdfs.properties

  1. # Name the components on this agent
  2. a1.sources = r1 # agent的别名
  3. a1.sinks = k1
  4. a1.channels = c1
  5. # Describe/configure the source
  6. # 采集数据的类型
  7. a1.sources.r1.type = exec
  8. # 指定执行命令(flume自动执行该命令)
  9. a1.sources.r1.command = tail -F /export/data/callLog.log
  10. # Describe the sink
  11. # 指定采集信息下沉到哪里
  12. a1.sinks.k1.type = hdfs
  13. # 指定采集到的数据存放在hdfs文件系统的哪个路径下
  14. a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/
  15. # 指定保存信息文件名的前缀
  16. a1.sinks.k1.hdfs.filePrefix = events-
  17. # 是否舍弃已下沉的文件,舍弃指的是:根据指定的时间间隔创建文件夹
  18. a1.sinks.k1.hdfs.round = true
  19. #指定每10分钟舍弃已下沉的文件
  20. a1.sinks.k1.hdfs.roundValue = 10
  21. a1.sinks.k1.hdfs.roundUnit = minute
  22. # 每30s,将临时文件,持久化到hdfs文件系统中;设置为0,不滚动,滚动即下沉
  23. a1.sinks.k1.hdfs.rollInterval = 30
  24. # 临时文件达到指定字节就滚动,默认1024;设置为0,不根据临时文件大小来滚动文件
  25. a1.sinks.k1.hdfs.rollSize = 1024
  26. # 临时文件的事件event个数达到指定值就滚动,默认10;如果设置成0,不根据events数据来滚动文件
  27. a1.sinks.k1.hdfs.rollCount = 10
  28. # 每个事件写入的行数,默认100
  29. a1.sinks.k1.hdfs.batchSize = 100
  30. # 是否使用本地的时间戳
  31. a1.sinks.k1.hdfs.useLocalTimeStamp = true
  32. # 生成的文件类型,默认是 Sequencefile,可用 DataStream(普通文本)
  33. a1.sinks.k1.hdfs.fileType = DataStream
  34. # Use a channel which buffers events in memory
  35. # channels数据缓存类型
  36. a1.channels.c1.type = memory
  37. # 该通道中最大的可以存储的event数量
  38. a1.channels.c1.capacity = 1000
  39. # 每次最大可以从source中拿到或者送到sink中的event数量
  40. a1.channels.c1.transactionCapacity = 100
  41. # Bind the source and sink to the channel
  42. a1.sources.r1.channels = c1
  43. a1.sinks.k1.channel = c1

提示:配置文件中的注释,在虚拟机配置时尽可能删除,如果保留可能会报错

启动flume

flume-ng agent -c conf -f conf/spooldir-hdfs.properties -n a1 -Dflume.root.logger=INFO,console

命令行参数解释:

  • -c conf   指定flume自身的配置文件所在目录
  • -f conf/spooldir-hdfs.properties 指定我们所描述的采集方案
  • -n a1  指定我们这个agent的名字

下面截图是复制的节点启动发送数据源后,收到的数据源

复制该节点,产生数据,这里是自己编写的java代码,也可以使用下面shell命令测试

[root@node01 flume-1.6.0]# while true;do echo test >> /export/data/callLog.log;sleep 0.5;done

 三、搭建高可用Flume NG

高可用的Flume NG集群,架构图如下所示:

由于电脑性能的限制,将agent减少到1个节点,Collector维持原来的2个节点。主要是为了后面测试负载均衡、以及容错考虑

1、节点分配

图中所示,Agent数据分别流入到Collector1和Collector2,Flume NG本身提供了Failover机制,可以自动切换和恢复。在上图中,有3个产生日志服务器分布在不同的机房,要把所有的日志都收集到一个集群中存储。下面我们开发配置Flume NG集群

2、Flume 的 load-balance

负载均衡是用于解决一台机器(一个进程)无法解决所有请求而产生的一种算法。Load balancing Sink Processor能够实现load balance功能,如上图 Agent1是一个路由节点,负责将Channel暂存的Event均衡到对应的Collector1和Collector2中

2.1、agent端的配置文件:exec-avro.properties

  1. #agent1 name
  2. agent1.channels = c1
  3. agent1.sources = r1
  4. agent1.sinks = k1 k2
  5. #set channel
  6. agent1.channels.c1.type = memory
  7. agent1.channels.c1.capacity = 1000
  8. agent1.channels.c1.transactionCapacity = 100
  9. agent1.sources.r1.channels = c1
  10. agent1.sources.r1.type = exec
  11. agent1.sources.r1.command = tail -F /export/data/callLog.log
  12. # set sink1
  13. agent1.sinks.k1.channel = c1
  14. agent1.sinks.k1.type = avro
  15. agent1.sinks.k1.hostname = node02
  16. agent1.sinks.k1.port = 52020
  17. # set sink2
  18. agent1.sinks.k2.channel = c1
  19. agent1.sinks.k2.type = avro
  20. agent1.sinks.k2.hostname = node03
  21. agent1.sinks.k2.port = 52020
  22. #set sink group
  23. agent1.sinkgroups = g1
  24. agent1.sinkgroups.g1.sinks = k1 k2
  25. #set load_balance
  26. agent1.sinkgroups.g1.processor.type = load_balance
  27. agent1.sinkgroups.g1.processor.backoff = true
  28. agent1.sinkgroups.g1.processor.selector = round_robin
  29. agent1.sinkgroups.g1.processor.selector.maxTimeOut=10000

2.2、Collector1端的配置文件:avro-logger.properties

  1. # Name the components on this agent
  2. a1.sources = r1
  3. a1.sinks = k1
  4. a1.channels = c1
  5. # Describe/configure the source
  6. a1.sources.r1.type = avro
  7. a1.sources.r1.bind = node02
  8. a1.sources.r1.port = 52020
  9. # Describe the sink
  10. a1.sinks.k1.type = logger
  11. # Use a channel which buffers events in memory
  12. a1.channels.c1.type = memory
  13. a1.channels.c1.capacity = 1000
  14. a1.channels.c1.transactionCapacity = 100
  15. # Bind the source and sink to the channel
  16. a1.sources.r1.channels = c1
  17. a1.sinks.k1.channel = c1

提示:Collector2的配置文件与Collector1的基本相同,只需要修改a1.sources.r1.bind

2.3、启动测试

  1. 分别启动Collector1、Collector2、agent
  2. 复制agent虚拟机,执行shell命令:while true;do echo test >> /export/data/callLog.log;sleep 0.5;done

Collector1的运行结果

Collector2的运行结果

3、Flume 的 failover

3.1、故障转移机制

实现 failover 功能,具体流程类似 load balance,但是内部处理机制与load balance完全不同

  • Failover Sink Processor维护一个优先级Sink组件列表,只要有一个Sink 组件可用,Event就会被传递到下一个组件。
  • 故障转移机制的作用是将失败的Sink 降级到一个池,在这些池中它们被分配一个冷却时间,随着故障的连续,增加下次该Sink的重试时间。但只要失败的Sink成功发送一个Event,它将恢复到活动池。
  • Sink具有与之相关的优先级,数值越大,优先级越高。

例如,具有优先级为10的sink在优先级为8的Sink之前被激活。在激活过程中,如果发送事件时汇聚失败,则将尝试让优先级为8的Sink发送事件。如果没有指定优先级,则根据在配置中指定Sink的顺序来确定发送顺序

3.2、配置如下:

  1. #agent1 name
  2. agent1.channels = c1
  3. agent1.sources = r1
  4. agent1.sinks = k1 k2
  5. #set channel
  6. agent1.channels.c1.type = memory
  7. agent1.channels.c1.capacity = 1000
  8. agent1.channels.c1.transactionCapacity = 100
  9. agent1.sources.r1.channels = c1
  10. agent1.sources.r1.type = exec
  11. agent1.sources.r1.command = tail -F /export/data/callLog.log
  12. # set sink1
  13. agent1.sinks.k1.channel = c1
  14. agent1.sinks.k1.type = avro
  15. agent1.sinks.k1.hostname = node02
  16. agent1.sinks.k1.port = 52020
  17. # set sink2
  18. agent1.sinks.k2.channel = c1
  19. agent1.sinks.k2.type = avro
  20. agent1.sinks.k2.hostname = node03
  21. agent1.sinks.k2.port = 52020
  22. #set sink group
  23. agent1.sinkgroups = g1
  24. agent1.sinkgroups.g1.sinks = k1 k2
  25. #set failover
  26. a1.sinkgroups.g1.processor.type = failover
  27. # 如果开启,则将失败的 sink 放入黑名单
  28. a1.sinkgroups.g1.processor.backoff = true
  29. # 还支持random
  30. a1.sinkgroups.g1.processor.selector = round_robin
  31. #在黑名单放置的超时时间,超时结束时,若仍然无法接收,则超时时间呈指数增长
  32. a1.sinkgroups.g1.processor.selector.maxTimeOut=10000
  33. #优先级值, 绝对值越大表示优先级越高,若不设置,则按照sink的先后顺序
  34. a1.sinkgroups.g1.processor.priority.k1 = 5
  35. a1.sinkgroups.g1.processor.priority.k2 = 10
  36. a1.sinkgroups.g1.processor.priority.k3 = 6
  37. #失败的 Sink 的最大回退期(millis)
  38. a1.sinkgroups.g1.processor.maxpenalty = 20000

3.3、测试参考负载均衡,这里就不测试了

转载请注明:SuperIT » Flume NG高可用集群搭建

喜欢 (0)or分享 (0)