2020import org .apache .fluss .client .admin .ClientToServerITCaseBase ;
2121import org .apache .fluss .client .table .Table ;
2222import org .apache .fluss .client .table .writer .UpsertWriter ;
23+ import org .apache .fluss .metadata .PartitionInfo ;
2324import org .apache .fluss .metadata .PartitionSpec ;
2425import org .apache .fluss .metadata .Schema ;
2526import org .apache .fluss .metadata .TableBucket ;
3334import org .junit .jupiter .api .BeforeEach ;
3435import org .junit .jupiter .api .Test ;
3536
36- import java .time .Duration ;
3737import java .util .ArrayList ;
3838import java .util .Comparator ;
3939import java .util .List ;
@@ -56,7 +56,6 @@ protected void teardown() throws Exception {
5656
5757 @ Test
5858 void testBasicScan () throws Exception {
59- System .out .println ("eddww" );
6059 TablePath tablePath = TablePath .of ("test_db" , "test_basic_scan" );
6160 Schema schema =
6261 Schema .newBuilder ()
@@ -68,7 +67,6 @@ void testBasicScan() throws Exception {
6867 TableDescriptor .builder ().schema (schema ).distributedBy (1 , "id" ).build ();
6968
7069 createTable (tablePath , descriptor , true );
71- System .out .println ("eddww" );
7270
7371 Table table = conn .getTable (tablePath );
7472
@@ -78,11 +76,10 @@ void testBasicScan() throws Exception {
7876 writer .upsert (row (2 , "b" ));
7977 writer .upsert (row (3 , "c" ));
8078 writer .flush ();
81- System .out .println ("never reaches this" );
8279
83- // 2. test the kvScan works as expected
80+ // 2. test the snapshotQuery works as expected
8481 TableBucket bucket = new TableBucket (table .getTableInfo ().getTableId (), 0 );
85- List <InternalRow > result = scanAll (table , bucket );
82+ List <InternalRow > result = snapshotQueryAll (table , bucket );
8683
8784 assertThat (result ).hasSize (3 );
8885 result .sort (Comparator .comparingInt (r -> r .getInt (0 )));
@@ -119,7 +116,7 @@ void testMultiBucketScan() throws Exception {
119116 List <InternalRow > allResult = new ArrayList <>();
120117 for (int i = 0 ; i < 3 ; i ++) {
121118 TableBucket bucket = new TableBucket (table .getTableInfo ().getTableId (), i );
122- allResult .addAll (scanAll (table , bucket ));
119+ allResult .addAll (snapshotQueryAll (table , bucket ));
123120 }
124121
125122 assertThat (allResult ).hasSize (rowCount );
@@ -163,30 +160,27 @@ void testPartitionedTableScan() throws Exception {
163160 Table table = conn .getTable (tablePath );
164161 long p1Id = -1 ;
165162 long p2Id = -1 ;
166- for ( org . apache . fluss . metadata . PartitionInfo p :
167- admin . listPartitionInfos ( tablePath ). get () ) {
168- if (p .getPartitionName ().equals ("p= p1" )) {
163+ List < PartitionInfo > partitionInfos = admin . listPartitionInfos ( tablePath ). get ();
164+ for ( PartitionInfo p : partitionInfos ) {
165+ if (p .getPartitionName ().equals ("p1" )) {
169166 p1Id = p .getPartitionId ();
170- } else if (p .getPartitionName ().equals ("p= p2" )) {
167+ } else if (p .getPartitionName ().equals ("p2" )) {
171168 p2Id = p .getPartitionId ();
172169 }
173170 }
174171
175- // 1. write data to different partitions
176172 UpsertWriter writer = table .newUpsert ().createWriter ();
177173 writer .upsert (row (1 , "p1" , "a1" ));
178174 writer .upsert (row (2 , "p1" , "b1" ));
179175 writer .upsert (row (1 , "p2" , "a2" ));
180176 writer .flush ();
181177
182- // 2. scan partition p1
183178 TableBucket p1Bucket = new TableBucket (table .getTableInfo ().getTableId (), p1Id , 0 );
184- List <InternalRow > p1Result = scanAll (table , p1Bucket );
179+ List <InternalRow > p1Result = snapshotQueryAll (table , p1Bucket );
185180 assertThat (p1Result ).hasSize (2 );
186181
187- // 3. scan partition p2
188182 TableBucket p2Bucket = new TableBucket (table .getTableInfo ().getTableId (), p2Id , 0 );
189- List <InternalRow > p2Result = scanAll (table , p2Bucket );
183+ List <InternalRow > p2Result = snapshotQueryAll (table , p2Bucket );
190184 assertThat (p2Result ).hasSize (1 );
191185 assertThatRow (p2Result .get (0 ))
192186 .withSchema (schema .getRowType ())
@@ -218,7 +212,7 @@ void testLargeDataScan() throws Exception {
218212
219213 // 2. scan and verify
220214 TableBucket bucket = new TableBucket (table .getTableInfo ().getTableId (), 0 );
221- List <InternalRow > result = scanAll (table , bucket );
215+ List <InternalRow > result = snapshotQueryAll (table , bucket );
222216
223217 assertThat (result ).hasSize (rowCount );
224218 result .sort (Comparator .comparingInt (r -> r .getInt (0 )));
@@ -253,12 +247,7 @@ void testSnapshotQuery() throws Exception {
253247
254248 // 2. test the snapshotQuery works as expected
255249 TableBucket bucket = new TableBucket (table .getTableInfo ().getTableId (), 0 );
256- List <InternalRow > result = new ArrayList <>();
257- try (CloseableIterator <InternalRow > iterator = table .newSnapshotQuery ().execute (bucket )) {
258- while (iterator .hasNext ()) {
259- result .add (iterator .next ());
260- }
261- }
250+ List <InternalRow > result = snapshotQueryAll (table , bucket );
262251
263252 assertThat (result ).hasSize (3 );
264253 result .sort (Comparator .comparingInt (r -> r .getInt (0 )));
@@ -267,15 +256,11 @@ void testSnapshotQuery() throws Exception {
267256 assertThatRow (result .get (2 )).withSchema (schema .getRowType ()).isEqualTo (row (3 , "c" ));
268257 }
269258
270- private List <InternalRow > scanAll (Table table , TableBucket bucket ) throws Exception {
259+ private List <InternalRow > snapshotQueryAll (Table table , TableBucket bucket ) throws Exception {
271260 List <InternalRow > allRows = new ArrayList <>();
272- try (BatchScanner scanner = table .newScan ().createBatchScanner (bucket )) {
273- CloseableIterator <InternalRow > iterator ;
274- while ((iterator = scanner .pollBatch (Duration .ofSeconds (5 ))) != null ) {
275- while (iterator .hasNext ()) {
276- allRows .add (iterator .next ());
277- }
278- iterator .close ();
261+ try (CloseableIterator <InternalRow > iterator = table .newSnapshotQuery ().execute (bucket )) {
262+ while (iterator .hasNext ()) {
263+ allRows .add (iterator .next ());
279264 }
280265 }
281266 return allRows ;
0 commit comments