Skip to content

Commit c7f843e

Browse files
committed
[hotfix] Introduce SinkUpsertMaterializer factory method
1 parent c07a902 commit c7f843e

File tree

1 file changed

+16
-0
lines changed

1 file changed

+16
-0
lines changed

flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializer.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131
import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
3232
import org.apache.flink.table.runtime.generated.RecordEqualiser;
3333
import org.apache.flink.table.runtime.operators.TableStreamOperator;
34+
import org.apache.flink.table.runtime.typeutils.InternalSerializers;
35+
import org.apache.flink.table.types.logical.RowType;
3436
import org.apache.flink.types.RowKind;
3537
import org.apache.flink.util.Preconditions;
3638

@@ -222,4 +224,18 @@ private boolean equalsIgnoreRowKind(RowData newRow, RowData oldRow) {
222224
}
223225
return equaliser.equals(newRow, oldRow);
224226
}
227+
228+
public static SinkUpsertMaterializer create(
229+
StateTtlConfig ttlConfig,
230+
RowType physicalRowType,
231+
GeneratedRecordEqualiser rowEqualiser,
232+
GeneratedRecordEqualiser upsertKeyEqualiser,
233+
int[] inputUpsertKey) {
234+
return new SinkUpsertMaterializer(
235+
ttlConfig,
236+
InternalSerializers.create(physicalRowType),
237+
rowEqualiser,
238+
upsertKeyEqualiser,
239+
inputUpsertKey);
240+
}
225241
}

0 commit comments

Comments
 (0)