Kafka

简介

service_kafka input插件可以采集Kafka的消息。

版本

Stable

配置参数

参数类型是否必选说明

Type

String

插件类型,指定为service_kafka

Version

String

Kafka集群版本号。

Brokers

Array

Kafka服务器地址列表。

ConsumerGroup

String

Kafka消费组名称。

Topics

Array

待消费的Kafka订阅主题列表。

ClientID

String

消费Kafka的用户ID。

Offset

String

Kafka初始消费位移类型,可选值包括:oldest和newest。如果未添加该参数,则默认使用oldest,表示从最早可用的位移处开始消费。

MaxMessageLen

Integer

Kafka消息的最大允许长度,单位为字节,取值范围为:1~524288。如果未添加该参数,则默认使用524288,即512KB。

SASLUsername

String

SASL用户名。

SASLPassword

String

SASL密码。

样例

采集服务器地址为172.xx.xx.48和172.xx.xx.34、主题为topicA和topicB的Kafka消息,并将采集结果输出至标准输出,其中Kafka集群的版本为2.1.1,消费组的名称为test-group,其余取默认值。

  • 输入

{"payload":"foo"}
  • 采集配置

enable: true
inputs:
  - Type: service_kafka
    Version: 2.1.1
    Brokers: 
        - 172.xx.xx.48
        - 172.xx.xx.34
    ConsumerGroup: test-group
    Topics:
        - topicA
        - topicB
    ClientID: sls
flushers:
  - Type: flusher_stdout
    OnlyStdout: true  
  • 输出

{"payload":"foo"}

Last updated