2424import org .apache .hudi .common .config .HoodieMetadataConfig ;
2525import org .apache .hudi .common .fs .FSUtils ;
2626import org .apache .hudi .common .function .SerializableFunctionUnchecked ;
27+ import org .apache .hudi .common .model .HoodieCommitMetadata ;
2728import org .apache .hudi .common .model .HoodieFailedWritesCleaningPolicy ;
2829import org .apache .hudi .common .model .HoodieRecord ;
2930import org .apache .hudi .common .model .HoodieTableType ;
4243import org .apache .hudi .metadata .HoodieTableMetadata ;
4344import org .apache .hudi .storage .StoragePath ;
4445import org .apache .hudi .storage .StoragePathFilter ;
46+ import org .apache .hudi .table .action .HoodieWriteMetadata ;
4547import org .apache .hudi .testutils .HoodieClientTestBase ;
4648
4749import org .apache .spark .api .java .JavaRDD ;
6264import java .util .stream .Collectors ;
6365import java .util .stream .Stream ;
6466
67+ import static org .apache .hudi .client .functional .TestHoodieFileSystemViews .assertForFSVEquality ;
6568import static org .apache .hudi .common .table .timeline .HoodieTimeline .DELTA_COMMIT_ACTION ;
66- import static org .apache .hudi .functional .TestHoodieFileSystemViews .assertForFSVEquality ;
6769import static org .apache .hudi .testutils .Assertions .assertNoWriteErrors ;
6870import static org .junit .jupiter .api .Assertions .assertEquals ;
6971import static org .junit .jupiter .api .Assertions .assertFalse ;
@@ -249,7 +251,6 @@ public void testRestoreWithFileGroupCreatedWithDeltaCommits(HoodieTableVersion t
249251 .withAutoUpgradeVersion (false )
250252 .withWriteTableVersion (tableVersion .versionCode ())
251253 .withMetadataConfig (HoodieMetadataConfig .newBuilder ()
252- .withStreamingWriteEnabled (tableVersion .greaterThanOrEquals (HoodieTableVersion .EIGHT ))
253254 .build ())
254255 .build ();
255256
@@ -420,7 +421,7 @@ void testCleaningPendingCompaction(HoodieTableVersion tableVersion) throws Excep
420421 * No files will be cleaned up. Only rollback log appends.
421422 */
422423 @ ParameterizedTest
423- @ EnumSource (value = HoodieTableVersion .class , names = {"SIX" , "NINE " })
424+ @ EnumSource (value = HoodieTableVersion .class , names = {"SIX" , "EIGHT " })
424425 void testCleaningCompletedRollback (HoodieTableVersion tableVersion ) throws Exception {
425426 HoodieWriteConfig hoodieWriteConfig = getHoodieWriteConfigAndInitializeTable (HoodieCompactionConfig .newBuilder ()
426427 .withMaxNumDeltaCommitsBeforeCompaction (3 ) // the 3rd delta_commit triggers compaction
@@ -456,7 +457,7 @@ void testCleaningCompletedRollback(HoodieTableVersion tableVersion) throws Excep
456457
457458 assertRowNumberEqualsTo (20 );
458459 // write a delta_commit but does not commit
459- updateBatchWithoutCommit (WriteClientTestUtils .createNewInstantTime (),
460+ updateBatchWithoutCommit (client .createNewInstantTime (),
460461 Objects .requireNonNull (baseRecordsToUpdate , "The records to update should not be null" ), tableVersion );
461462 // rollback the delta_commit
462463 assertTrue (writeClient .rollbackFailedWrites (metaClient ), "The last delta_commit should be rolled back" );
@@ -485,7 +486,7 @@ void rollbackWithAsyncServices_compactionCompletesDuringCommit() {
485486 // Run compaction while delta-commit is in-flight
486487 Option <String > compactionInstant = client .scheduleCompaction (Option .empty ());
487488 HoodieWriteMetadata result = client .compact (compactionInstant .get ());
488- client .commitCompaction (compactionInstant .get (), result , Option .empty ());
489+ client .commitCompaction (compactionInstant .get (), ( HoodieCommitMetadata ) result . getCommitMetadata (). get () , Option .empty ());
489490 // commit the inflight delta commit
490491 client .commit (inflightCommit , writeStatus );
491492
@@ -522,7 +523,7 @@ void rollbackWithAsyncServices_commitCompletesDuringCompaction() {
522523 // commit the inflight delta commit
523524 client .commit (inflightCommit , writeStatus );
524525 // commit the compaction instant after the delta commit
525- client .commitCompaction (compactionInstant .get (), result , Option .empty ());
526+ client .commitCompaction (compactionInstant .get (), ( HoodieCommitMetadata ) result . getCommitMetadata (). get () , Option .empty ());
526527
527528 client .savepoint (inflightCommit , "user1" , "Savepoint for commit that completed during compaction" );
528529
@@ -543,7 +544,7 @@ void rollbackWithAsyncServices_commitCompletesDuringCompaction() {
543544 }
544545
545546 @ ParameterizedTest
546- @ EnumSource (value = HoodieTableVersion .class , names = {"SIX" , "NINE " })
547+ @ EnumSource (value = HoodieTableVersion .class , names = {"SIX" , "EIGHT " })
547548 void rollbackWithAsyncServices_commitStartsAndFinishesDuringCompaction (HoodieTableVersion tableVersion ) {
548549 HoodieWriteConfig hoodieWriteConfig = getHoodieWriteConfigWithCompactionAndConcurrencyControl (tableVersion );
549550 try (SparkRDDWriteClient client = getHoodieWriteClient (hoodieWriteConfig )) {
@@ -560,7 +561,7 @@ void rollbackWithAsyncServices_commitStartsAndFinishesDuringCompaction(HoodieTab
560561 // commit the inflight delta commit
561562 client .commit (inflightCommit , writeStatus );
562563 // commit the compaction instant after the delta commit
563- client .commitCompaction (compactionInstant .get (), result , Option .empty ());
564+ client .commitCompaction (compactionInstant .get (), ( HoodieCommitMetadata ) result . getCommitMetadata (). get () , Option .empty ());
564565
565566 client .savepoint (inflightCommit , "user1" , "Savepoint for commit that completed during compaction" );
566567
@@ -581,7 +582,7 @@ void rollbackWithAsyncServices_commitStartsAndFinishesDuringCompaction(HoodieTab
581582 }
582583
583584 @ ParameterizedTest
584- @ EnumSource (value = HoodieTableVersion .class , names = {"SIX" , "NINE " })
585+ @ EnumSource (value = HoodieTableVersion .class , names = {"SIX" , "EIGHT " })
585586 void testMissingFileDoesNotFallRestore (HoodieTableVersion tableVersion ) throws Exception {
586587 HoodieWriteConfig hoodieWriteConfig = getHoodieWriteConfigAndInitializeTable (HoodieCompactionConfig .newBuilder ()
587588 .withMaxNumDeltaCommitsBeforeCompaction (4 )
@@ -657,7 +658,6 @@ private HoodieWriteConfig getHoodieWriteConfigWithCompactionAndConcurrencyContro
657658 .withAutoUpgradeVersion (false )
658659 .withWriteTableVersion (tableVersion .versionCode ())
659660 .withMetadataConfig (HoodieMetadataConfig .newBuilder ()
660- .withStreamingWriteEnabled (tableVersion .greaterThanOrEquals (HoodieTableVersion .EIGHT ))
661661 .build ())
662662 .withProps (Collections .singletonMap (HoodieCompactionConfig .PARQUET_SMALL_FILE_LIMIT .key (), "0" ))
663663 .build ();
@@ -677,9 +677,9 @@ private void validateFilesMetadata(HoodieWriteConfig writeConfig) {
677677 HoodieTableFileSystemView metadataBasedView = (HoodieTableFileSystemView ) FileSystemViewManager
678678 .createViewManager (context , writeConfig .getMetadataConfig (), viewStorageConfig , writeConfig .getCommonConfig (),
679679 (SerializableFunctionUnchecked <HoodieTableMetaClient , HoodieTableMetadata >) v1 ->
680- metaClient . getTableFormat (). getMetadataFactory () .create (context , metaClient .getStorage (), writeConfig .getMetadataConfig (), writeConfig .getBasePath ()))
680+ HoodieTableMetadata .create (context , metaClient .getStorage (), writeConfig .getMetadataConfig (), writeConfig .getBasePath ()))
681681 .getFileSystemView (basePath );
682- assertForFSVEquality (fileListingBasedView , metadataBasedView , true , Option . empty () );
682+ assertForFSVEquality (fileListingBasedView , metadataBasedView , true );
683683 }
684684
685685 private String upsertBatch (SparkRDDWriteClient client , List <HoodieRecord > baseRecordsToUpdate ) throws IOException {
@@ -698,7 +698,6 @@ private void compactWithoutCommit(String compactionInstantTime, HoodieTableVersi
698698 .withAutoUpgradeVersion (false )
699699 .withWriteTableVersion (tableVersion .versionCode ())
700700 .withMetadataConfig (HoodieMetadataConfig .newBuilder ()
701- .withStreamingWriteEnabled (tableVersion .greaterThanOrEquals (HoodieTableVersion .EIGHT ))
702701 .build ())
703702 .build ();
704703
@@ -726,7 +725,6 @@ private HoodieWriteConfig getHoodieWriteConfigAndInitializeTable(HoodieCompactio
726725 .withAutoUpgradeVersion (false )
727726 .withWriteTableVersion (tableVersion .versionCode ())
728727 .withMetadataConfig (HoodieMetadataConfig .newBuilder ()
729- .withStreamingWriteEnabled (tableVersion .greaterThanOrEquals (HoodieTableVersion .EIGHT ))
730728 .build ())
731729 .build ();
732730 initMetaClient (HoodieTableType .MERGE_ON_READ , hoodieWriteConfig .getProps ());
0 commit comments