Apache Iceberg
Apache Iceberg 源连接器
支持 Iceberg 版本
- 1.6.1
支持这些引擎
Spark
Flink
SeaTunnel Zeta
关键特性
- 批
- 流
- 精确一次
- 列投影
- 并行性
- 支持用户自定义split
- 数据格式
- parquet
- orc
- avro
- iceberg 目录
- hadoop(2.7.1 , 2.7.5 , 3.1.3)
- hive(2.3.9 , 3.1.2)
描述
Apache Iceberg 的源连接器。它可以支持批处理和流模式。
支持的数据源信息
| 数据源 | 依赖 | Maven |
|---|---|---|
| Iceberg | hive-exec | 下载 |
| Iceberg | libfb303 | 下载 |
数据库依赖
为了与不同版本的 Hadoop 和 Hive 兼容,项目 pom 文件中 hive-exec 的范围是 provided,所以如果您使用 Flink 引擎,首先您可能需要将以下 Jar 包添加到 <FLINK_HOME>/lib 目录,如果您使用 Spark 引擎并与 Hadoop 集成,则不需要添加以下 Jar 包。如果您使用 hadoop s3 目录,您需要为您的 Flink 和 Spark 引擎版本添加 hadoop-aws、aws-java-sdk jars。(其他位置:<FLINK_HOME>/lib、<SPARK_HOME>/jars)
hive-exec-xxx.jar
libfb303-xxx.jar
hive-exec 包的某些版本没有 libfb303-xxx.jar,所以您还需要手动导入 Jar 包。
数据类型映射
| Iceberg 数据类型 | SeaTunnel 数据类型 |
|---|---|
| BOOLEAN | BOOLEAN |
| INTEGER | INT |
| LONG | BIGINT |
| FLOAT | FLOAT |
| DOUBLE | DOUBLE |
| DATE | DATE |
| TIME | TIME |
| TIMESTAMP | TIMESTAMP |
| STRING | STRING |
| FIXED BINARY | BYTES |
| DECIMAL | DECIMAL |
| STRUCT | ROW |
| LIST | ARRAY |
| MAP | MAP |
源选项
| 参数名 | 类型 | 必须 | 默认值 | 描述 |
|---|---|---|---|---|
| catalog_name | string | 是 | - | 用户指定的目录名称。 |
| namespace | string | 是 | - | 后端目录中的 iceberg 数据库名称。 |
| table | string | 否 | - | 后端目录中的 iceberg 表名称。 |
| table_list | string | 否 | - | 后端目录中的 iceberg 表列表。 |
| iceberg.catalog.config | map | 是 | - | 指定初始化 Iceberg 目录的属性,可以在此文件中引用:https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/CatalogProperties.java |
| hadoop.config | map | 否 | - | 传递给 Hadoop 配置的属性 |
| iceberg.hadoop-conf-path | string | 否 | - | 为 'core-site.xml'、'hdfs-site.xml'、'hive-site.xml' 文件指定的加载路径。 |
| schema | config | 否 | - | 使用投影来选择数据列和列顺序。 |
| case_sensitive | boolean | 否 | false | 如果通过 schema [config] 选择了数据列,控制是否将与 schema 的匹配进行区分大小写。 |
| start_snapshot_timestamp | long | 否 | - | 指示此扫描从表的最新快照开始查找更改,从给定的时间戳开始。 timestamp – 自 Unix 纪元以来的时间戳(毫秒) |
| start_snapshot_id | long | 否 | - | 指示此扫描从特定快照(独占)开始查找更改。 |
| end_snapshot_id | long | 否 | - | 指示此扫描查找更改直到特定快照(包含)。 |
| use_snapshot_id | long | 否 | - | 指示此扫描使用给定的快照 ID。 |
| use_snapshot_timestamp | long | 否 | - | 指示此扫描使用给定时间(毫秒)的最新快照。timestamp – 自 Unix 纪元以来的时间戳(毫秒) |
| stream_scan_strategy | enum | 否 | FROM_LATEST_SNAPSHOT | 流模式执行的启动策略,如果不指定任何值,默认使用 FROM_LATEST_SNAPSHOT,可选值为:TABLE_SCAN_THEN_INCREMENTAL:执行常规表扫描,然后切换到增量模式。 FROM_LATEST_SNAPSHOT:从最新快照(包含)开始增量模式。 FROM_EARLIEST_SNAPSHOT:从最早快照(包含)开始增量模式。 FROM_SNAPSHOT_ID:从具有特定 id(包含)的快照开始增量模式。 FROM_SNAPSHOT_TIMESTAMP:从具有特定时间戳(包含)的快照开始增量模式。 |
| increment.scan-interval | long | 否 | 2000 | 增量扫描的间隔(毫秒) |
| common-options | 否 | - | 源插件通用参数,请参考 源通用选项 详见。 | |
| query | String | 否 | - | 用于选择 iceberg 数据的 select DML。它不能包含表名,也不支持别名。例如:select * from table where f1 > 100、select fn from table where f1 > 100。当前对 LIKE 语法的支持是有限的:LIKE 子句不应以 % 开头。支持的是:select f1 from t where f2 like 'tom%' |
任务示例
简单
env {
parallelism = 2
job.mode = "BATCH"
}
source {
Iceberg {
catalog_name = "seatunnel"
iceberg.catalog.config={
type = "hadoop"
warehouse = "file:///tmp/seatunnel/iceberg/hadoop/"
}
namespace = "database1"
table = "source"
query = "select fn from table where f1 > 100"
plugin_output = "iceberg"
}
}
transform {
}
sink {
Console {
plugin_input = "iceberg"
}
}
多表读取
source {
Iceberg {
catalog_name = "seatunnel"
iceberg.catalog.config = {
type = "hadoop"
warehouse = "file:///tmp/seatunnel/iceberg/hadoop/"
}
namespace = "database1"
table_list = [
{
table = "table_1"
},
{
table = "table_2"
query = "select fn from table where f1 > 100"
}
]
plugin_output = "iceberg"
}
}
Hadoop S3 目录
source {
iceberg {
catalog_name = "seatunnel"
iceberg.catalog.config={
"type"="hadoop"
"warehouse"="s3a://your_bucket/spark/warehouse/"
}
hadoop.config={
"fs.s3a.aws.credentials.provider" = "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"
"fs.s3a.endpoint" = "s3.cn-north-1.amazonaws.com.cn"
"fs.s3a.access.key" = "xxxxxxxxxxxxxxxxx"
"fs.s3a.secret.key" = "xxxxxxxxxxxxxxxxx"
"fs.defaultFS" = "s3a://your_bucket"
}
namespace = "your_iceberg_database"
table = "your_iceberg_table"
plugin_output = "iceberg_test"
}
}
Hive 目录
source {
Iceberg {
catalog_name = "seatunnel"
iceberg.catalog.config={
type = "hive"
uri = "thrift://localhost:9083"
warehouse = "hdfs://your_cluster//tmp/seatunnel/iceberg/"
}
catalog_type = "hive"
namespace = "your_iceberg_database"
table = "your_iceberg_table"
}
}
列投影
source {
Iceberg {
catalog_name = "seatunnel"
iceberg.catalog.config={
type = "hadoop"
warehouse = "hdfs://your_cluster/tmp/seatunnel/iceberg/"
}
namespace = "your_iceberg_database"
table = "your_iceberg_table"
schema {
fields {
f2 = "boolean"
f1 = "bigint"
f3 = "int"
f4 = "bigint"
}
}
}
}
变更日志
Change Log
| Change | Commit | Version |
|---|---|---|
| [Chore] fix typos filed -> field (#9757) | https://github.com/apache/seatunnel/commit/e3e1c67d29 | 2.3.12 |
| [Improve][Core] Unify the aws-sdk-v2 version to 2.31.30 (#9698) | https://github.com/apache/seatunnel/commit/41c251cc8a | 2.3.12 |
| [Improve][API] Optimize the enumerator API semantics and reduce lock calls at the connector level (#9671) | https://github.com/apache/seatunnel/commit/9212a77140 | 2.3.12 |
| [Bug][Connector-V2] Fix the issue of writing the ORC format Iceberg report "Illegal provider-class name" (#6754) (#9588) | https://github.com/apache/seatunnel/commit/74b193dd5a | 2.3.12 |
| [Bug][Connector-V2] Updates Iceberg version to 1.6.1 (#9387) (#9451) | https://github.com/apache/seatunnel/commit/7b92a6c5c1 | 2.3.12 |
| [Fix][Connector-Iceberg] Fix Time Zone Issue for Iceberg Timestamp Type (#9460) | https://github.com/apache/seatunnel/commit/60cd497610 | 2.3.12 |
| [Feature][Connector-V2] Iceberg add glue catalog support (#9247) | https://github.com/apache/seatunnel/commit/ecff2e8618 | 2.3.11 |
[Improve] Remove useless iceberg sink config iceberg.table.config (#9307) | https://github.com/apache/seatunnel/commit/fbdf39ebf2 | 2.3.11 |
| [Improve][connector-iceberg] fix schema change event (#9217) | https://github.com/apache/seatunnel/commit/56669095b7 | 2.3.11 |
| [Feature][Transform] Support define sink column type (#9114) | https://github.com/apache/seatunnel/commit/ab7119e507 | 2.3.11 |
| [Feat][Connector-v2][Iceberg]support filter conditions in iceberg source (#9095) | https://github.com/apache/seatunnel/commit/0eb72780ee | 2.3.11 |
| [Feature][Checkpoint] Add check script for source/sink state class serialVersionUID missing (#9118) | https://github.com/apache/seatunnel/commit/4f5adeb1c7 | 2.3.11 |
[Fix][API] Fixed not invoke the SinkAggregatedCommitter's init method (#9070) | https://github.com/apache/seatunnel/commit/df0d11d632 | 2.3.11 |
| [Improve] iceberg options (#8967) | https://github.com/apache/seatunnel/commit/82a374ec87 | 2.3.10 |
| [Improve] restruct connector common options (#8634) | https://github.com/apache/seatunnel/commit/f3499a6eeb | 2.3.10 |
| [Feature][Iceberg] Support read multi-table (#8524) | https://github.com/apache/seatunnel/commit/2bfb97e502 | 2.3.10 |
| [Improve][Iceberg] Filter catalog table primaryKey is empty (#8413) | https://github.com/apache/seatunnel/commit/857aab5e83 | 2.3.9 |
| [Improve][Connector-V2] Reduce the create times of iceberg sink writer (#8155) | https://github.com/apache/seatunnel/commit/45a7a715a2 | 2.3.9 |
| [Improve][dist]add shade check rule (#8136) | https://github.com/apache/seatunnel/commit/51ef800016 | 2.3.9 |
| [Feature][Iceberg] Support custom delete sql for sink savemode (#8094) | https://github.com/apache/seatunnel/commit/29ca928c36 | 2.3.9 |
| [Improve][Connector-V2] Reduce the request times of iceberg load table (#8149) | https://github.com/apache/seatunnel/commit/555f5eb404 | 2.3.9 |
| [Feature][Core] Support cdc task ddl restore for zeta (#7463) | https://github.com/apache/seatunnel/commit/8e322281ed | 2.3.9 |
| [Improve][Iceberg] Support table comment for catalog (#7936) | https://github.com/apache/seatunnel/commit/72ab38f317 | 2.3.9 |
| [Feature][Restapi] Allow metrics information to be associated to logical plan nodes (#7786) | https://github.com/apache/seatunnel/commit/6b7c53d03c | 2.3.9 |
| [Fix][Connector-V2] Fix iceberg throw java: package sun.security.krb5 does not exist when use jdk 11 (#7734) | https://github.com/apache/seatunnel/commit/116af4febc | 2.3.8 |
| [Hotfix][Connector-V2] Release resources when task is closed for iceberg sinkwriter (#7729) | https://github.com/apache/seatunnel/commit/ff281183bd | 2.3.8 |
| [Fix][Connector-V2] Fixed iceberg sink can not handle uppercase fields (#7660) | https://github.com/apache/seatunnel/commit/b7be0cb4a1 | 2.3.8 |
| [Hotfix][CDC] Fix ddl duplicate execution error when config multi_table_sink_replica (#7634) | https://github.com/apache/seatunnel/commit/23ab3edbbb | 2.3.8 |
| [Improve][Iceberg] Add savemode create table primaryKey testcase (#7641) | https://github.com/apache/seatunnel/commit/6b36f90f4d | 2.3.8 |
| [Hotfix] Fix iceberg missing column comment when savemode create table (#7608) | https://github.com/apache/seatunnel/commit/b35bd94bfb | 2.3.8 |
| [Improve][Connector-V2] Remove hard code iceberg table format version (#7500) | https://github.com/apache/seatunnel/commit/f49b263e65 | 2.3.8 |
| [Improve][API] Move catalog open to SaveModeHandler (#7439) | https://github.com/apache/seatunnel/commit/8c2c5c79a1 | 2.3.8 |
| [Feature][Connector-V2][Iceberg] Support Iceberg Kerberos (#7246) | https://github.com/apache/seatunnel/commit/e3001207c8 | 2.3.8 |
| [Improve][Connector] Add multi-table sink option check (#7360) | https://github.com/apache/seatunnel/commit/2489f6446b | 2.3.7 |
| [Feature][Core] Support using upstream table placeholders in sink options and auto replacement (#7131) | https://github.com/apache/seatunnel/commit/c4ca74122c | 2.3.6 |
| [Bug][Connector-Iceberg]fix create iceberg v2 table with pks (#6895) | https://github.com/apache/seatunnel/commit/40d2c1b213 | 2.3.6 |
| [Feature][Connector-V2] Iceberg-sink supports writing data to branches (#6697) | https://github.com/apache/seatunnel/commit/e3103535cc | 2.3.6 |
| [Fix][Connector-V2] Fix connector support SPI but without no args constructor (#6551) | https://github.com/apache/seatunnel/commit/5f3c9c36a5 | 2.3.5 |
| [Improve] Add SaveMode log of process detail (#6375) | https://github.com/apache/seatunnel/commit/b0d70ce224 | 2.3.5 |
| [Improve][Zeta] Add classloader cache mode to fix metaspace leak (#6355) | https://github.com/apache/seatunnel/commit/9c3c2f183d | 2.3.5 |
| [Improve][API] Unify type system api(data & type) (#5872) | https://github.com/apache/seatunnel/commit/b38c7edcc9 | 2.3.5 |
| [Feature] Supports iceberg sink #6198 (#6265) | https://github.com/apache/seatunnel/commit/18d3e86194 | 2.3.5 |
| [Test][E2E] Add thread leak check for connector (#5773) | https://github.com/apache/seatunnel/commit/1f2f3fc5f0 | 2.3.4 |
| [Improve][Common] Introduce new error define rule (#5793) | https://github.com/apache/seatunnel/commit/9d1b2582b2 | 2.3.4 |
| [BUG][Connector-V2] Iceberg source lost data with parallelism option (#5732) | https://github.com/apache/seatunnel/commit/7f3b4be075 | 2.3.4 |
| [Dependency]Bump org.apache.avro:avro in /seatunnel-connectors-v2/connector-iceberg (#5582) | https://github.com/apache/seatunnel/commit/13753a927b | 2.3.4 |
| [Improve][Pom] Add junit4 to the root pom (#5611) | https://github.com/apache/seatunnel/commit/7b4f7db2a2 | 2.3.4 |
| Support config column/primaryKey/constraintKey in schema (#5564) | https://github.com/apache/seatunnel/commit/eac76b4e50 | 2.3.4 |
| [Doc][Iceberg] Improved iceberg documentation (#5335) | https://github.com/apache/seatunnel/commit/659a68a0be | 2.3.4 |
| [Hotfix] Fix com.google.common.base.Preconditions to seatunnel shade one (#5284) | https://github.com/apache/seatunnel/commit/ed5eadcf73 | 2.3.3 |
| [Hotfix][Connector][Iceberg] Fix iceberg source stream mode init error (#4638) | https://github.com/apache/seatunnel/commit/64760eed4d | 2.3.2 |
| Merge branch 'dev' into merge/cdc | https://github.com/apache/seatunnel/commit/4324ee1912 | 2.3.1 |
| [Improve][Project] Code format with spotless plugin. | https://github.com/apache/seatunnel/commit/423b583038 | 2.3.1 |
| [Improve][SourceConnector] Unifie Iceberg source fields to schema (#3959) | https://github.com/apache/seatunnel/commit/20e1255fab | 2.3.1 |
| [improve][api] Refactoring schema parse (#4157) | https://github.com/apache/seatunnel/commit/b2f573a13e | 2.3.1 |
| [Improve][build] Give the maven module a human readable name (#4114) | https://github.com/apache/seatunnel/commit/d7cd601051 | 2.3.1 |
| [Improve][Project] Code format with spotless plugin. (#4101) | https://github.com/apache/seatunnel/commit/a2ab166561 | 2.3.1 |
| [Improve][Connector-V2][Iceberg] Unified exception for iceberg source connector (#3677) | https://github.com/apache/seatunnel/commit/e24843515f | 2.3.1 |
| [Feature][Connector] add get source method to all source connector (#3846) | https://github.com/apache/seatunnel/commit/417178fb84 | 2.3.1 |
| [Feature][API & Connector & Doc] add parallelism and column projection interface (#3829) | https://github.com/apache/seatunnel/commit/b9164b8ba1 | 2.3.1 |
| [Hotfix][OptionRule] Fix option rule about all connectors (#3592) | https://github.com/apache/seatunnel/commit/226dc6a119 | 2.3.0 |
| [Feature][Connector-V2][Iceberg] Modify the scope of flink-shaded-hadoop-2 to provided to be compatible with hadoop3.x (#3046) | https://github.com/apache/seatunnel/commit/b38c50789f | 2.3.0 |
| [Feature][Connector V2] expose configurable options in Iceberg (#3394) | https://github.com/apache/seatunnel/commit/bd9a313ded | 2.3.0 |
| [Improve][Connector][Iceberg] Improve code. (#3065) | https://github.com/apache/seatunnel/commit/9f38e3da74 | 2.3.0-beta |
| [Code-Improve][Iceberg] Use automatic resource management to replace 'try - finally' code block. (#2909) | https://github.com/apache/seatunnel/commit/b7f640724b | 2.3.0-beta |
| [Feature][Connector-V2] Add iceberg source connector (#2615) | https://github.com/apache/seatunnel/commit/ffc6088a79 | 2.2.0-beta |