跳到主要内容
版本:Next

Apache Pulsar

Apache Pulsar 源连接器

描述

Apache Pulsar 的源连接器。

关键特性

选项

参数名类型必须默认值描述
topicString-主题名称
topic-patternString-主题名称的正则表达式模式
topic-discovery.intervalLong-1发现新主题分区的间隔(毫秒)
subscription.nameString-订阅名称
client.service-urlString-Pulsar 服务 URL
admin.service-urlString-Pulsar 管理端点的 HTTP URL
auth.plugin-classString-认证插件的名称
auth.paramsString-认证插件的参数
poll.timeoutInteger100获取记录时的最大等待时间(毫秒)
poll.intervalLong50获取记录时的间隔时间(毫秒)
poll.batch.sizeInteger500轮询时要获取的最大记录数
cursor.startup.modeEnumLATEST启动模式
cursor.startup.timestampLong-启动时间戳(毫秒)
cursor.reset.modeEnumLATEST游标重置策略
cursor.stop.modeEnumNEVER停止模式
cursor.stop.timestampLong-停止时间戳(毫秒)
schemaconfig-数据结构
common-options-源插件通用参数
formatStringjson数据格式

topic [String]

当表用作源时要读取数据的主题名称。它也支持通过分号分隔的主题列表,如 'topic-1;topic-2'。

注意,只能为源指定 "topic-pattern" 和 "topic" 中的一个。

topic-pattern [String]

主题名称模式的正则表达式。当作业开始运行时,所有名称与指定正则表达式匹配的主题都将被消费者订阅。

注意,只能为源指定 "topic-pattern" 和 "topic" 中的一个。

topic-discovery.interval [Long]

Pulsar 源发现新主题分区的间隔(毫秒)。非正值禁用主题分区发现。

注意,此选项仅在使用 'topic-pattern' 选项时有效。

subscription.name [String]

为此消费者指定订阅名称。构造消费者时需要此参数。

client.service-url [String]

Pulsar 服务的服务 URL 提供程序。要使用客户端库连接到 Pulsar,需要指定 Pulsar 协议 URL。

例如,localhost: pulsar://localhost:6650,localhost:6651

admin.service-url [String]

Pulsar 服务管理端点的 HTTP URL。

例如,http://my-broker.example.com:8080,或 https://my-broker.example.com:8443(用于 TLS)。

auth.plugin-class [String]

认证插件的名称。

auth.params [String]

认证插件的参数。

例如,key1:val1,key2:val2

poll.timeout [Integer]

获取记录时的最大等待时间(毫秒)。更长的时间会增加吞吐量但也会增加延迟。

poll.interval [Long]

获取记录时的间隔时间(毫秒)。更短的时间会增加吞吐量,但也会增加 CPU 负载。

poll.batch.size [Integer]

轮询时要获取的最大记录数。更长的时间会增加吞吐量但也会增加延迟。

cursor.startup.mode [Enum]

Pulsar 消费者的启动模式,有效值为 'EARLIEST''LATEST''SUBSCRIPTION''TIMESTAMP'

cursor.startup.timestamp [Long]

从指定的纪元时间戳(毫秒)开始。

注意,当 "cursor.startup.mode" 选项使用 'TIMESTAMP' 时,此选项是必需的。

cursor.reset.mode [Enum]

Pulsar 消费者的游标重置策略,有效值为 'EARLIEST''LATEST'

注意,此选项仅在 "cursor.startup.mode" 选项使用 'SUBSCRIPTION' 时有效。

cursor.stop.mode [String]

Pulsar 消费者的停止模式,有效值为 'NEVER''LATEST''TIMESTAMP'

注意,当指定 'NEVER' 时,这是一个实时作业,其他模式是离线作业。

cursor.stop.timestamp [Long]

从指定的纪元时间戳(毫秒)停止。

注意,当 "cursor.stop.mode" 选项使用 'TIMESTAMP' 时,此选项是必需的。

schema [Config]

数据的结构,包括字段名称和字段类型。参考 Schema-Feature

format [String]

数据格式。默认格式是 json,参考 formats

通用选项

源插件通用参数,请参考 源通用选项 详见。

示例

source {
Pulsar {
topic = "example"
subscription.name = "seatunnel"
client.service-url = "pulsar://localhost:6650"
admin.service-url = "http://my-broker.example.com:8080"
plugin_output = "test"
}
}

变更日志

Change Log
ChangeCommitVersion
[Improve][API] Optimize the enumerator API semantics and reduce lock calls at the connector level (#9671)https://github.com/apache/seatunnel/commit/9212a771402.3.12
[improve] pulsar options (#9180)https://github.com/apache/seatunnel/commit/26a2160c802.3.12
[Feature][Checkpoint] Add check script for source/sink state class serialVersionUID missing (#9118)https://github.com/apache/seatunnel/commit/4f5adeb1c72.3.11
[Improve] restruct connector common options (#8634)https://github.com/apache/seatunnel/commit/f3499a6eeb2.3.10
[Improve][dist]add shade check rule (#8136)https://github.com/apache/seatunnel/commit/51ef8000162.3.9
[Feature][Restapi] Allow metrics information to be associated to logical plan nodes (#7786)https://github.com/apache/seatunnel/commit/6b7c53d03c2.3.9
[Improve][API] Make sure the table name in TablePath not be null (#7252)https://github.com/apache/seatunnel/commit/764d8b0bc82.3.7
[Feature][Kafka] Support multi-table source read (#5992)https://github.com/apache/seatunnel/commit/60104602d12.3.6
[PulsarSource]Improve pulsar throughput performance. (#6234)https://github.com/apache/seatunnel/commit/37461f4f3e2.3.4
[Feature][Connector-v2][PulsarSink]Add Pulsar Sink Connector. (#4382)https://github.com/apache/seatunnel/commit/543d2c50862.3.4
[Chore] Remove useless DeserializationFormatFactory and its implement (#5880)https://github.com/apache/seatunnel/commit/f0511544ff2.3.4
fix: update IDENTIFIER = Pulsar for pulsar-datasource on project:seatunnel-web (#5852)https://github.com/apache/seatunnel/commit/3b6de3743e2.3.4
[Improve][Common] Introduce new error define rule (#5793)https://github.com/apache/seatunnel/commit/9d1b2582b22.3.4
Support config column/primaryKey/constraintKey in schema (#5564)https://github.com/apache/seatunnel/commit/eac76b4e502.3.4
[Improve][CheckStyle] Remove useless 'SuppressWarnings' annotation of checkstyle. (#5260)https://github.com/apache/seatunnel/commit/51c0d709ba2.3.4
[Hotfix] Fix com.google.common.base.Preconditions to seatunnel shade one (#5284)https://github.com/apache/seatunnel/commit/ed5eadcf732.3.3
[Feature][Json-format] support read format for pulsar (#4111)https://github.com/apache/seatunnel/commit/7d61ae93e72.3.2
[hotfix][pulsar] Fix the bug that can't consume messages all the time. (#4125)https://github.com/apache/seatunnel/commit/a6705cc5bf2.3.2
[Feature] add cdc multiple table support & fix zeta bughttps://github.com/apache/seatunnel/commit/533ff2c2fa2.3.1
[hotfix][pulsar] PulsarSource consumer ack exception. (#4237)https://github.com/apache/seatunnel/commit/9725d675da2.3.1
Merge branch 'dev' into merge/cdchttps://github.com/apache/seatunnel/commit/4324ee19122.3.1
[Improve][Project] Code format with spotless plugin.https://github.com/apache/seatunnel/commit/423b5830382.3.1
[Improve][Connector-v2][Pulsar] Set the name of the pulsar consumption thread. (#4182)https://github.com/apache/seatunnel/commit/e567203f7d2.3.1
[improve][api] Refactoring schema parse (#4157)https://github.com/apache/seatunnel/commit/b2f573a13e2.3.1
[Improve][build] Give the maven module a human readable name (#4114)https://github.com/apache/seatunnel/commit/d7cd6010512.3.1
[Improve][Project] Code format with spotless plugin. (#4101)https://github.com/apache/seatunnel/commit/a2ab1665612.3.1
[Bug][Connector-v2][PulsarSource]Fix pulsar option topic-pattern bug. (#3989)https://github.com/apache/seatunnel/commit/aee2c580ea2.3.1
[Feature][Connector] add get source method to all source connector (#3846)https://github.com/apache/seatunnel/commit/417178fb842.3.1
[Feature][API & Connector & Doc] add parallelism and column projection interface (#3829)https://github.com/apache/seatunnel/commit/b9164b8ba12.3.1
[Improve][Connector-V2][Pulsar] Unified exception for Pulsar source &… (#3590)https://github.com/apache/seatunnel/commit/4fe93234192.3.0
[Hotfix][OptionRule] Fix option rule about all connectors (#3592)https://github.com/apache/seatunnel/commit/226dc6a1192.3.0
[Hotfix][Connector-V2][Pulsar] fix conditional options (#3504)https://github.com/apache/seatunnel/commit/0066affacf2.3.0
[Feature][Connector][pulsar] expose configurable options in Pulsar (#3341)https://github.com/apache/seatunnel/commit/200faa7c292.3.0
[Connector][Dependency] Add Miss Dependency Cassandra And Change Kudu Plugin Name (#3432)https://github.com/apache/seatunnel/commit/6ac6a0a0cd2.3.0
[chore] fix pulsar consumer comment error (#3356)https://github.com/apache/seatunnel/commit/91e632c5262.3.0
[Connector-V2][ElasticSearch] Add ElasticSearch Source/Sink Factory (#3325)https://github.com/apache/seatunnel/commit/38254e3f262.3.0
[hotfix][connector][pulsar] Fix not being able to mark #noMoreNewSplits when restoring (#2945)https://github.com/apache/seatunnel/commit/5ad69076b32.3.0-beta
Move Handover to common module (#2877)https://github.com/apache/seatunnel/commit/d94a874bcb2.3.0-beta
[hotfix][connector-v2] fix pulsar source exceptions (#2820)https://github.com/apache/seatunnel/commit/8ff0ba70152.2.0-beta
[#2606]Dependency management split (#2630)https://github.com/apache/seatunnel/commit/fc047be69b2.2.0-beta
[SeaTunnel]Simply seatunnel package pipeline. (#2563)https://github.com/apache/seatunnel/commit/9d88b6221a2.2.0-beta
[Improve][Connector-V2] Pulsar support user-defined schema (#2436)https://github.com/apache/seatunnel/commit/16cabe6a352.2.0-beta
[improve][UT] Upgrade junit to 5.+ (#2305)https://github.com/apache/seatunnel/commit/362319ff3e2.2.0-beta
StateT of SeaTunnelSource should extend Serializable (#2214)https://github.com/apache/seatunnel/commit/8c426ef8502.2.0-beta
[doc][connector-v2] pulsar source options doc (#2128)https://github.com/apache/seatunnel/commit/59ce8a2b322.2.0-beta
[api-draft][Optimize] Optimize module name (#2062)https://github.com/apache/seatunnel/commit/f79e3112b12.2.0-beta