跳到主要内容
版本:Next

RocketMQ

RocketMQ 源连接器

支持的 Apache RocketMQ 版本

  • 4.9.0(或更新版本,供参考)

支持这些引擎

Spark
Flink
SeaTunnel Zeta

关键特性

描述

Apache RocketMQ 的源连接器。

源选项

参数名类型必须默认值描述
topicsString-RocketMQ 主题名称。如果有多个主题,使用 , 分隔,例如:"tpc1,tpc2"
name.srv.addrString-RocketMQ 名称服务器集群地址。
tagsString-RocketMQ 标签名称。如果有多个标签,使用 , 分隔,例如:"tag1,tag2"
acl.enabledBooleanfalse如果为 true,启用访问控制,需要配置访问密钥和秘密密钥。
access.keyString访问密钥
secret.keyString当 ACL_ENABLED 为 true 时,秘密密钥不能为空。
batch.sizeint100RocketMQ 消费者拉取批大小
consumer.groupStringSeaTunnel-Consumer-GroupRocketMQ 消费者组 ID,用于区分不同的消费者组。
commit.on.checkpointBooleantrue如果为 true,消费者的偏移量将在后台定期提交。
schema-数据的结构,包括字段名称和字段类型。
formatStringjson数据格式。默认格式是 json。可选 text 格式。默认字段分隔符是 ","。如果自定义分隔符,添加 "field.delimiter" 选项。
field.delimiterString,自定义数据格式的字段分隔符
start.modeStringCONSUME_FROM_GROUP_OFFSETS消费者的初始消费模式,有几种类型:[CONSUME_FROM_LAST_OFFSET],[CONSUME_FROM_FIRST_OFFSET],[CONSUME_FROM_GROUP_OFFSETS],[CONSUME_FROM_TIMESTAMP],[CONSUME_FROM_SPECIFIC_OFFSETS]
start.mode.offsets消费模式为 "CONSUME_FROM_SPECIFIC_OFFSETS" 所需的偏移量
start.mode.timestampLong消费模式为 "CONSUME_FROM_TIMESTAMP" 所需的时间。
partition.discovery.interval.millislong-1动态发现主题和分区的间隔。
ignore_parse_errorsBooleanfalse可选标志,跳过解析错误而不是失败。
common-optionsconfig-源插件通用参数,请参考 源通用选项 详见。

start.mode.offsets

消费模式为 "CONSUME_FROM_SPECIFIC_OFFSETS" 所需的偏移量。

例如:

start.mode.offsets = {
topic1-0 = 70
topic1-1 = 10
topic1-2 = 10
}

任务示例

简单

消费者读取 Rocketmq 数据并将其打印到控制台

env {
parallelism = 1
job.mode = "BATCH"
}

source {
Rocketmq {
name.srv.addr = "rocketmq-e2e:9876"
topics = "test_topic_json"
plugin_output = "rocketmq_table"
schema = {
fields {
id = bigint
c_map = "map<string, smallint>"
c_array = "array<tinyint>"
c_string = string
c_boolean = boolean
c_tinyint = tinyint
c_smallint = smallint
c_int = int
c_bigint = bigint
c_float = float
c_double = double
c_decimal = "decimal(2, 1)"
c_bytes = bytes
c_date = date
c_timestamp = timestamp
}
}
}
}

transform {
# 如果您想了解有关如何配置 seatunnel 的更多信息并查看完整的转换插件列表,
# 请访问 https://seatunnel.apache.org/docs/category/transform
}

sink {
Console {
}
}

指定格式消费简单

当我以 json 格式消费主题数据并解析,每次拉取的条数是 400,消费从原始位置开始

env {
parallelism = 1
job.mode = "BATCH"
}

source {
Rocketmq {
name.srv.addr = "localhost:9876"
topics = "test_topic"
plugin_output = "rocketmq_table"
start.mode = "CONSUME_FROM_FIRST_OFFSET"
batch.size = "400"
consumer.group = "test_topic_group"
format = json
schema = {
fields {
c_map = "map<string, string>"
c_array = "array<int>"
c_string = string
c_boolean = boolean
c_tinyint = tinyint
c_smallint = smallint
c_int = int
c_bigint = bigint
c_float = float
c_double = double
c_decimal = "decimal(30, 8)"
c_bytes = bytes
c_date = date
c_timestamp = timestamp
}
}
}
}

transform {
# 如果您想了解有关如何配置 seatunnel 的更多信息并查看完整的转换插件列表,
# 请访问 https://seatunnel.apache.org/docs/category/transform
}

sink {
Console {
}
}

变更日志

Change Log
ChangeCommitVersion
[Improve][API] Optimize the enumerator API semantics and reduce lock calls at the connector level (#9671)https://github.com/apache/seatunnel/commit/9212a771402.3.12
[improve] rocketmq options (#9251)https://github.com/apache/seatunnel/commit/4cbe3b91722.3.12
[Feature][Checkpoint] Add check script for source/sink state class serialVersionUID missing (#9118)https://github.com/apache/seatunnel/commit/4f5adeb1c72.3.11
[Improve][Connector-V2] RocketMQ Source add message tag config (#8825)https://github.com/apache/seatunnel/commit/5913e8c35f2.3.10
[Improve][Connector-V2] Add optional flag for rocketmq connector to skip parse errors instead of failing (#8737)https://github.com/apache/seatunnel/commit/701f17b5d42.3.10
[Improve][Connector-V2] RocketMQ Sink add message tag config (#7996)https://github.com/apache/seatunnel/commit/97a1b00e482.3.9
[Feature][Restapi] Allow metrics information to be associated to logical plan nodes (#7786)https://github.com/apache/seatunnel/commit/6b7c53d03c2.3.9
[Fix][Connector-V2] Fix some throwable error not be caught (#7657)https://github.com/apache/seatunnel/commit/e19d73282e2.3.8
[Feature][Kafka] Support multi-table source read (#5992)https://github.com/apache/seatunnel/commit/60104602d12.3.6
[Fix][connector-rocketmq] commit a correct offset to broker & reduce ThreadInterruptedException log (#6668)https://github.com/apache/seatunnel/commit/b7480e1a892.3.6
[fix][connector-rocketmq]Fix a NPE problem when checkpoint.interval is set too small(#6624) (#6625)https://github.com/apache/seatunnel/commit/6e0c81d4922.3.5
[Test][E2E] Add thread leak check for connector (#5773)https://github.com/apache/seatunnel/commit/1f2f3fc5f02.3.4
[Fix][Connector] Rocketmq source startOffset greater than endOffset error (#6287)https://github.com/apache/seatunnel/commit/cd44b5894e2.3.4
[Improve][Common] Introduce new error define rule (#5793)https://github.com/apache/seatunnel/commit/9d1b2582b22.3.4
[Improve] Remove use SeaTunnelSink::getConsumedType method and mark it as deprecated (#5755)https://github.com/apache/seatunnel/commit/8de74081002.3.4
[Improve][CheckStyle] Remove useless 'SuppressWarnings' annotation of checkstyle. (#5260)https://github.com/apache/seatunnel/commit/51c0d709ba2.3.4
[Improve][pom] Formatting pom (#4761)https://github.com/apache/seatunnel/commit/1d6d3815ec2.3.2
[Hotfix][Connector-V2][RocketMQ] Fix rocketmq spark e2e test cases (#4583)https://github.com/apache/seatunnel/commit/e711f6ef4c2.3.2
[Feature][Connector-V2] Add rocketmq source and sink (#4007)https://github.com/apache/seatunnel/commit/e3338975522.3.2