Skip to content

Commit 48e7326

Browse files
committed
fix weak read in batch get
1 parent 4429e5b commit 48e7326

File tree

2 files changed

+139
-31
lines changed

2 files changed

+139
-31
lines changed

src/main/java/com/alipay/oceanbase/rpc/mutation/BatchOperation.java

Lines changed: 35 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ public class BatchOperation {
5454
boolean isSameType = true;
5555
protected ObTableEntityType entityType = ObTableEntityType.KV;
5656
protected OHOperationType hbaseOpType = OHOperationType.INVALID;
57+
protected ObReadConsistency readConsistency = null; // BatchOperation 级别的弱读设置
5758

5859
/*
5960
* default constructor
@@ -63,6 +64,7 @@ public BatchOperation() {
6364
client = null;
6465
withResult = false;
6566
operations = new ArrayList<>();
67+
readConsistency = null;
6668
}
6769

6870
/*
@@ -188,6 +190,25 @@ public void setEntityType(ObTableEntityType entityType) {
188190
this.entityType = entityType;
189191
}
190192

193+
/**
194+
* Set read consistency level for batch operation.
195+
* This setting will override the readConsistency settings on individual Get operations.
196+
* @param readConsistency read consistency level
197+
* @return this
198+
*/
199+
public BatchOperation setReadConsistency(ObReadConsistency readConsistency) {
200+
this.readConsistency = readConsistency;
201+
return this;
202+
}
203+
204+
/**
205+
* Get read consistency level for batch operation.
206+
* @return read consistency level
207+
*/
208+
public ObReadConsistency getReadConsistency() {
209+
return readConsistency;
210+
}
211+
191212
public void setServerCanRetry(boolean canRetry) {
192213
this.serverCanRetry = canRetry;
193214
}
@@ -317,13 +338,6 @@ private BatchOperationResult executeWithNormalBatchOp() throws Exception {
317338
return new BatchOperationResult(batchOps.executeWithResult());
318339
}
319340

320-
private boolean checkReadConsistency(ObTableClient obTableClient, ObReadConsistency readConsistency) throws IllegalArgumentException {
321-
// 如果没有设置语句级别的 readConsistency(null),使用 TableRoute 上的 consistencyLevel
322-
if (readConsistency == null) {
323-
return obTableClient.getTableRoute().getReadConsistency() == ObReadConsistency.WEAK;
324-
}
325-
return readConsistency == ObReadConsistency.WEAK;
326-
}
327341

328342
private BatchOperationResult executeWithLSBatchOp() throws Exception {
329343
if (tableName == null || tableName.isEmpty()) {
@@ -369,11 +383,23 @@ private BatchOperationResult executeWithLSBatchOp() throws Exception {
369383
if (get.getRowKey() == null) {
370384
throw new IllegalArgumentException("RowKey is null in Get operation");
371385
}
372-
isWeakRead = checkReadConsistency(obTableClient, get.getReadConsistency());
386+
// BatchOperation 级别的 readConsistency 优先,忽略 Get 上的设置
387+
if (readConsistency != null) {
388+
isWeakRead = (readConsistency == ObReadConsistency.WEAK);
389+
} else {
390+
// 如果 BatchOperation 没有设置,使用 TableRoute 上的全局设置
391+
isWeakRead = obTableClient.getTableRoute().getReadConsistency() == ObReadConsistency.WEAK;
392+
}
373393
batchOps.addOperation(get);
374394
} else if (operation instanceof TableQuery) {
375395
TableQuery query = (TableQuery) operation;
376-
isWeakRead = checkReadConsistency(obTableClient, query.getReadConsistency());
396+
// BatchOperation 级别的 readConsistency 优先,忽略 TableQuery 上的设置
397+
if (readConsistency != null) {
398+
isWeakRead = (readConsistency == ObReadConsistency.WEAK);
399+
} else {
400+
// 如果 BatchOperation 没有设置,使用 TableRoute 上的全局设置
401+
isWeakRead = obTableClient.getTableRoute().getReadConsistency() == ObReadConsistency.WEAK;
402+
}
377403
batchOps.addOperation(query);
378404
} else if (operation instanceof QueryAndMutate) {
379405
QueryAndMutate qm = (QueryAndMutate) operation;

src/test/java/com/alipay/oceanbase/rpc/ObTableWeakReadTest.java

Lines changed: 104 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -48,16 +48,18 @@ class SqlAuditResult {
4848
public String svrIp;
4949
public int svrPort;
5050
public int tabletId;
51+
public int consistency_level;
5152

52-
public SqlAuditResult(String svrIp, int svrPort, int tabletId) {
53+
public SqlAuditResult(String svrIp, int svrPort, int tabletId, int consistency_level) {
5354
this.svrIp = svrIp;
5455
this.svrPort = svrPort;
5556
this.tabletId = tabletId;
57+
this.consistency_level = consistency_level;
5658
}
5759

5860
@Override
5961
public String toString() {
60-
return "SqlAuditResult{" + "svrIp='" + svrIp + '\'' + ", svrPort=" + svrPort + ", tabletId=" + tabletId + '}';
62+
return "SqlAuditResult{" + "svrIp='" + svrIp + '\'' + ", svrPort=" + svrPort + ", tabletId=" + tabletId + ", consistency_level=" + consistency_level + '}';
6163
}
6264
}
6365

@@ -413,6 +415,38 @@ private int extractTabletId(String querySql) {
413415
}
414416
}
415417

418+
private int extractConsistencyLevel(String querySql) {
419+
// 查找 consistency_level: 的模式
420+
String pattern = "consistency_level:";
421+
int startIndex = querySql.indexOf(pattern);
422+
if (startIndex == -1) {
423+
return -1; // 如果找不到,返回 -1
424+
}
425+
// 找到 : 后面的数字开始位置
426+
int levelStartIndex = startIndex + pattern.length();
427+
// 跳过可能的空格
428+
while (levelStartIndex < querySql.length() && Character.isWhitespace(querySql.charAt(levelStartIndex))) {
429+
levelStartIndex++;
430+
}
431+
// 找到数字结束位置(遇到 , 或 } 或空格)
432+
int levelEndIndex = levelStartIndex;
433+
while (levelEndIndex < querySql.length()) {
434+
char c = querySql.charAt(levelEndIndex);
435+
if (c == ',' || c == '}' || c == ' ') {
436+
break;
437+
}
438+
levelEndIndex++;
439+
}
440+
// 提取数字字符串
441+
String consistencyLevelStr = querySql.substring(levelStartIndex, levelEndIndex).trim();
442+
try {
443+
return Integer.parseInt(consistencyLevelStr);
444+
} catch (NumberFormatException e) {
445+
debugPrint("Failed to parse consistency_level from: %s", consistencyLevelStr);
446+
return -1;
447+
}
448+
}
449+
416450
private static int getTenantId(Connection connection) throws Exception {
417451
PreparedStatement statement = connection.prepareStatement(GET_TENANT_ID_SQL);
418452
statement.setString(1, TENANT_NAME);
@@ -444,7 +478,8 @@ private SqlAuditResult getServerBySqlAudit(String rowkey, String stmtType) throw
444478
int svrPort = resultSet.getInt("svr_port");
445479
String querySql = resultSet.getString("query_sql");
446480
int tabletId = extractTabletId(querySql);
447-
sqlAuditResult = new SqlAuditResult(svrIp, svrPort, tabletId);
481+
int consistencyLevel = extractConsistencyLevel(querySql);
482+
sqlAuditResult = new SqlAuditResult(svrIp, svrPort, tabletId, consistencyLevel);
448483
debugPrint("querySql: %s", querySql);
449484
debugPrint("sqlAuditResult: %s", sqlAuditResult.toString());
450485
}
@@ -1511,19 +1546,23 @@ public void testIdcBatchGet1() throws Exception {
15111546
setZoneRegionIdc(ZONE2, REGION2, IDC2);
15121547
setZoneRegionIdc(ZONE3, REGION3, IDC3);
15131548
// 3. 获取数据
1514-
BatchOperation batch = client.batchOperation(TABLE_NAME);
1515-
Get get = client.get(TABLE_NAME)
1549+
BatchOperation batch = client.batchOperation(TABLE_NAME)
1550+
.setReadConsistency(ObReadConsistency.WEAK); // 设置弱一致性读
1551+
Get get1 = client.get(TABLE_NAME)
15161552
.setRowKey(row(colVal("c1", rowkey)))
1517-
.setReadConsistency(ObReadConsistency.WEAK) // 设置弱一致性读
15181553
.select("c2");
1519-
batch.addOperation(get);
1554+
Get get2 = client.get(TABLE_NAME)
1555+
.setRowKey(row(colVal("c1", rowkey + "2")))
1556+
.select("c2");
1557+
batch.addOperation(get1, get2);
15201558
BatchOperationResult res = batch.execute();
15211559
Assert.assertNotNull(res);
1522-
Assert.assertEquals(1, res.getResults().size());
1560+
Assert.assertEquals(2, res.getResults().size());
15231561
Assert.assertEquals("c2_val", res.get(0).getOperationRow().get("c2"));
15241562
// 4. 查询 sql audit,确定读请求发到哪个节点和分区上
15251563
SqlAuditResult sqlAuditResult = getServerBySqlAudit(rowkey, BATCH_GET_STMT_TYPE);
15261564
debugPrint("sqlAuditResult: %s", sqlAuditResult.toString());
1565+
Assert.assertEquals(ObReadConsistency.WEAK.getValue(), sqlAuditResult.consistency_level);
15271566
// 5. 查询分区的位置信息
15281567
PartitionLocation partitionLocation = getPartitionLocation(sqlAuditResult.tabletId);
15291568
debugPrint("partitionLocation: %s", partitionLocation.toString());
@@ -1533,6 +1572,49 @@ public void testIdcBatchGet1() throws Exception {
15331572
Assert.assertEquals(IDC2, readReplica.getIdc());
15341573
}
15351574

1575+
/*
1576+
* 测试场景:用户正常使用场景,使用batch get接口进行指定IDC读
1577+
* 测试预期:发到对应的IDC上进行读取
1578+
*/
1579+
@Test
1580+
public void testIdcBatchGet1_1() throws Exception {
1581+
ObTableClient client = newTestClient();
1582+
client.setCurrentIDC(IDC2); // 设置当前 idc
1583+
client.init();
1584+
// 1. 准备数据
1585+
String rowkey = getRandomRowkString();
1586+
insertData(client, rowkey);
1587+
Thread.sleep(1000); // 等待数据同步到所有节点
1588+
// 2. 设置 idc
1589+
setZoneRegionIdc(ZONE1, REGION1, IDC1);
1590+
setZoneRegionIdc(ZONE2, REGION2, IDC2);
1591+
setZoneRegionIdc(ZONE3, REGION3, IDC3);
1592+
// 3. 获取数据
1593+
BatchOperation batch = client.batchOperation(TABLE_NAME);
1594+
Get get1 = client.get(TABLE_NAME)
1595+
.setRowKey(row(colVal("c1", rowkey)))
1596+
.select("c2");
1597+
Get get2 = client.get(TABLE_NAME)
1598+
.setRowKey(row(colVal("c1", rowkey + "2")))
1599+
.select("c2");
1600+
batch.addOperation(get1, get2);
1601+
BatchOperationResult res = batch.execute();
1602+
Assert.assertNotNull(res);
1603+
Assert.assertEquals(2, res.getResults().size());
1604+
Assert.assertEquals("c2_val", res.get(0).getOperationRow().get("c2"));
1605+
// 4. 查询 sql audit,确定读请求发到哪个节点和分区上
1606+
SqlAuditResult sqlAuditResult = getServerBySqlAudit(rowkey, BATCH_GET_STMT_TYPE);
1607+
debugPrint("sqlAuditResult: %s", sqlAuditResult.toString());
1608+
Assert.assertEquals(ObReadConsistency.STRONG.getValue(), sqlAuditResult.consistency_level);
1609+
// 5. 查询分区的位置信息
1610+
PartitionLocation partitionLocation = getPartitionLocation(sqlAuditResult.tabletId);
1611+
debugPrint("partitionLocation: %s", partitionLocation.toString());
1612+
// 6. 校验
1613+
ReplicaLocation readReplica = partitionLocation.getReplicaBySvrAddr(sqlAuditResult.svrIp, sqlAuditResult.svrPort);
1614+
debugPrint("readReplica: %s", readReplica.toString());
1615+
Assert.assertTrue(readReplica.isLeader());
1616+
}
1617+
15361618
/*
15371619
* 测试场景:未设置当前IDC进行弱读
15381620
* 测试预期:发到任意follower上进行弱读
@@ -1551,10 +1633,10 @@ public void testIdcBatchGet2() throws Exception {
15511633
setZoneRegionIdc(ZONE2, REGION2, IDC2);
15521634
setZoneRegionIdc(ZONE3, REGION3, IDC3);
15531635
// 3. 获取数据
1554-
BatchOperation batch = client.batchOperation(TABLE_NAME);
1636+
BatchOperation batch = client.batchOperation(TABLE_NAME)
1637+
.setReadConsistency(ObReadConsistency.WEAK); // 设置弱一致性读
15551638
Get get = client.get(TABLE_NAME)
15561639
.setRowKey(row(colVal("c1", rowkey)))
1557-
.setReadConsistency(ObReadConsistency.WEAK) // 设置弱一致性读
15581640
.select("c2");
15591641
batch.addOperation(get);
15601642
BatchOperationResult res = batch.execute();
@@ -1631,10 +1713,10 @@ public void testIdcBatchGet4() throws Exception {
16311713
setZoneRegionIdc(ZONE2, REGION2, IDC2);
16321714
setZoneRegionIdc(ZONE3, REGION3, IDC3);
16331715
// 3. 获取数据
1634-
BatchOperation batch = client.batchOperation(TABLE_NAME);
1716+
BatchOperation batch = client.batchOperation(TABLE_NAME)
1717+
.setReadConsistency(ObReadConsistency.WEAK); // 设置弱一致性读
16351718
Get get = client.get(TABLE_NAME)
16361719
.setRowKey(row(colVal("c1", rowkey)))
1637-
.setReadConsistency(ObReadConsistency.WEAK) // 设置弱一致性读
16381720
.select("c2");
16391721
batch.addOperation(get);
16401722
BatchOperationResult res = batch.execute();
@@ -1672,10 +1754,10 @@ public void testIdcBatchGet5() throws Exception {
16721754
setZoneRegionIdc(ZONE2, REGION2, IDC2);
16731755
setZoneRegionIdc(ZONE3, REGION3, IDC3);
16741756
// 3. 获取数据,使用strong consistency
1675-
BatchOperation batch = client.batchOperation(TABLE_NAME);
1757+
BatchOperation batch = client.batchOperation(TABLE_NAME)
1758+
.setReadConsistency(ObReadConsistency.STRONG); // 设置强一致性读
16761759
Get get = client.get(TABLE_NAME)
16771760
.setRowKey(row(colVal("c1", rowkey)))
1678-
.setReadConsistency(ObReadConsistency.STRONG) // 设置强一致性读
16791761
.select("c2");
16801762
batch.addOperation(get);
16811763
BatchOperationResult res = batch.execute();
@@ -1712,10 +1794,10 @@ public void testIdcBatchGet6() throws Exception {
17121794
setZoneRegionIdc(ZONE2, REGION2, IDC2);
17131795
setZoneRegionIdc(ZONE3, REGION3, IDC3);
17141796
// 3. 获取数据
1715-
BatchOperation batch = client.batchOperation(TABLE_NAME);
1797+
BatchOperation batch = client.batchOperation(TABLE_NAME)
1798+
.setReadConsistency(ObReadConsistency.WEAK); // 设置弱一致性读
17161799
Get get = client.get(TABLE_NAME)
17171800
.setRowKey(row(colVal("c1", rowkey)))
1718-
.setReadConsistency(ObReadConsistency.WEAK) // 设置弱一致性读
17191801
.select("c2");
17201802
batch.addOperation(get);
17211803
BatchOperationResult res = batch.execute();
@@ -1783,10 +1865,10 @@ public void testIdcBatchGet8() throws Exception {
17831865
setZoneRegionIdc(ZONE2, REGION2, IDC2);
17841866
setZoneRegionIdc(ZONE3, REGION3, IDC3);
17851867
// 3. 获取数据
1786-
BatchOperation batch = client.batchOperation(TABLE_NAME);
1868+
BatchOperation batch = client.batchOperation(TABLE_NAME)
1869+
.setReadConsistency(ObReadConsistency.WEAK); // 设置弱一致性读
17871870
Get get = client.get(TABLE_NAME)
17881871
.setRowKey(row(colVal("c1", rowkey)))
1789-
.setReadConsistency(ObReadConsistency.WEAK)
17901872
.select("c2");
17911873
batch.addOperation(get);
17921874
BatchOperationResult res = batch.execute();
@@ -1864,10 +1946,10 @@ public void testIdcBatchGet10() throws Exception {
18641946
setZoneRegionIdc(ZONE2, REGION2, IDC2);
18651947
setZoneRegionIdc(ZONE3, REGION3, IDC3);
18661948
// 3. 使用不同大小写的weak
1867-
BatchOperation batch = client.batchOperation(TABLE_NAME);
1949+
BatchOperation batch = client.batchOperation(TABLE_NAME)
1950+
.setReadConsistency(ObReadConsistency.WEAK); // 设置弱一致性读
18681951
Get get = client.get(TABLE_NAME)
18691952
.setRowKey(row(colVal("c1", rowkey)))
1870-
.setReadConsistency(ObReadConsistency.WEAK) // 大写
18711953
.select("c2");
18721954
batch.addOperation(get);
18731955
BatchOperationResult res = batch.execute();
@@ -1946,10 +2028,10 @@ public void testIdcBatchGet12() throws Exception {
19462028
setZoneRegionIdc(ZONE2, REGION2, IDC2);
19472029
setZoneRegionIdc(ZONE3, REGION3, IDC3);
19482030
// 3. 获取数据,语句级别设置为strong,应该覆盖全局的weak设置
1949-
BatchOperation batch = client.batchOperation(TABLE_NAME);
2031+
BatchOperation batch = client.batchOperation(TABLE_NAME)
2032+
.setReadConsistency(ObReadConsistency.STRONG); // 语句级别设置为strong,应该覆盖全局的weak
19502033
Get get = client.get(TABLE_NAME)
19512034
.setRowKey(row(colVal("c1", rowkey)))
1952-
.setReadConsistency(ObReadConsistency.STRONG) // 语句级别设置为strong,应该覆盖全局的weak
19532035
.select("c2");
19542036
batch.addOperation(get);
19552037
BatchOperationResult res = batch.execute();

0 commit comments

Comments
 (0)