@@ -6,6 +6,7 @@ import models.app.PluginStreamContext
66import models .batches .{MergeableBatch , StagedVersionedBatch }
77import models .settings .*
88import models .settings .TableNaming .*
9+ import models .settings .backfill .BackfillBehavior .Overwrite
910import models .settings .sink .SinkSettings
1011import services .base .MergeServiceClient
1112import services .iceberg .base .*
@@ -30,7 +31,7 @@ class MergeBatchProcessor(
3031 targetTableSettings : SinkSettings ,
3132 declaredMetrics : DeclaredMetrics ,
3233 schemaMigrationEnabled : Boolean ,
33- isBackfilling : Boolean
34+ isTargetInStaging : Boolean
3435) extends StagedBatchProcessor :
3536
3637 private def alignSchemas (
@@ -59,9 +60,9 @@ class MergeBatchProcessor(
5960 ZIO .when(! batch.isEmpty && schemaMigrationEnabled) {
6061 for
6162 // for streams, we migrate sink table
62- _ <- ZIO .unless(isBackfilling )(alignSchemas(batch, sinkPropertyManager, sinkEntityManager))
63+ _ <- ZIO .unless(isTargetInStaging )(alignSchemas(batch, sinkPropertyManager, sinkEntityManager))
6364 // for backfills, we migrate staging table
64- _ <- ZIO .when(isBackfilling )(alignSchemas(batch, stagingPropertyManager, stagingEntityManager))
65+ _ <- ZIO .when(isTargetInStaging )(alignSchemas(batch, stagingPropertyManager, stagingEntityManager))
6566 yield ()
6667 }
6768 )
@@ -157,6 +158,6 @@ object MergeBatchProcessor:
157158 context.sink,
158159 declaredMetrics,
159160 ! context.staging.table.isUnifiedSchema,
160- context.isBackfilling
161+ context.isBackfilling && context.streamMode.backfill.backfillBehavior == Overwrite
161162 )
162163 }
0 commit comments