4040import org .apache .paimon .table .sink .BatchWriteBuilder ;
4141import org .apache .paimon .table .sink .CommitMessage ;
4242import org .apache .paimon .table .sink .CommitMessageImpl ;
43+ import org .apache .paimon .table .sink .StreamWriteBuilder ;
4344import org .apache .paimon .table .source .DataSplit ;
4445import org .apache .paimon .table .source .ReadBuilder ;
4546import org .apache .paimon .table .source .Split ;
4647import org .apache .paimon .types .DataTypes ;
48+ import org .apache .paimon .utils .SnapshotManager ;
4749import org .apache .paimon .utils .StringUtils ;
4850
4951import org .apache .paimon .shade .guava30 .com .google .common .collect .Lists ;
5658import java .util .HashMap ;
5759import java .util .List ;
5860import java .util .Map ;
61+ import java .util .Objects ;
5962import java .util .concurrent .ThreadLocalRandom ;
6063import java .util .stream .Collectors ;
6164
@@ -686,6 +689,7 @@ public void testClusterWithDeletionVector() throws Exception {
686689 @ Test
687690 public void testClusterWithBucket () throws Exception {
688691 Map <String , String > dynamicOptions = commonOptions ();
692+ dynamicOptions .put (CoreOptions .WRITE_ONLY .key (), "true" );
689693 dynamicOptions .put (CoreOptions .BUCKET .key (), "2" );
690694 dynamicOptions .put (CoreOptions .BUCKET_KEY .key (), "pt" );
691695 dynamicOptions .put (CoreOptions .BUCKET_APPEND_ORDERED .key (), "false" );
@@ -848,6 +852,35 @@ public void testClusterWithBucket() throws Exception {
848852 assertThat (result5 ).containsExactlyElementsOf (expected5 );
849853 }
850854
855+ @ Test
856+ public void testStreamingClusterWithBucket () throws Exception {
857+ Map <String , String > dynamicOptions = commonOptions ();
858+ dynamicOptions .put (CoreOptions .WRITE_ONLY .key (), "true" );
859+ dynamicOptions .put (CoreOptions .BUCKET .key (), "1" );
860+ dynamicOptions .put (CoreOptions .BUCKET_KEY .key (), "pt" );
861+ dynamicOptions .put (CoreOptions .BUCKET_APPEND_ORDERED .key (), "false" );
862+ dynamicOptions .put (CoreOptions .CONTINUOUS_DISCOVERY_INTERVAL .key (), "1s" );
863+ FileStoreTable table = createTable (null , dynamicOptions );
864+ StreamWriteBuilder streamWriteBuilder =
865+ table .newStreamWriteBuilder ().withCommitUser (commitUser );
866+ write = streamWriteBuilder .newWrite ();
867+ commit = streamWriteBuilder .newCommit ();
868+
869+ // base records
870+ writeData (GenericRow .of (2 , 2 , BinaryString .fromString ("test" ), 0 ));
871+ writeData (GenericRow .of (2 , 1 , BinaryString .fromString ("test" ), 0 ));
872+ writeData (GenericRow .of (2 , 0 , BinaryString .fromString ("test" ), 0 ));
873+
874+ checkSnapshot (table , Snapshot .CommitKind .APPEND );
875+ runAction (true , Collections .emptyList ());
876+ checkSnapshot (table , 4 , Snapshot .CommitKind .COMPACT , 60_000 );
877+
878+ // incremental records
879+ writeData (GenericRow .of (1 , 2 , BinaryString .fromString ("test" ), 0 ));
880+ writeData (GenericRow .of (1 , 1 , BinaryString .fromString ("test" ), 0 ));
881+ checkSnapshot (table , 7 , Snapshot .CommitKind .COMPACT , 60_000 );
882+ }
883+
851884 protected FileStoreTable createTable (String partitionKeys ) throws Exception {
852885 return createTable (partitionKeys , commonOptions ());
853886 }
@@ -923,8 +956,28 @@ private static String randomString(int length) {
923956 }
924957
925958 private void checkSnapshot (FileStoreTable table ) {
926- assertThat (table .latestSnapshot ().get ().commitKind ())
927- .isEqualTo (Snapshot .CommitKind .COMPACT );
959+ checkSnapshot (table , Snapshot .CommitKind .COMPACT );
960+ }
961+
962+ private void checkSnapshot (FileStoreTable table , Snapshot .CommitKind commitKind ) {
963+ assertThat (table .latestSnapshot ().get ().commitKind ()).isEqualTo (commitKind );
964+ }
965+
966+ protected void checkSnapshot (
967+ FileStoreTable table , long snapshotId , Snapshot .CommitKind commitKind , long timeout )
968+ throws Exception {
969+ SnapshotManager snapshotManager = table .snapshotManager ();
970+ long start = System .currentTimeMillis ();
971+ while (!Objects .equals (snapshotManager .latestSnapshotId (), snapshotId )) {
972+ Thread .sleep (500 );
973+ if (System .currentTimeMillis () - start > timeout ) {
974+ throw new RuntimeException ("can't wait for a compaction." );
975+ }
976+ }
977+
978+ Snapshot snapshot = snapshotManager .snapshot (snapshotManager .latestSnapshotId ());
979+ assertThat (snapshot .id ()).isEqualTo (snapshotId );
980+ assertThat (snapshot .commitKind ()).isEqualTo (commitKind );
928981 }
929982
930983 private List <CommitMessage > produceDvIndexMessages (
@@ -954,7 +1007,16 @@ private List<CommitMessage> produceDvIndexMessages(
9541007 }
9551008
9561009 private void runAction (List <String > extra ) throws Exception {
957- StreamExecutionEnvironment env = streamExecutionEnvironmentBuilder ().batchMode ().build ();
1010+ runAction (false , extra );
1011+ }
1012+
1013+ private void runAction (boolean isStreaming , List <String > extra ) throws Exception {
1014+ StreamExecutionEnvironment env ;
1015+ if (isStreaming ) {
1016+ env = streamExecutionEnvironmentBuilder ().streamingMode ().build ();
1017+ } else {
1018+ env = streamExecutionEnvironmentBuilder ().batchMode ().build ();
1019+ }
9581020 ArrayList <String > baseArgs =
9591021 Lists .newArrayList ("compact" , "--database" , database , "--table" , tableName );
9601022 ThreadLocalRandom random = ThreadLocalRandom .current ();
@@ -966,7 +1028,12 @@ private void runAction(List<String> extra) throws Exception {
9661028 baseArgs .addAll (extra );
9671029
9681030 CompactAction action = createAction (CompactAction .class , baseArgs .toArray (new String [0 ]));
969- action .withStreamExecutionEnvironment (env );
970- action .run ();
1031+ action .withStreamExecutionEnvironment (env ).build ();
1032+ if (isStreaming ) {
1033+ env .executeAsync ();
1034+ } else {
1035+ env .execute ();
1036+ }
1037+ // action.run();
9711038 }
9721039}
0 commit comments