📙
iLogtail用户手册
1.8.4
1.8.4
  • 关于
    • 什么是iLogtail
    • 发展历史
    • 产品优势
    • 开源协议
    • 社区版和企业版的对比说明
  • 安装
    • 快速开始
    • Docker使用
    • Kubernetes使用
    • 守护进程
    • 发布记录
    • 支持的操作系统
    • 源代码
      • 下载
      • 编译
      • Docker镜像
      • 编译依赖
    • 镜像站
  • 概念
    • 关键概念
    • 数据流水线
  • 配置
    • 采集配置
    • 系统参数
    • 日志
  • 数据流水线
    • 概览
    • 插件版本管理
    • 输入
      • 文本日志
      • 脚本执行数据
      • 容器标准输出
      • 文本日志(debug)
      • MetricInput示例插件
      • 主机Meta数据
      • Mock数据-Metric
      • eBPF网络调用数据
      • 主机监控数据
      • MySQL Binlog
      • GO Profile
      • GPU数据
      • HTTP数据
      • ServiceInput示例插件
      • Journal数据
      • Kafka
      • Mock数据-Service
      • SqlServer 查询数据
      • OTLP数据
      • PostgreSQL 查询数据
      • Syslog数据
    • 处理
      • 添加字段
      • 添加云资产信息
      • 原始数据
      • 数据脱敏
      • 丢弃字段
      • 字段加密
      • 条件字段处理
      • 日志过滤
      • Go时间格式解析
      • Grok
      • Json
      • 日志转SLS Metric
      • 正则
      • 重命名字段
      • 分隔符
      • 键值对
      • 多行切分
      • 字符串替换
    • 聚合
      • 基础
      • 上下文
      • 按Key分组
      • 按GroupMetadata分组
    • 输出
      • Kafka(Deprecated)
      • kafkaV2
      • ClickHouse
      • ElasticSearch
      • SLS
      • 标准输出/文件
      • OTLP日志
      • Pulsar
      • HTTP
      • Loki
    • 加速
      • 分隔符加速
      • Json加速
      • 正则加速
  • 工作原理
    • 文件发现
    • 插件系统
  • 可观测性
    • 日志
  • 开发者指南
    • 开发环境
    • 日志协议
      • 协议转换
      • 增加新的日志协议
      • 协议
        • sls协议
        • 单条协议
    • 代码风格
    • 数据模型
    • 插件开发
      • 开源插件开发引导
      • Checkpoint接口
      • Logger接口
      • 如何开发Input插件
      • 如何开发Processor插件
      • 如何开发Aggregator插件
      • 如何开发Flusher插件
      • 如何生成插件文档
      • 插件文档规范
      • 纯插件模式启动
    • 测试
      • 单元测试
      • E2E测试
    • 代码检查
      • 检查代码规范
      • 检查文件许可证
      • 检查依赖包许可证
  • 贡献指南
    • 贡献指南
    • 开发者
    • 成就
  • 性能测试
    • 容器场景iLogtail与Filebeat性能对比测试
  • 管控工具
    • 使用介绍
    • 通信协议
    • 开发指南
  • Awesome iLogtail
    • 走近iLogtail社区版
    • iLogtail社区版使用入门
    • iLogtail社区版开发者指南
    • iLogtail社区版使用案例
Powered by GitBook
On this page
  • 简介
  • 版本
  • 配置参数
  • 样例
  • 进阶配置
  • 动态topic
  • TagFieldsRename
  • ProtocolFieldsRename
  • 指定分区分发
  • 数据平铺
  • 安全连接配置
  • OAuth2认证配置参考(待验证)
  • TLS配置参考(待验证)
  • Athenz认证配置参考(待验证)
  1. 数据流水线
  2. 输出

Pulsar

PreviousOTLP日志NextHTTP

Last updated 1 year ago

简介

flusher_pulsar flusher插件可以实现将采集到的数据,经过处理后,发送到Pulsar。

版本

配置参数

参数
类型
是否必选
说明

Type

String

是

插件类型

Url

String

是

Pulsar url,多地址用逗号分隔,可以参考本文中的用例配置

Topic

String

是

Pulsar Topic,支持动态topic, 例如: test_%{contents.appname}

Name

String

否

producer名称,默认ilogtail

Convert

Struct

否

ilogtail数据转换协议配置

Convert.Protocol

String

否

ilogtail数据转换协议,kafka flusher 可选值:custom_single,custom_single_flatten,otlp_log_v1。默认值:custom_single

Convert.Encoding

String

否

ilogtail flusher数据转换编码,可选值:json、none、protobuf,默认值:json

Convert.TagFieldsRename

Map

否

对日志中tags中的json字段重命名

Convert.ProtocolFieldsRename

Map

否

ilogtail日志协议字段重命名,可当前可重命名的字段:contents,tags和time

EnableTLS

Boolean

否

是否启用TLS安全连接,对应采用TLS和Athenz两种认证模式都需要设置为true,默认值:false

TLSTrustCertsFilePath

String

否

TLS CA根证书文件路径,对应采用TLS和Athenz认证时需要指定

Authentication

Struct

否

Pulsar连接访问认证配置

Authentication.TLS.CertFile

String

否

TLS连接Pulsar证书文件路径

Authentication.TLS.KeyFile

String

否

TLS连接Pulsar私钥文件路径

Authentication.Token.Token

String

否

采用JWT 认证方式的token

Authentication.Athenz.ProviderDomain

String

否

Provider domain name

Authentication.Athenz.TenantDomain

String

否

租户域

Authentication.Athenz.TenantService

String

否

租户服务

Authentication.Athenz.PrivateKey

String

否

Tenant private key path

Authentication.Athenz.KeyID

String

否

Key id for the tenant private key

Authentication.Athenz.PrincipalHeader

String

否

Authentication.Athenz.ZtsURL

String

否

ZTS server的地址

Authentication.OAuth2.Enabled

Boolean

否

是否启用OAuth2认证

Authentication.OAuth2.IssuerURL

String

是

认证提供商的URL,OAuth2.Enabled开启时必填

Authentication.OAuth2.PrivateKey

String

是

JSON 凭据文件的 URL,OAuth2.Enabled开启时必填

Authentication.OAuth2.Audience

String

否

Pulsar 集群的 OAuth 2.0 “资源服务” 的标识符

Authentication.OAuth2.Scope

String

否

访问范围

CompressionType

String

否

压缩算法,NONE,LZ4,ZLIB,ZSTD,默认值NONE

BlockIfQueueFull

Boolean

否

队列满的时候是否阻塞,默认值:false

SendTimeout

Int

否

发送超时时间,默认30s

OperationTimeout

Int

否

pulsar producer创建、订阅、取消订阅的超时时间,默认30s

ConnectionTimeout

Int

否

tcp连接建立超时时间,默认5s

MaxConnectionsPerBroker

Int

否

单个broker连接池保持的连接数,默认1

MaxReconnectToBroker

Int

否

重连broker的最大重试次数,默认为无限

HashingScheme

Int

否

消息push分区的分发方式:JavaStringHash,Murmur3_32Hash,默认值:JavaStringHash

BatchingMaxPublishDelay

int

否

提交时延,默认值:1ms

BatchingMaxMessages

int

否

批量提交最大消息数,默认值:1000

MaxCacheProducers

int

否

动态topic情况下最大Producer数量 ,默认最大数量:8,使用动态topic的使用可以根据自己的情况调整。

PartitionKeys

String数组

否

指定消息分区分发的key。

ClientID

String

否

写入Pulsar的Client ID,默认取值:iLogtail。

样例

采集/home/test-log/路径下的所有文件名匹配*.log规则的文件,并将采集结果发送到Pulsar。

enable: true
inputs:
  - Type: file_log
    LogPath: /home/test_log
    FilePattern: "*.log"
flushers:
  - Type: flusher_pulsar
    URL: "pulsar://192.168.6.128:6650,192.168.6.129:6650,192.168.6.130:6650"
    Topic: PulsarTestTopic

进阶配置

以下面的一段日志为例,后来将展开介绍ilogtail pulsar flusher的一些高阶配置

2022-07-22 10:19:23.684 ERROR [springboot-docker] [http-nio-8080-exec-10] com.benchmark.springboot.controller.LogController : error log

以上面这行日志为例 , 我们通ilogtail的processor_regex插件,将上面的日志提取处理后几个关键字段:

  • time

  • loglevel

  • appname

  • thread

  • class

  • message

最后推送到kafka的数据样例如下:

{
  "contents": {
    "class": "org.springframework.web.servlet.DispatcherServlet@initServletBean:547",
    "application": "springboot-docker",
    "level": "ERROR",
    "message": "Completed initialization in 9 ms",
    "thread": "http-nio-8080-exec-10",
    "time": "2022-07-20 16:55:05.415"
  },
  "tags": {
    "k8s.namespace.name":"java_app",
    "host.ip": "192.168.6.128",
    "host.name": "master",
    "log.file.path": "/data/test.log"
  },
  "time": 1664435098
}

动态topic

针对上面写入的这种日志格式,如果想根据application名称针对不用的应用推送到不通的topic, 则topic可以这样配置。

Topic: test_%{content.application}

最后ilogtail就自动将日志推送到test_springboot-docker这个topic中。

topic动态表达式规则:

  • %{content.fieldname}。content代表从contents中取指定字段值

  • %{tag.fieldname},tag表示从tags中取指定字段值,例如:%{tag.k8s.namespace.name}

  • ${env_name}, 读取系统变量绑定到动态topic上,ilogtail 1.5.0开始支持。可以参考flusher-kafka_v2中的使用。

  • 其它方式暂不支持

TagFieldsRename

例如将tags中的host.name重命名为hostname,配置参考如下:

enable: true
inputs:
  - Type: file_log
    LogPath: /home/test_log
    FilePattern: "*.log"
flushers:
  - Type: flusher_pulsar
    URL: "pulsar://192.168.6.128:6650,192.168.6.129:6650,192.168.6.130:6650"
    Convert:
      TagFieldsRename:
        host.name: hostname
    Topic: PulsarTestTopic

ProtocolFieldsRename

对ilogtail协议字段重命名,在ilogtail的数据转换协议中, 最外层三个字段contents,tags和time属于协议字段。ProtocolFieldsRename只能对 contents,tags和time这个三个字段进行重命名。 例如在使用Elasticsearch你可能想直接将time重命名为@timestamp,则配置参考如下:

enable: true
inputs:
  - Type: file_log
    LogPath: /home/test_log
    FilePattern: "*.log"
flushers:
  - Type: flusher_pulsar
    URL: "pulsar://192.168.6.128:6650,192.168.6.129:6650,192.168.6.130:6650"
    Convert:
      TagFieldsRename:
        host.name: hostname
      ProtocolFieldsRename:
        time: '@timestamp'
    Topic: PulsarTestTopic

指定分区分发

ilogtail flusher pulsar使用的官方SDK只支持hash方式分区投递,通过HashingScheme来选择不同的hash算法。 分发是可以指定PartitionKeys,PartitionKeys的中配置的字段名只能是contents中的字段属性。

配置用例:

enable: true
inputs:
  - Type: file_log
    LogPath: /home/test_log
    FilePattern: "*.log"
flushers:
  - Type: flusher_pulsar
    PartitionKeys:
      - content.application  
    URL: "pulsar://192.168.6.128:6650,192.168.6.129:6650,192.168.6.130:6650"
    Topic: PulsarTestTopic
  • content.application中表示从contents中取数据application字段数据,如果对contents协议字段做了重命名, 例如重名为messege,则应该配置为messege.application

数据平铺

ilogtail 1.8.0新增数据平铺协议custom_single_flatten,contents、tags和time三个convert层的协议字段中数据做一级打平。 当前convert协议在单条数据处理仅支持json编码,因此custom_single_flatten需要配合json编码一起使用。

配置用例:

enable: true
inputs:
  - Type: file_log
    LogPath: /home/test_log
    FilePattern: "*.log"
flushers:
  - Type: flusher_pulsar
    Convert:
      Protocol: custom_single_flatten
      Encoding: json
    URL: "pulsar://192.168.6.128:6650,192.168.6.129:6650,192.168.6.130:6650"
    Topic: PulsarTestTopic

非平铺前写入pulsar的消息格式

{
  "contents": {
    "class": "org.springframework.web.servlet.DispatcherServlet@initServletBean:547",
    "application": "springboot-docker",
    "level": "ERROR",
    "message": "Completed initialization in 9 ms",
    "thread": "http-nio-8080-exec-10",
    "@time": "2022-07-20 16:55:05.415"
  },
  "tags": {
    "k8s.namespace.name":"java_app",
    "host.ip": "192.168.6.128",
    "host.name": "master",
    "log.file.path": "/data/test.log"
  },
  "time": 1664435098
}

使用平铺协议后custom_single_flatten,json全部被一级平铺。

{
    "class": "org.springframework.web.servlet.DispatcherServlet@initServletBean:547",
    "application": "springboot-docker",
    "level": "ERROR",
    "message": "Completed initialization in 9 ms",
    "thread": "http-nio-8080-exec-10",
    "@time": "2022-07-20 16:55:05.415",
    "k8s.namespace.name":"java_app",
    "host.ip": "192.168.6.128",
    "host.name": "master",
    "log.file.path": "/data/test.log",
    "time": 1664435098
}

安全连接配置

flusher_pulsar支持多种安全认证连接pulsar服务端。

  • TLS认证;

  • TokenJWT Token认证;

  • Athenz pulsar租户域认证;

  • OAuth2认证;

JWT Token认证配置比较简单,参照前面的配置表配置即可,下面主要介绍下OAuth2,TLS和Athenz两种认证的配置。

OAuth2认证配置参考(待验证)

下面配置仅供参考,请根据服务器实际部署情况配置

enable: true
inputs:
  - Type: file_log
    LogPath: /home/test_log
    FilePattern: "*.log"
flushers:
  - Type: flusher_pulsar
    URL: "pulsar://192.168.6.128:6650,192.168.6.129:6650,192.168.6.130:6650"
    Authentication:
      OAuth2:
        Enabled: true
        IssuerURL: https://accounts.google.com
        PrivateKey: file:/path/to/file/credentials_file.json
        Audience: https://broker.example.com
        Scope: api://pulsar-cluster-1/.default
    Topic: PulsarTestTopic

credentials_file.json配置内容样例

{
  "type": "client_credentials",
  "client_id": "d9ZyX97q1ef8Cr81WHVC4hFQ64vSlDK3",
  "client_secret": "on1uJ...k6F6R",
  "client_email": "1234567890-abcdefghijklmnopqrstuvwxyz@developer.gserviceaccount.com",
  "issuer_url": "https://accounts.google.com"
}

TLS配置参考(待验证)

下面配置仅供参考,请根据服务器实际部署情况配置

enable: true
inputs:
  - Type: file_log
    LogPath: /home/test_log
    FilePattern: "*.log"
flushers:
  - Type: flusher_pulsar
    URL: "pulsar+ssl://192.168.6.128:6651,192.168.6.129:6651,192.168.6.130:6651"
    EnableTLS: true
    TLSTrustCertsFilePath: /data/cert/ca.crt
    Authentication:
      TLS:
        CertFile: /data/cert/client.crt
        KeyFile: /data/cert/client.key
    Topic: PulsarTestTopic
  • EnableTLS 如果要启用TLS必须设置为true。开始TLS的情况下,URL头部为pulsar+ssl://

  • TLSTrustCertsFilePath根证书需要设置。 注: 配置仅供参考,证书文件请自行生成后根据事情情况配置。

Athenz认证配置参考(待验证)

下面配置仅供参考,请根据服务器实际部署情况配置

enable: true
inputs:
  - Type: file_log
    LogPath: /home/test_log
    FilePattern: "*.log"
flushers:
  - Type: flusher_pulsar
    URL: "pulsar+ssl://192.168.6.128:6651,192.168.6.129:6651,192.168.6.130:6651"
    EnableTLS: true
    TLSTrustCertsFilePath: /data/cert/ca.crt
    Authentication:
      Athenz:
        ProviderDomain: pulsar
        TenantDomain: shopping
        TenantService: some_app
        PrivateKey: file:///path/to/client-key.pem
        KeyId: v1
    Topic: PulsarTestTopic
  • EnableTLS 如果要启用Athenz认证必须设置为true。开始TLS的情况下,URL头部为pulsar+ssl://

  • TLSTrustCertsFilePath根证书需要设置。

Alpha