4444import java .util .List ;
4545import java .util .Map ;
4646import java .util .Random ;
47+ import java .util .concurrent .CountDownLatch ;
4748import java .util .concurrent .ExecutorService ;
4849import java .util .concurrent .Executors ;
4950import java .util .concurrent .TimeUnit ;
51+ import java .util .concurrent .atomic .AtomicInteger ;
5052
5153import static com .alipay .oceanbase .rpc .filter .ObTableFilterFactory .compareVal ;
5254import static com .alipay .oceanbase .rpc .mutation .MutationFactory .colVal ;
@@ -133,7 +135,6 @@ PRIMARY KEY(`K`, `Q`, `T`)
133135 public void testOneLevelKeyPartition () throws Exception {
134136 String table_name = "testKey" ;
135137 BatchOperation batchOperation = client .batchOperation (table_name );
136- // client.setRunningMode(ObTableClient.RunningMode.HBASE);
137138 Object values [][] = { { "K_val1" , "Q_val1" , 1L , "V_val1" },
138139 { "K_val2" , "Q_val2" , 101L , "V_val2" }, { "K_val3" , "Q_val3" , 501L , "V_val3" },
139140 { "K_val4" , "Q_val4" , 1001L , "V_val4" }, { "K_val5" , "Q_val5" , 5001L , "V_val5" },
@@ -187,13 +188,12 @@ public void testOneLevelKeyPartition() throws Exception {
187188 `V` varbinary(1024),
188189 INDEX i1(`K`, `V`) local,
189190 PRIMARY KEY(`K`, `Q`, `T`)
190- ) partition by hash(`K`) partitions 16 ;
191+ ) partition by hash(`K`) partitions 15 ;
191192 * */
192193 @ Test
193194 public void testOneLevelHashPartition () throws Exception {
194195 String table_name = "testHash" ;
195196 BatchOperation batchOperation = client .batchOperation (table_name );
196- // client.setRunningMode(ObTableClient.RunningMode.HBASE);
197197 Object values [][] = { { 1L , "Q_val1" , 1L , "V_val1" }, { 10L , "Q_val2" , 101L , "V_val2" },
198198 { 501L , "Q_val3" , 501L , "V_val3" }, { 1001L , "Q_val4" , 1001L , "V_val4" },
199199 { 5001L , "Q_val5" , 5001L , "V_val5" }, { 10001L , "Q_val6" , 10001L , "V_val6" }, };
@@ -386,7 +386,6 @@ SUBPARTITION p2sp2 VALUES LESS THAN (MAXVALUE, MAXVALUE, MAXVALUE)
386386 public void testTwoLevelRangePartition () throws Exception {
387387 client .setRunningMode (ObTableClient .RunningMode .NORMAL );
388388 String testTable = "testPartitionRangeComplex" ;
389- client .addRowKeyElement (testTable , new String [] { "c1" , "c2" , "c3" , "c4" });
390389 Random rng = new Random ();
391390 try {
392391 cleanTable (testTable );
@@ -423,59 +422,72 @@ public void testConcurrentGetPartition() throws Exception {
423422 String [] table_names = { "testHash" , "testKey" , "testRange" };
424423 ExecutorService executorService = Executors .newFixedThreadPool (10 );
425424 Random random = new Random ();
425+ AtomicInteger cnt = new AtomicInteger (0 );
426+ CountDownLatch latch = new CountDownLatch (20 );
426427
427428 try {
428429 for (int i = 0 ; i < 20 ; ++i ) {
429430 executorService .submit (() -> {
430431 try {
431432 String table_name = table_names [random .nextInt (table_names .length )];
432- List <Partition > partitions = client .getPartition (table_name , false );
433433 if (table_name .equalsIgnoreCase ("testHash" )) {
434- Assert .assertEquals (15 , partitions .size ());
435- for (Partition partition : partitions ) {
436- System .out .println ("testHash: " + partition .toString ());
437- }
438- MutationResult resultSet = client .insertOrUpdate ("testHash" )
439- .setRowKey (row (colVal ("K" , random .nextInt ()), colVal ("Q" , "Q_val1" ), colVal ("T" , System .currentTimeMillis ())))
434+ MutationResult resultSet = client .insert ("testHash" )
435+ .setRowKey (row (colVal ("K" , random .nextLong ()), colVal ("Q" , "Q_val1" ), colVal ("T" , System .currentTimeMillis ())))
440436 .addMutateRow (row (colVal ("V" , "V_val1" ))).execute ();
441437 Assert .assertEquals (1 , resultSet .getAffectedRows ());
442- } else if ( table_name . equalsIgnoreCase ( "testKey" )) {
438+ List < Partition > partitions = client . getPartition ( table_name , false );
443439 Assert .assertEquals (15 , partitions .size ());
444440 for (Partition partition : partitions ) {
445- System .out .println ("testKey : " + partition .toString ());
441+ System .out .println ("testHash : " + partition .toString ());
446442 }
447- byte [] bytes = new byte []{};
443+ cnt .getAndIncrement ();
444+ } else if (table_name .equalsIgnoreCase ("testKey" )) {
445+ byte [] bytes = new byte [10 ];
448446 random .nextBytes (bytes );
449- MutationResult resultSet = client .insertOrUpdate ("testKey" )
447+ MutationResult resultSet = client .insert ("testKey" )
450448 .setRowKey (row (colVal ("K" , bytes ), colVal ("Q" , "Q_val1" ), colVal ("T" , System .currentTimeMillis ())))
451449 .addMutateRow (row (colVal ("V" , "V_val1" ))).execute ();
452450 Assert .assertEquals (1 , resultSet .getAffectedRows ());
453- } else {
454- Assert .assertEquals (3 , partitions .size ());
451+ List < Partition > partitions = client . getPartition ( table_name , false );
452+ Assert .assertEquals (15 , partitions .size ());
455453 for (Partition partition : partitions ) {
456- System .out .println ("testRange : " + partition .toString ());
454+ System .out .println ("testHash : " + partition .toString ());
457455 }
458- MutationResult resultSet = client .insertOrUpdate ("testRange" )
456+ cnt .getAndIncrement ();
457+ } else {
458+ MutationResult resultSet = client .insert ("testRange" )
459459 .setRowKey (row (colVal ("c1" , random .nextInt ()), colVal ("c2" , "c2_val1" )))
460460 .addMutateRow (row (colVal ("c3" , "c3_val1" ), colVal ("c4" , 10L ))).execute ();
461461 Assert .assertEquals (1 , resultSet .getAffectedRows ());
462+ List <Partition > partitions = client .getPartition (table_name , false );
463+ Assert .assertEquals (3 , partitions .size ());
464+ for (Partition partition : partitions ) {
465+ System .out .println ("testHash: " + partition .toString ());
466+ }
467+ cnt .getAndIncrement ();
462468 }
463- } catch (Exception e ) {
464- throw new RuntimeException (e );
469+ } catch (Throwable t ) {
470+ t .printStackTrace ();
471+ throw new RuntimeException (t );
472+ } finally {
473+ latch .countDown ();
465474 }
466475 });
467476 }
468- } catch (Exception e ) {
469- e .printStackTrace ();
477+ latch .await ();
478+ Assert .assertEquals (20 , cnt .get ());
479+ } catch (Throwable t ) {
480+ t .printStackTrace ();
470481 Assert .assertTrue (false );
471482 } finally {
472483 executorService .shutdown ();
473484 try {
474485 // wait for all tasks done
475- if (!executorService .awaitTermination (500L , TimeUnit .MILLISECONDS )) {
486+ if (!executorService .awaitTermination (2000L , TimeUnit .MILLISECONDS )) {
476487 executorService .shutdownNow ();
477- if (!executorService .awaitTermination (500L , TimeUnit .MILLISECONDS )) {
488+ if (!executorService .awaitTermination (2000L , TimeUnit .MILLISECONDS )) {
478489 System .err .println ("the thread pool did not shut down" );
490+ Assert .assertTrue (false );
479491 }
480492 }
481493 cleanTable ("testHash" );
@@ -484,6 +496,7 @@ public void testConcurrentGetPartition() throws Exception {
484496 } catch (InterruptedException ie ) {
485497 executorService .shutdownNow ();
486498 Thread .currentThread ().interrupt ();
499+ Assert .assertTrue (false );
487500 }
488501 }
489502 }
0 commit comments