Skip to content

Commit 9867cf6

Browse files
[REST Catalog API] Rework catalog instance rotation (#338)
1 parent e12ada6 commit 9867cf6

22 files changed

+338
-297
lines changed

build.sbt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ lazy val root = (project in file("."))
4343
// Framework dependencies
4444
libraryDependencies += "dev.zio" %% "zio" % "2.1.24",
4545
libraryDependencies += "dev.zio" %% "zio-streams" % "2.1.24",
46+
libraryDependencies += "dev.zio" %% "zio-cache" % "0.2.8",
4647
libraryDependencies += "com.microsoft.sqlserver" % "mssql-jdbc" % "12.8.2.jre11",
4748
libraryDependencies += "software.amazon.awssdk" % "s3" % "2.33.13",
4849
libraryDependencies += "com.lihaoyi" %% "upickle" % "4.4.3",

src/main/scala/models/settings/iceberg/DefaultIcebergStagingSettings.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,18 @@
11
package com.sneaksanddata.arcane.framework
22
package models.settings.iceberg
33

4+
import models.serialization.ZIODurationRW.*
45
import services.iceberg.IcebergCatalogCredential
56
import services.iceberg.base.S3CatalogFileIO
67

78
import upickle.ReadWriter
8-
import upickle.implicits.key
99

1010
case class DefaultIcebergStagingSettings(
1111
catalogProperties: Map[String, String],
1212
override val namespace: String,
1313
override val catalogUri: String,
14-
override val warehouse: String
14+
override val warehouse: String,
15+
override val maxCatalogInstanceLifetime: zio.Duration
1516
) extends IcebergCatalogSettings derives ReadWriter:
1617
/** Important to note that currently we do not provide separation between Sink and Staging catalog auth and FileIO
1718
* implementations. This should be fixed in the future.

src/main/scala/models/settings/iceberg/IcebergCatalogSettings.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,3 +19,7 @@ trait IcebergCatalogSettings:
1919
/** The catalog additional properties.
2020
*/
2121
val additionalProperties: Map[String, String]
22+
23+
/** Maximum lifetime of a RESTSessionCatalog instance
24+
*/
25+
val maxCatalogInstanceLifetime: zio.Duration

src/main/scala/models/settings/sink/DefaultIcebergSinkSettings.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,19 @@
11
package com.sneaksanddata.arcane.framework
22
package models.settings.sink
33

4+
import models.serialization.ZIODurationRW.*
45
import models.settings.iceberg.IcebergCatalogSettings
56
import services.iceberg.IcebergCatalogCredential
67
import services.iceberg.base.S3CatalogFileIO
78

89
import upickle.ReadWriter
9-
import upickle.implicits.key
1010

1111
case class DefaultIcebergSinkSettings(
1212
catalogProperties: Map[String, String],
1313
override val namespace: String,
1414
override val catalogUri: String,
15-
override val warehouse: String
15+
override val warehouse: String,
16+
override val maxCatalogInstanceLifetime: zio.Duration
1617
) extends IcebergCatalogSettings derives ReadWriter:
1718
/** Important to note that currently we do not provide separation between Sink and Staging catalog auth and FileIO
1819
* implementations. This should be fixed in the future.

src/main/scala/services/iceberg/IcebergCatalogFactory.scala

Lines changed: 36 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -10,26 +10,18 @@ import org.apache.iceberg.aws.s3.S3FileIO
1010
import org.apache.iceberg.catalog.SessionCatalog.SessionContext
1111
import org.apache.iceberg.rest.auth.OAuth2Properties
1212
import org.apache.iceberg.rest.{HTTPClient, RESTSessionCatalog}
13-
import zio.{Task, ZIO}
13+
import zio.cache.Cache
14+
import zio.{Cached, Schedule, Scope, Task, ZIO, ZLayer}
1415

1516
import java.time.Instant
1617
import scala.collection.concurrent.TrieMap
1718
import scala.jdk.CollectionConverters.*
1819
import scala.language.implicitConversions
1920

20-
final class IcebergCatalogFactory(icebergCatalogSettings: IcebergCatalogSettings) extends CatalogFactory:
21-
22-
private val catalogProperties: Map[String, String] =
23-
Map(
24-
CatalogProperties.WAREHOUSE_LOCATION -> icebergCatalogSettings.warehouse,
25-
CatalogProperties.URI -> icebergCatalogSettings.catalogUri,
26-
"rest-metrics-reporting-enabled" -> "false",
27-
"view-endpoints-supported" -> "false"
28-
) ++ icebergCatalogSettings.additionalProperties
29-
30-
private val maxCatalogLifetime =
31-
zio.Duration.fromSeconds(10 * 60).toSeconds // limit catalog instance lifetime to 10 minutes
32-
private val catalogs: TrieMap[String, (RESTSessionCatalog, Long)] = TrieMap()
21+
final class IcebergCatalogFactory(
22+
icebergCatalogSettings: IcebergCatalogSettings,
23+
cachedRef: Cached[Throwable, RESTSessionCatalog]
24+
) extends CatalogFactory:
3325

3426
def getSessionContext: SessionContext =
3527
SessionContext(
@@ -41,38 +33,46 @@ final class IcebergCatalogFactory(icebergCatalogSettings: IcebergCatalogSettings
4133
Map().asJava
4234
);
4335

44-
def newCatalog: Task[RESTSessionCatalog] = ZIO.scoped {
45-
for
36+
def getCatalog: Task[RESTSessionCatalog] = cachedRef.get
37+
38+
object IcebergCatalogFactory:
39+
private def getCatalogProperties(icebergCatalogSettings: IcebergCatalogSettings): Map[String, String] =
40+
Map(
41+
CatalogProperties.WAREHOUSE_LOCATION -> icebergCatalogSettings.warehouse,
42+
CatalogProperties.URI -> icebergCatalogSettings.catalogUri,
43+
"rest-metrics-reporting-enabled" -> "false",
44+
"view-endpoints-supported" -> "false"
45+
) ++ icebergCatalogSettings.additionalProperties
46+
47+
private def newCatalog(icebergCatalogSettings: IcebergCatalogSettings): ZIO[Scope, Throwable, RESTSessionCatalog] =
48+
ZIO.acquireRelease(for
49+
properties <- ZIO.succeed(getCatalogProperties(icebergCatalogSettings))
4650
result <- ZIO.succeed(
4751
new RESTSessionCatalog(
4852
config => HTTPClient.builder(config).uri(config.get(CatalogProperties.URI)).build(),
4953
(_, _) =>
5054
val baseIO = new S3FileIO()
51-
baseIO.initialize(catalogProperties.asJava)
55+
baseIO.initialize(properties.asJava)
5256
baseIO
5357
)
5458
)
5559
name <- ZIO.succeed(java.util.UUID.randomUUID().toString)
5660
_ <- zlog("Creating new Iceberg RESTSessionCatalog instance with id %s", name)
5761
_ <- ZIO.attemptBlocking(
58-
result.initialize(name, (catalogProperties ++ Map("header.X-Arcane-Runner-Identifier" -> name)).asJava)
62+
result.initialize(name, (properties ++ Map("header.X-Arcane-Runner-Identifier" -> name)).asJava)
5963
)
60-
_ <- ZIO.attempt(catalogs.addOne((name, (result, Instant.now.getEpochSecond))))
61-
yield result
62-
}
64+
yield result)(catalog =>
65+
zlog("Closing RESTSessionCatalog %s due to cache expiry or application shutdown", catalog.name()) *> ZIO
66+
.attemptBlocking(catalog.close())
67+
.orDieWith(e => new Throwable("Unable to close RESTSessionCatalog instance correctly", e))
68+
)
6369

64-
def getCatalog: Task[RESTSessionCatalog] = for
65-
catalogInfo <- ZIO.attempt(catalogs.find { case (_, (_, duration)) =>
66-
Instant.now.getEpochSecond - duration < maxCatalogLifetime
67-
})
68-
selected <- catalogInfo match {
69-
case Some((_, (catalog, _))) => ZIO.succeed(catalog)
70-
case None =>
71-
ZIO
72-
.attempt(catalogs.foreach { case (_, (catalog, _)) =>
73-
catalog.close()
74-
})
75-
.map(_ => catalogs.clear())
76-
.flatMap(_ => newCatalog)
77-
}
78-
yield selected
70+
/** Provide an instance of IcebergCatalogFactory which rotates catalog instance automatically
71+
* @return
72+
*/
73+
def live(icebergCatalogSettings: IcebergCatalogSettings): ZIO[Scope, Throwable, IcebergCatalogFactory] =
74+
for cachedRef <- Cached.auto(
75+
acquire = newCatalog(icebergCatalogSettings),
76+
policy = Schedule.spaced(icebergCatalogSettings.maxCatalogInstanceLifetime)
77+
)
78+
yield new IcebergCatalogFactory(icebergCatalogSettings, cachedRef)

src/main/scala/services/iceberg/IcebergEntityManager.scala

Lines changed: 29 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,13 @@ import services.iceberg.SchemaConversions.toIcebergType
1010
import services.iceberg.base.{CatalogEntityManager, SinkEntityManager, StagingEntityManager}
1111

1212
import org.apache.iceberg.catalog.{Namespace, TableIdentifier}
13-
import org.apache.iceberg.{PartitionSpec, SortOrder}
13+
import org.apache.iceberg.{PartitionSpec, SortOrder, Table}
1414
import zio.{Task, ZIO, ZLayer}
1515

1616
import scala.jdk.CollectionConverters.*
1717

18-
trait IcebergEntityManager(catalogSettings: IcebergCatalogSettings) extends CatalogEntityManager:
19-
override val catalogFactory = new IcebergCatalogFactory(catalogSettings)
20-
18+
trait IcebergEntityManager(catalogSettings: IcebergCatalogSettings, catalogFactory: IcebergCatalogFactory)
19+
extends CatalogEntityManager:
2120
private def delete(tableId: TableIdentifier): Task[Boolean] = for
2221
_ <- zlog("Deleting table %s", tableId.name())
2322
catalog <- catalogFactory.getCatalog
@@ -36,6 +35,18 @@ trait IcebergEntityManager(catalogSettings: IcebergCatalogSettings) extends Cata
3635
result <- delete(tableId)
3736
yield result
3837

38+
override def getTableRef(tableName: String): Task[Table] = for
39+
tableId <- ZIO.succeed(TableIdentifier.of(catalogSettings.namespace, tableName))
40+
catalog <- catalogFactory.getCatalog
41+
tableRef <- ZIO.attemptBlocking(catalog.loadTable(catalogFactory.getSessionContext, tableId))
42+
yield tableRef
43+
44+
override def tableExists(tableName: String): Task[Boolean] = for
45+
tableId <- ZIO.succeed(TableIdentifier.of(catalogSettings.namespace, tableName))
46+
catalog <- catalogFactory.getCatalog
47+
result <- ZIO.attemptBlocking(catalog.tableExists(catalogFactory.getSessionContext, tableId))
48+
yield result
49+
3950
/** Creates a new table in the Iceberg catalog, using the provided schema
4051
* @return
4152
*/
@@ -92,21 +103,25 @@ trait IcebergEntityManager(catalogSettings: IcebergCatalogSettings) extends Cata
92103
}
93104
yield ()
94105

95-
class IcebergSinkEntityManager(catalogSettings: IcebergCatalogSettings)
96-
extends IcebergEntityManager(catalogSettings)
106+
class IcebergSinkEntityManager(catalogSettings: IcebergCatalogSettings, catalogFactory: IcebergCatalogFactory)
107+
extends IcebergEntityManager(catalogSettings, catalogFactory)
97108
with SinkEntityManager
98109

99-
class IcebergStagingEntityManager(catalogSettings: IcebergCatalogSettings)
100-
extends IcebergEntityManager(catalogSettings)
110+
class IcebergStagingEntityManager(catalogSettings: IcebergCatalogSettings, catalogFactory: IcebergCatalogFactory)
111+
extends IcebergEntityManager(catalogSettings, catalogFactory)
101112
with StagingEntityManager
102113

103114
object IcebergEntityManager:
104-
val sinkLayer = ZLayer {
105-
for context <- ZIO.service[PluginStreamContext]
106-
yield IcebergSinkEntityManager(context.sink.icebergCatalog)
115+
val sinkLayer: ZLayer[PluginStreamContext, Throwable, IcebergSinkEntityManager] = ZLayer.scoped {
116+
for
117+
context <- ZIO.service[PluginStreamContext]
118+
factory <- IcebergCatalogFactory.live(context.sink.icebergCatalog)
119+
yield IcebergSinkEntityManager(context.sink.icebergCatalog, factory)
107120
}
108121

109-
val stagingLayer = ZLayer {
110-
for context <- ZIO.service[PluginStreamContext]
111-
yield IcebergStagingEntityManager(context.staging.icebergCatalog)
122+
val stagingLayer: ZLayer[PluginStreamContext, Throwable, IcebergStagingEntityManager] = ZLayer.scoped {
123+
for
124+
context <- ZIO.service[PluginStreamContext]
125+
factory <- IcebergCatalogFactory.live(context.staging.icebergCatalog)
126+
yield IcebergStagingEntityManager(context.staging.icebergCatalog, factory)
112127
}

src/main/scala/services/iceberg/IcebergS3CatalogWriter.scala

Lines changed: 9 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ given Conversion[ArcaneSchema, Schema] with
2828
def apply(schema: ArcaneSchema): Schema = SchemaConversions.toIcebergSchema(schema)
2929

3030
// https://www.tabular.io/blog/java-api-part-3/
31-
class IcebergS3CatalogWriter(entityManager: CatalogEntityManager, stagingSettings: StagingSettings)
31+
class IcebergS3CatalogWriter(entityManager: StagingEntityManager, stagingSettings: StagingSettings)
3232
extends CatalogWriter[RESTCatalog, Table, Schema]:
3333

3434
private val maxRowsPerFile = stagingSettings.table.maxRowsPerFile.getOrElse(10000)
@@ -101,25 +101,13 @@ class IcebergS3CatalogWriter(entityManager: CatalogEntityManager, stagingSetting
101101
logAnnotations: Seq[(LogAnnotation[String], String)]
102102
): Task[Table] =
103103
for
104-
_ <- entityManager.createTable(CreateTableRequest(name, schema, false))
105-
_ <- zlog("Created a staging table %s, waiting for commit", logAnnotations, name)
106-
catalog <- entityManager.catalogFactory.getCatalog
104+
_ <- entityManager.createTable(CreateTableRequest(name, schema, false))
105+
_ <- zlog("Created a staging table %s, waiting for commit", logAnnotations, name)
107106
_ <- ZIO
108107
.sleep(zio.Duration.fromSeconds(1))
109-
.repeatUntil(_ =>
110-
catalog
111-
.tableExists(
112-
entityManager.catalogFactory.getSessionContext,
113-
TableIdentifier.of(stagingSettings.icebergCatalog.namespace, name)
114-
)
115-
)
116-
_ <- zlog("Staging table %s created, appending data", logAnnotations, name)
117-
table <- ZIO.attemptBlocking(
118-
catalog.loadTable(
119-
entityManager.catalogFactory.getSessionContext,
120-
TableIdentifier.of(stagingSettings.icebergCatalog.namespace, name)
121-
)
122-
)
108+
.repeatUntilZIO(_ => entityManager.tableExists(name).orDie)
109+
_ <- zlog("Staging table %s created, appending data", logAnnotations, name)
110+
table <- entityManager.getTableRef(name)
123111
updatedTable <- appendData(data, schema, false, table, logAnnotations)
124112
_ <- zlog("Staging table %s ready for merge", logAnnotations, name)
125113
yield updatedTable
@@ -130,9 +118,7 @@ class IcebergS3CatalogWriter(entityManager: CatalogEntityManager, stagingSetting
130118
schema: Schema,
131119
logAnnotations: Seq[(LogAnnotation[String], String)]
132120
): Task[Table] = for
133-
tableId <- ZIO.succeed(TableIdentifier.of(stagingSettings.icebergCatalog.namespace, name))
134-
catalog <- entityManager.catalogFactory.getCatalog
135-
table <- ZIO.attemptBlocking(catalog.loadTable(entityManager.catalogFactory.getSessionContext, tableId))
121+
table <- entityManager.getTableRef(name)
136122
updatedTable <- appendData(data, schema, false, table, logAnnotations)
137123
yield updatedTable
138124

@@ -142,10 +128,10 @@ object IcebergS3CatalogWriter:
142128
* @return
143129
* The initialized IcebergS3CatalogWriter instance
144130
*/
145-
def apply(entityManager: CatalogEntityManager, stagingSettings: StagingSettings): IcebergS3CatalogWriter =
131+
def apply(entityManager: StagingEntityManager, stagingSettings: StagingSettings): IcebergS3CatalogWriter =
146132
new IcebergS3CatalogWriter(entityManager, stagingSettings)
147133

148-
def layer = ZLayer {
134+
def layer: ZLayer[StagingEntityManager & PluginStreamContext, Nothing, IcebergS3CatalogWriter] = ZLayer.scoped {
149135
for
150136
stagingEntityManager <- ZIO.service[StagingEntityManager]
151137
context <- ZIO.service[PluginStreamContext]

src/main/scala/services/iceberg/IcebergTablePropertyManager.scala

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@ import zio.{Task, ZIO, ZLayer}
1212
import scala.jdk.CollectionConverters.*
1313
import scala.language.implicitConversions
1414

15-
trait IcebergTablePropertyManager(catalogSettings: IcebergCatalogSettings) extends TablePropertyManager:
16-
override val catalogFactory = new IcebergCatalogFactory(catalogSettings)
15+
trait IcebergTablePropertyManager(catalogSettings: IcebergCatalogSettings, catalogFactory: IcebergCatalogFactory)
16+
extends TablePropertyManager:
1717

1818
private def loadTable(tableName: String): Task[Table] = for
1919
tableId <- ZIO.succeed(
@@ -72,24 +72,28 @@ trait IcebergTablePropertyManager(catalogSettings: IcebergCatalogSettings) exten
7272
override def getTableSchema(tableName: String): Task[Schema] = for table <- loadTable(tableName)
7373
yield table.schema()
7474

75-
class IcebergSinkTablePropertyManager(catalogSettings: IcebergCatalogSettings)
76-
extends IcebergTablePropertyManager(catalogSettings)
75+
class IcebergSinkTablePropertyManager(catalogSettings: IcebergCatalogSettings, catalogFactory: IcebergCatalogFactory)
76+
extends IcebergTablePropertyManager(catalogSettings, catalogFactory)
7777
with SinkPropertyManager
7878

79-
class IcebergStagingTablePropertyManager(catalogSettings: IcebergCatalogSettings)
80-
extends IcebergTablePropertyManager(catalogSettings)
79+
class IcebergStagingTablePropertyManager(catalogSettings: IcebergCatalogSettings, catalogFactory: IcebergCatalogFactory)
80+
extends IcebergTablePropertyManager(catalogSettings, catalogFactory)
8181
with StagingPropertyManager
8282

8383
object IcebergTablePropertyManager:
8484

85-
val sinkLayer =
86-
ZLayer {
87-
for context <- ZIO.service[PluginStreamContext]
88-
yield IcebergSinkTablePropertyManager(context.sink.icebergCatalog)
85+
val sinkLayer: ZLayer[PluginStreamContext, Throwable, IcebergSinkTablePropertyManager] =
86+
ZLayer.scoped {
87+
for
88+
context <- ZIO.service[PluginStreamContext]
89+
factory <- IcebergCatalogFactory.live(context.sink.icebergCatalog)
90+
yield IcebergSinkTablePropertyManager(context.sink.icebergCatalog, factory)
8991
}
9092

91-
val stagingLayer =
92-
ZLayer {
93-
for context <- ZIO.service[PluginStreamContext]
94-
yield IcebergStagingTablePropertyManager(context.staging.icebergCatalog)
93+
val stagingLayer: ZLayer[PluginStreamContext, Throwable, IcebergStagingTablePropertyManager] =
94+
ZLayer.scoped {
95+
for
96+
context <- ZIO.service[PluginStreamContext]
97+
factory <- IcebergCatalogFactory.live(context.staging.icebergCatalog)
98+
yield IcebergStagingTablePropertyManager(context.staging.icebergCatalog, factory)
9599
}

src/main/scala/services/iceberg/base/CatalogEntityManager.scala

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,6 @@ import org.apache.iceberg.Table
88
import zio.Task
99

1010
trait CatalogEntityManager:
11-
/** Catalog connection factory
12-
*/
13-
val catalogFactory: CatalogFactory
14-
1511
/** Deletes the specified table from the catalog
1612
*
1713
* @param tableName
@@ -38,6 +34,16 @@ trait CatalogEntityManager:
3834
*/
3935
def migrateSchema(oldSchema: ArcaneSchema, newSchema: ArcaneSchema, tableName: String): Task[Unit]
4036

37+
/** Retrieve catalog factory used by this entity manager
38+
* @return
39+
*/
40+
def getTableRef(tableName: String): Task[Table]
41+
42+
/** Check if a specified table exists in the catalog
43+
* @return
44+
*/
45+
def tableExists(tableName: String): Task[Boolean]
46+
4147
/** Entity manager for sink catalog
4248
*/
4349
trait SinkEntityManager extends CatalogEntityManager

src/main/scala/services/iceberg/base/CatalogFactory.scala

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ package services.iceberg.base
33

44
import org.apache.iceberg.catalog.SessionCatalog.SessionContext
55
import org.apache.iceberg.rest.RESTSessionCatalog
6-
import zio.Task
6+
import zio.{Scope, Task, ZIO}
77

88
/** Object responsible for creating catalog clients. Takes care of recycling expired instances and provided the current
99
* active one to the caller
@@ -14,11 +14,6 @@ trait CatalogFactory:
1414
*/
1515
def getSessionContext: SessionContext
1616

17-
/** Create a new RESTSessionCatalog isntance
18-
* @return
19-
*/
20-
def newCatalog: Task[RESTSessionCatalog]
21-
2217
/** Retrieve current active RESTSessionCatalog instance
2318
* @return
2419
*/

0 commit comments

Comments
 (0)