kafkaV2

简介

flusher_kafka_v2 flusher插件可以实现将采集到的数据,经过处理后,发送到Kafka。

版本

Beta

配置参数

参数类型是否必选说明

Type

String

插件类型

Brokers

String数组

Kafka Brokers

Topic

String

Kafka Topic,支持动态topic, 例如: test_%{content.appname}

Version

String

Kafka协议版本号 ,例如:2.0.0,默认值:1.0.0

Convert

Struct

ilogtail数据转换协议配置

Convert.Protocol

String

ilogtail数据转换协议,kafka flusher 可选值:custom_single,otlp_log_v1。默认值:custom_single

Convert.Encoding

String

ilogtail flusher数据转换编码,可选值:jsonnoneprotobuf,默认值:json

Convert.TagFieldsRename

Map

对日志中tags中的json字段重命名

Convert.ProtocolFieldsRename

Map

ilogtail日志协议字段重命名,可当前可重命名的字段:contents,tagstime

Authentication

Struct

Kafka连接访问认证配置,支持SASL/PLAIN,根据kafka服务端认证方式选择配置

Authentication.PlainText.Username

String

PlainText认证用户名

Authentication.PlainText.Password

String

PlainText认证密码

Authentication.SASL.Username

String

SASL认证用户名

Authentication.SASL.Password

String

SASL认证密码

Authentication.Sasl.SaslMechanism

String

SASL认证,配置可选项:PLAINSCRAM-SHA-256SCRAM-SHA-512

Authentication.TLS.Enabled

Boolean

是否启用TLS安全连接,

Authentication.TLS.CAFile

String

TLS CA根证书文件路径

Authentication.TLS.CertFile

String

TLS连接kafka证书文件路径

Authentication.TLS.KeyFile

String

TLS连接kafka私钥文件路径

Authentication.TLS.MinVersion

String

TLS支持协议最小版本,可选配置:1.0, 1.1, 1.2, 1.3,默认:1.2

Authentication.TLS.MaxVersion

String

TLS支持协议最大版本,可选配置:1.0, 1.1, 1.2, 1.3,默认采用:crypto/tls支持的版本,当前1.3

Authentication.TLS.InsecureSkipVerify

Boolean

是否跳过TLS证书校验

Authentication.Kerberos.ServiceName

String

服务名称,例如:kafka

Authentication.Kerberos.UseKeyTab

Boolean

是否采用keytab,配置此项后需要配置KeyTabPath,默认为:false

Authentication.Kerberos.Username

Boolean

UseKeyTab设置为false的情况下,需要指定用户名

Authentication.Kerberos.Password

String

UseKeyTab设置为false的情况下,需要指定密码

Authentication.Kerberos.Realm

String

kerberos认证管理域,大小写敏感

Authentication.Kerberos.ConfigPath

Boolean

Kerberos krb5.conf

Authentication.Kerberos.KeyTabPath

String

keytab的路径

PartitionerType

String

Partitioner类型。取值:roundrobinhashrandom。默认为:random

QequiredAcks

int

ACK的可靠等级.0=无响应,1=等待本地消息,-1=等待所有副本提交.默认1,

Compression

String

压缩算法,可选值:none, snappylz4gzip,默认值none

CompressionLevel

Int

压缩级别,可选值:1~9,默认值:4,设置为0则禁用Compression

MaxMessageBytes

Int

一个批次提交的大小限制,配置和message.max.bytes对应,默认值:1000000

MaxRetries

Int

提交失败重试次数,最大3次,默认值:3

BulkMaxSize

Int

单次请求提交事件数,默认2048

BulkFlushFrequency

Int

发送批量 Kafka 请求之前等待的时间,0标识没有时延,默认值:0

Timeout

Int

等待Kafka brokers响应的超时时间,默认30s

BrokerTimeout

int

kafka broker等待请求的最大时长,默认10s

Metadata.Retry.Max

int

最大重试次数,默认值:3

Metadata.Retry.Backoff

int

在重试之前等待leader选举发生的时间,默认值:250ms

Metadata.RefreshFrequency

int

Metadata刷新频率,默认值:250ms

Metadata.Full

int

获取原数数据的策略,获取元数据时使用的策略,当此选项为true时,客户端将为所有可用主题维护一整套元数据,如果此选项设置为false,它将仅刷新已配置主题的元数据。默认值:false

HashKeys

String数组

PartitionerType为hash时,需指定HashKeys。

HashOnce

Boolean

ClientID

String

写入Kafka的Client ID,默认取值:LogtailPlugin

  • Version需要填写的是kafka protocol version版本号,flusher_kafka_v2当前支持的kafka版本范围:0.8.2.x~2.7.0。 请根据自己的kafka版本号参照下面的kafka protocol version规则进行配置。建议根据自己的kafka版本指定对应protocol version, kafka protocol version配置规则如下:

// x代表小版本号
0.8.2.x,0.9.0.x,0.10.0.x,0.10.1.x,0.10.2.x,0.11.0.x,1.0.0,1.1.0,1.1.1,2.0.0,2.0.1,2.1.0,2.2.0,2.3.0,2.4.0,2.5.0,2.6.0,2.7.0
  • Brokers是个数组,多个Broker地址不能使用;或者,来隔开放在一行里,yaml配置文件中正确的多个Broker地址配置参考如下:

enable: true
inputs:
  - Type: file_log
    LogPath: /home/test_log
    FilePattern: "*.log"
flushers:
  - Type: flusher_kafka
    Brokers:
      - 192.XX.XX.1:9092
      - 192.XX.XX.2:9092
      - 192.XX.XX.3:9092
    Topic: KafkaTestTopic

样例

采集/home/test-log/路径下的所有文件名匹配*.log规则的文件,并将采集结果发送到Kafka。

enable: true
inputs:
  - Type: file_log
    LogPath: /home/test_log
    FilePattern: "*.log"
flushers:
  - Type: flusher_kafka
    Brokers: 
      - 192.XX.XX.1:9092
      - 192.XX.XX.2:9092
      - 192.XX.XX.3:9092
    Topic: KafkaTestTopic

进阶配置

以下面的一段日志为例,后来将展开介绍ilogtail kafka flusher的一些高阶配置

2022-07-22 10:19:23.684 ERROR [springboot-docker] [http-nio-8080-exec-10] com.benchmark.springboot.controller.LogController : error log

以上面这行日志为例 , 我们通ilogtailprocessor_regex插件,将上面的日志提取处理后几个关键字段:

  • time

  • loglevel

  • appname

  • thread

  • class

  • message

最后推送到kafka的数据样例如下:

{
  "contents": {
    "class": "org.springframework.web.servlet.DispatcherServlet@initServletBean:547",
    "application": "springboot-docker",
    "level": "ERROR",
    "message": "Completed initialization in 9 ms",
    "thread": "http-nio-8080-exec-10",
    "time": "2022-07-20 16:55:05.415"
  },
  "tags": {
    "k8s.namespace.name":"java_app",
    "host.ip": "192.168.6.128",
    "host.name": "master",
    "log.file.path": "/data/test.log"
  },
  "time": 1664435098
}

动态topic

针对上面写入的这种日志格式,如果想根据application名称针对不用的应用推送到不通的topic, 则topic可以这样配置。

Topic: test_%{content.application}

最后ilogtail就自动将日志推送到test_springboot-docker这个topic中。

topic动态表达式规则:

  • %{content.fieldname}content代表从contents中取指定字段值

  • %{tag.fieldname},tag表示从tags中取指定字段值,例如:%{tag.k8s.namespace.name}

  • 其它方式暂不支持

TagFieldsRename

例如将tags中的host.name重命名为hostname,配置参考如下:

enable: true
inputs:
  - Type: file_log
    LogPath: /home/test_log
    FilePattern: "*.log"
flushers:
  - Type: flusher_kafka_v2
    Brokers:
      - 192.XX.XX.1:9092
      - 192.XX.XX.2:9092
      - 192.XX.XX.3:9092
    Convert:
      TagFieldsRename:
        host.name: hostname
    Topic: KafkaTestTopic

ProtocolFieldsRename

ilogtail协议字段重命名,在ilogtail的数据转换协议中, 最外层三个字段contents,tagstime属于协议字段。ProtocolFieldsRename只能对 contents,tagstime这个三个字段进行重命名。 例如在使用Elasticsearch你可能想直接将time重命名为@timestamp,则配置参考如下:

enable: true
inputs:
  - Type: file_log
    LogPath: /home/test_log
    FilePattern: "*.log"
flushers:
  - Type: flusher_kafka_v2
    Brokers:
      - 192.XX.XX.1:9092
      - 192.XX.XX.2:9092
      - 192.XX.XX.3:9092
    Convert:
      TagFieldsRename:
        host.name: hostname
      ProtocolFieldsRename:
        time: '@timestamp'
    Topic: KafkaTestTopic

指定分区分发

ilogtail一共支持三种分区分发方式:

  • random随机分发, 默认。

  • roundrobin轮询分发。

  • hash分发。

randomroundrobin分发只需要配置PartitionerType指定对应的分区分发方式即可。 hash分发相对比较特殊,可以指定HashKeysHashKeys的中配置的字段名只能是contents中的字段属性。

配置用例:

enable: true
inputs:
  - Type: file_log
    LogPath: /home/test_log
    FilePattern: "*.log"
flushers:
  - Type: flusher_kafka_v2
    PartitionerType: hash
    HashKeys:
      - content.application  
    Brokers:
      - 192.XX.XX.1:9092
      - 192.XX.XX.2:9092
      - 192.XX.XX.3:9092
    Topic: KafkaTestTopic
  • content.application中表示从contents中取数据application字段数据,如果对contents协议字段做了重命名, 例如重名为messege,则应该配置为messege.application

安全连接配置

flusher_kafka_v2支持多种安全认证连接kafka服务端。

  • PlainText认证,ilogtail v1.3.0开始支持;

  • SASL认证,ilogtail v1.3.0开始支持;

  • TLS认证,ilogtail v1.4.0开始支持;

  • Kerberos认证(待测试验证),ilogtail v1.4.0开始支持;

前面两种配置比较简单,下面主要介绍下TLSKerberos两种认证的配置。

TLS配置参考

enable: true
inputs:
  - Type: file_log
    LogPath: /home/test_log
    FilePattern: "*.log"
flushers:
  - Type: flusher_kafka_v2
    PartitionerType: hash
    HashKeys:
      - content.application  
    Brokers:
      - 192.XX.XX.1:9092
      - 192.XX.XX.2:9092
      - 192.XX.XX.3:9092
    Authentication:
      TLS:
        Enabled: true
        CAFile: /data/cert/ca.crt
        CertFile: /data/cert/client.crt
        KeyFile: /data/cert/client.key
        MinVersion: "1.1"
        MaxVersion: "1.2"
    Topic: KafkaTestTopic

注: 配置仅供参考,证书文件请自行生成后根据事情情况配置。

Kerberos配置参考(待验证)

enable: true
inputs:
  - Type: file_log
    LogPath: /home/test_log
    FilePattern: "*.log"
flushers:
  - Type: flusher_kafka_v2
    PartitionerType: hash
    HashKeys:
      - content.application  
    Brokers:
      - 192.XX.XX.1:9092
      - 192.XX.XX.2:9092
      - 192.XX.XX.3:9092
    Authentication:
      Kerberos:
        ServiceName: kafka
        Realm: test
        UseKeyTab: true
        ConfigPath: "/etc/krb5.conf"
        KeyTabPath: "/etc/security/kafka.keytab"
    Topic: KafkaTestTopic

注: Kerberos认证由于缺乏环境,目前待测试验证,使用中如有问题请及时向社区反馈修复。

Last updated