@@ -233,13 +233,14 @@ public void close() {
233233 /**
234234 * Merge two log data records if needed.
235235 *
236- * @param record
237- * @param metadata
238- * @param existingRecordMetadataPair
239- * @return
240- * @throws IOException
236+ * @param newRecord The new incoming record
237+ * @param metadata The metadata
238+ * @param existingRecordMetadataPair The existing record metadata pair
239+ *
240+ * @return The pair of the record that needs to be updated with and its metadata,
241+ * returns empty to skip the update.
241242 */
242- protected Option <Pair <Option <T >, Map <String , Object >>> doProcessNextDataRecord (T record ,
243+ protected Option <Pair <Option <T >, Map <String , Object >>> doProcessNextDataRecord (T newRecord ,
243244 Map <String , Object > metadata ,
244245 Pair <Option <T >, Map <String , Object >> existingRecordMetadataPair )
245246 throws IOException {
@@ -249,14 +250,12 @@ protected Option<Pair<Option<T>, Map<String, Object>>> doProcessNextDataRecord(T
249250 // TODO(HUDI-7843): decouple the merging logic from the merger
250251 // and use the record merge mode to control how to merge partial updates
251252 // Merge and store the combined record
252- // Note that the incoming `record` is from an older commit, so it should be put as
253- // the `older` in the merge API
254253 Option <Pair <HoodieRecord , Schema >> combinedRecordAndSchemaOpt = recordMerger .get ().partialMerge (
255- readerContext .constructHoodieRecord (Option .of (record ), metadata ),
256- readerContext .getSchemaFromMetadata (metadata ),
257254 readerContext .constructHoodieRecord (
258255 existingRecordMetadataPair .getLeft (), existingRecordMetadataPair .getRight ()),
259256 readerContext .getSchemaFromMetadata (existingRecordMetadataPair .getRight ()),
257+ readerContext .constructHoodieRecord (Option .of (newRecord ), metadata ),
258+ readerContext .getSchemaFromMetadata (metadata ),
260259 readerSchema ,
261260 props );
262261 if (!combinedRecordAndSchemaOpt .isPresent ()) {
@@ -266,7 +265,7 @@ protected Option<Pair<Option<T>, Map<String, Object>>> doProcessNextDataRecord(T
266265 HoodieRecord <T > combinedRecord = combinedRecordAndSchema .getLeft ();
267266
268267 // If pre-combine returns existing record, no need to update it
269- if (combinedRecord .getData () != existingRecordMetadataPair .getLeft ().get ( )) {
268+ if (combinedRecord .getData () != existingRecordMetadataPair .getLeft ().orElse ( null )) {
270269 return Option .of (Pair .of (
271270 Option .ofNullable (combinedRecord .getData ()),
272271 readerContext .updateSchemaAndResetOrderingValInMetadata (metadata , combinedRecordAndSchema .getRight ())));
@@ -275,43 +274,47 @@ protected Option<Pair<Option<T>, Map<String, Object>>> doProcessNextDataRecord(T
275274 } else {
276275 switch (recordMergeMode ) {
277276 case COMMIT_TIME_ORDERING :
278- return Option .empty ( );
277+ return Option .of ( Pair . of ( Option . ofNullable ( newRecord ), metadata ) );
279278 case EVENT_TIME_ORDERING :
280- Comparable existingOrderingValue = readerContext .getOrderingValue (
281- existingRecordMetadataPair .getLeft (), existingRecordMetadataPair .getRight (),
282- readerSchema , orderingFieldName );
283- if (isDeleteRecordWithNaturalOrder (existingRecordMetadataPair .getLeft (), existingOrderingValue )) {
284- return Option .empty ();
285- }
286- Comparable incomingOrderingValue = readerContext .getOrderingValue (
287- Option .of (record ), metadata , readerSchema , orderingFieldName );
288- if (incomingOrderingValue .compareTo (existingOrderingValue ) > 0 ) {
289- return Option .of (Pair .of (Option .of (record ), metadata ));
279+ if (shouldKeepNewerRecord (existingRecordMetadataPair .getLeft (), existingRecordMetadataPair .getRight (), Option .ofNullable (newRecord ), metadata )) {
280+ return Option .of (Pair .of (Option .of (newRecord ), metadata ));
290281 }
291282 return Option .empty ();
292283 case CUSTOM :
293284 default :
294285 // Merge and store the combined record
295- // Note that the incoming `record` is from an older commit, so it should be put as
296- // the `older` in the merge API
297286 if (payloadClass .isPresent ()) {
287+ if (existingRecordMetadataPair .getLeft ().isEmpty ()
288+ && shouldKeepNewerRecord (existingRecordMetadataPair .getLeft (), existingRecordMetadataPair .getRight (), Option .ofNullable (newRecord ), metadata )) {
289+ // IMPORTANT:
290+ // this is needed when the fallback HoodieAvroRecordMerger got used, the merger would
291+ // return Option.empty when the old payload data is empty(a delete) and ignores its ordering value directly.
292+ return Option .of (Pair .of (Option .of (newRecord ), metadata ));
293+ }
298294 Option <Pair <HoodieRecord , Schema >> combinedRecordAndSchemaOpt =
299- getMergedRecord (Option . of ( record ), metadata , existingRecordMetadataPair .getLeft (), existingRecordMetadataPair . getRight () );
295+ getMergedRecord (existingRecordMetadataPair . getLeft ( ), existingRecordMetadataPair .getRight (), Option . of ( newRecord ), metadata );
300296 if (combinedRecordAndSchemaOpt .isPresent ()) {
301297 T combinedRecordData = readerContext .convertAvroRecord ((IndexedRecord ) combinedRecordAndSchemaOpt .get ().getLeft ().getData ());
302298 // If pre-combine does not return existing record, update it
303- if (combinedRecordData != existingRecordMetadataPair .getLeft ().get ( )) {
299+ if (combinedRecordData != existingRecordMetadataPair .getLeft ().orElse ( null )) {
304300 return Option .of (Pair .of (Option .ofNullable (combinedRecordData ), metadata ));
305301 }
306302 }
307303 return Option .empty ();
308304 } else {
305+ if (existingRecordMetadataPair .getLeft ().isEmpty ()
306+ && shouldKeepNewerRecord (existingRecordMetadataPair .getLeft (), existingRecordMetadataPair .getRight (), Option .ofNullable (newRecord ), metadata )) {
307+ // IMPORTANT:
308+ // this is needed when the fallback HoodieAvroRecordMerger got used, the merger would
309+ // return Option.empty when the old payload data is empty(a delete) and ignores its ordering value directly.
310+ return Option .of (Pair .of (Option .of (newRecord ), metadata ));
311+ }
309312 Option <Pair <HoodieRecord , Schema >> combinedRecordAndSchemaOpt = recordMerger .get ().merge (
310- readerContext .constructHoodieRecord (Option .of (record ), metadata ),
311- readerContext .getSchemaFromMetadata (metadata ),
312313 readerContext .constructHoodieRecord (
313314 existingRecordMetadataPair .getLeft (), existingRecordMetadataPair .getRight ()),
314315 readerContext .getSchemaFromMetadata (existingRecordMetadataPair .getRight ()),
316+ readerContext .constructHoodieRecord (Option .of (newRecord ), metadata ),
317+ readerContext .getSchemaFromMetadata (metadata ),
315318 props );
316319
317320 if (!combinedRecordAndSchemaOpt .isPresent ()) {
@@ -322,7 +325,7 @@ protected Option<Pair<Option<T>, Map<String, Object>>> doProcessNextDataRecord(T
322325 HoodieRecord <T > combinedRecord = combinedRecordAndSchema .getLeft ();
323326
324327 // If pre-combine returns existing record, no need to update it
325- if (combinedRecord .getData () != existingRecordMetadataPair .getLeft ().get ( )) {
328+ if (combinedRecord .getData () != existingRecordMetadataPair .getLeft ().orElse ( null )) {
326329 return Option .of (Pair .of (Option .ofNullable (combinedRecord .getData ()), metadata ));
327330 }
328331 return Option .empty ();
@@ -334,24 +337,25 @@ protected Option<Pair<Option<T>, Map<String, Object>>> doProcessNextDataRecord(T
334337 // NOTE: Record have to be cloned here to make sure if it holds low-level engine-specific
335338 // payload pointing into a shared, mutable (underlying) buffer we get a clean copy of
336339 // it since these records will be put into records(Map).
337- return Option .of (Pair .of (Option .ofNullable (record ), metadata ));
340+ return Option .of (Pair .of (Option .ofNullable (newRecord ), metadata ));
338341 }
339342 }
340343
341344 /**
342345 * Merge a delete record with another record (data, or delete).
343346 *
344- * @param deleteRecord
345- * @param existingRecordMetadataPair
346- * @return
347+ * @param deleteRecord The delete record
348+ * @param existingRecordMetadataPair The existing record metadata pair
349+ *
350+ * @return The option of new delete record that needs to be updated with.
347351 */
348352 protected Option <DeleteRecord > doProcessNextDeletedRecord (DeleteRecord deleteRecord ,
349353 Pair <Option <T >, Map <String , Object >> existingRecordMetadataPair ) {
350354 totalLogRecords ++;
351355 if (existingRecordMetadataPair != null ) {
352356 switch (recordMergeMode ) {
353357 case COMMIT_TIME_ORDERING :
354- return Option .empty ( );
358+ return Option .of ( deleteRecord );
355359 case EVENT_TIME_ORDERING :
356360 case CUSTOM :
357361 default :
@@ -473,6 +477,17 @@ protected Option<T> merge(Option<T> older, Map<String, Object> olderInfoMap,
473477 case CUSTOM :
474478 default :
475479 if (payloadClass .isPresent ()) {
480+ if (older .isEmpty () || newer .isEmpty ()) {
481+ if (shouldKeepNewerRecord (older , olderInfoMap , newer , newerInfoMap )) {
482+ // IMPORTANT:
483+ // this is needed when the fallback HoodieAvroRecordMerger got used, the merger would
484+ // return Option.empty when the new payload data is empty(a delete) and ignores its ordering value directly.
485+ return newer ;
486+ } else {
487+ return older ;
488+ }
489+ }
490+
476491 Option <Pair <HoodieRecord , Schema >> mergedRecord =
477492 getMergedRecord (older , olderInfoMap , newer , newerInfoMap );
478493 if (mergedRecord .isPresent ()
@@ -487,6 +502,16 @@ protected Option<T> merge(Option<T> older, Map<String, Object> olderInfoMap,
487502 }
488503 return Option .empty ();
489504 } else {
505+ if (older .isEmpty () || newer .isEmpty ()) {
506+ if (shouldKeepNewerRecord (older , olderInfoMap , newer , newerInfoMap )) {
507+ // IMPORTANT:
508+ // this is needed when the fallback HoodieAvroRecordMerger got used, the merger would
509+ // return Option.empty when the new payload data is empty(a delete) and ignores its ordering value directly.
510+ return newer ;
511+ } else {
512+ return older ;
513+ }
514+ }
490515 Option <Pair <HoodieRecord , Schema >> mergedRecord = recordMerger .get ().merge (
491516 readerContext .constructHoodieRecord (older , olderInfoMap ), readerContext .getSchemaFromMetadata (olderInfoMap ),
492517 readerContext .constructHoodieRecord (newer , newerInfoMap ), readerContext .getSchemaFromMetadata (newerInfoMap ), props );
@@ -504,6 +529,19 @@ protected Option<T> merge(Option<T> older, Map<String, Object> olderInfoMap,
504529 }
505530 }
506531
532+ /**
533+ * Decides whether to keep the incoming record with ordering value comparison.
534+ */
535+ private boolean shouldKeepNewerRecord (Option <T > oldVal , Map <String , Object > oldMetadata , Option <T > newVal , Map <String , Object > newMetadata ) {
536+ Comparable newOrderingVal = readerContext .getOrderingValue (newVal , newMetadata , readerSchema , orderingFieldName );
537+ if (isDeleteRecordWithNaturalOrder (newVal , newOrderingVal )) {
538+ // handle records coming from DELETE statements(the orderingVal is constant 0)
539+ return true ;
540+ }
541+ Comparable oldOrderingVal = readerContext .getOrderingValue (oldVal , oldMetadata , readerSchema , orderingFieldName );
542+ return newOrderingVal .compareTo (oldOrderingVal ) >= 0 ;
543+ }
544+
507545 private Option <Pair <HoodieRecord , Schema >> getMergedRecord (Option <T > older , Map <String , Object > olderInfoMap , Option <T > newer , Map <String , Object > newerInfoMap ) throws IOException {
508546 ValidationUtils .checkArgument (!Objects .equals (payloadClass , OverwriteWithLatestAvroPayload .class .getCanonicalName ())
509547 && !Objects .equals (payloadClass , DefaultHoodieRecordPayload .class .getCanonicalName ()));
0 commit comments