简介
flusher_kafka_v2
flusher
插件可以实现将采集到的数据,经过处理后,发送到Kafka。
版本
Beta
配置参数
Version
需要填写的是kafka protocol version
版本号,flusher_kafka_v2
当前支持的kafka
版本范围:0.8.2.x~3.3.1
。 请根据自己的kafka
版本号参照下面的kafka protocol version
规则进行配置。建议根据自己的kafka
版本指定对应protocol version
, kafka protocol version
支持版本号如下:
0.8.2.0,0.8.2.1,0.8.2.2
0.9.0.0,0.9.0.1
0.10.0.0,0.10.0.1,0.10.1.0,0.10.1.1,0.10.2.0,0.10.2.1,0.10.2.2
0.11.0.0,0.11.0.1,0.11.0.2
1.0.0,1.0.1,1.0.2,1.1.0,1.1.1,
2.0.0,2.0.1,2.1.0,2.1.1,2.2.0,2.2.1,2.2.2,2.3.0,2.3.1,2.4.0,2.4.1,2.5.0,2.5.1,2.6.0,2.6.1,2.6.2,2.7.0,2.7.1,2.8.0,2.8.1,2.8.2
3.0.0,3.0.1,3.0.2,3.1.0,3.1.1,3.1.2,3.2.0,3.2.1,3.2.2,3.2.3,3.3.0,3.3.1
Brokers
是个数组,多个Broker
地址不能使用;
或者,
来隔开放在一行里,yaml
配置文件中正确的多个Broker
地址配置参考如下:
enable: true
inputs:
- Type: input_file
FilePaths:
- /home/test-log/*.log
flushers:
- Type: flusher_kafka_v2
Brokers:
- 192.XX.XX.1:9092
- 192.XX.XX.2:9092
- 192.XX.XX.3:9092
Topic: KafkaTestTopic
样例
采集/home/test-log/
路径下的所有文件名匹配*.log
规则的文件,并将采集结果发送到Kafka。
enable: true
inputs:
- Type: input_file
FilePaths:
- /home/test-log/*.log
flushers:
- Type: flusher_kafka_v2
Brokers:
- 192.XX.XX.1:9092
- 192.XX.XX.2:9092
- 192.XX.XX.3:9092
Topic: KafkaTestTopic
进阶配置
以下面的一段日志为例,后来将展开介绍ilogtail kafka flusher的一些高阶配置
2022-07-22 10:19:23.684 ERROR [springboot-docker] [http-nio-8080-exec-10] com.benchmark.springboot.controller.LogController : error log
以上面这行日志为例 , 我们通ilogtail
的processor_regex
插件,将上面的日志提取处理后几个关键字段:
最后推送到kafka
的数据样例如下:
{
"contents": {
"class": "org.springframework.web.servlet.DispatcherServlet@initServletBean:547",
"application": "springboot-docker",
"level": "ERROR",
"message": "Completed initialization in 9 ms",
"thread": "http-nio-8080-exec-10",
"time": "2022-07-20 16:55:05.415"
},
"tags": {
"k8s.namespace.name": "java_app",
"host.ip": "192.168.6.128",
"host.name": "master",
"log.file.path": "/data/test.log"
},
"time": 1664435098
}
动态topic
针对上面写入的这种日志格式,如果想根据application
名称针对不用的应用推送到不同的topic
, 则topic
可以这样配置。
Topic: test_%{content.application}
最后ilogtail
就自动将日志推送到test_springboot-docker
这个topic
中。 topic
动态表达式规则:
%{content.fieldname}
。content
代表从contents
中取指定字段值
%{tag.fieldname}
,tag
表示从tags
中取指定字段值,例如:%{tag.k8s.namespace.name}
${env_name}
, 读取系统变量绑定到动态topic
上,ilogtail 1.5.0
开始支持。
动态topic中使用系统变量
动态topic
绑定系统变量的两种场景:
将系统变量采集添加到日志的tag
中,然后使用%{tag.fieldname}
规则完成绑定。
对系统变量无采集存储需求,只是想根据设定的系统变量将日志推送到指定的topic
中,直接采用${env_name}
规则完成绑定,此方式需要1.5.0
才支持。
由于上面提到的两种系统变量的采集绑定都需要做一些特殊配置,因此下面将分别介绍下相关的配置操作。
(1)将系统变量采集到日志中完成动态topic
绑定
将系统变量采集添加到日志中有两种方式,一种是在ilogtail
容器env
添加,另一种是通过processor_add_fields
插件添加, 两种方式不同的配置参考下面的介绍
在daemonset
或者sidecar
方式部署的ilogtail
容器env
配置部分添加自定义的系统变量,配置参考案例如下:
env:
- name: ALIYUN_LOG_ENV_TAGS # add log tags from env
value: _node_name_|_node_ip_|_app_name_
- name: _app_name_ # 添加自定义_app_name_变量,
value: kafka
自定义的变量_app_name_
被添加到ALIYUN_LOG_ENV_TAGS
中,日志的tags
中会看到自定义的变量, 此时动态 topic
采用%{tag.fieldname}
规则配置即可。
使用processor_add_fields
插件系统变量添加到日志中,配置参考如下:
processors:
- Type: processor_add_fields
Fields:
service: ${env_name}
IgnoreIfExist: false
这里${env_name}
生效依赖于ilogtail
的enable_env_ref_in_config
配置,从ilogtail 1.5.0
开始支持。
(2)直接采用$
符将系统变量绑定动态topic
中
在daemonset
或者sidecar
方式部署的ilogtail
容器env
配置部分添加自定义的系统变量,配置参考案例如下:
env:
- name: ALIYUN_LOG_ENV_TAGS # add log tags from env
value: _node_name_|_node_ip_
- name: app_name # 添加自定义app_name变量,
value: kafka
app_name
添加到系统变量中后,直接采用动态topic的:${env_name}
规则即可绑定。
enable: true
inputs:
- Type: input_file
FilePaths:
- /home/test-log/*.log
flushers:
- Type: flusher_kafka_v2
Brokers:
- 192.XX.XX.1:9092
- 192.XX.XX.2:9092
- 192.XX.XX.3:9092
Topic: ilogtail_${app_name}
${app_name}
就是我们上面添加的系统变量。
TagFieldsRename
例如将tags
中的host.name
重命名为hostname
,配置参考如下:
enable: true
inputs:
- Type: input_file
FilePaths:
- /home/test-log/*.log
flushers:
- Type: flusher_kafka_v2
Brokers:
- 192.XX.XX.1:9092
- 192.XX.XX.2:9092
- 192.XX.XX.3:9092
Convert:
TagFieldsRename:
host.name: hostname
Topic: KafkaTestTopic
ProtocolFieldsRename
对ilogtail
协议字段重命名,在ilogtail
的数据转换协议中, 最外层三个字段contents
,tags
和time
属于协议字段。ProtocolFieldsRename
只能对 contents
,tags
和time
这个三个字段进行重命名。 例如在使用Elasticsearch
你可能想直接将time
重命名为@timestamp
,则配置参考如下:
enable: true
inputs:
- Type: input_file
FilePaths:
- /home/test-log/*.log
flushers:
- Type: flusher_kafka_v2
Brokers:
- 192.XX.XX.1:9092
- 192.XX.XX.2:9092
- 192.XX.XX.3:9092
Convert:
TagFieldsRename:
host.name: hostname
ProtocolFieldsRename:
time: '@timestamp'
Topic: KafkaTestTopic
指定分区分发
ilogtail
一共支持三种分区分发方式:
random
和roundrobin
分发只需要配置PartitionerType
指定对应的分区分发方式即可。 hash
分发相对比较特殊,可以指定HashKeys
,HashKeys
的中配置的字段名只能是contents
中的字段属性。
配置用例:
enable: true
inputs:
- Type: input_file
FilePaths:
- /home/test-log/*.log
flushers:
- Type: flusher_kafka_v2
PartitionerType: hash
HashKeys:
- content.application
Brokers:
- 192.XX.XX.1:9092
- 192.XX.XX.2:9092
- 192.XX.XX.3:9092
Topic: KafkaTestTopic
content.application
中表示从contents
中取数据application
字段数据,如果对contents
协议字段做了重命名, 例如重名为messege
,则应该配置为messege.application
iLogtail
中Kafka
的消息头是以键值对数组的形式配置的。header
中value
仅支持字符串类型。
enable: true
inputs:
- Type: input_file
FilePaths:
- /home/test-log/*.log
flushers:
- Type: flusher_kafka_v2
Brokers:
- 192.XX.XX.1:9092
- 192.XX.XX.2:9092
- 192.XX.XX.3:9092
Topic: KafkaTestTopic
Headers:
- key: "key1"
value: "value1"
- key: "key2"
value: "value2"
数据平铺
ilogtail 1.8.0
新增数据平铺协议custom_single_flatten
,contents
、tags
和time
三个convert
层的协议字段中数据做一级打平。 当前convert
协议在单条数据处理仅支持json
编码,因此custom_single_flatten
需要配合json
编码一起使用。
enable: true
inputs:
- Type: input_file
FilePaths:
- /home/test-log/*.log
flushers:
- Type: flusher_kafka_v2
Brokers:
- 192.XX.XX.1:9092
- 192.XX.XX.2:9092
- 192.XX.XX.3:9092
Convert:
Protocol: custom_single_flatten
Encoding: json
Topic: KafkaTestTopic
非平铺前写入kafka
的消息格式
{
"contents": {
"class": "org.springframework.web.servlet.DispatcherServlet@initServletBean:547",
"application": "springboot-docker",
"level": "ERROR",
"message": "Completed initialization in 9 ms",
"thread": "http-nio-8080-exec-10",
"@time": "2022-07-20 16:55:05.415"
},
"tags": {
"k8s.namespace.name": "java_app",
"host.ip": "192.168.6.128",
"host.name": "master",
"log.file.path": "/data/test.log"
},
"time": 1664435098
}
使用平铺协议后custom_single_flatten
,json
全部被一级平铺。
{
"class": "org.springframework.web.servlet.DispatcherServlet@initServletBean:547",
"application": "springboot-docker",
"level": "ERROR",
"message": "Completed initialization in 9 ms",
"thread": "http-nio-8080-exec-10",
"@time": "2022-07-20 16:55:05.415",
"k8s.namespace.name": "java_app",
"host.ip": "192.168.6.128",
"host.name": "master",
"log.file.path": "/data/test.log",
"time": 1664435098
}
安全连接配置
flusher_kafka_v2
支持多种安全认证连接kafka
服务端。
PlainText
认证,ilogtail v1.3.0
开始支持;
SASL
认证,ilogtail v1.3.0
开始支持;
TLS
认证,ilogtail v1.4.0
开始支持;
Kerberos
认证(待测试验证),ilogtail v1.4.0
开始支持;
前面两种配置比较简单,下面主要介绍下TLS
和Kerberos
两种认证的配置。
TLS配置参考
enable: true
inputs:
- Type: input_file
FilePaths:
- /home/test-log/*.log
flushers:
- Type: flusher_kafka_v2
PartitionerType: hash
HashKeys:
- content.application
Brokers:
- 192.XX.XX.1:9092
- 192.XX.XX.2:9092
- 192.XX.XX.3:9092
Authentication:
TLS:
Enabled: true
CAFile: /data/cert/ca.crt
CertFile: /data/cert/client.crt
KeyFile: /data/cert/client.key
MinVersion: "1.1"
MaxVersion: "1.2"
Topic: KafkaTestTopic
注: 配置仅供参考,证书文件请自行生成后根据事情情况配置。
Kerberos配置参考
enable: true
inputs:
- Type: input_file
FilePaths:
- /home/test-log/*.log
flushers:
- Type: flusher_kafka_v2
PartitionerType: hash
HashKeys:
- content.application
Brokers:
- 192.XX.XX.1:9092
- 192.XX.XX.2:9092
- 192.XX.XX.3:9092
Authentication:
Kerberos:
ServiceName: kafka
Realm: test
UseKeyTab: true
ConfigPath: "/etc/krb5.conf"
KeyTabPath: "/etc/security/kafka.keytab"
Topic: KafkaTestTopic
Last updated