跳到主要内容
版本:Next

Apache Iceberg

Apache Iceberg 源连接器

支持 Iceberg 版本

  • 1.6.1

支持这些引擎

Spark
Flink
SeaTunnel Zeta

关键特性

描述

Apache Iceberg 的源连接器。它可以支持批处理和流模式。

支持的数据源信息

数据源依赖Maven
Iceberghive-exec下载
Iceberglibfb303下载

数据库依赖

为了与不同版本的 Hadoop 和 Hive 兼容,项目 pom 文件中 hive-exec 的范围是 provided,所以如果您使用 Flink 引擎,首先您可能需要将以下 Jar 包添加到 <FLINK_HOME>/lib 目录,如果您使用 Spark 引擎并与 Hadoop 集成,则不需要添加以下 Jar 包。如果您使用 hadoop s3 目录,您需要为您的 Flink 和 Spark 引擎版本添加 hadoop-aws、aws-java-sdk jars。(其他位置:<FLINK_HOME>/lib、<SPARK_HOME>/jars)

hive-exec-xxx.jar
libfb303-xxx.jar

hive-exec 包的某些版本没有 libfb303-xxx.jar,所以您还需要手动导入 Jar 包。

数据类型映射

Iceberg 数据类型SeaTunnel 数据类型
BOOLEANBOOLEAN
INTEGERINT
LONGBIGINT
FLOATFLOAT
DOUBLEDOUBLE
DATEDATE
TIMETIME
TIMESTAMPTIMESTAMP
STRINGSTRING
FIXED
BINARY
BYTES
DECIMALDECIMAL
STRUCTROW
LISTARRAY
MAPMAP

源选项

参数名类型必须默认值描述
catalog_namestring-用户指定的目录名称。
namespacestring-后端目录中的 iceberg 数据库名称。
tablestring-后端目录中的 iceberg 表名称。
table_liststring-后端目录中的 iceberg 表列表。
iceberg.catalog.configmap-指定初始化 Iceberg 目录的属性,可以在此文件中引用:https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/CatalogProperties.java
hadoop.configmap-传递给 Hadoop 配置的属性
iceberg.hadoop-conf-pathstring-为 'core-site.xml'、'hdfs-site.xml'、'hive-site.xml' 文件指定的加载路径。
schemaconfig-使用投影来选择数据列和列顺序。
case_sensitivebooleanfalse如果通过 schema [config] 选择了数据列,控制是否将与 schema 的匹配进行区分大小写。
start_snapshot_timestamplong-指示此扫描从表的最新快照开始查找更改,从给定的时间戳开始。
timestamp – 自 Unix 纪元以来的时间戳(毫秒)
start_snapshot_idlong-指示此扫描从特定快照(独占)开始查找更改。
end_snapshot_idlong-指示此扫描查找更改直到特定快照(包含)。
use_snapshot_idlong-指示此扫描使用给定的快照 ID。
use_snapshot_timestamplong-指示此扫描使用给定时间(毫秒)的最新快照。timestamp – 自 Unix 纪元以来的时间戳(毫秒)
stream_scan_strategyenumFROM_LATEST_SNAPSHOT流模式执行的启动策略,如果不指定任何值,默认使用 FROM_LATEST_SNAPSHOT,可选值为:
TABLE_SCAN_THEN_INCREMENTAL:执行常规表扫描,然后切换到增量模式。
FROM_LATEST_SNAPSHOT:从最新快照(包含)开始增量模式。
FROM_EARLIEST_SNAPSHOT:从最早快照(包含)开始增量模式。
FROM_SNAPSHOT_ID:从具有特定 id(包含)的快照开始增量模式。
FROM_SNAPSHOT_TIMESTAMP:从具有特定时间戳(包含)的快照开始增量模式。
increment.scan-intervallong2000增量扫描的间隔(毫秒)
common-options-源插件通用参数,请参考 源通用选项 详见。
queryString-用于选择 iceberg 数据的 select DML。它不能包含表名,也不支持别名。例如:select * from table where f1 > 100select fn from table where f1 > 100。当前对 LIKE 语法的支持是有限的:LIKE 子句不应以 % 开头。支持的是:select f1 from t where f2 like 'tom%'

任务示例

简单

env {
parallelism = 2
job.mode = "BATCH"
}

source {
Iceberg {
catalog_name = "seatunnel"
iceberg.catalog.config={
type = "hadoop"
warehouse = "file:///tmp/seatunnel/iceberg/hadoop/"
}
namespace = "database1"
table = "source"
query = "select fn from table where f1 > 100"
plugin_output = "iceberg"
}
}

transform {
}

sink {
Console {
plugin_input = "iceberg"
}
}

多表读取

source {
Iceberg {
catalog_name = "seatunnel"
iceberg.catalog.config = {
type = "hadoop"
warehouse = "file:///tmp/seatunnel/iceberg/hadoop/"
}
namespace = "database1"
table_list = [
{
table = "table_1"
},
{
table = "table_2"
query = "select fn from table where f1 > 100"
}
]

plugin_output = "iceberg"
}
}

Hadoop S3 目录

source {
iceberg {
catalog_name = "seatunnel"
iceberg.catalog.config={
"type"="hadoop"
"warehouse"="s3a://your_bucket/spark/warehouse/"
}
hadoop.config={
"fs.s3a.aws.credentials.provider" = "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"
"fs.s3a.endpoint" = "s3.cn-north-1.amazonaws.com.cn"
"fs.s3a.access.key" = "xxxxxxxxxxxxxxxxx"
"fs.s3a.secret.key" = "xxxxxxxxxxxxxxxxx"
"fs.defaultFS" = "s3a://your_bucket"
}
namespace = "your_iceberg_database"
table = "your_iceberg_table"
plugin_output = "iceberg_test"
}
}

Hive 目录

source {
Iceberg {
catalog_name = "seatunnel"
iceberg.catalog.config={
type = "hive"
uri = "thrift://localhost:9083"
warehouse = "hdfs://your_cluster//tmp/seatunnel/iceberg/"
}
catalog_type = "hive"

namespace = "your_iceberg_database"
table = "your_iceberg_table"
}
}

列投影

source {
Iceberg {
catalog_name = "seatunnel"
iceberg.catalog.config={
type = "hadoop"
warehouse = "hdfs://your_cluster/tmp/seatunnel/iceberg/"
}
namespace = "your_iceberg_database"
table = "your_iceberg_table"

schema {
fields {
f2 = "boolean"
f1 = "bigint"
f3 = "int"
f4 = "bigint"
}
}
}
}

变更日志

Change Log
ChangeCommitVersion
[Chore] fix typos filed -> field (#9757)https://github.com/apache/seatunnel/commit/e3e1c67d292.3.12
[Improve][Core] Unify the aws-sdk-v2 version to 2.31.30 (#9698)https://github.com/apache/seatunnel/commit/41c251cc8a2.3.12
[Improve][API] Optimize the enumerator API semantics and reduce lock calls at the connector level (#9671)https://github.com/apache/seatunnel/commit/9212a771402.3.12
[Bug][Connector-V2] Fix the issue of writing the ORC format Iceberg report "Illegal provider-class name" (#6754) (#9588)https://github.com/apache/seatunnel/commit/74b193dd5a2.3.12
[Bug][Connector-V2] Updates Iceberg version to 1.6.1 (#9387) (#9451)https://github.com/apache/seatunnel/commit/7b92a6c5c12.3.12
[Fix][Connector-Iceberg] Fix Time Zone Issue for Iceberg Timestamp Type (#9460)https://github.com/apache/seatunnel/commit/60cd4976102.3.12
[Feature][Connector-V2] Iceberg add glue catalog support (#9247)https://github.com/apache/seatunnel/commit/ecff2e86182.3.11
[Improve] Remove useless iceberg sink config iceberg.table.config (#9307)https://github.com/apache/seatunnel/commit/fbdf39ebf22.3.11
[Improve][connector-iceberg] fix schema change event (#9217)https://github.com/apache/seatunnel/commit/56669095b72.3.11
[Feature][Transform] Support define sink column type (#9114)https://github.com/apache/seatunnel/commit/ab7119e5072.3.11
[Feat][Connector-v2][Iceberg]support filter conditions in iceberg source (#9095)https://github.com/apache/seatunnel/commit/0eb72780ee2.3.11
[Feature][Checkpoint] Add check script for source/sink state class serialVersionUID missing (#9118)https://github.com/apache/seatunnel/commit/4f5adeb1c72.3.11
[Fix][API] Fixed not invoke the SinkAggregatedCommitter's init method (#9070)https://github.com/apache/seatunnel/commit/df0d11d6322.3.11
[Improve] iceberg options (#8967)https://github.com/apache/seatunnel/commit/82a374ec872.3.10
[Improve] restruct connector common options (#8634)https://github.com/apache/seatunnel/commit/f3499a6eeb2.3.10
[Feature][Iceberg] Support read multi-table (#8524)https://github.com/apache/seatunnel/commit/2bfb97e5022.3.10
[Improve][Iceberg] Filter catalog table primaryKey is empty (#8413)https://github.com/apache/seatunnel/commit/857aab5e832.3.9
[Improve][Connector-V2] Reduce the create times of iceberg sink writer (#8155)https://github.com/apache/seatunnel/commit/45a7a715a22.3.9
[Improve][dist]add shade check rule (#8136)https://github.com/apache/seatunnel/commit/51ef8000162.3.9
[Feature][Iceberg] Support custom delete sql for sink savemode (#8094)https://github.com/apache/seatunnel/commit/29ca928c362.3.9
[Improve][Connector-V2] Reduce the request times of iceberg load table (#8149)https://github.com/apache/seatunnel/commit/555f5eb4042.3.9
[Feature][Core] Support cdc task ddl restore for zeta (#7463)https://github.com/apache/seatunnel/commit/8e322281ed2.3.9
[Improve][Iceberg] Support table comment for catalog (#7936)https://github.com/apache/seatunnel/commit/72ab38f3172.3.9
[Feature][Restapi] Allow metrics information to be associated to logical plan nodes (#7786)https://github.com/apache/seatunnel/commit/6b7c53d03c2.3.9
[Fix][Connector-V2] Fix iceberg throw java: package sun.security.krb5 does not exist when use jdk 11 (#7734)https://github.com/apache/seatunnel/commit/116af4febc2.3.8
[Hotfix][Connector-V2] Release resources when task is closed for iceberg sinkwriter (#7729)https://github.com/apache/seatunnel/commit/ff281183bd2.3.8
[Fix][Connector-V2] Fixed iceberg sink can not handle uppercase fields (#7660)https://github.com/apache/seatunnel/commit/b7be0cb4a12.3.8
[Hotfix][CDC] Fix ddl duplicate execution error when config multi_table_sink_replica (#7634)https://github.com/apache/seatunnel/commit/23ab3edbbb2.3.8
[Improve][Iceberg] Add savemode create table primaryKey testcase (#7641)https://github.com/apache/seatunnel/commit/6b36f90f4d2.3.8
[Hotfix] Fix iceberg missing column comment when savemode create table (#7608)https://github.com/apache/seatunnel/commit/b35bd94bfb2.3.8
[Improve][Connector-V2] Remove hard code iceberg table format version (#7500)https://github.com/apache/seatunnel/commit/f49b263e652.3.8
[Improve][API] Move catalog open to SaveModeHandler (#7439)https://github.com/apache/seatunnel/commit/8c2c5c79a12.3.8
[Feature][Connector-V2][Iceberg] Support Iceberg Kerberos (#7246)https://github.com/apache/seatunnel/commit/e3001207c82.3.8
[Improve][Connector] Add multi-table sink option check (#7360)https://github.com/apache/seatunnel/commit/2489f6446b2.3.7
[Feature][Core] Support using upstream table placeholders in sink options and auto replacement (#7131)https://github.com/apache/seatunnel/commit/c4ca74122c2.3.6
[Bug][Connector-Iceberg]fix create iceberg v2 table with pks (#6895)https://github.com/apache/seatunnel/commit/40d2c1b2132.3.6
[Feature][Connector-V2] Iceberg-sink supports writing data to branches (#6697)https://github.com/apache/seatunnel/commit/e3103535cc2.3.6
[Fix][Connector-V2] Fix connector support SPI but without no args constructor (#6551)https://github.com/apache/seatunnel/commit/5f3c9c36a52.3.5
[Improve] Add SaveMode log of process detail (#6375)https://github.com/apache/seatunnel/commit/b0d70ce2242.3.5
[Improve][Zeta] Add classloader cache mode to fix metaspace leak (#6355)https://github.com/apache/seatunnel/commit/9c3c2f183d2.3.5
[Improve][API] Unify type system api(data & type) (#5872)https://github.com/apache/seatunnel/commit/b38c7edcc92.3.5
[Feature] Supports iceberg sink #6198 (#6265)https://github.com/apache/seatunnel/commit/18d3e861942.3.5
[Test][E2E] Add thread leak check for connector (#5773)https://github.com/apache/seatunnel/commit/1f2f3fc5f02.3.4
[Improve][Common] Introduce new error define rule (#5793)https://github.com/apache/seatunnel/commit/9d1b2582b22.3.4
[BUG][Connector-V2] Iceberg source lost data with parallelism option (#5732)https://github.com/apache/seatunnel/commit/7f3b4be0752.3.4
[Dependency]Bump org.apache.avro:avro in /seatunnel-connectors-v2/connector-iceberg (#5582)https://github.com/apache/seatunnel/commit/13753a927b2.3.4
[Improve][Pom] Add junit4 to the root pom (#5611)https://github.com/apache/seatunnel/commit/7b4f7db2a22.3.4
Support config column/primaryKey/constraintKey in schema (#5564)https://github.com/apache/seatunnel/commit/eac76b4e502.3.4
[Doc][Iceberg] Improved iceberg documentation (#5335)https://github.com/apache/seatunnel/commit/659a68a0be2.3.4
[Hotfix] Fix com.google.common.base.Preconditions to seatunnel shade one (#5284)https://github.com/apache/seatunnel/commit/ed5eadcf732.3.3
[Hotfix][Connector][Iceberg] Fix iceberg source stream mode init error (#4638)https://github.com/apache/seatunnel/commit/64760eed4d2.3.2
Merge branch 'dev' into merge/cdchttps://github.com/apache/seatunnel/commit/4324ee19122.3.1
[Improve][Project] Code format with spotless plugin.https://github.com/apache/seatunnel/commit/423b5830382.3.1
[Improve][SourceConnector] Unifie Iceberg source fields to schema (#3959)https://github.com/apache/seatunnel/commit/20e1255fab2.3.1
[improve][api] Refactoring schema parse (#4157)https://github.com/apache/seatunnel/commit/b2f573a13e2.3.1
[Improve][build] Give the maven module a human readable name (#4114)https://github.com/apache/seatunnel/commit/d7cd6010512.3.1
[Improve][Project] Code format with spotless plugin. (#4101)https://github.com/apache/seatunnel/commit/a2ab1665612.3.1
[Improve][Connector-V2][Iceberg] Unified exception for iceberg source connector (#3677)https://github.com/apache/seatunnel/commit/e24843515f2.3.1
[Feature][Connector] add get source method to all source connector (#3846)https://github.com/apache/seatunnel/commit/417178fb842.3.1
[Feature][API &amp; Connector &amp; Doc] add parallelism and column projection interface (#3829)https://github.com/apache/seatunnel/commit/b9164b8ba12.3.1
[Hotfix][OptionRule] Fix option rule about all connectors (#3592)https://github.com/apache/seatunnel/commit/226dc6a1192.3.0
[Feature][Connector-V2][Iceberg] Modify the scope of flink-shaded-hadoop-2 to provided to be compatible with hadoop3.x (#3046)https://github.com/apache/seatunnel/commit/b38c50789f2.3.0
[Feature][Connector V2] expose configurable options in Iceberg (#3394)https://github.com/apache/seatunnel/commit/bd9a313ded2.3.0
[Improve][Connector][Iceberg] Improve code. (#3065)https://github.com/apache/seatunnel/commit/9f38e3da742.3.0-beta
[Code-Improve][Iceberg] Use automatic resource management to replace 'try - finally' code block. (#2909)https://github.com/apache/seatunnel/commit/b7f640724b2.3.0-beta
[Feature][Connector-V2] Add iceberg source connector (#2615)https://github.com/apache/seatunnel/commit/ffc6088a792.2.0-beta