kafkaV2

简介

flusher_kafka_v2 flusher插件可以实现将采集到的数据,经过处理后,发送到Kafka。

版本

Beta

配置参数

  • Version需要填写的是kafka protocol version版本号,flusher_kafka_v2当前支持的kafka版本范围:0.8.2.x~3.3.1。 请根据自己的kafka版本号参照下面的kafka protocol version规则进行配置。建议根据自己的kafka 版本指定对应protocol version, kafka protocol version支持版本号如下:

0.8.2.0,0.8.2.1,0.8.2.2
0.9.0.0,0.9.0.1
0.10.0.0,0.10.0.1,0.10.1.0,0.10.1.1,0.10.2.0,0.10.2.1,0.10.2.2
0.11.0.0,0.11.0.1,0.11.0.2
1.0.0,1.0.1,1.0.2,1.1.0,1.1.1,
2.0.0,2.0.1,2.1.0,2.1.1,2.2.0,2.2.1,2.2.2,2.3.0,2.3.1,2.4.0,2.4.1,2.5.0,2.5.1,2.6.0,2.6.1,2.6.2,2.7.0,2.7.1,2.8.0,2.8.1,2.8.2
3.0.0,3.0.1,3.0.2,3.1.0,3.1.1,3.1.2,3.2.0,3.2.1,3.2.2,3.2.3,3.3.0,3.3.1
  • Brokers是个数组,多个Broker地址不能使用;或者,来隔开放在一行里,yaml配置文件中正确的多个Broker地址配置参考如下:

enable: true
inputs:
  - Type: input_file
    FilePaths: 
      - /home/test-log/*.log
flushers:
  - Type: flusher_kafka_v2
    Brokers:
      - 192.XX.XX.1:9092
      - 192.XX.XX.2:9092
      - 192.XX.XX.3:9092
    Topic: KafkaTestTopic

样例

采集/home/test-log/路径下的所有文件名匹配*.log规则的文件,并将采集结果发送到Kafka。

enable: true
inputs:
  - Type: input_file
    FilePaths: 
      - /home/test-log/*.log
flushers:
  - Type: flusher_kafka_v2
    Brokers:
      - 192.XX.XX.1:9092
      - 192.XX.XX.2:9092
      - 192.XX.XX.3:9092
    Topic: KafkaTestTopic

进阶配置

以下面的一段日志为例,后来将展开介绍ilogtail kafka flusher的一些高阶配置

2022-07-22 10:19:23.684 ERROR [springboot-docker] [http-nio-8080-exec-10] com.benchmark.springboot.controller.LogController : error log

以上面这行日志为例 , 我们通ilogtailprocessor_regex插件,将上面的日志提取处理后几个关键字段:

  • time

  • loglevel

  • appname

  • thread

  • class

  • message

最后推送到kafka的数据样例如下:

{
  "contents": {
    "class": "org.springframework.web.servlet.DispatcherServlet@initServletBean:547",
    "application": "springboot-docker",
    "level": "ERROR",
    "message": "Completed initialization in 9 ms",
    "thread": "http-nio-8080-exec-10",
    "time": "2022-07-20 16:55:05.415"
  },
  "tags": {
    "k8s.namespace.name": "java_app",
    "host.ip": "192.168.6.128",
    "host.name": "master",
    "log.file.path": "/data/test.log"
  },
  "time": 1664435098
}

动态topic

针对上面写入的这种日志格式,如果想根据application名称针对不用的应用推送到不同的topic, 则topic可以这样配置。

Topic: test_%{content.application}

最后ilogtail就自动将日志推送到test_springboot-docker这个topic中。 topic动态表达式规则:

  • %{content.fieldname}content代表从contents中取指定字段值

  • %{tag.fieldname},tag表示从tags中取指定字段值,例如:%{tag.k8s.namespace.name}

  • ${env_name}, 读取系统变量绑定到动态topic上,ilogtail 1.5.0开始支持。

  • 其它方式暂不支持

动态topic中使用系统变量

动态topic绑定系统变量的两种场景:

  • 将系统变量采集添加到日志的tag中,然后使用%{tag.fieldname}规则完成绑定。

  • 对系统变量无采集存储需求,只是想根据设定的系统变量将日志推送到指定的topic中,直接采用${env_name} 规则完成绑定,此方式需要1.5.0才支持。

由于上面提到的两种系统变量的采集绑定都需要做一些特殊配置,因此下面将分别介绍下相关的配置操作。

(1)将系统变量采集到日志中完成动态topic绑定

将系统变量采集添加到日志中有两种方式,一种是在ilogtail容器env添加,另一种是通过processor_add_fields 插件添加, 两种方式不同的配置参考下面的介绍

  • daemonset或者sidecar方式部署的ilogtail容器env配置部分添加自定义的系统变量,配置参考案例如下:

env:
  - name: ALIYUN_LOG_ENV_TAGS       # add log tags from env
    value: _node_name_|_node_ip_|_app_name_
  - name: _app_name_  # 添加自定义_app_name_变量,
    value: kafka

自定义的变量_app_name_被添加到ALIYUN_LOG_ENV_TAGS中,日志的tags中会看到自定义的变量, 此时动态 topic 采用%{tag.fieldname}规则配置即可。

  • 使用processor_add_fields 插件系统变量添加到日志中,配置参考如下:

processors:
  - Type: processor_add_fields
    Fields:
      service: ${env_name}
    IgnoreIfExist: false

这里${env_name}生效依赖于ilogtailenable_env_ref_in_config配置,从ilogtail 1.5.0开始支持。

(2)直接采用$符将系统变量绑定动态topic

daemonset或者sidecar方式部署的ilogtail容器env配置部分添加自定义的系统变量,配置参考案例如下:

env:
  - name: ALIYUN_LOG_ENV_TAGS       # add log tags from env
    value: _node_name_|_node_ip_
  - name: app_name  # 添加自定义app_name变量,
    value: kafka

app_name添加到系统变量中后,直接采用动态topic的:${env_name}规则即可绑定。

enable: true
inputs:
  - Type: input_file
    FilePaths: 
      - /home/test-log/*.log
flushers:
  - Type: flusher_kafka_v2
    Brokers:
      - 192.XX.XX.1:9092
      - 192.XX.XX.2:9092
      - 192.XX.XX.3:9092
    Topic: ilogtail_${app_name}
  • ${app_name}就是我们上面添加的系统变量。

TagFieldsRename

例如将tags中的host.name重命名为hostname,配置参考如下:

enable: true
inputs:
  - Type: input_file
    FilePaths: 
      - /home/test-log/*.log
flushers:
  - Type: flusher_kafka_v2
    Brokers:
      - 192.XX.XX.1:9092
      - 192.XX.XX.2:9092
      - 192.XX.XX.3:9092
    Convert:
      TagFieldsRename:
        host.name: hostname
    Topic: KafkaTestTopic

ProtocolFieldsRename

ilogtail协议字段重命名,在ilogtail的数据转换协议中, 最外层三个字段contents,tagstime属于协议字段。ProtocolFieldsRename只能对 contents,tagstime这个三个字段进行重命名。 例如在使用Elasticsearch你可能想直接将time重命名为@timestamp,则配置参考如下:

enable: true
inputs:
  - Type: input_file
    FilePaths: 
      - /home/test-log/*.log
flushers:
  - Type: flusher_kafka_v2
    Brokers:
      - 192.XX.XX.1:9092
      - 192.XX.XX.2:9092
      - 192.XX.XX.3:9092
    Convert:
      TagFieldsRename:
        host.name: hostname
      ProtocolFieldsRename:
        time: '@timestamp'
    Topic: KafkaTestTopic

指定分区分发

ilogtail一共支持三种分区分发方式:

  • random随机分发, 默认。

  • roundrobin轮询分发。

  • hash分发。

randomroundrobin分发只需要配置PartitionerType指定对应的分区分发方式即可。 hash分发相对比较特殊,可以指定HashKeysHashKeys的中配置的字段名只能是contents中的字段属性。

配置用例:

enable: true
inputs:
  - Type: input_file
    FilePaths: 
      - /home/test-log/*.log
flushers:
  - Type: flusher_kafka_v2
    PartitionerType: hash
    HashKeys:
      - content.application
    Brokers:
      - 192.XX.XX.1:9092
      - 192.XX.XX.2:9092
      - 192.XX.XX.3:9092
    Topic: KafkaTestTopic
  • content.application中表示从contents中取数据application字段数据,如果对contents协议字段做了重命名, 例如重名为messege,则应该配置为messege.application

配置Headers

iLogtailKafka的消息头是以键值对数组的形式配置的。headervalue仅支持字符串类型。

enable: true
inputs:
  - Type: input_file
    FilePaths: 
      - /home/test-log/*.log
flushers:
  - Type: flusher_kafka_v2
    Brokers:
      - 192.XX.XX.1:9092
      - 192.XX.XX.2:9092
      - 192.XX.XX.3:9092
    Topic: KafkaTestTopic
    Headers:
      - key: "key1"
        value: "value1"
      - key: "key2"
        value: "value2"

数据平铺

ilogtail 1.8.0新增数据平铺协议custom_single_flattencontentstagstime三个convert层的协议字段中数据做一级打平。 当前convert协议在单条数据处理仅支持json编码,因此custom_single_flatten需要配合json编码一起使用。

enable: true
inputs:
  - Type: input_file
    FilePaths: 
      - /home/test-log/*.log
flushers:
  - Type: flusher_kafka_v2
    Brokers:
      - 192.XX.XX.1:9092
      - 192.XX.XX.2:9092
      - 192.XX.XX.3:9092
    Convert:
      Protocol: custom_single_flatten
      Encoding: json
    Topic: KafkaTestTopic

非平铺前写入kafka的消息格式

{
  "contents": {
    "class": "org.springframework.web.servlet.DispatcherServlet@initServletBean:547",
    "application": "springboot-docker",
    "level": "ERROR",
    "message": "Completed initialization in 9 ms",
    "thread": "http-nio-8080-exec-10",
    "@time": "2022-07-20 16:55:05.415"
  },
  "tags": {
    "k8s.namespace.name": "java_app",
    "host.ip": "192.168.6.128",
    "host.name": "master",
    "log.file.path": "/data/test.log"
  },
  "time": 1664435098
}

使用平铺协议后custom_single_flattenjson全部被一级平铺。

{
  "class": "org.springframework.web.servlet.DispatcherServlet@initServletBean:547",
  "application": "springboot-docker",
  "level": "ERROR",
  "message": "Completed initialization in 9 ms",
  "thread": "http-nio-8080-exec-10",
  "@time": "2022-07-20 16:55:05.415",
  "k8s.namespace.name": "java_app",
  "host.ip": "192.168.6.128",
  "host.name": "master",
  "log.file.path": "/data/test.log",
  "time": 1664435098
}

安全连接配置

flusher_kafka_v2支持多种安全认证连接kafka服务端。

  • PlainText认证,ilogtail v1.3.0开始支持;

  • SASL认证,ilogtail v1.3.0开始支持;

  • TLS认证,ilogtail v1.4.0开始支持;

  • Kerberos认证(待测试验证),ilogtail v1.4.0开始支持;

前面两种配置比较简单,下面主要介绍下TLSKerberos两种认证的配置。

TLS配置参考

enable: true
inputs:
  - Type: input_file
    FilePaths: 
      - /home/test-log/*.log
flushers:
  - Type: flusher_kafka_v2
    PartitionerType: hash
    HashKeys:
      - content.application
    Brokers:
      - 192.XX.XX.1:9092
      - 192.XX.XX.2:9092
      - 192.XX.XX.3:9092
    Authentication:
      TLS:
        Enabled: true
        CAFile: /data/cert/ca.crt
        CertFile: /data/cert/client.crt
        KeyFile: /data/cert/client.key
        MinVersion: "1.1"
        MaxVersion: "1.2"
    Topic: KafkaTestTopic

注: 配置仅供参考,证书文件请自行生成后根据事情情况配置。

Kerberos配置参考

enable: true
inputs:
  - Type: input_file
    FilePaths: 
      - /home/test-log/*.log
flushers:
  - Type: flusher_kafka_v2
    PartitionerType: hash
    HashKeys:
      - content.application
    Brokers:
      - 192.XX.XX.1:9092
      - 192.XX.XX.2:9092
      - 192.XX.XX.3:9092
    Authentication:
      Kerberos:
        ServiceName: kafka
        Realm: test
        UseKeyTab: true
        ConfigPath: "/etc/krb5.conf"
        KeyTabPath: "/etc/security/kafka.keytab"
    Topic: KafkaTestTopic

Last updated