微信搜索superit|邀请体验:大数据, 数据管理、OLAP分析与可视化平台 | 赞助作者:赞助作者

flume-ng命令

Flume aide_941 30℃

flume-ng命令

flume-ng命令帮助:

[root@hadoop01 apache-flume-1.6.0-bin]# ./bin/flume-ng help

Usage: ./flume-ng <command> [options]…commands:

help                      display this help text
agent                     run a Flume agent
avro-client               run an avro Flume client
version                   show Flume version info

global options:
–conf,-c <conf>          use configs in <conf> directory
–classpath,-C <cp>       append to the classpath
–dryrun,-d               do not actually start Flume, just print the command
–plugins-path <dirs>     colon-separated list of plugins.d directories. See the
plugins.d section in the user guide for more details.
Default: $FLUME_HOME/plugins.d
-Dproperty=value          sets a Java system property value
-Xproperty=value          sets a Java -X option

agent options:
–name,-n <name>          the name of this agent (required)
–conf-file,-f <file>     specify a config file (required if -z missing)
–zkConnString,-z <str>   specify the ZooKeeper connection to use (required if -f missing)
–zkBasePath,-p <path>    specify the base path in ZooKeeper for agent configs
–no-reload-conf          do not reload config file if changed
–help,-h                 display help text

avro-client options:
–rpcProps,-P <file>   RPC client properties file with server connection params
–host,-H <host>       hostname to which events will be sent
–port,-p <port>       port of the avro source
–dirname <dir>        directory to stream to avro source
–filename,-F <file>   text file to stream to avro source (default: std input)
–headerFile,-R <file> File containing event headers as key/value pairs on each new line
–help,-h              display help text

Either –rpcProps or both –host and –port must be specified.

Note that if <conf> directory is specified, then it is always included first
in the classpath.

测试flume操作

测试服务器

ip:192.168.226.151 主机名:hadoop01

ip:192.168.226.152 主机名:hadoop02

ip:192.168.226.153 主机名:hadoop03

source测试

1、netcat source

配置文件:example01.conf

#配置agent a1
a1.sources=r1
a1.channels=c1

a1.sinks=k1

#配置对应的source
a1.sources.r1.type=netcat
a1.sources.r1.bind=0.0.0.0
a1.sources.r1.port=8888

#配置对应的sink
a1.sinks.k1.type=logger

#配置对应的channel

a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100

#配置绑定关系(一个sink对应一个channel)
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

启动angent命令(hadoop01)

…….

从window,cmd命令操作客户端连接

telnet 192.168.226.151 8888

agent端(hadoop01)接收到内容

2、avro source

配置文件:example02.conf

#配置agent a1
a1.sources=r1
a1.channels=c1
a1.sinks=k1

#配置对应的source
a1.sources.r1.type=avro
a1.sources.r1.bind=0.0.0.0
a1.sources.r1.port=8888

#配置对应的sink
a1.sinks.k1.type=logger

#配置对应的channel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100

#配置绑定关系(一个sink对应一个channel)
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

启动angent命令(hadoop01)

…….

启动avro客户端(hadoop01)

……..

angent端接收到log1.txt文件内容

3、exec source

配置文件:example03.conf

#配置agent a1
a1.sources=r1
a1.channels=c1
a1.sinks=k1

#配置对应的source
a1.sources.r1.type=exec
a1.sources.r1.command=ls /usr/soft

#配置对应的sink
a1.sinks.k1.type=logger

#配置对应的channel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100

#配置绑定关系(一个sink对应一个channel)
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

启动angent命令(hadoop01),启动成功后,直接输出exec中执行的命令结果


……..


4、Spooling Directory Source

配置文件:example04.conf

#配置agent a1
a1.sources=r1
a1.channels=c1
a1.sinks=k1

#配置对应的source
a1.sources.r1.type=spooldir
a1.sources.r1.spoolDir=/usr/soft/apache-flume-1.6.0-bin/mydata

#配置对应的sink
a1.sinks.k1.type=logger

#配置对应的channel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100

#配置绑定关系(一个sink对应一个channel)
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

启动angent命令(hadoop01)

……

通过另外的连接,在flume目录下的mydata目录,创建log2.txt文件(hadoop01)

angent监听进程自动扫描并处理文件

5、Sequence GeneratorSource

配置文件:example05.conf

#配置agent a1
a1.sources=r1
a1.channels=c1
a1.sinks=k1

#配置对应的source
a1.sources.r1.type=seq

#配置对应的sink
a1.sinks.k1.type=logger

#配置对应的channel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100

#配置绑定关系(一个sink对应一个channel)
a1.sources.r1.channels=c1

a1.sinks.k1.channel=c1

启动angent命令(hadoop01),不断的+1输出

[root@hadoop01 conf]# ../bin/flume-ng agent -c ./ -f ./example05.conf -n a1 -Dflume.root.logger=INFO,console


6、 HTTP Source

配置文件:example06.conf

#配置agent a1
a1.sources=r1
a1.channels=c1
a1.sinks=k1

#配置对应的source
a1.sources.r1.type=http
a1.sources.r1.port=8888

#配置对应的sink
a1.sinks.k1.type=logger

#配置对应的channel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100

#配置绑定关系(一个sink对应一个channel)
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

启动angent命令(hadoop01)


…….


从另外的连接,通过命令发送HTTP请求到指定端口(hadoop01)

[root@hadoop01 mydata]# curl -X POST -d ‘[{ “headers” :{“a” : “a1″,”b” : “b1″},”body” : “hello~http~flume~”}]’ http://0.0.0.0:8888

angent端接收到http请求信息


Sink 测试

1、file roll sink

配置文件:example07.conf

#配置agent a1
a1.sources=r1
a1.channels=c1
a1.sinks=k1

#配置对应的source
a1.sources.r1.type=http
a1.sources.r1.port=8888

#配置对应的sink
a1.sinks.k1.type=file_roll
a1.sinks.k1.sink.directory=/usr/soft/apache-flume-1.6.0-bin/mydata

#配置对应的channel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100

#配置绑定关系(一个sink对应一个channel)
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

启动angent命令(hadoop01)(与HTTP Source测试启动一样)

从另外的连接,通过命令发送HTTP请求到指定端口(hadoop01)(与HTTP Source测试一样)

查看接收到的信息,保存到指定目录

2、Avro Sink,多级流动

hadoop01(http,avro)->hadoop02(avro,logger)

hadoop01配置文件:example081.conf

#配置agent a1
a1.sources=r1
a1.channels=c1
a1.sinks=k1

#配置对应的source
a1.sources.r1.type=http
a1.sources.r1.port=8888

#配置对应的sink
a1.sinks.k1.type=avro
a1.sinks.k1.hostname=hadoop02
a1.sinks.k1.port=9988

#配置对应的channel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100

#配置绑定关系(一个sink对应一个channel)
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
hadoop02配置文件:example082.conf

#配置agent a1
a1.sources=r1
a1.channels=c1
a1.sinks=k1

#配置对应的source
a1.sources.r1.type=avro
a1.sources.r1.bind=0.0.0.0
a1.sources.r1.port=9988

#配置对应的sink
a1.sinks.k1.type=logger

#配置对应的channel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100

#配置绑定关系(一个sink对应一个channel)
a1.sources.r1.channels=c1

a1.sinks.k1.channel=c1

启动顺序,从后向前,

先启动hadoop02 agent

……

再启动hadoop01 agent


……


从另外的连接,通过命令发送HTTP请求到指定端口(hadoop01)

[root@hadoop01 mydata]# curl -X POST -d ‘[{ “headers” :{“a” : “a1″,”b” : “b1″},”body” : “hello~http~flume~hadoop01~hadoop02”}]’ http://0.0.0.0:8888

angent端接收到http请求信息(hadoop02)


3、Avro Sink,扇出流-复制

hadoop01(http,avro)

->hadoop02(avro,logger)

->hadoop03(avro,logger)

hadoop01配置文件:example091.conf

#配置agent a1
a1.sources=r1
a1.channels=c1 c2
a1.sinks=k1 k2

#配置对应的source
a1.sources.r1.type=http
a1.sources.r1.port=8888

#配置对应的sink
a1.sinks.k1.type=avro
a1.sinks.k1.hostname=hadoop02
a1.sinks.k1.port=9988

a1.sinks.k2.type=avro
a1.sinks.k2.hostname=hadoop03
a1.sinks.k2.port=9988

#配置对应的channel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100

a1.channels.c2.type=memory
a1.channels.c2.capacity=1000
a1.channels.c2.transactionCapacity=100

#配置绑定关系(一个sink对应一个channel)
a1.sources.r1.channels=c1 c2
a1.sinks.k1.channel=c1
a1.sinks.k2.channel=c2

hadoop02配置文件:example092.conf

#配置agent a1
a1.sources=r1
a1.channels=c1
a1.sinks=k1

#配置对应的source
a1.sources.r1.type=avro
a1.sources.r1.bind=0.0.0.0
a1.sources.r1.port=9988

#配置对应的sink
a1.sinks.k1.type=logger

#配置对应的channel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100

#配置绑定关系(一个sink对应一个channel)
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

hadoop03配置文件:example093.conf

#配置agent a1
a1.sources=r1
a1.channels=c1
a1.sinks=k1

#配置对应的source
a1.sources.r1.type=avro
a1.sources.r1.bind=0.0.0.0
a1.sources.r1.port=9988

#配置对应的sink
a1.sinks.k1.type=logger

#配置对应的channel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100

#配置绑定关系(一个sink对应一个channel)
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

启动顺序,从后向前,

先启动hadoop02 agent、hadoop03 agent(与arvo多级流动类似)

再启动hadoop01 agent


……


从另外的连接,通过命令发送HTTP请求到指定端口(hadoop01)

[root@hadoop01 mydata]# curl -X POST -d ‘[{ “headers” :{“a” : “a1″,”b” : “b1″},”body” : “hello~http~flume”}]’ http://0.0.0.0:8888

angent端接收到http请求信息(hadoop02、hadoop03)


4、Avro Sink,扇出流-多路复用(路由)

hadoop01(http,avro)

->hadoop02(avro,logger)—— c1

->hadoop03(avro,logger)—— c2

hadoop01配置文件:example0101.conf

#配置agent a1
a1.sources=r1
a1.channels=c1 c2
a1.sinks=k1 k2

#配置对应的source
a1.sources.r1.type=http
a1.sources.r1.port=8888
a1.sources.r1.selector.type=multiplexing
a1.sources.r1.selector.header=gender
a1.sources.r1.selector.mapping.male=c1
a1.sources.r1.selector.mapping.female=c2
a1.sources.r1.selector.default=c1

#配置对应的sink
a1.sinks.k1.type=avro
a1.sinks.k1.hostname=hadoop02
a1.sinks.k1.port=9988

a1.sinks.k2.type=avro
a1.sinks.k2.hostname=hadoop03
a1.sinks.k2.port=9988

#配置对应的channel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100

a1.channels.c2.type=memory
a1.channels.c2.capacity=1000
a1.channels.c2.transactionCapacity=100

#配置绑定关系(一个sink对应一个channel)
a1.sources.r1.channels=c1 c2
a1.sinks.k1.channel=c1
a1.sinks.k2.channel=c2

hadoop02配置文件:example0102.conf(与example092.conf一样)

hadoop03配置文件:example0103.conf(与example093.conf一样)

启动顺序,从后向前,

先启动hadoop02 agent、hadoop03 agent(与arvo sink 扇出流-复制,一样)

再启动hadoop01 agent(与arvo sink 扇出流-复制,一样)

从另外的连接,通过命令发送HTTP请求到指定端口(hadoop01)

[root@hadoop01 mydata]# curl -X POST -d ‘[{ “headers” :{“gender” : “male”,”name” : “jay”},”body” : “hello~jay”}]’ http://0.0.0.0:8888

angent端接收到http请求信息(hadoop02)


[root@hadoop01 mydata]# curl -X POST -d ‘[{ “headers” :{“gender” : “female”,”name” : “shirly”},”body” : “hello~shirly”}]’ http://0.0.0.0:8888

angent端接收到http请求信息(hadoop03)


[root@hadoop01 mydata]# curl -X POST -d ‘[{ “headers” :{“gender” : “ladyboy”,”name” : “haha”},”body” : “hello~haha”}]’ http://0.0.0.0:8888

angent端接收到http请求信息(hadoop02)


5、Avro Sink,扇入流

hadoop02(http,avro)->hadoop01(avro,logger)

hadoop03(http,avro)->hadoop01(avro,logger)

hadoop01配置文件:example0111.conf

#配置agent a1
a1.sources=r1
a1.channels=c1
a1.sinks=k1

#配置对应的source
a1.sources.r1.type=avro
a1.sources.r1.bind=0.0.0.0
a1.sources.r1.port=9988

#配置对应的sink
a1.sinks.k1.type=logger

#配置对应的channel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100

#配置绑定关系(一个sink对应一个channel)
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

hadoop02配置文件:example0112.conf

#配置agent a1
a1.sources=r1
a1.channels=c1
a1.sinks=k1

#配置对应的source
a1.sources.r1.type=http
a1.sources.r1.port=8888

#配置对应的sink
a1.sinks.k1.type=avro
a1.sinks.k1.hostname=hadoop01
a1.sinks.k1.port=9988

#配置对应的channel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100

#配置绑定关系(一个sink对应一个channel)
a1.sources.r1.channels=c1

a1.sinks.k1.channel=c1

hadoop03配置文件:example0113.conf

#配置agent a1
a1.sources=r1
a1.channels=c1
a1.sinks=k1

#配置对应的source
a1.sources.r1.type=http
a1.sources.r1.port=8888

#配置对应的sink
a1.sinks.k1.type=avro
a1.sinks.k1.hostname=hadoop01
a1.sinks.k1.port=9988

#配置对应的channel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100

#配置绑定关系(一个sink对应一个channel)
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

启动顺序,从后向前,

先启动hadoop01 agent


……


再启动hadoop02 agent、hadoop03 agent

启动hadoop02,hadoop03agent后,hadoop01接收到连接信息

从另外的连接,通过命令发送HTTP请求到指定端口(hadoop02)

[root@hadoop02 ~]# curl -X POST -d ‘[{ “headers” :{“gender” : “male”,”name” : “jay”},”body” : “hello~jay”}]’ http://0.0.0.0:8888

从另外的连接,通过命令发送HTTP请求到指定端口(hadoop03)

[root@hadoop03 ~]# curl -X POST -d ‘[{ “headers” :{“gender” : “female”,”name” : “shirly”},”body” : “hello~shirly”}]’ http://0.0.0.0:8888

hadoop01angent端接收到(hadoop02、hadoop03)http请求信息


6、hdfs Sink

hadoop01配置文件:example012.conf

#配置agent a1
a1.sources=r1
a1.channels=c1
a1.sinks=k1

#配置对应的source
a1.sources.r1.type=http
a1.sources.r1.port=8888

#配置对应的sink
a1.sinks.k1.type=hdfs
a1.sinks.k1.hdfs.path=hdfs://0.0.0.0:9000/flume

#配置对应的channel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100

#配置绑定关系(一个sink对应一个channel)
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

启动hadoop01 agent

……

从另外的连接,通过命令发送HTTP请求到指定端口(hadoop01)

[root@hadoop01 ~]# curl -X POST -d ‘[{ “headers” :{“gender” : “male”,”name” : “jay”},”body” : “hello~jay”}]’ http://0.0.0.0:8888

hadoop01agent接收到http请求信息


查看hdfs上保存的文件信息


转载请注明:SuperIT » flume-ng命令

喜欢 (0)or分享 (0)