RocketMQ
RocketMQ 源连接器
支持的 Apache RocketMQ 版本
- 4.9.0(或更新版本,供参考)
支持这些引擎
Spark
Flink
SeaTunnel Zeta
关键特性
描述
Apache RocketMQ 的源连接器。
源选项
| 参数名 | 类型 | 必须 | 默认值 | 描述 |
|---|---|---|---|---|
| topics | String | 是 | - | RocketMQ 主题名称。如果有多个主题,使用 , 分隔,例如:"tpc1,tpc2"。 |
| name.srv.addr | String | 是 | - | RocketMQ 名称服务器集群地址。 |
| tags | String | 否 | - | RocketMQ 标签名称。如果有多个标签,使用 , 分隔,例如:"tag1,tag2"。 |
| acl.enabled | Boolean | 否 | false | 如果为 true,启用访问控制,需要配置访问密钥和秘密密钥。 |
| access.key | String | 否 | 访问密钥 | |
| secret.key | String | 否 | 当 ACL_ENABLED 为 true 时,秘密密钥不能为空。 | |
| batch.size | int | 否 | 100 | RocketMQ 消费者拉取批大小 |
| consumer.group | String | 否 | SeaTunnel-Consumer-Group | RocketMQ 消费者组 ID,用于区分不同的消费者组。 |
| commit.on.checkpoint | Boolean | 否 | true | 如果为 true,消费者的偏移量将在后台定期提交。 |
| schema | 否 | - | 数据的结构,包括字段名称和字段类型。 | |
| format | String | 否 | json | 数据格式。默认格式是 json。可选 text 格式。默认字段分隔符是 ","。如果自定义分隔符,添加 "field.delimiter" 选项。 |
| field.delimiter | String | 否 | , | 自定义数据格式的字段分隔符 |
| start.mode | String | 否 | CONSUME_FROM_GROUP_OFFSETS | 消费者的初始消费模式,有几种类型:[CONSUME_FROM_LAST_OFFSET],[CONSUME_FROM_FIRST_OFFSET],[CONSUME_FROM_GROUP_OFFSETS],[CONSUME_FROM_TIMESTAMP],[CONSUME_FROM_SPECIFIC_OFFSETS] |
| start.mode.offsets | 否 | 消费模式为 "CONSUME_FROM_SPECIFIC_OFFSETS" 所需的偏移量 | ||
| start.mode.timestamp | Long | 否 | 消费模式为 "CONSUME_FROM_TIMESTAMP" 所需的时间。 | |
| partition.discovery.interval.millis | long | 否 | -1 | 动态发现主题和分区的间隔。 |
| ignore_parse_errors | Boolean | 否 | false | 可选标志,跳过解析错误而不是失败。 |
| common-options | config | 否 | - | 源插件通用参数,请参考 源通用选项 详见。 |
start.mode.offsets
消费模式为 "CONSUME_FROM_SPECIFIC_OFFSETS" 所需的偏移量。
例如:
start.mode.offsets = {
topic1-0 = 70
topic1-1 = 10
topic1-2 = 10
}
任务示例
简单
消费者读取 Rocketmq 数据并将其打印到控制台
env {
parallelism = 1
job.mode = "BATCH"
}
source {
Rocketmq {
name.srv.addr = "rocketmq-e2e:9876"
topics = "test_topic_json"
plugin_output = "rocketmq_table"
schema = {
fields {
id = bigint
c_map = "map<string, smallint>"
c_array = "array<tinyint>"
c_string = string
c_boolean = boolean
c_tinyint = tinyint
c_smallint = smallint
c_int = int
c_bigint = bigint
c_float = float
c_double = double
c_decimal = "decimal(2, 1)"
c_bytes = bytes
c_date = date
c_timestamp = timestamp
}
}
}
}
transform {
# 如果您想了解有关如何配置 seatunnel 的更多信息并查看完整的转换插件列表,
# 请访问 https://seatunnel.apache.org/docs/category/transform
}
sink {
Console {
}
}
指定格式消费简单
当我以 json 格式消费主题数据并解析,每次拉取的条数是 400,消费从原始位置开始
env {
parallelism = 1
job.mode = "BATCH"
}
source {
Rocketmq {
name.srv.addr = "localhost:9876"
topics = "test_topic"
plugin_output = "rocketmq_table"
start.mode = "CONSUME_FROM_FIRST_OFFSET"
batch.size = "400"
consumer.group = "test_topic_group"
format = json
schema = {
fields {
c_map = "map<string, string>"
c_array = "array<int>"
c_string = string
c_boolean = boolean
c_tinyint = tinyint
c_smallint = smallint
c_int = int
c_bigint = bigint
c_float = float
c_double = double
c_decimal = "decimal(30, 8)"
c_bytes = bytes
c_date = date
c_timestamp = timestamp
}
}
}
}
transform {
# 如果您想了解有关如何配置 seatunnel 的更多信息并查看完整的转换插件列表,
# 请访问 https://seatunnel.apache.org/docs/category/transform
}
sink {
Console {
}
}
变更日志
Change Log
| Change | Commit | Version |
|---|---|---|
| [Improve][API] Optimize the enumerator API semantics and reduce lock calls at the connector level (#9671) | https://github.com/apache/seatunnel/commit/9212a77140 | 2.3.12 |
| [improve] rocketmq options (#9251) | https://github.com/apache/seatunnel/commit/4cbe3b9172 | 2.3.12 |
| [Feature][Checkpoint] Add check script for source/sink state class serialVersionUID missing (#9118) | https://github.com/apache/seatunnel/commit/4f5adeb1c7 | 2.3.11 |
| [Improve][Connector-V2] RocketMQ Source add message tag config (#8825) | https://github.com/apache/seatunnel/commit/5913e8c35f | 2.3.10 |
| [Improve][Connector-V2] Add optional flag for rocketmq connector to skip parse errors instead of failing (#8737) | https://github.com/apache/seatunnel/commit/701f17b5d4 | 2.3.10 |
| [Improve][Connector-V2] RocketMQ Sink add message tag config (#7996) | https://github.com/apache/seatunnel/commit/97a1b00e48 | 2.3.9 |
| [Feature][Restapi] Allow metrics information to be associated to logical plan nodes (#7786) | https://github.com/apache/seatunnel/commit/6b7c53d03c | 2.3.9 |
| [Fix][Connector-V2] Fix some throwable error not be caught (#7657) | https://github.com/apache/seatunnel/commit/e19d73282e | 2.3.8 |
| [Feature][Kafka] Support multi-table source read (#5992) | https://github.com/apache/seatunnel/commit/60104602d1 | 2.3.6 |
| [Fix][connector-rocketmq] commit a correct offset to broker & reduce ThreadInterruptedException log (#6668) | https://github.com/apache/seatunnel/commit/b7480e1a89 | 2.3.6 |
| [fix][connector-rocketmq]Fix a NPE problem when checkpoint.interval is set too small(#6624) (#6625) | https://github.com/apache/seatunnel/commit/6e0c81d492 | 2.3.5 |
| [Test][E2E] Add thread leak check for connector (#5773) | https://github.com/apache/seatunnel/commit/1f2f3fc5f0 | 2.3.4 |
| [Fix][Connector] Rocketmq source startOffset greater than endOffset error (#6287) | https://github.com/apache/seatunnel/commit/cd44b5894e | 2.3.4 |
| [Improve][Common] Introduce new error define rule (#5793) | https://github.com/apache/seatunnel/commit/9d1b2582b2 | 2.3.4 |
[Improve] Remove use SeaTunnelSink::getConsumedType method and mark it as deprecated (#5755) | https://github.com/apache/seatunnel/commit/8de7408100 | 2.3.4 |
| [Improve][CheckStyle] Remove useless 'SuppressWarnings' annotation of checkstyle. (#5260) | https://github.com/apache/seatunnel/commit/51c0d709ba | 2.3.4 |
| [Improve][pom] Formatting pom (#4761) | https://github.com/apache/seatunnel/commit/1d6d3815ec | 2.3.2 |
| [Hotfix][Connector-V2][RocketMQ] Fix rocketmq spark e2e test cases (#4583) | https://github.com/apache/seatunnel/commit/e711f6ef4c | 2.3.2 |
| [Feature][Connector-V2] Add rocketmq source and sink (#4007) | https://github.com/apache/seatunnel/commit/e333897552 | 2.3.2 |