Skip to content

Commit 92f6067

Browse files
Accomodate fixed watermark emission behaviour (#328)
1 parent 606bcd0 commit 92f6067

File tree

1 file changed

+16
-2
lines changed

1 file changed

+16
-2
lines changed

src/main/scala/services/streaming/processors/transformers/StagingProcessor.scala

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,10 +90,24 @@ class StagingProcessor(
9090
)
9191
)
9292

93-
applyTasks <- ZIO.foreach(groupedBySchema.keys)(schema =>
93+
stagedBatches <- ZIO.foreach(groupedBySchema.keys)(schema =>
9494
writeDataRows(groupedBySchema(schema), schema, onBatchStaged, maybeWatermark)
9595
)
96-
yield Chunk(applyTasks.map(batches => batches))
96+
yield
97+
if groupedBySchema.keys.nonEmpty then Chunk(stagedBatches.map(batches => batches))
98+
else
99+
Chunk(
100+
Seq(
101+
onBatchStaged(
102+
None,
103+
icebergCatalogSettings.namespace,
104+
icebergCatalogSettings.warehouse,
105+
ArcaneSchema.empty(),
106+
targetTableFullName,
107+
maybeWatermark
108+
)
109+
)
110+
)
97111

98112
override def process(
99113
onStagingTablesComplete: OnStagingTablesComplete,

0 commit comments

Comments
 (0)