2121import org .apache .paimon .CoreOptions ;
2222import org .apache .paimon .CoreOptions .MergeEngine ;
2323import org .apache .paimon .data .BinaryRow ;
24+ import org .apache .paimon .flink .FlinkConnectorOptions .PartitionMarkDoneActionMode ;
2425import org .apache .paimon .manifest .ManifestCommittable ;
2526import org .apache .paimon .options .Options ;
2627import org .apache .paimon .partition .actions .PartitionMarkDoneAction ;
3233import org .apache .paimon .utils .PartitionPathUtils ;
3334
3435import org .apache .flink .api .common .state .OperatorStateStore ;
36+ import org .apache .flink .api .java .tuple .Tuple2 ;
37+ import org .slf4j .Logger ;
38+ import org .slf4j .LoggerFactory ;
3539
3640import java .io .IOException ;
3741import java .time .Duration ;
42+ import java .util .HashMap ;
3843import java .util .HashSet ;
3944import java .util .List ;
45+ import java .util .Map ;
4046import java .util .Optional ;
4147import java .util .Set ;
4248
4349import static org .apache .paimon .CoreOptions .PARTITION_MARK_DONE_WHEN_END_INPUT ;
4450import static org .apache .paimon .flink .FlinkConnectorOptions .PARTITION_IDLE_TIME_TO_DONE ;
51+ import static org .apache .paimon .flink .FlinkConnectorOptions .PARTITION_MARK_DONE_MODE ;
4552
4653/** Mark partition done. */
4754public class PartitionMarkDone implements PartitionListener {
4855
56+ private static final Logger LOG = LoggerFactory .getLogger (PartitionMarkDone .class );
57+
4958 private final InternalRowPartitionComputer partitionComputer ;
5059 private final PartitionMarkDoneTrigger trigger ;
5160 private final List <PartitionMarkDoneAction > actions ;
5261 private final boolean waitCompaction ;
62+ private final PartitionMarkDoneActionMode partitionMarkDoneActionMode ;
5363
5464 public static Optional <PartitionMarkDone > create (
5565 ClassLoader cl ,
@@ -86,7 +96,12 @@ public static Optional<PartitionMarkDone> create(
8696 || coreOptions .mergeEngine () == MergeEngine .FIRST_ROW );
8797
8898 return Optional .of (
89- new PartitionMarkDone (partitionComputer , trigger , actions , waitCompaction ));
99+ new PartitionMarkDone (
100+ partitionComputer ,
101+ trigger ,
102+ actions ,
103+ waitCompaction ,
104+ options .get (PARTITION_MARK_DONE_MODE )));
90105 }
91106
92107 private static boolean disablePartitionMarkDone (
@@ -108,15 +123,25 @@ public PartitionMarkDone(
108123 InternalRowPartitionComputer partitionComputer ,
109124 PartitionMarkDoneTrigger trigger ,
110125 List <PartitionMarkDoneAction > actions ,
111- boolean waitCompaction ) {
126+ boolean waitCompaction ,
127+ PartitionMarkDoneActionMode partitionMarkDoneActionMode ) {
112128 this .partitionComputer = partitionComputer ;
113129 this .trigger = trigger ;
114130 this .actions = actions ;
115131 this .waitCompaction = waitCompaction ;
132+ this .partitionMarkDoneActionMode = partitionMarkDoneActionMode ;
116133 }
117134
118135 @ Override
119136 public void notifyCommittable (List <ManifestCommittable > committables ) {
137+ if (partitionMarkDoneActionMode == PartitionMarkDoneActionMode .WATERMARK ) {
138+ markDoneByWatermark (committables );
139+ } else {
140+ markDoneByProcessTime (committables );
141+ }
142+ }
143+
144+ private void markDoneByProcessTime (List <ManifestCommittable > committables ) {
120145 Set <BinaryRow > partitions = new HashSet <>();
121146 boolean endInput = false ;
122147 for (ManifestCommittable committable : committables ) {
@@ -141,6 +166,58 @@ public void notifyCommittable(List<ManifestCommittable> committables) {
141166 markDone (trigger .donePartitions (endInput ), actions );
142167 }
143168
169+ private void markDoneByWatermark (List <ManifestCommittable > committables ) {
170+ // extract watermarks from committables and update partition watermarks
171+ Tuple2 <Map <BinaryRow , Long >, Boolean > extractedWatermarks =
172+ extractPartitionWatermarks (committables );
173+ Map <BinaryRow , Long > partitionWatermarks = extractedWatermarks .f0 ;
174+ boolean endInput = extractedWatermarks .f1 ;
175+ Optional <Long > latestWatermark = partitionWatermarks .values ().stream ().max (Long ::compareTo );
176+
177+ if (!latestWatermark .isPresent ()) {
178+ LOG .warn ("No watermark found in this batch of committables, skip partition mark done." );
179+ return ;
180+ }
181+
182+ partitionWatermarks .forEach (
183+ (row , value ) -> {
184+ String partition =
185+ PartitionPathUtils .generatePartitionPath (
186+ partitionComputer .generatePartValues (row ));
187+ trigger .notifyPartition (partition , value );
188+ });
189+
190+ markDone (trigger .donePartitions (endInput , latestWatermark .get (), true ), actions );
191+ }
192+
193+ private Tuple2 <Map <BinaryRow , Long >, Boolean > extractPartitionWatermarks (
194+ List <ManifestCommittable > committables ) {
195+ boolean endInput = false ;
196+ Map <BinaryRow , Long > partitionWatermarks = new HashMap <>();
197+ for (ManifestCommittable committable : committables ) {
198+ Long watermark = committable .watermark ();
199+ if (watermark != null ) {
200+ for (CommitMessage commitMessage : committable .fileCommittables ()) {
201+ CommitMessageImpl message = (CommitMessageImpl ) commitMessage ;
202+ if (waitCompaction
203+ || !message .indexIncrement ().isEmpty ()
204+ || !message .newFilesIncrement ().isEmpty ()) {
205+ partitionWatermarks .compute (
206+ message .partition (),
207+ (partition , old ) ->
208+ old == null ? watermark : Math .max (old , watermark ));
209+ }
210+ }
211+ }
212+
213+ if (committable .identifier () == Long .MAX_VALUE ) {
214+ endInput = true ;
215+ }
216+ }
217+
218+ return Tuple2 .of (partitionWatermarks , endInput );
219+ }
220+
144221 public static void markDone (List <String > partitions , List <PartitionMarkDoneAction > actions ) {
145222 for (String partition : partitions ) {
146223 try {
0 commit comments