Skip to content

Commit 36492a8

Browse files
bobbai00MA77HEW820
authored andcommitted
Add support for switching to Iceberg RESTCatalog (#3225)
This PR adds the support for switching between Hadoop Catalog and REST Catalog for Iceberg. ### How to switch to REST Catalog 1. change the `storage-config.yaml`, see Iceberg.catalog.type, change it to "rest" 2. adjust iceberg.catalog.uri accordingly. ### Tested Iceberg RESTCatalog https://hub.docker.com/r/tabulario/iceberg-rest. This REST Catalog is only for testing purpose. We need to test more production-level catalog implementation.
1 parent c4b7325 commit 36492a8

File tree

5 files changed

+69
-19
lines changed

5 files changed

+69
-19
lines changed

core/workflow-core/src/main/resources/storage-config.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@ storage:
55
database: "texera_storage"
66
commit-batch-size: 1000
77
iceberg:
8+
catalog:
9+
type: hadoop # either hadoop or rest
10+
uri: http://localhost:8181 # the uri of the rest catalog
811
table:
912
namespace: "operator-result"
1013
commit:

core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/IcebergCatalogInstance.scala

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,29 @@ object IcebergCatalogInstance {
1616
/**
1717
* Retrieves the singleton Iceberg catalog instance.
1818
* - If the catalog is not initialized, it is lazily created using the configured properties.
19+
*
1920
* @return the Iceberg catalog instance.
2021
*/
2122
def getInstance(): Catalog = {
2223
instance match {
2324
case Some(catalog) => catalog
2425
case None =>
25-
val hadoopCatalog = IcebergUtil.createHadoopCatalog(
26-
"texera_iceberg",
27-
StorageConfig.fileStorageDirectoryPath
28-
)
29-
instance = Some(hadoopCatalog)
30-
hadoopCatalog
26+
val catalog = StorageConfig.icebergCatalogType match {
27+
case "hadoop" =>
28+
IcebergUtil.createHadoopCatalog(
29+
"texera_iceberg",
30+
StorageConfig.fileStorageDirectoryPath
31+
)
32+
case "rest" =>
33+
IcebergUtil.createRestCatalog(
34+
"texera_iceberg",
35+
StorageConfig.fileStorageDirectoryPath
36+
)
37+
case unsupported =>
38+
throw new IllegalArgumentException(s"Unsupported catalog type: $unsupported")
39+
}
40+
instance = Some(catalog)
41+
catalog
3142
}
3243
}
3344

core/workflow-core/src/main/scala/edu/uci/ics/amber/core/storage/StorageConfig.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ object StorageConfig {
1616
val storageMap = javaConf("storage").asInstanceOf[JMap[String, Any]].asScala.toMap
1717
val mongodbMap = storageMap("mongodb").asInstanceOf[JMap[String, Any]].asScala.toMap
1818
val icebergMap = storageMap("iceberg").asInstanceOf[JMap[String, Any]].asScala.toMap
19+
val icebergCatalogMap = icebergMap("catalog").asInstanceOf[JMap[String, Any]].asScala.toMap
1920
val icebergTableMap = icebergMap("table").asInstanceOf[JMap[String, Any]].asScala.toMap
2021
val icebergCommitMap = icebergTableMap("commit").asInstanceOf[JMap[String, Any]].asScala.toMap
2122
val icebergRetryMap = icebergCommitMap("retry").asInstanceOf[JMap[String, Any]].asScala.toMap
@@ -35,6 +36,7 @@ object StorageConfig {
3536
icebergCommitMap.updated("retry", icebergRetryMap)
3637
)
3738
)
39+
.updated("catalog", icebergCatalogMap)
3840
)
3941
.updated("jdbc", jdbcMap)
4042
)
@@ -97,6 +99,19 @@ object StorageConfig {
9799
.asInstanceOf[Map[String, Any]]("max-wait-ms")
98100
.asInstanceOf[Int]
99101

102+
// Iceberg catalog configurations
103+
val icebergCatalogType: String = conf("storage")
104+
.asInstanceOf[Map[String, Any]]("iceberg")
105+
.asInstanceOf[Map[String, Any]]("catalog")
106+
.asInstanceOf[Map[String, Any]]("type")
107+
.asInstanceOf[String]
108+
109+
val icebergCatalogUri: String = conf("storage")
110+
.asInstanceOf[Map[String, Any]]("iceberg")
111+
.asInstanceOf[Map[String, Any]]("catalog")
112+
.asInstanceOf[Map[String, Any]]("uri")
113+
.asInstanceOf[String]
114+
100115
// JDBC configurations
101116
val jdbcUrl: String = conf("storage")
102117
.asInstanceOf[Map[String, Any]]("jdbc")

core/workflow-core/src/main/scala/edu/uci/ics/amber/util/IcebergUtil.scala

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import org.apache.iceberg.data.{GenericRecord, Record}
1010
import org.apache.iceberg.hadoop.{HadoopCatalog, HadoopFileIO}
1111
import org.apache.iceberg.io.{CloseableIterable, InputFile}
1212
import org.apache.iceberg.parquet.{Parquet, ParquetValueReader}
13+
import org.apache.iceberg.rest.RESTCatalog
1314
import org.apache.iceberg.types.Type.PrimitiveType
1415
import org.apache.iceberg.{
1516
CatalogProperties,
@@ -60,6 +61,38 @@ object IcebergUtil {
6061
catalog
6162
}
6263

64+
/**
65+
* Creates and initializes a RESTCatalog with the given parameters.
66+
* - Configures the catalog to interact with a REST endpoint.
67+
* - The `warehouse` parameter specifies the root directory for storing table data.
68+
* - Sets the file I/O implementation to `HadoopFileIO`.
69+
* - Authentication support is not implemented yet (see TODO).
70+
*
71+
* Note: The only tested REST catalog implementation is `tabulario/iceberg-rest`
72+
* (https://hub.docker.com/r/tabulario/iceberg-rest).
73+
*
74+
* TODO: Add authentication support, such as OAuth2, using `OAuth2Properties`.
75+
*
76+
* @param catalogName the name of the catalog.
77+
* @param warehouse the root path for the warehouse where the tables are stored.
78+
* @return the initialized RESTCatalog instance.
79+
*/
80+
def createRestCatalog(
81+
catalogName: String,
82+
warehouse: Path
83+
): RESTCatalog = {
84+
val catalog = new RESTCatalog()
85+
catalog.initialize(
86+
catalogName,
87+
Map(
88+
"warehouse" -> warehouse.toString,
89+
CatalogProperties.URI -> StorageConfig.icebergCatalogUri,
90+
CatalogProperties.FILE_IO_IMPL -> classOf[HadoopFileIO].getName
91+
).asJava
92+
)
93+
catalog
94+
}
95+
6396
/**
6497
* Creates a new Iceberg table with the specified schema and properties.
6598
* - Drops the existing table if `overrideIfExists` is true and the table already exists.

core/workflow-core/src/test/scala/edu/uci/ics/amber/storage/result/iceberg/IcebergDocumentSpec.scala

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,6 @@
11
package edu.uci.ics.amber.storage.result.iceberg
22

3-
import edu.uci.ics.amber.core.storage.{
4-
DocumentFactory,
5-
IcebergCatalogInstance,
6-
StorageConfig,
7-
VFSURIFactory
8-
}
3+
import edu.uci.ics.amber.core.storage.{DocumentFactory, VFSURIFactory}
94
import edu.uci.ics.amber.core.storage.model.{VirtualDocument, VirtualDocumentSpec}
105
import edu.uci.ics.amber.core.tuple.{Attribute, AttributeType, Schema, Tuple}
116
import edu.uci.ics.amber.core.virtualidentity.{
@@ -56,13 +51,6 @@ class IcebergDocumentSpec extends VirtualDocumentSpec[Tuple] with BeforeAndAfter
5651
// Initialize serialization and deserialization functions
5752
serde = IcebergUtil.toGenericRecord
5853
deserde = (schema, record) => IcebergUtil.fromRecord(record, amberSchema)
59-
60-
// Initialize the the Iceberg catalog
61-
catalog = IcebergUtil.createHadoopCatalog(
62-
"iceberg_document_test",
63-
StorageConfig.fileStorageDirectoryPath
64-
)
65-
IcebergCatalogInstance.replaceInstance(catalog)
6654
}
6755

6856
override def beforeEach(): Unit = {

0 commit comments

Comments
 (0)