kafkaV2
简介
flusher_kafka_v2
flusher
插件可以实现将采集到的数据,经过处理后,发送到Kafka。
配置参数
参数 | 类型 | 是否必选 | 说明 |
---|---|---|---|
Type | String | 是 | 插件类型 |
Brokers | String数组 | 是 | Kafka Brokers |
Topic | String | 是 | Kafka Topic,支持动态topic, 例如: |
Version | String | 否 | Kafka协议版本号 ,例如: |
Convert | Struct | 否 | ilogtail数据转换协议配置 |
Convert.Protocol | String | 否 | ilogtail数据转换协议,kafka flusher 可选值: |
Convert.Encoding | String | 否 | ilogtail flusher数据转换编码,可选值: |
Convert.TagFieldsRename | Map | 否 | 对日志中tags中的json字段重命名 |
Convert.ProtocolFieldsRename | Map | 否 | ilogtail日志协议字段重命名,可当前可重命名的字段: |
Authentication | Struct | 否 | Kafka连接访问认证配置,支持 |
Authentication.PlainText.Username | String | 否 | PlainText认证用户名 |
Authentication.PlainText.Password | String | 否 | PlainText认证密码 |
Authentication.SASL.Username | String | 否 | SASL认证用户名 |
Authentication.SASL.Password | String | 否 | SASL认证密码 |
Authentication.Sasl.SaslMechanism | String | 否 | SASL认证,配置可选项: |
PartitionerType | String | 否 | Partitioner类型。取值: |
QequiredAcks | int | 否 | ACK的可靠等级.0=无响应,1=等待本地消息,-1=等待所有副本提交.默认1, |
Compression | String | 否 | 压缩算法,可选值: |
CompressionLevel | Int | 否 | 压缩级别,可选值: |
MaxMessageBytes | Int | 否 | 一个批次提交的大小限制,配置和 |
MaxRetries | Int | 否 | 提交失败重试次数,最大 |
BulkMaxSize | Int | 否 | 单次请求提交事件数,默认 |
BulkFlushFrequency | Int | 否 | 发送批量 Kafka 请求之前等待的时间,0标识没有时延,默认值: |
Timeout | Int | 否 | 等待Kafka brokers响应的超时时间,默认 |
BrokerTimeout | int | 否 | kafka broker等待请求的最大时长,默认 |
Metadata.Retry.Max | int | 否 | 最大重试次数,默认值: |
Metadata.Retry.Backoff | int | 否 | 在重试之前等待leader选举发生的时间,默认值: |
Metadata.RefreshFrequency | int | 否 | Metadata刷新频率,默认值: |
Metadata.Full | int | 否 | 获取原数数据的策略,获取元数据时使用的策略,当此选项为 |
HashKeys | String数组 | 否 | PartitionerType为 |
HashOnce | Boolean | 否 | |
ClientID | String | 否 | 写入Kafka的Client ID,默认取值: |
Version
需要填写的是kafka protocol version
版本号,flusher_kafka_v2
当前支持的kafka
版本范围:0.8.2.x~2.7.0
。 请根据自己的kafka
版本号参照下面的kafka protocol version
规则进行配置。kafka protocol version
配置规则如下:
样例
采集/home/test-log/
路径下的所有文件名匹配*.log
规则的文件,并将采集结果发送到Kafka。
进阶配置
以下面的一段日志为例,后来将展开介绍ilogtail kafka flusher的一些高阶配置
以上面这行日志为例 , 我们通ilogtail
的processor_regex
插件,将上面的日志提取处理后几个关键字段:
time
loglevel
appname
thread
class
message
最后推送到kafka
的数据样例如下:
动态topic
针对上面写入的这种日志格式,如果想根据application
名称针对不用的应用推送到不通的topic
, 则topic
可以这样配置。
最后ilogtail
就自动将日志推送到test_springboot-docker
这个topic
中。
topic
动态表达式规则:
%{content.fieldname}
。content
代表从contents
中取指定字段值%{tag.fieldname}
,tag
表示从tags
中取指定字段值,例如:%{tag.k8s.namespace.name}
其它方式暂不支持
TagFieldsRename
例如将tags
中的host.name
重命名为hostname
,配置参考如下:
ProtocolFieldsRename
对ilogtail
协议字段重命名,在ilogtail
的数据转换协议中, 最外层三个字段contents
,tags
和time
属于协议字段。ProtocolFieldsRename
只能对 contents
,tags
和time
这个三个字段进行重命名。 例如在使用Elasticsearch
你可能想直接将time
重命名为@timestamp
,则配置参考如下:
指定分区分发
ilogtail
一共支持三种分区分发方式:
random
随机分发, 默认。roundrobin
轮询分发。hash
分发。
random
和roundrobin
分发只需要配置PartitionerType
指定对应的分区分发方式即可。 hash
分发相对比较特殊,可以指定HashKeys
,HashKeys
的中配置的字段名只能是contents
中的字段属性。
配置用例:
content.application
中表示从contents
中取数据application
字段数据,如果对contents
协议字段做了重命名, 例如重名为messege
,则应该配置为messege.application
Last updated