Skip to content

Commit ff955e0

Browse files
committed
fix getPartition after merge
1 parent 2f250d9 commit ff955e0

File tree

4 files changed

+80
-66
lines changed

4 files changed

+80
-66
lines changed

src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2043,23 +2043,22 @@ private ObPair<Long, ObTableParam> getODPTableInternal(TableEntry odpTableEntry,
20432043
* @return
20442044
* @throws Exception
20452045
*/
2046-
private List<ObPair<Long, ReplicaLocation>> getPartitionReplica(TableEntry tableEntry,
2046+
private List<ObPair<ObPair<Long, Long>, ReplicaLocation>> getPartitionReplica(TableEntry tableEntry,
20472047
String tableName,
20482048
Row startRow,
20492049
boolean startIncluded,
20502050
Row endRow,
20512051
boolean endIncluded,
20522052
ObServerRoute route) throws Exception {
2053-
List<ObPair<Long, ReplicaLocation>> replicas = new ArrayList<>();
2053+
List<ObPair<ObPair<Long, Long>, ReplicaLocation>> replicas = new ArrayList<>();
20542054

20552055
if (!tableEntry.isPartitionTable() || tableEntry.getPartitionInfo().getLevel() == ObPartitionLevel.LEVEL_ZERO) {
20562056
if (ObGlobal.obVsnMajor() >= 4) {
20572057
long tabletId = getTabletIdByPartId(tableEntry, 0L);
20582058
ObPartitionLocationInfo locationInfo = getOrRefreshPartitionInfo(tableEntry, tableName, tabletId);
2059-
replicas.add(new ObPair<>(tabletId, getPartitionLocation(locationInfo, route)));
2059+
replicas.add(new ObPair<>(new ObPair<>(0L, tabletId), getPartitionLocation(locationInfo, route)));
20602060
} else {
2061-
replicas.add(new ObPair<Long, ReplicaLocation>(0L, getPartitionLocation(tableEntry, 0L,
2062-
route)));
2061+
replicas.add(new ObPair<>(new ObPair<Long, Long>(0L, 0L), getPartitionLocation(tableEntry, 0L, route)));
20632062
}
20642063
return replicas;
20652064
}
@@ -2071,7 +2070,7 @@ private List<ObPair<Long, ReplicaLocation>> getPartitionReplica(TableEntry table
20712070
for (Long partId : partIds) {
20722071
long tabletId = getTabletIdByPartId(tableEntry, partId);
20732072
ObPartitionLocationInfo locationInfo = getOrRefreshPartitionInfo(tableEntry, tableName, tabletId);
2074-
replicas.add(new ObPair<>(tabletId, getPartitionLocation(locationInfo, route)));
2073+
replicas.add(new ObPair<>(new ObPair<>(partId, tabletId), getPartitionLocation(locationInfo, route)));
20752074
}
20762075
} else {
20772076
for (Long partId : partIds) {
@@ -2080,7 +2079,7 @@ private List<ObPair<Long, ReplicaLocation>> getPartitionReplica(TableEntry table
20802079
partitionId = ObPartIdCalculator.getPartIdx(partId, tableEntry
20812080
.getPartitionInfo().getSubPartDesc().getPartNum());
20822081
}
2083-
replicas.add(new ObPair<Long, ReplicaLocation>(partId, getPartitionLocation(
2082+
replicas.add(new ObPair<>(new ObPair<>(partId, partId), getPartitionLocation(
20842083
tableEntry, partitionId, route)));
20852084
}
20862085
}
@@ -2224,13 +2223,15 @@ public List<ObPair<Long, ObTableParam>> getTables(String tableName, ObTableQuery
22242223
}
22252224
}
22262225

2227-
List<ObPair<Long, ReplicaLocation>> partIdWithReplicaList = getPartitionReplica(tableEntry, tableName,
2226+
// partIdWithReplicaList -> List<Pair<Pair<logicId, tabletId> obTableParams>>
2227+
List<ObPair<ObPair<Long, Long>, ReplicaLocation>> partIdWithReplicaList = getPartitionReplica(tableEntry, tableName,
22282228
startRow, startInclusive, endRow, endInclusive, route);
22292229

22302230
// obTableParams -> List<Pair<logicId, obTableParams>>
22312231
List<ObPair<Long, ObTableParam>> obTableParams = new ArrayList<ObPair<Long, ObTableParam>>();
2232-
for (ObPair<Long, ReplicaLocation> partIdWithReplica : partIdWithReplicaList) {
2233-
long partId = partIdWithReplica.getLeft();
2232+
for (ObPair<ObPair<Long, Long>, ReplicaLocation> partIdWithReplica : partIdWithReplicaList) {
2233+
long partId = partIdWithReplica.getLeft().getLeft();
2234+
long partitionId = partIdWithReplica.getLeft().getRight();
22342235
ReplicaLocation replica = partIdWithReplica.getRight();
22352236
ObServerAddr addr = replica.getAddr();
22362237
ObTable obTable = tableRoster.get(addr);
@@ -2253,12 +2254,17 @@ public List<ObPair<Long, ObTableParam>> getTables(String tableName, ObTableQuery
22532254
}
22542255

22552256
ObTableParam param = new ObTableParam(obTable);
2257+
if (ObGlobal.obVsnMajor() >= 4 && tableEntry != null) {
2258+
ObPartitionLocationInfo partitionLocationInfo = getOrRefreshPartitionInfo(tableEntry, tableName, partitionId);
2259+
param.setLsId(partitionLocationInfo.getTabletLsId());
2260+
}
22562261
param.setTableId(tableEntry.getTableId());
2262+
param.setPartId(partId);
22572263
// real partition(tablet) id
2258-
param.setPartitionId(partId);
2264+
param.setPartitionId(partitionId);
22592265

22602266
addr.recordAccess();
2261-
obTableParams.add(new ObPair<Long, ObTableParam>(partIdWithReplica.getLeft(), param));
2267+
obTableParams.add(new ObPair<Long, ObTableParam>(partIdWithReplica.getLeft().getRight(), param));
22622268
}
22632269

22642270
return obTableParams;

src/test/java/com/alipay/oceanbase/rpc/ODPGetPartitionMetaTest.java

Lines changed: 32 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import com.alipay.oceanbase.rpc.util.ObTableClientTestUtil;
3636
import org.junit.Assert;
3737
import org.junit.Before;
38+
import org.junit.Ignore;
3839
import org.junit.Test;
3940

4041
import java.sql.Connection;
@@ -326,30 +327,33 @@ public void testOneLevelHashPartition() throws Exception {
326327

327328
/*
328329
* CREATE TABLE IF NOT EXISTS `testRange` (
329-
`c1` int NOT NULL,
330-
`c2` varchar(20) NOT NULL,
331-
`c3` varbinary(1024) DEFAULT NULL,
332-
`c4` bigint DEFAULT NULL,
333-
PRIMARY KEY(`c1`, `c2`)) partition by range columns (`c1`, `c2`) (
334-
PARTITION p0 VALUES LESS THAN (300, 't'),
335-
PARTITION p1 VALUES LESS THAN (1000, 'T'),
336-
PARTITION p2 VALUES LESS THAN (MAXVALUE, MAXVALUE));
330+
`K` varbinary(1024),
331+
`Q` varbinary(256),
332+
`T` bigint,
333+
`V` varbinary(10240),
334+
INDEX i1(`K`, `V`) local,
335+
PRIMARY KEY(`K`, `Q`, `T`)
336+
) partition by range columns (`K`) (
337+
PARTITION p0 VALUES LESS THAN ('a'),
338+
PARTITION p1 VALUES LESS THAN ('w'),
339+
PARTITION p2 VALUES LESS THAN MAXVALUE
340+
);
337341
* */
338342
@Test
339343
public void testOneLevelRangePartition() throws Exception {
340344
BatchOperation batchOperation = client.batchOperation(TABLE_NAME3);
341-
Object values[][] = { { 1, "c2_val1", "c3_val1", 1L }, { 101, "c2_val1", "c3_val1", 101L },
342-
{ 501, "c2_val1", "c3_val1", 501L }, { 901, "c2_val1", "c3_val1", 901L },
343-
{ 1001, "c2_val1", "c3_val1", 1001L }, { 1501, "c2_val1", "c3_val1", 1501L }, };
345+
Object values[][] = { { "ah", "c2_val1", 1L, "c3_val1" }, { "bw", "c2_val1", 101L, "c3_val1" },
346+
{ "ht", "c2_val1", 501L, "c3_val1" }, { "tw", "c2_val1", 901L, "c3_val1" },
347+
{ "xy", "c2_val1", 1001L, "c3_val1" }, { "zw", "c2_val1", 1501L, "c3_val1" } };
344348
int rowCnt = values.length;
345349

346350
try {
347351
// test batch insert in ODP mode
348352
for (int i = 0; i < rowCnt; i++) {
349353
Object[] curRow = values[i];
350354
InsertOrUpdate insertOrUpdate = new InsertOrUpdate();
351-
insertOrUpdate.setRowKey(row(colVal("c1", curRow[0]), colVal("c2", curRow[1])));
352-
insertOrUpdate.addMutateRow(row(colVal("c3", curRow[2]), colVal("c4", curRow[3])));
355+
insertOrUpdate.setRowKey(row(colVal("K", curRow[0]), colVal("Q", curRow[1]), colVal("T", curRow[2])));
356+
insertOrUpdate.addMutateRow(row(colVal("V", curRow[3])));
353357
batchOperation.addOperation(insertOrUpdate);
354358
}
355359
BatchOperationResult batchOperationResult = batchOperation.execute();
@@ -360,26 +364,26 @@ public void testOneLevelRangePartition() throws Exception {
360364
System.out.println(partition.toString());
361365
}
362366

363-
// test get the first partition
367+
// test get the first partition using nonexistent row
364368
Partition first_partition = client.getPartition(TABLE_NAME3,
365-
row(colVal("c1", 1L), colVal("c2", "c2_val")), false);
369+
row(colVal("K", "A"), colVal("Q", "bw"), colVal("T", 1L)), false);
366370
Assert.assertEquals(partitions.get(0).getPartitionId(),
367371
first_partition.getPartitionId());
368372
// test get the second partition
369373
Partition sec_partition = client.getPartition(TABLE_NAME3,
370-
row(colVal("c1", 401L), colVal("c2", "c2_val")), false);
374+
row(colVal("K", "ah"), colVal("Q", "bw"), colVal("T", 1L)), false);
371375
Assert.assertEquals(partitions.get(1).getPartitionId(), sec_partition.getPartitionId());
372376
// test get the same partition with the first partition key
373377
Partition partition1 = client.getPartition(TABLE_NAME3,
374-
row(colVal("c1", 101L), colVal("c2", "c2_val")), false);
378+
row(colVal("K", "B")), false);
375379
Assert.assertEquals(first_partition.getPartitionId(), partition1.getPartitionId());
376380
} catch (Exception e) {
377381
e.printStackTrace();
378382
Assert.assertTrue(false);
379383
} finally {
380384
for (int j = 0; j < rowCnt; j++) {
381385
Delete delete = client.delete(TABLE_NAME3);
382-
delete.setRowKey(row(colVal("c1", values[j][0]), colVal("c2", values[j][1])));
386+
delete.setRowKey(row(colVal("K", values[j][0]), colVal("Q", values[j][1]), colVal("T", values[j][2])));
383387
MutationResult res = delete.execute();
384388
Assert.assertEquals(1, res.getAffectedRows());
385389
}
@@ -537,9 +541,11 @@ public void testConcurrentGetPartition() throws Exception {
537541
}
538542
cnt.getAndIncrement();
539543
} else {
544+
byte[] bytes = new byte[10];
545+
random.nextBytes(bytes);
540546
MutationResult resultSet = client.insert(TABLE_NAME3)
541-
.setRowKey(row(colVal("c1", random.nextInt()), colVal("c2", "c2_val1")))
542-
.addMutateRow(row(colVal("c3", "c3_val1"), colVal("c4", 10L))).execute();
547+
.setRowKey(row(colVal("K", bytes), colVal("Q", "c2_val1"), colVal("T", random.nextLong())))
548+
.addMutateRow(row(colVal("V", "c3_val1"))).execute();
543549
Assert.assertEquals(1, resultSet.getAffectedRows());
544550
List<Partition> partitions = client.getPartition(table_name, false);
545551
Assert.assertEquals(3, partitions.size());
@@ -583,18 +589,18 @@ public void testConcurrentGetPartition() throws Exception {
583589
}
584590
}
585591

586-
@Test
592+
@Ignore
587593
public void testReFetchPartitionMeta() throws Exception {
588594
String table_name = TABLE_NAME3;
589595
BatchOperation batchOperation = client.batchOperation(table_name);
590-
Object values[][] = { { 1, "c2_val1", "c3_val1", 1L }, { 101, "c2_val1", "c3_val1", 101L },
591-
{ 501, "c2_val1", "c3_val1", 501L }, { 901, "c2_val1", "c3_val1", 901L },
592-
{ 1001, "c2_val1", "c3_val1", 1001L }, { 1501, "c2_val1", "c3_val1", 1501L }, };
596+
Object values[][] = { { "ah", "c2_val1", 1L, "c3_val1" }, { "bw", "c2_val1", 101L, "c3_val1" },
597+
{ "ht", "c2_val1", 501L, "c3_val1" }, { "tw", "c2_val1", 901L, "c3_val1" },
598+
{ "xy", "c2_val1", 1001L, "c3_val1" }, { "zw", "c2_val1", 1501L, "c3_val1" } };
593599
int rowCnt = values.length;
594600
try {
595601
MutationResult resultSet = client.insertOrUpdate(TABLE_NAME3)
596-
.setRowKey(row(colVal("c1", 10), colVal("c2", "c2_val1")))
597-
.addMutateRow(row(colVal("c3", "c3_val1"), colVal("c4", 10L))).execute();
602+
.setRowKey(row(colVal("K", "ah"), colVal("Q", "c2_val1"), colVal("T", 1L)))
603+
.addMutateRow(row(colVal("T", "c3_val1"))).execute();
598604
Assert.assertEquals(1, resultSet.getAffectedRows());
599605
// need to manually breakpoint here to change table schema in database
600606
resultSet = client.insertOrUpdate(TABLE_NAME3)

src/test/java/com/alipay/oceanbase/rpc/ObGetPartitionTest.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -248,21 +248,21 @@ public void testAddScanWithPartition() throws Exception {
248248

249249
Partition partition = client.getPartition(TABLE_NAME,
250250
row(colVal("c1", 1L), colVal("c2", "c2_val")), true);
251-
System.out.println("Row Key: {1L, c2_val:" + partition.toString());
251+
System.out.println("Row Key: {1L, c2_val}:" + partition.toString());
252252
QueryResultSet result = client.query(TABLE_NAME)
253253
.addScanRange(partition.start(), partition.end()).execute();
254254
Assert.assertEquals(1, result.cacheSize());
255255

256256
partition = client.getPartition(TABLE_NAME,
257257
row(colVal("c1", 400L), colVal("c2", "c2_val")), true);
258-
System.out.println("Row Key: {400L, c2_val:" + partition.toString());
258+
System.out.println("Row Key: {400L, c2_val}:" + partition.toString());
259259
result = client.query(TABLE_NAME).addScanRange(partition.start(), partition.end())
260260
.execute();
261261
Assert.assertEquals(2, result.cacheSize());
262262

263263
partition = client.getPartition(TABLE_NAME,
264264
row(colVal("c1", 1000L), colVal("c2", "c2_val")), true);
265-
System.out.println("Row Key: {1001L, c2_val:" + partition.toString());
265+
System.out.println("Row Key: {1001L, c2_val}:" + partition.toString());
266266
result = client.query(TABLE_NAME).addScanRange(partition.start(), partition.end())
267267
.execute();
268268
Assert.assertEquals(3, result.cacheSize());
@@ -650,9 +650,11 @@ public void testConcurrentGetPartition() throws Exception {
650650
}
651651
cnt.getAndIncrement();
652652
} else {
653+
byte[] bytes = new byte[10];
654+
random.nextBytes(bytes);
653655
MutationResult resultSet = client.insert(TABLE_NAME4)
654-
.setRowKey(row(colVal("c1", random.nextInt()), colVal("c2", "c2_val1")))
655-
.addMutateRow(row(colVal("c3", "c3_val1"), colVal("c4", 10L))).execute();
656+
.setRowKey(row(colVal("K", bytes), colVal("Q", "c2_val1"), colVal("T", random.nextLong())))
657+
.addMutateRow(row(colVal("V", "c3_val1"))).execute();
656658
Assert.assertEquals(1, resultSet.getAffectedRows());
657659
List<Partition> partitions = client.getPartition(table_name, false);
658660
Assert.assertEquals(3, partitions.size());

0 commit comments

Comments
 (0)