微信搜索superit|邀请体验:大数据, 数据管理、OLAP分析与可视化平台 | 赞助作者:赞助作者

Flume Taildir Source监听实时追加内容的文件

Flume aide_941 39℃

Flume Taildir Source监听实时追加内容的文件

 版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/wangpei1949/article/details/80472928
flume中有三种可监控文件或目录的source、分别是Exec Source、Spooling Directory Source和Taildir Source。
Taildir Source是1.7版本的新特性,综合了Spooling Directory Source和Exec Source的优点。
  • 1
  • 2

使用场景

Exec Source

Exec Source可通过tail -f命令去tail住一个文件,然后实时同步日志到sink。但存在的问题是,当agent进程挂掉重启后,会有重复消费的问题。可以通过增加UUID来解决,或通过改进ExecSource来解决。
  • 1

Spooling Directory Source

Spooling Directory Source可监听一个目录,同步目录中的新文件到sink,被同步完的文件可被立即删除或被打上标记。适合用于同步新文件,但不适合对实时追加日志的文件进行监听并同步。如果需要实时监听追加内容的文件,可对SpoolDirectorySource进行改进。
  • 1

Taildir Source

Taildir Source可实时监控一批文件,并记录每个文件最新消费位置,agent进程重启后不会有重复消费的问题。
使用时建议用1.8.0版本的flume,1.8.0版本中解决了Taildir Source一个可能会丢数据的bug。
  • 1
  • 2

Taildir Source

agent配置

# source的名字
agent.sources = s1
# channels的名字
agent.channels = c1
# sink的名字
agent.sinks = r1

# 指定source使用的channel
agent.sources.s1.channels = c1
# 指定sink使用的channel
agent.sinks.r1.channel = c1

######## source相关配置 ########
# source类型
agent.sources.s1.type = TAILDIR
# 元数据位置
agent.sources.s1.positionFile = /Users/wangpei/tempData/flume/taildir_position.json
# 监控的目录
agent.sources.s1.filegroups = f1
agent.sources.s1.filegroups.f1=/Users/wangpei/tempData/flume/data/.*log
agent.sources.s1.fileHeader = true

######## channel相关配置 ########
# channel类型
agent.channels.c1.type = file
# 数据存放路径
agent.channels.c1.dataDirs = /Users/wangpei/tempData/flume/filechannle/dataDirs
# 检查点路径
agent.channels.c1.checkpointDir = /Users/wangpei/tempData/flume/filechannle/checkpointDir
# channel中最多缓存多少
agent.channels.c1.capacity = 1000
# channel一次最多吐给sink多少
agent.channels.c1.transactionCapacity = 100

######## sink相关配置 ########
# sink类型
agent.sinks.r1.type = org.apache.flume.sink.kafka.KafkaSink
# brokers地址
agent.sinks.r1.kafka.bootstrap.servers = localhost:9092
# topic
agent.sinks.r1.kafka.topic = testTopic3
# 压缩
agent.sinks.r1.kafka.producer.compression.type = snappy
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43

记录每个文件消费位置的元数据

#配置
agent.sources.s1.positionFile = /Users/wangpei/tempData/flume/taildir_position.json
#内容
[
{
    "inode":6028358,
    "pos":144,
    "file":"/Users/wangpei/tempData/flume/data/test.log"
},
{
    "inode":6028612,
    "pos":20,
    "file":"/Users/wangpei/tempData/flume/data/test_a.log"
}
]  

可以看到,在taildir_position.json文件中,通过json数组的方式,记录了每个文件最新的消费位置,每消费一次便去更新这个文件。

转载请注明:SuperIT » Flume Taildir Source监听实时追加内容的文件

喜欢 (0)or分享 (0)