Pulsar

简介

flusher_pulsar flusher插件可以实现将采集到的数据,经过处理后,发送到Pulsar

版本

Alpha

配置参数

参数类型是否必选说明

Type

String

插件类型

Url

String

Pulsar url,多地址用逗号分隔,可以参考本文中的用例配置

Topic

String

Pulsar Topic,支持动态topic, 例如: test_%{contents.appname}

Name

String

producer名称,默认ilogtail

Convert

Struct

ilogtail数据转换协议配置

Convert.Protocol

String

ilogtail数据转换协议,kafka flusher 可选值:custom_single,custom_single_flatten,otlp_log_v1。默认值:custom_single

Convert.Encoding

String

ilogtail flusher数据转换编码,可选值:jsonnoneprotobuf,默认值:json

Convert.TagFieldsRename

Map

对日志中tags中的json字段重命名

Convert.ProtocolFieldsRename

Map

ilogtail日志协议字段重命名,可当前可重命名的字段:contents,tagstime

EnableTLS

Boolean

是否启用TLS安全连接,对应采用TLS和Athenz两种认证模式都需要设置为true,默认值:false

TLSTrustCertsFilePath

String

TLS CA根证书文件路径,对应采用TLS和Athenz认证时需要指定

Authentication

Struct

Pulsar连接访问认证配置

Authentication.TLS.CertFile

String

TLS连接Pulsar证书文件路径

Authentication.TLS.KeyFile

String

TLS连接Pulsar私钥文件路径

Authentication.Token.Token

String

采用JWT 认证方式的token

Authentication.Athenz.ProviderDomain

String

Provider domain name

Authentication.Athenz.TenantDomain

String

租户域

Authentication.Athenz.TenantService

String

租户服务

Authentication.Athenz.PrivateKey

String

Tenant private key path

Authentication.Athenz.KeyID

String

Key id for the tenant private key

Authentication.Athenz.PrincipalHeader

String

Authentication.Athenz.ZtsURL

String

ZTS server的地址

Authentication.OAuth2.Enabled

Boolean

是否启用OAuth2认证

Authentication.OAuth2.IssuerURL

String

认证提供商的URL,OAuth2.Enabled开启时必填

Authentication.OAuth2.PrivateKey

String

JSON 凭据文件的 URL,OAuth2.Enabled开启时必填

Authentication.OAuth2.Audience

String

Pulsar 集群的 OAuth 2.0 “资源服务” 的标识符

Authentication.OAuth2.Scope

String

访问范围

CompressionType

String

压缩算法,NONE,LZ4,ZLIB,ZSTD,默认值NONE

BlockIfQueueFull

Boolean

队列满的时候是否阻塞,默认值:false

SendTimeout

Int

发送超时时间,默认30s

OperationTimeout

Int

pulsar producer创建、订阅、取消订阅的超时时间,默认30s

ConnectionTimeout

Int

tcp连接建立超时时间,默认5s

MaxConnectionsPerBroker

Int

单个broker连接池保持的连接数,默认1

MaxReconnectToBroker

Int

重连broker的最大重试次数,默认为无限

HashingScheme

Int

消息push分区的分发方式:JavaStringHash,Murmur3_32Hash,默认值:JavaStringHash

BatchingMaxPublishDelay

int

提交时延,默认值:1ms

BatchingMaxMessages

int

批量提交最大消息数,默认值:1000

MaxCacheProducers

int

动态topic情况下最大Producer数量 ,默认最大数量:8,使用动态topic的使用可以根据自己的情况调整。

PartitionKeys

String数组

指定消息分区分发的key。

ClientID

String

写入Pulsar的Client ID,默认取值:iLogtail

样例

采集/home/test-log/路径下的所有文件名匹配*.log规则的文件,并将采集结果发送到Pulsar。

enable: true
inputs:
  - Type: input_file
    FilePaths: 
      - /home/test-log/*.log
flushers:
  - Type: flusher_pulsar
    URL: "pulsar://192.168.6.128:6650,192.168.6.129:6650,192.168.6.130:6650"
    Topic: PulsarTestTopic

进阶配置

以下面的一段日志为例,后来将展开介绍ilogtail pulsar flusher的一些高阶配置

2022-07-22 10:19:23.684 ERROR [springboot-docker] [http-nio-8080-exec-10] com.benchmark.springboot.controller.LogController : error log

以上面这行日志为例 , 我们通ilogtailprocessor_regex插件,将上面的日志提取处理后几个关键字段:

  • time

  • loglevel

  • appname

  • thread

  • class

  • message

最后推送到kafka的数据样例如下:

{
  "contents": {
    "class": "org.springframework.web.servlet.DispatcherServlet@initServletBean:547",
    "application": "springboot-docker",
    "level": "ERROR",
    "message": "Completed initialization in 9 ms",
    "thread": "http-nio-8080-exec-10",
    "time": "2022-07-20 16:55:05.415"
  },
  "tags": {
    "k8s.namespace.name":"java_app",
    "host.ip": "192.168.6.128",
    "host.name": "master",
    "log.file.path": "/data/test.log"
  },
  "time": 1664435098
}

动态topic

针对上面写入的这种日志格式,如果想根据application名称针对不用的应用推送到不通的topic, 则topic可以这样配置。

Topic: test_%{content.application}

最后ilogtail就自动将日志推送到test_springboot-docker这个topic中。

topic动态表达式规则:

  • %{content.fieldname}content代表从contents中取指定字段值

  • %{tag.fieldname},tag表示从tags中取指定字段值,例如:%{tag.k8s.namespace.name}

  • ${env_name}, 读取系统变量绑定到动态topic上,ilogtail 1.5.0开始支持。可以参考flusher-kafka_v2中的使用。

  • 其它方式暂不支持

TagFieldsRename

例如将tags中的host.name重命名为hostname,配置参考如下:

enable: true
inputs:
  - Type: input_file
    FilePaths: 
      - /home/test-log/*.log
flushers:
  - Type: flusher_pulsar
    URL: "pulsar://192.168.6.128:6650,192.168.6.129:6650,192.168.6.130:6650"
    Convert:
      TagFieldsRename:
        host.name: hostname
    Topic: PulsarTestTopic

ProtocolFieldsRename

ilogtail协议字段重命名,在ilogtail的数据转换协议中, 最外层三个字段contents,tagstime属于协议字段。ProtocolFieldsRename只能对 contents,tagstime这个三个字段进行重命名。 例如在使用Elasticsearch你可能想直接将time重命名为@timestamp,则配置参考如下:

enable: true
inputs:
  - Type: input_file
    FilePaths: 
      - /home/test-log/*.log
flushers:
  - Type: flusher_pulsar
    URL: "pulsar://192.168.6.128:6650,192.168.6.129:6650,192.168.6.130:6650"
    Convert:
      TagFieldsRename:
        host.name: hostname
      ProtocolFieldsRename:
        time: '@timestamp'
    Topic: PulsarTestTopic

指定分区分发

ilogtail flusher pulsar使用的官方SDK只支持hash方式分区投递,通过HashingScheme来选择不同的hash算法。 分发是可以指定PartitionKeysPartitionKeys的中配置的字段名只能是contents中的字段属性。

配置用例:

enable: true
inputs:
  - Type: input_file
    FilePaths: 
      - /home/test-log/*.log
flushers:
  - Type: flusher_pulsar
    PartitionKeys:
      - content.application  
    URL: "pulsar://192.168.6.128:6650,192.168.6.129:6650,192.168.6.130:6650"
    Topic: PulsarTestTopic
  • content.application中表示从contents中取数据application字段数据,如果对contents协议字段做了重命名, 例如重名为messege,则应该配置为messege.application

数据平铺

ilogtail 1.8.0新增数据平铺协议custom_single_flattencontentstagstime三个convert层的协议字段中数据做一级打平。 当前convert协议在单条数据处理仅支持json编码,因此custom_single_flatten需要配合json编码一起使用。

配置用例:

enable: true
inputs:
  - Type: input_file
    FilePaths: 
      - /home/test-log/*.log
flushers:
  - Type: flusher_pulsar
    Convert:
      Protocol: custom_single_flatten
      Encoding: json
    URL: "pulsar://192.168.6.128:6650,192.168.6.129:6650,192.168.6.130:6650"
    Topic: PulsarTestTopic

非平铺前写入pulsar的消息格式

{
  "contents": {
    "class": "org.springframework.web.servlet.DispatcherServlet@initServletBean:547",
    "application": "springboot-docker",
    "level": "ERROR",
    "message": "Completed initialization in 9 ms",
    "thread": "http-nio-8080-exec-10",
    "@time": "2022-07-20 16:55:05.415"
  },
  "tags": {
    "k8s.namespace.name":"java_app",
    "host.ip": "192.168.6.128",
    "host.name": "master",
    "log.file.path": "/data/test.log"
  },
  "time": 1664435098
}

使用平铺协议后custom_single_flattenjson全部被一级平铺。

{
    "class": "org.springframework.web.servlet.DispatcherServlet@initServletBean:547",
    "application": "springboot-docker",
    "level": "ERROR",
    "message": "Completed initialization in 9 ms",
    "thread": "http-nio-8080-exec-10",
    "@time": "2022-07-20 16:55:05.415",
    "k8s.namespace.name":"java_app",
    "host.ip": "192.168.6.128",
    "host.name": "master",
    "log.file.path": "/data/test.log",
    "time": 1664435098
}

安全连接配置

flusher_pulsar支持多种安全认证连接pulsar服务端。

  • TLS认证;

  • TokenJWT Token认证;

  • Athenz pulsar租户域认证;

  • OAuth2认证;

JWT Token认证配置比较简单,参照前面的配置表配置即可,下面主要介绍下OAuth2,TLSAthenz两种认证的配置。

OAuth2认证配置参考(待验证)

下面配置仅供参考,请根据服务器实际部署情况配置

enable: true
inputs:
  - Type: input_file
    FilePaths: 
      - /home/test-log/*.log
flushers:
  - Type: flusher_pulsar
    URL: "pulsar://192.168.6.128:6650,192.168.6.129:6650,192.168.6.130:6650"
    Authentication:
      OAuth2:
        Enabled: true
        IssuerURL: https://accounts.google.com
        PrivateKey: file:/path/to/file/credentials_file.json
        Audience: https://broker.example.com
        Scope: api://pulsar-cluster-1/.default
    Topic: PulsarTestTopic

credentials_file.json配置内容样例

{
  "type": "client_credentials",
  "client_id": "d9ZyX97q1ef8Cr81WHVC4hFQ64vSlDK3",
  "client_secret": "on1uJ...k6F6R",
  "client_email": "1234567890-abcdefghijklmnopqrstuvwxyz@developer.gserviceaccount.com",
  "issuer_url": "https://accounts.google.com"
}

TLS配置参考(待验证)

下面配置仅供参考,请根据服务器实际部署情况配置

enable: true
inputs:
  - Type: input_file
    FilePaths: 
      - /home/test-log/*.log
flushers:
  - Type: flusher_pulsar
    URL: "pulsar+ssl://192.168.6.128:6651,192.168.6.129:6651,192.168.6.130:6651"
    EnableTLS: true
    TLSTrustCertsFilePath: /data/cert/ca.crt
    Authentication:
      TLS:
        CertFile: /data/cert/client.crt
        KeyFile: /data/cert/client.key
    Topic: PulsarTestTopic
  • EnableTLS 如果要启用TLS必须设置为true。开始TLS的情况下,URL头部为pulsar+ssl://

  • TLSTrustCertsFilePath根证书需要设置。 注: 配置仅供参考,证书文件请自行生成后根据事情情况配置。

Athenz认证配置参考(待验证)

下面配置仅供参考,请根据服务器实际部署情况配置

enable: true
inputs:
  - Type: input_file
    FilePaths: 
      - /home/test-log/*.log
flushers:
  - Type: flusher_pulsar
    URL: "pulsar+ssl://192.168.6.128:6651,192.168.6.129:6651,192.168.6.130:6651"
    EnableTLS: true
    TLSTrustCertsFilePath: /data/cert/ca.crt
    Authentication:
      Athenz:
        ProviderDomain: pulsar
        TenantDomain: shopping
        TenantService: some_app
        PrivateKey: file:///path/to/client-key.pem
        KeyId: v1
    Topic: PulsarTestTopic
  • EnableTLS 如果要启用Athenz认证必须设置为true。开始TLS的情况下,URL头部为pulsar+ssl://

  • TLSTrustCertsFilePath根证书需要设置。

Last updated