Skip to content

Commit 4bf91e3

Browse files
committed
pass all situations to get ODP partition meta
1 parent f78d0d3 commit 4bf91e3

File tree

3 files changed

+146
-8
lines changed

3 files changed

+146
-8
lines changed

src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2983,7 +2983,7 @@ private TableEntry getOrFetchODPPartitionMeta(String tableName, boolean needRene
29832983
}
29842984
}
29852985

2986-
private void checkObFetchPartitionMetaResult(Long lastRefreshMetadataTimestamp, ObFetchPartitionMetaRequest request,
2986+
private void checkObFetchPartitionMetaResult(Long lastOdpRefreshTimeMills, ObFetchPartitionMetaRequest request,
29872987
ObPayload result) {
29882988
if (result == null) {
29892989
RUNTIME.error("client get unexpected NULL result");
@@ -2996,8 +2996,8 @@ private void checkObFetchPartitionMetaResult(Long lastRefreshMetadataTimestamp,
29962996
+ result.getClass().getName());
29972997
}
29982998

2999-
if (lastRefreshMetadataTimestamp != null) {
3000-
if (lastRefreshMetadataTimestamp >= ((ObFetchPartitionMetaResult) result).getCreateTime()) {
2999+
if (lastOdpRefreshTimeMills != null) {
3000+
if (lastOdpRefreshTimeMills >= ((ObFetchPartitionMetaResult) result).getCreateTime()) {
30013001
throw new ObTableException("client get outdated result from ODP");
30023002
}
30033003
}

src/main/java/com/alipay/oceanbase/rpc/property/Property.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ public enum Property {
7676

7777
TABLE_ENTRY_REFRESH_LOCK_TIMEOUT("table.entry.refresh.lock.timeout", 4000L, "刷新TABLE地址的锁超时时间"),
7878

79-
ODP_TABLE_ENTRY_REFRESH_LOCK_TIMEOUT("odp.table.entry.refresh.lock.timeout", 10000L,
79+
ODP_TABLE_ENTRY_REFRESH_LOCK_TIMEOUT("odp.table.entry.refresh.lock.timeout", 5000L,
8080
"刷新ODP TABLE地址的锁超时时间"),
8181

8282
TABLE_ENTRY_REFRESH_TRY_TIMES("table.entry.refresh.try.times", 3, "刷新TABLE地址的尝试次数"),

src/test/java/com/alipay/oceanbase/rpc/ODPGetPartitionMetaTest.java

Lines changed: 142 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@
77
import com.alipay.oceanbase.rpc.mutation.result.BatchOperationResult;
88
import com.alipay.oceanbase.rpc.mutation.result.MutationResult;
99
import com.alipay.oceanbase.rpc.stream.QueryResultSet;
10+
import com.alipay.oceanbase.rpc.table.ConcurrentTask;
11+
import com.alipay.oceanbase.rpc.table.ConcurrentTaskExecutor;
12+
import com.alipay.oceanbase.rpc.table.api.TableQuery;
13+
import com.alipay.oceanbase.rpc.threadlocal.ThreadLocalMap;
1014
import com.alipay.oceanbase.rpc.util.ObTableClientTestUtil;
1115
import org.junit.Assert;
1216
import org.junit.Before;
@@ -18,11 +22,15 @@
1822
import java.util.List;
1923
import java.util.Map;
2024
import java.util.Random;
25+
import java.util.concurrent.ExecutorService;
26+
import java.util.concurrent.Executors;
27+
import java.util.concurrent.TimeUnit;
2128

2229
import static com.alipay.oceanbase.rpc.mutation.MutationFactory.colVal;
2330
import static com.alipay.oceanbase.rpc.mutation.MutationFactory.row;
2431
import static com.alipay.oceanbase.rpc.util.ObTableClientTestUtil.cleanTable;
2532
import static com.alipay.oceanbase.rpc.util.ObTableClientTestUtil.generateRandomStringByUUID;
33+
import static com.alipay.oceanbase.rpc.util.TableClientLoggerFactory.LCD;
2634
import static java.lang.StrictMath.abs;
2735

2836
public class ODPGetPartitionMetaTest {
@@ -224,10 +232,10 @@ public void testOneLevelSubStrKeyPartition() throws Exception {
224232
// batchOperation.addOperation(insertOrUpdate);
225233
// }
226234
// BatchOperationResult batchOperationResult = batchOperation.execute();
227-
MutationResult res = client.insert(table_name)
228-
.setRowKey(row(colVal("K", "K_val1"), colVal("Q", "Q_val1".getBytes()), colVal("T", 1L)))
229-
.addMutateRow(row(colVal("V", "V_val1".getBytes()))).execute();
230-
Assert.assertEquals(1, res.getAffectedRows());
235+
// MutationResult res = client.insert(table_name)
236+
// .setRowKey(row(colVal("K", "K_val1"), colVal("Q", "Q_val1".getBytes()), colVal("T", 1L)))
237+
// .addMutateRow(row(colVal("V", "V_val1".getBytes()))).execute();
238+
// Assert.assertEquals(1, res.getAffectedRows());
231239
// test get all partitions
232240
List<Partition> partitions = client.getPartition(table_name);
233241
Assert.assertEquals(15, partitions.size());
@@ -435,4 +443,134 @@ public void testTwoLevelRangePartition() throws Exception {
435443
}
436444
}
437445

446+
@Test
447+
public void testConcurrentGetPartition() throws Exception {
448+
String[] table_names = { "testHash", "testKey", "testRange" };
449+
ExecutorService executorService = Executors.newFixedThreadPool(10);
450+
Random random = new Random();
451+
452+
try {
453+
for (int i = 0; i < 20; ++i) {
454+
executorService.submit(() -> {
455+
try {
456+
String table_name = table_names[random.nextInt(table_names.length)];
457+
List<Partition> partitions = client.getPartition(table_name);
458+
if (table_name.equalsIgnoreCase("testHash")) {
459+
Assert.assertEquals(15, partitions.size());
460+
for (Partition partition : partitions) {
461+
System.out.println("testHash: " + partition.toString());
462+
}
463+
MutationResult resultSet = client.insertOrUpdate("testHash")
464+
.setRowKey(row(colVal("K", random.nextInt()), colVal("Q", "Q_val1"), colVal("T", System.currentTimeMillis())))
465+
.addMutateRow(row(colVal("V", "V_val1"))).execute();
466+
Assert.assertEquals(1, resultSet.getAffectedRows());
467+
} else if (table_name.equalsIgnoreCase("testKey")) {
468+
Assert.assertEquals(15, partitions.size());
469+
for (Partition partition : partitions) {
470+
System.out.println("testKey: " + partition.toString());
471+
}
472+
byte[] bytes = new byte[]{};
473+
random.nextBytes(bytes);
474+
MutationResult resultSet = client.insertOrUpdate("testKey")
475+
.setRowKey(row(colVal("K", bytes), colVal("Q", "Q_val1"), colVal("T", System.currentTimeMillis())))
476+
.addMutateRow(row(colVal("V", "V_val1"))).execute();
477+
Assert.assertEquals(1, resultSet.getAffectedRows());
478+
} else {
479+
Assert.assertEquals(3, partitions.size());
480+
for (Partition partition : partitions) {
481+
System.out.println("testRange: " + partition.toString());
482+
}
483+
MutationResult resultSet = client.insertOrUpdate("testRange")
484+
.setRowKey(row(colVal("c1", random.nextInt()), colVal("c2", "c2_val1")))
485+
.addMutateRow(row(colVal("c3", "c3_val1"), colVal("c4", 10L))).execute();
486+
Assert.assertEquals(1, resultSet.getAffectedRows());
487+
}
488+
} catch (Exception e) {
489+
throw new RuntimeException(e);
490+
}
491+
});
492+
}
493+
} catch (Exception e) {
494+
e.printStackTrace();
495+
Assert.assertTrue(false);
496+
} finally {
497+
executorService.shutdown();
498+
try {
499+
// wait for all tasks done
500+
if (!executorService.awaitTermination(500L, TimeUnit.MILLISECONDS)) {
501+
executorService.shutdownNow();
502+
if (!executorService.awaitTermination(500L, TimeUnit.MILLISECONDS)) {
503+
System.err.println("the thread pool did not shut down");
504+
}
505+
}
506+
cleanTable("testHash");
507+
cleanTable("testKey");
508+
cleanTable("testRange");
509+
} catch (InterruptedException ie) {
510+
executorService.shutdownNow();
511+
Thread.currentThread().interrupt();
512+
}
513+
}
514+
}
515+
516+
@Test
517+
public void testReFetchPartitionMeta() throws Exception {
518+
String table_name = "testRange";
519+
BatchOperation batchOperation = client.batchOperation(table_name);
520+
Object values[][] = { { 1, "c2_val1", "c3_val1", 1L }, { 101, "c2_val1", "c3_val1", 101L },
521+
{ 501, "c2_val1", "c3_val1", 501L }, { 901, "c2_val1", "c3_val1", 901L },
522+
{ 1001, "c2_val1", "c3_val1", 1001L }, { 1501, "c2_val1", "c3_val1", 1501L }, };
523+
int rowCnt = values.length;
524+
try {
525+
MutationResult resultSet = client.insertOrUpdate("testRange")
526+
.setRowKey(row(colVal("c1", 10), colVal("c2", "c2_val1")))
527+
.addMutateRow(row(colVal("c3", "c3_val1"), colVal("c4", 10L))).execute();
528+
Assert.assertEquals(1, resultSet.getAffectedRows());
529+
// need to manually breakpoint here to change table schema in database
530+
resultSet = client.insertOrUpdate("testRange")
531+
.setRowKey(row(colVal("c1", 10), colVal("c2", "c2_val1")))
532+
.addMutateRow(row(colVal("c3", "c3_val1"), colVal("c4", 10L))).execute();
533+
Assert.assertEquals(1, resultSet.getAffectedRows());
534+
535+
// test batch insert in ODP mode
536+
for (int i = 0; i < rowCnt; i++) {
537+
Object[] curRow = values[i];
538+
InsertOrUpdate insertOrUpdate = new InsertOrUpdate();
539+
insertOrUpdate.setRowKey(row(colVal("c1", curRow[0]), colVal("c2", curRow[1])));
540+
insertOrUpdate.addMutateRow(row(colVal("c3", curRow[2]), colVal("c4", curRow[3])));
541+
batchOperation.addOperation(insertOrUpdate);
542+
}
543+
BatchOperationResult batchOperationResult = batchOperation.execute();
544+
Assert.assertEquals(rowCnt, batchOperationResult.size());
545+
for (int j = 0; j < rowCnt; j++) {
546+
Assert.assertEquals(1, batchOperationResult.get(j).getAffectedRows());
547+
}
548+
// need to manually breakpoint here to change table schema in database
549+
batchOperationResult = batchOperation.execute();
550+
Assert.assertEquals(rowCnt, batchOperationResult.size());
551+
for (int j = 0; j < rowCnt; j++) {
552+
Assert.assertEquals(1, batchOperationResult.get(j).getAffectedRows());
553+
}
554+
555+
QueryResultSet result = client.query("testRange")
556+
.addScanRange(new Object[] { 1, "c2_val1" },
557+
new Object[] { 2000, "c2_val1" })
558+
.select("c1", "c2", "c3", "c4").execute();
559+
Assert.assertEquals(rowCnt + 1, result.cacheSize());
560+
// need to manually breakpoint here to change table schema in database
561+
result = client.query("testRange")
562+
.addScanRange(new Object[] { 1, "c2_val1" },
563+
new Object[] { 2000, "c2_val1" })
564+
.select("c1", "c2", "c3", "c4").execute();
565+
Assert.assertEquals(1, result.cacheSize());
566+
567+
568+
} catch (Exception e) {
569+
e.printStackTrace();
570+
Assert.assertTrue(false);
571+
} finally {
572+
cleanTable(table_name);
573+
}
574+
}
575+
438576
}

0 commit comments

Comments
 (0)