Apache Pulsar
Apache Pulsar 源连接器
描述
Apache Pulsar 的源连接器。
关键特性
选项
| 参数名 | 类型 | 必须 | 默认值 | 描述 |
|---|---|---|---|---|
| topic | String | 否 | - | 主题名称 |
| topic-pattern | String | 否 | - | 主题名称的正则表达式模式 |
| topic-discovery.interval | Long | 否 | -1 | 发现新主题分区的间隔(毫秒) |
| subscription.name | String | 是 | - | 订阅名称 |
| client.service-url | String | 是 | - | Pulsar 服务 URL |
| admin.service-url | String | 是 | - | Pulsar 管理端点的 HTTP URL |
| auth.plugin-class | String | 否 | - | 认证插件的名称 |
| auth.params | String | 否 | - | 认证插件的参数 |
| poll.timeout | Integer | 否 | 100 | 获取记录时的最大等待时间(毫秒) |
| poll.interval | Long | 否 | 50 | 获取记录时的间隔时间(毫秒) |
| poll.batch.size | Integer | 否 | 500 | 轮询时要获取的最大记录数 |
| cursor.startup.mode | Enum | 否 | LATEST | 启动模式 |
| cursor.startup.timestamp | Long | 否 | - | 启动时间戳(毫秒) |
| cursor.reset.mode | Enum | 否 | LATEST | 游标重置策略 |
| cursor.stop.mode | Enum | 否 | NEVER | 停止模式 |
| cursor.stop.timestamp | Long | 否 | - | 停止时间戳(毫秒) |
| schema | config | 否 | - | 数据结构 |
| common-options | 否 | - | 源插件通用参数 | |
| format | String | 否 | json | 数据格式 |
topic [String]
当表用作源时要读取数据的主题名称。它也支持通过分号分隔的主题列表,如 'topic-1;topic-2'。
注意,只能为源指定 "topic-pattern" 和 "topic" 中的一个。
topic-pattern [String]
主题名称模式的正则表达式。当作业开始运行时,所有名称与指定正则表达式匹配的主题都将被消费者订阅。
注意,只能为源指定 "topic-pattern" 和 "topic" 中的一个。
topic-discovery.interval [Long]
Pulsar 源发现新主题分区的间隔(毫秒)。非正值禁用主题分区发现。
注意,此选项仅在使用 'topic-pattern' 选项时有效。
subscription.name [String]
为此消费者指定订阅名称。构造消费者时需要此参数。
client.service-url [String]
Pulsar 服务的服务 URL 提供程序。要使用客户端库连接到 Pulsar,需要指定 Pulsar 协议 URL。
例如,localhost: pulsar://localhost:6650,localhost:6651。
admin.service-url [String]
Pulsar 服务管理端点的 HTTP URL。
例如,http://my-broker.example.com:8080,或 https://my-broker.example.com:8443(用于 TLS)。
auth.plugin-class [String]
认证插件的名称。
auth.params [String]
认证插件的参数。
例如,key1:val1,key2:val2
poll.timeout [Integer]
获取记录时的最大等待时间(毫秒)。更长的时间会增加吞吐量但也会增加延迟。
poll.interval [Long]
获取记录时的间隔时间(毫秒)。更短的时间会增加吞吐量,但也会增加 CPU 负载。
poll.batch.size [Integer]
轮询时要获取的最大记录数。更长的时间会增加吞吐量但也会增加延迟。
cursor.startup.mode [Enum]
Pulsar 消费者的启动模式,有效值为 'EARLIEST'、'LATEST'、'SUBSCRIPTION'、'TIMESTAMP'。
cursor.startup.timestamp [Long]
从指定的纪元时间戳(毫秒)开始。
注意,当 "cursor.startup.mode" 选项使用 'TIMESTAMP' 时,此选项是必需的。
cursor.reset.mode [Enum]
Pulsar 消费者的游标重置策略,有效值为 'EARLIEST'、'LATEST'。
注意,此选项仅在 "cursor.startup.mode" 选项使用 'SUBSCRIPTION' 时有效。
cursor.stop.mode [String]
Pulsar 消费者的停止模式,有效值为 'NEVER'、'LATEST' 和 'TIMESTAMP'。
注意,当指定 'NEVER' 时,这是一个实时作业,其他模式是离线作业。
cursor.stop.timestamp [Long]
从指定的纪元时间戳(毫秒)停止。
注意,当 "cursor.stop.mode" 选项使用 'TIMESTAMP' 时,此选项是必需的。
schema [Config]
数据的结构,包括字段名称和字段类型。参考 Schema-Feature
format [String]
数据格式。默认格式是 json,参考 formats。
通用选项
源插件通用参数,请参考 源通用选项 详见。
示例
source {
Pulsar {
topic = "example"
subscription.name = "seatunnel"
client.service-url = "pulsar://localhost:6650"
admin.service-url = "http://my-broker.example.com:8080"
plugin_output = "test"
}
}
变更日志
Change Log
| Change | Commit | Version |
|---|---|---|
| [Improve][API] Optimize the enumerator API semantics and reduce lock calls at the connector level (#9671) | https://github.com/apache/seatunnel/commit/9212a77140 | 2.3.12 |
| [improve] pulsar options (#9180) | https://github.com/apache/seatunnel/commit/26a2160c80 | 2.3.12 |
| [Feature][Checkpoint] Add check script for source/sink state class serialVersionUID missing (#9118) | https://github.com/apache/seatunnel/commit/4f5adeb1c7 | 2.3.11 |
| [Improve] restruct connector common options (#8634) | https://github.com/apache/seatunnel/commit/f3499a6eeb | 2.3.10 |
| [Improve][dist]add shade check rule (#8136) | https://github.com/apache/seatunnel/commit/51ef800016 | 2.3.9 |
| [Feature][Restapi] Allow metrics information to be associated to logical plan nodes (#7786) | https://github.com/apache/seatunnel/commit/6b7c53d03c | 2.3.9 |
| [Improve][API] Make sure the table name in TablePath not be null (#7252) | https://github.com/apache/seatunnel/commit/764d8b0bc8 | 2.3.7 |
| [Feature][Kafka] Support multi-table source read (#5992) | https://github.com/apache/seatunnel/commit/60104602d1 | 2.3.6 |
| [PulsarSource]Improve pulsar throughput performance. (#6234) | https://github.com/apache/seatunnel/commit/37461f4f3e | 2.3.4 |
| [Feature][Connector-v2][PulsarSink]Add Pulsar Sink Connector. (#4382) | https://github.com/apache/seatunnel/commit/543d2c5086 | 2.3.4 |
| [Chore] Remove useless DeserializationFormatFactory and its implement (#5880) | https://github.com/apache/seatunnel/commit/f0511544ff | 2.3.4 |
| fix: update IDENTIFIER = Pulsar for pulsar-datasource on project:seatunnel-web (#5852) | https://github.com/apache/seatunnel/commit/3b6de3743e | 2.3.4 |
| [Improve][Common] Introduce new error define rule (#5793) | https://github.com/apache/seatunnel/commit/9d1b2582b2 | 2.3.4 |
| Support config column/primaryKey/constraintKey in schema (#5564) | https://github.com/apache/seatunnel/commit/eac76b4e50 | 2.3.4 |
| [Improve][CheckStyle] Remove useless 'SuppressWarnings' annotation of checkstyle. (#5260) | https://github.com/apache/seatunnel/commit/51c0d709ba | 2.3.4 |
| [Hotfix] Fix com.google.common.base.Preconditions to seatunnel shade one (#5284) | https://github.com/apache/seatunnel/commit/ed5eadcf73 | 2.3.3 |
| [Feature][Json-format] support read format for pulsar (#4111) | https://github.com/apache/seatunnel/commit/7d61ae93e7 | 2.3.2 |
| [hotfix][pulsar] Fix the bug that can't consume messages all the time. (#4125) | https://github.com/apache/seatunnel/commit/a6705cc5bf | 2.3.2 |
| [Feature] add cdc multiple table support & fix zeta bug | https://github.com/apache/seatunnel/commit/533ff2c2fa | 2.3.1 |
| [hotfix][pulsar] PulsarSource consumer ack exception. (#4237) | https://github.com/apache/seatunnel/commit/9725d675da | 2.3.1 |
| Merge branch 'dev' into merge/cdc | https://github.com/apache/seatunnel/commit/4324ee1912 | 2.3.1 |
| [Improve][Project] Code format with spotless plugin. | https://github.com/apache/seatunnel/commit/423b583038 | 2.3.1 |
| [Improve][Connector-v2][Pulsar] Set the name of the pulsar consumption thread. (#4182) | https://github.com/apache/seatunnel/commit/e567203f7d | 2.3.1 |
| [improve][api] Refactoring schema parse (#4157) | https://github.com/apache/seatunnel/commit/b2f573a13e | 2.3.1 |
| [Improve][build] Give the maven module a human readable name (#4114) | https://github.com/apache/seatunnel/commit/d7cd601051 | 2.3.1 |
| [Improve][Project] Code format with spotless plugin. (#4101) | https://github.com/apache/seatunnel/commit/a2ab166561 | 2.3.1 |
| [Bug][Connector-v2][PulsarSource]Fix pulsar option topic-pattern bug. (#3989) | https://github.com/apache/seatunnel/commit/aee2c580ea | 2.3.1 |
| [Feature][Connector] add get source method to all source connector (#3846) | https://github.com/apache/seatunnel/commit/417178fb84 | 2.3.1 |
| [Feature][API & Connector & Doc] add parallelism and column projection interface (#3829) | https://github.com/apache/seatunnel/commit/b9164b8ba1 | 2.3.1 |
| [Improve][Connector-V2][Pulsar] Unified exception for Pulsar source &… (#3590) | https://github.com/apache/seatunnel/commit/4fe9323419 | 2.3.0 |
| [Hotfix][OptionRule] Fix option rule about all connectors (#3592) | https://github.com/apache/seatunnel/commit/226dc6a119 | 2.3.0 |
| [Hotfix][Connector-V2][Pulsar] fix conditional options (#3504) | https://github.com/apache/seatunnel/commit/0066affacf | 2.3.0 |
| [Feature][Connector][pulsar] expose configurable options in Pulsar (#3341) | https://github.com/apache/seatunnel/commit/200faa7c29 | 2.3.0 |
| [Connector][Dependency] Add Miss Dependency Cassandra And Change Kudu Plugin Name (#3432) | https://github.com/apache/seatunnel/commit/6ac6a0a0cd | 2.3.0 |
| [chore] fix pulsar consumer comment error (#3356) | https://github.com/apache/seatunnel/commit/91e632c526 | 2.3.0 |
| [Connector-V2][ElasticSearch] Add ElasticSearch Source/Sink Factory (#3325) | https://github.com/apache/seatunnel/commit/38254e3f26 | 2.3.0 |
| [hotfix][connector][pulsar] Fix not being able to mark #noMoreNewSplits when restoring (#2945) | https://github.com/apache/seatunnel/commit/5ad69076b3 | 2.3.0-beta |
| Move Handover to common module (#2877) | https://github.com/apache/seatunnel/commit/d94a874bcb | 2.3.0-beta |
| [hotfix][connector-v2] fix pulsar source exceptions (#2820) | https://github.com/apache/seatunnel/commit/8ff0ba7015 | 2.2.0-beta |
| [#2606]Dependency management split (#2630) | https://github.com/apache/seatunnel/commit/fc047be69b | 2.2.0-beta |
| [SeaTunnel]Simply seatunnel package pipeline. (#2563) | https://github.com/apache/seatunnel/commit/9d88b6221a | 2.2.0-beta |
| [Improve][Connector-V2] Pulsar support user-defined schema (#2436) | https://github.com/apache/seatunnel/commit/16cabe6a35 | 2.2.0-beta |
| [improve][UT] Upgrade junit to 5.+ (#2305) | https://github.com/apache/seatunnel/commit/362319ff3e | 2.2.0-beta |
StateT of SeaTunnelSource should extend Serializable (#2214) | https://github.com/apache/seatunnel/commit/8c426ef850 | 2.2.0-beta |
| [doc][connector-v2] pulsar source options doc (#2128) | https://github.com/apache/seatunnel/commit/59ce8a2b32 | 2.2.0-beta |
| [api-draft][Optimize] Optimize module name (#2062) | https://github.com/apache/seatunnel/commit/f79e3112b1 | 2.2.0-beta |