微信搜索superit|邀请体验:大数据, 数据管理、OLAP分析与可视化平台 | 赞助作者:赞助作者

(强烈推荐)log+filebeat+kafka+logstash+es配置, filebeat连不上kafka原因解析!!!

kafka aide_941 6℃
        

 

# 通常架构
# filebeat=>kafka=>logstash=>elasticsearch=>kibana
1.filebeat的配置:
# filebeat/config/filebeat.yml:
#=========================== Filebeat inputs =============================

filebeat.inputs:

- type: log
  enabled: true
  paths:
    - /data2/log/nginx/logs/access7.log
    # - /var/log/*.log
    #- c:\programdata\elasticsearch\logs\*
  # 如果采集json日志开启下面三行:
  # json.keys_under_root: true
  # json.overwrite_keys: true
  # json.add_error_key: true
  tag: ['nginx']
  fields:
    log_source: nginx
    level: debug
    review: 1
  scan_frequency: 10s

 

如果用es:

#----------------------------- Logstash output OK!!!--------------------------------
output.logstash:
    hosts: ["10.13.113.237:5044"]
    # The Logstash hosts

如果用kafka:

#----------------------------- Kafka output OK!!!!!--------------------------------

output.kafka:
  enabled: true
  hosts: ["yz-re011.kafka.com.cn:9110",
        "yz-re012.kafka.com.cn:9110",
        "yz-re005.kafka..com.cn:9110",
        "dbl-re002.kafka.com.cn:9110",
        "yz-re003.kafka.com.cn:9110",
        "dbl-re001.kafka.com.cn:9110"]
  topic: 'topic_biz_data'
  username: 'name'
  password: 'pwd'
  version: '0.10.2.1'
  compression: snappy
# version非常重要!!,filebeat默认:security_protocol=’SASL_PLAINTEXT’, sasl_mechanism=’PLAIN’,
# 如果没找到符合topics里的规则的,就采用topic为默认的
# 有关filebeat的log input的配置介绍见官网文档https://www.elastic.co/guide/en/beats/filebeat/current/filebeat-input-log.html
# 有关filebeat output到kafka的配置介绍见官方文档
# https://www.elastic.co/guide/en/logstash/7.x/output-plugins.html
# https://www.elastic.co/guide/en/beats/filebeat/7.x/kafka-output.html
# 新版本的Kafka中支持SASL的GSSAPI与PLAINkafka两种认证方式,
# 但是在fIlebeat output kafka配置中只支持SASL/PLAIN
# http://rk700.github.io/2016/12/16/filebeat-kafka-logstash-authentication-authorization/
# https://serverfault.com/questions/1001134/filebeat-kafka-input-with-sasl
2.kafka配置:
太多了,不讲了,这里不是重点
security_protocol=’SASL_PLAINTEXT’
sasl_mechanism=’PLAIN’,
3.logstash配置:
input{
    #es
    #beats{
    #    host => "0.0.0.0"      # 监听的所有ip 地址发送到5044端口的数据,这里是全部地址
    #    port => 5044
    #}
    #kafka
    # https://www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html
    # https://www.elastic.co/guide/en/logstash/current/plugins-outputs-kafka.html
    # https://github.com/logstash-plugins/logstash-input-kafka/blob/master/lib/logstash/inputs/kafka.rb
    kafka{                                     # 从kafka消费数据
        bootstrap_servers => ["172.168.50.41:9092,172.168.50.41:9097,172.168.50.41:9098"]  # kafka节点的IP+端口号
        topics => ["topic_biz_data"]   #
        #topics => "%{[@metadata][topic]}"     # 使用kafka传过来的topic
        # topics_pattern => "topic_.*"             # 使用正则匹配topic
        codec => "json"                        # 数据格式!!!,重要
        sasl_mechanism => "PLAIN"
        security_protocol => "SASL_PLAINTEXT"
        #consumer_threads => 2                  # 消费线程数量
        #decorate_events => true                # 可向事件添加Kafka元数据,比如主题、消息大小的选项,这将向logstash事件中添加一个名为kafka的字段,
        #auto_offset_reset => "latest"          # 自动重置偏移量到最新的偏移量
        #auto_commit_interval_ms => "1000"
        sasl_jaas_config => "org.apache.kafka.common.security.scram.ScramLoginModule required username='xxx' password='xxx';"
        group_id => "group_biz_data"         # 消费组ID,多个有相同group_id的logstash实例为一个消费组
    }
}
filter{
    if [fields][log_source] == "nginx" {
        # 拼接两个字段
        mutate {
            # 替换
            gsub => ["message",'"',"'"]
        }
        grok{
            match => {
                "message" => "%{TIMESTAMP_ISO8601:@timestamp} %{IPV4:remote_addr} %{IPORHOST:http_host} %{QS:referer} %{WORD:scheme} %{QS:request} %{NOTSPACE:request_method} %{NUMBER:request_time:float} %{NOTSPACE:server_protocol} %{NOTSPACE:uri} %{HOSTNAME:host} %{HOSTNAME:domain} %{HOSTNAME:hostname} %{NUMBER:status} %{NUMBER:bytes} %{QS:agent} %{QS:x_forwarded} %{QS:upstr_addr} %{NUMBER:upstr_status} %{NOTSPACE:upstr_host} %{NUMBER:upstr_resp_time}"
            }
        }
        mutate{
            add_field => {"str" => "%{referer}%{request}"}
        }
        grok{
            match => {
                "str" => "(\?|\&)(site_id|id)=(?<siteid>[\d]+)"
            }
        }
        mutate {
            # 替换
            gsub => ["upstr_resp_time","-","0"]
        }
        mutate{
            remove_field => "message"
            remove_field => "str"
            remove_field => "@version"
            remove_field => "host"
            remove_field => "path"
            remove_field => "tags"
            split => ["x_forwarded", ","]
            add_field => {"real_remote_ip" => "%{[x_forwarded][0]}"}
            remove_field => "x_forwarded"
        }
        mutate {
            convert => {
                "upstr_resp_time" => "float"
            }
            gsub => ["referer","'",""]
            gsub => ["real_remote_ip","'",""]
            gsub => ["request","'",""]
            gsub => ["upstr_addr","'",""]
        }
        ruby { 
            code => "event.set('index_day', event.timestamp.time.localtime.strftime('%Y.%m.%d'))" 
        } 
        # json {
        #     source => "message"
        # }
    }
}
output{
    if [fields][log_source] == "nginx" {
        
        elasticsearch {
            hosts => ["10.13.13.109:9200"]
            # index => "nginx-access-logs-%{+YYYY.MM.dd}" #差8小时
            index=> "nginx-access-logs-%{index_day}"
            user => elastic 
            password => pwd23
            # codec => json # 如果转换成功到json才记录
        }
    }
    # # 调试的时候用
    # stdout {
    #     codec => rubydebug
    # }
}

 

转载请注明:SuperIT » (强烈推荐)log+filebeat+kafka+logstash+es配置, filebeat连不上kafka原因解析!!!

转载请注明:SuperIT » (强烈推荐)log+filebeat+kafka+logstash+es配置, filebeat连不上kafka原因解析!!!

喜欢 (2)or分享 (0)