For the complete documentation index, see llms.txt. This page is also available as Markdown.

Kafka(C++)

简介

flusher_kafka_native 将事件序列化为 JSON 文本后异步投递到 Kafka,底层基于 librdkafka。

源代码:FlusherKafka.cpp

版本

Alpha

版本说明

  • 推荐版本:LoongCollector v3.2.6 及以上

配置参数

参数
类型
是否必选
默认值
说明

Type

string

/

固定为 flusher_kafka_native

Brokers

[]string

/

Kafka 集群地址列表,如 ["host1:9092", "host2:9092"]

Topic

string

/

发送的目标 Topic 名称。支持动态 Topic 同 kafka_flusher_v2 扩展插件(仅字符串替换)。

Version

string

"1.0.0"

Kafka 协议版本,格式 x.y.z[.n],用于推导底层 librdkafka 兼容参数。

BulkFlushFrequency

uint

0

批量发送等待时长(毫秒),映射 linger.ms

BulkMaxSize

uint

2048

单批最大消息数,映射 batch.num.messages

MaxMessageBytes

uint

1000000

单条消息最大字节数,映射 message.max.bytes

QueueBufferingMaxKbytes

uint

1048576

本地队列总容量(KB),映射 queue.buffering.max.kbytes

QueueBufferingMaxMessages

uint

100000

本地队列最大消息数,映射 queue.buffering.max.messages

RequiredAcks

int

1

确认级别:0/1/-1(all),映射 acks

Timeout

uint

30000

请求超时(毫秒),映射 request.timeout.ms

MessageTimeoutMs

uint

300000

消息发送(含重试)超时(毫秒),映射 message.timeout.ms

MaxRetries

uint

3

失败重试次数,映射 message.send.max.retries

RetryBackoffMs

uint

100

重试退避(毫秒),映射 retry.backoff.ms

Kafka

map[string]string

/

透传自定义 librdkafka 配置,如 { "compression.type": "lz4" }

Headers

header数组

/

Kafka 消息头,静态键值对数组,value 仅支持字符串

PartitionerType

String

分区策略:randomhash。默认 random。当为 hash 时,会基于指定的 HashKeys 生成消息键(Key),并使用 murmur2_random 作为底层分区器。

HashKeys

String数组

参与分区键生成的字段(仅对 LOG 事件生效)。每项必须以 content. 前缀开头,如:["content.service", "content.user"]。当 PartitionerType = hash 时必填。

Compression

string

none

压缩算法:none/gzip/snappy/lz4,映射 compression.codec

CompressionLevel

int

-1

压缩级别,映射 compression.level

Authentication.TLS.Enabled

bool

false

启用 SSL 连接,对应 security.protocol=ssl

Authentication.TLS.CAFile

string

/

CA 证书路径,映射 ssl.ca.location

Authentication.TLS.CertFile

string

/

客户端证书路径,映射 ssl.certificate.location(与 KeyFile 必须成对配置,否则将视为配置错误)

Authentication.TLS.KeyFile

string

/

客户端私钥路径,映射 ssl.key.location(与 CertFile 必须成对配置,否则将视为配置错误)

Authentication.TLS.KeyPassword

string

/

私钥口令,映射 ssl.key.password(可选)

Authentication.SASL.Mechanism

string

/

SASL 机制:PLAINSCRAM-SHA-256SCRAM-SHA-512 等,对应 sasl.mechanisms

Authentication.SASL.Username

string

/

SASL 用户名(当设置 Mechanism 时必填),对应 sasl.username

Authentication.SASL.Password

string

/

SASL 密码(当设置 Mechanism 时必填),对应 sasl.password

Authentication.Kerberos.Enabled

bool

false

启用 Kerberos(GSSAPI) 认证,启用后 SASL(PLAIN/SCRAM) 不可同时设置。

Authentication.Kerberos.Mechanisms

string

GSSAPI

Kerberos 机制(通常为 GSSAPI),对应 sasl.mechanisms

Authentication.Kerberos.ServiceName

string

kafka

Kafka 服务名,对应 sasl.kerberos.service.name

Authentication.Kerberos.Principal

string

/

Kerberos 主体(启用 Kerberos 时必填)。

Authentication.Kerberos.Keytab

string

/

Kerberos keytab(启用 Kerberos 时必填)。

Authentication.Kerberos.KinitCmd

string

/

自定义 kinit 命令(通常无需配置)。

样例

动态 Topic

Topic 支持动态格式化,按事件内容或分组标签动态路由到不同的 Kafka Topic。支持的占位符:

  • %{content.key}: 取日志内容中的字段值(仅对 LOG 类型事件生效)。

  • %{tag.key}: 取分组标签(GroupTags)中的键值。

  • ${ENV_NAME}: 取分组标签中名为 ENV_NAME 的值(通常由上游处理器/输入端注入)。

示例:根据日志中的 service 字段动态路由到不同 Topic:

示例:根据标签 env 和日志字段 service 组合路由:

当动态格式化失败(字段缺失等)时,将回退到原始 Topic 模板字符串对应的静态值,并记录错误日志。

分区策略

当需要将相同业务键的日志落到同一分区时,可以开启 hash 分区:

  • PartitionerType: "hash":启用哈希分区,内部映射为 librdkafka partitioner=murmur2_random,与 Java 客户端默认分区器兼容(NULL Key 随机分配)。

  • HashKeys:从日志内容中取值拼接成消息 Key,示例:

TLS配置

支持通过 TLS/SSL 安全连接到 Kafka 集群。

SASL 示例

Kerberos 示例

Headers

  • 形态:数组,每个元素包含 keyvalue 两个字段,均为字符串;

  • 特性:静态配置,发送时原样附加到每条 Kafka 消息的 Header 中;

示例:

压缩支持

支持多种压缩模式

压缩算法
级别范围
默认级别
实现方式
特点
Kafka Broker 要求
适用场景

GZIP

0-9

Z_DEFAULT_COMPRESSION (通常为6)

zlib 的 deflateInit2()

高压缩率,CPU消耗大

无特殊要求

需要高压缩率的场景

LZ4

0-12

0 (无系统默认值)

LZ4F_preferences_t.compressionLevel

高速压缩/解压

无特殊要求

高吞吐量场景

Snappy

仅0

不适用

无级别控制

极速但压缩率低

无特殊要求

低延迟场景

None

不适用

不适用

无压缩

零开销

无要求

调试或非压缩场景

注:压缩算法ZSTD暂不支持

Last updated