@@ -48,16 +48,18 @@ class SqlAuditResult {
4848 public String svrIp ;
4949 public int svrPort ;
5050 public int tabletId ;
51+ public int consistency_level ;
5152
52- public SqlAuditResult (String svrIp , int svrPort , int tabletId ) {
53+ public SqlAuditResult (String svrIp , int svrPort , int tabletId , int consistency_level ) {
5354 this .svrIp = svrIp ;
5455 this .svrPort = svrPort ;
5556 this .tabletId = tabletId ;
57+ this .consistency_level = consistency_level ;
5658 }
5759
5860 @ Override
5961 public String toString () {
60- return "SqlAuditResult{" + "svrIp='" + svrIp + '\'' + ", svrPort=" + svrPort + ", tabletId=" + tabletId + '}' ;
62+ return "SqlAuditResult{" + "svrIp='" + svrIp + '\'' + ", svrPort=" + svrPort + ", tabletId=" + tabletId + ", consistency_level=" + consistency_level + '}' ;
6163 }
6264}
6365
@@ -413,6 +415,38 @@ private int extractTabletId(String querySql) {
413415 }
414416 }
415417
418+ private int extractConsistencyLevel (String querySql ) {
419+ // 查找 consistency_level: 的模式
420+ String pattern = "consistency_level:" ;
421+ int startIndex = querySql .indexOf (pattern );
422+ if (startIndex == -1 ) {
423+ return -1 ; // 如果找不到,返回 -1
424+ }
425+ // 找到 : 后面的数字开始位置
426+ int levelStartIndex = startIndex + pattern .length ();
427+ // 跳过可能的空格
428+ while (levelStartIndex < querySql .length () && Character .isWhitespace (querySql .charAt (levelStartIndex ))) {
429+ levelStartIndex ++;
430+ }
431+ // 找到数字结束位置(遇到 , 或 } 或空格)
432+ int levelEndIndex = levelStartIndex ;
433+ while (levelEndIndex < querySql .length ()) {
434+ char c = querySql .charAt (levelEndIndex );
435+ if (c == ',' || c == '}' || c == ' ' ) {
436+ break ;
437+ }
438+ levelEndIndex ++;
439+ }
440+ // 提取数字字符串
441+ String consistencyLevelStr = querySql .substring (levelStartIndex , levelEndIndex ).trim ();
442+ try {
443+ return Integer .parseInt (consistencyLevelStr );
444+ } catch (NumberFormatException e ) {
445+ debugPrint ("Failed to parse consistency_level from: %s" , consistencyLevelStr );
446+ return -1 ;
447+ }
448+ }
449+
416450 private static int getTenantId (Connection connection ) throws Exception {
417451 PreparedStatement statement = connection .prepareStatement (GET_TENANT_ID_SQL );
418452 statement .setString (1 , TENANT_NAME );
@@ -444,7 +478,8 @@ private SqlAuditResult getServerBySqlAudit(String rowkey, String stmtType) throw
444478 int svrPort = resultSet .getInt ("svr_port" );
445479 String querySql = resultSet .getString ("query_sql" );
446480 int tabletId = extractTabletId (querySql );
447- sqlAuditResult = new SqlAuditResult (svrIp , svrPort , tabletId );
481+ int consistencyLevel = extractConsistencyLevel (querySql );
482+ sqlAuditResult = new SqlAuditResult (svrIp , svrPort , tabletId , consistencyLevel );
448483 debugPrint ("querySql: %s" , querySql );
449484 debugPrint ("sqlAuditResult: %s" , sqlAuditResult .toString ());
450485 }
@@ -1511,19 +1546,23 @@ public void testIdcBatchGet1() throws Exception {
15111546 setZoneRegionIdc (ZONE2 , REGION2 , IDC2 );
15121547 setZoneRegionIdc (ZONE3 , REGION3 , IDC3 );
15131548 // 3. 获取数据
1514- BatchOperation batch = client .batchOperation (TABLE_NAME );
1515- Get get = client .get (TABLE_NAME )
1549+ BatchOperation batch = client .batchOperation (TABLE_NAME )
1550+ .setReadConsistency (ObReadConsistency .WEAK ); // 设置弱一致性读
1551+ Get get1 = client .get (TABLE_NAME )
15161552 .setRowKey (row (colVal ("c1" , rowkey )))
1517- .setReadConsistency (ObReadConsistency .WEAK ) // 设置弱一致性读
15181553 .select ("c2" );
1519- batch .addOperation (get );
1554+ Get get2 = client .get (TABLE_NAME )
1555+ .setRowKey (row (colVal ("c1" , rowkey + "2" )))
1556+ .select ("c2" );
1557+ batch .addOperation (get1 , get2 );
15201558 BatchOperationResult res = batch .execute ();
15211559 Assert .assertNotNull (res );
1522- Assert .assertEquals (1 , res .getResults ().size ());
1560+ Assert .assertEquals (2 , res .getResults ().size ());
15231561 Assert .assertEquals ("c2_val" , res .get (0 ).getOperationRow ().get ("c2" ));
15241562 // 4. 查询 sql audit,确定读请求发到哪个节点和分区上
15251563 SqlAuditResult sqlAuditResult = getServerBySqlAudit (rowkey , BATCH_GET_STMT_TYPE );
15261564 debugPrint ("sqlAuditResult: %s" , sqlAuditResult .toString ());
1565+ Assert .assertEquals (ObReadConsistency .WEAK .getValue (), sqlAuditResult .consistency_level );
15271566 // 5. 查询分区的位置信息
15281567 PartitionLocation partitionLocation = getPartitionLocation (sqlAuditResult .tabletId );
15291568 debugPrint ("partitionLocation: %s" , partitionLocation .toString ());
@@ -1533,6 +1572,49 @@ public void testIdcBatchGet1() throws Exception {
15331572 Assert .assertEquals (IDC2 , readReplica .getIdc ());
15341573 }
15351574
1575+ /*
1576+ * 测试场景:用户正常使用场景,使用batch get接口进行指定IDC读
1577+ * 测试预期:发到对应的IDC上进行读取
1578+ */
1579+ @ Test
1580+ public void testIdcBatchGet1_1 () throws Exception {
1581+ ObTableClient client = newTestClient ();
1582+ client .setCurrentIDC (IDC2 ); // 设置当前 idc
1583+ client .init ();
1584+ // 1. 准备数据
1585+ String rowkey = getRandomRowkString ();
1586+ insertData (client , rowkey );
1587+ Thread .sleep (1000 ); // 等待数据同步到所有节点
1588+ // 2. 设置 idc
1589+ setZoneRegionIdc (ZONE1 , REGION1 , IDC1 );
1590+ setZoneRegionIdc (ZONE2 , REGION2 , IDC2 );
1591+ setZoneRegionIdc (ZONE3 , REGION3 , IDC3 );
1592+ // 3. 获取数据
1593+ BatchOperation batch = client .batchOperation (TABLE_NAME );
1594+ Get get1 = client .get (TABLE_NAME )
1595+ .setRowKey (row (colVal ("c1" , rowkey )))
1596+ .select ("c2" );
1597+ Get get2 = client .get (TABLE_NAME )
1598+ .setRowKey (row (colVal ("c1" , rowkey + "2" )))
1599+ .select ("c2" );
1600+ batch .addOperation (get1 , get2 );
1601+ BatchOperationResult res = batch .execute ();
1602+ Assert .assertNotNull (res );
1603+ Assert .assertEquals (2 , res .getResults ().size ());
1604+ Assert .assertEquals ("c2_val" , res .get (0 ).getOperationRow ().get ("c2" ));
1605+ // 4. 查询 sql audit,确定读请求发到哪个节点和分区上
1606+ SqlAuditResult sqlAuditResult = getServerBySqlAudit (rowkey , BATCH_GET_STMT_TYPE );
1607+ debugPrint ("sqlAuditResult: %s" , sqlAuditResult .toString ());
1608+ Assert .assertEquals (ObReadConsistency .STRONG .getValue (), sqlAuditResult .consistency_level );
1609+ // 5. 查询分区的位置信息
1610+ PartitionLocation partitionLocation = getPartitionLocation (sqlAuditResult .tabletId );
1611+ debugPrint ("partitionLocation: %s" , partitionLocation .toString ());
1612+ // 6. 校验
1613+ ReplicaLocation readReplica = partitionLocation .getReplicaBySvrAddr (sqlAuditResult .svrIp , sqlAuditResult .svrPort );
1614+ debugPrint ("readReplica: %s" , readReplica .toString ());
1615+ Assert .assertTrue (readReplica .isLeader ());
1616+ }
1617+
15361618 /*
15371619 * 测试场景:未设置当前IDC进行弱读
15381620 * 测试预期:发到任意follower上进行弱读
@@ -1551,10 +1633,10 @@ public void testIdcBatchGet2() throws Exception {
15511633 setZoneRegionIdc (ZONE2 , REGION2 , IDC2 );
15521634 setZoneRegionIdc (ZONE3 , REGION3 , IDC3 );
15531635 // 3. 获取数据
1554- BatchOperation batch = client .batchOperation (TABLE_NAME );
1636+ BatchOperation batch = client .batchOperation (TABLE_NAME )
1637+ .setReadConsistency (ObReadConsistency .WEAK ); // 设置弱一致性读
15551638 Get get = client .get (TABLE_NAME )
15561639 .setRowKey (row (colVal ("c1" , rowkey )))
1557- .setReadConsistency (ObReadConsistency .WEAK ) // 设置弱一致性读
15581640 .select ("c2" );
15591641 batch .addOperation (get );
15601642 BatchOperationResult res = batch .execute ();
@@ -1631,10 +1713,10 @@ public void testIdcBatchGet4() throws Exception {
16311713 setZoneRegionIdc (ZONE2 , REGION2 , IDC2 );
16321714 setZoneRegionIdc (ZONE3 , REGION3 , IDC3 );
16331715 // 3. 获取数据
1634- BatchOperation batch = client .batchOperation (TABLE_NAME );
1716+ BatchOperation batch = client .batchOperation (TABLE_NAME )
1717+ .setReadConsistency (ObReadConsistency .WEAK ); // 设置弱一致性读
16351718 Get get = client .get (TABLE_NAME )
16361719 .setRowKey (row (colVal ("c1" , rowkey )))
1637- .setReadConsistency (ObReadConsistency .WEAK ) // 设置弱一致性读
16381720 .select ("c2" );
16391721 batch .addOperation (get );
16401722 BatchOperationResult res = batch .execute ();
@@ -1672,10 +1754,10 @@ public void testIdcBatchGet5() throws Exception {
16721754 setZoneRegionIdc (ZONE2 , REGION2 , IDC2 );
16731755 setZoneRegionIdc (ZONE3 , REGION3 , IDC3 );
16741756 // 3. 获取数据,使用strong consistency
1675- BatchOperation batch = client .batchOperation (TABLE_NAME );
1757+ BatchOperation batch = client .batchOperation (TABLE_NAME )
1758+ .setReadConsistency (ObReadConsistency .STRONG ); // 设置强一致性读
16761759 Get get = client .get (TABLE_NAME )
16771760 .setRowKey (row (colVal ("c1" , rowkey )))
1678- .setReadConsistency (ObReadConsistency .STRONG ) // 设置强一致性读
16791761 .select ("c2" );
16801762 batch .addOperation (get );
16811763 BatchOperationResult res = batch .execute ();
@@ -1712,10 +1794,10 @@ public void testIdcBatchGet6() throws Exception {
17121794 setZoneRegionIdc (ZONE2 , REGION2 , IDC2 );
17131795 setZoneRegionIdc (ZONE3 , REGION3 , IDC3 );
17141796 // 3. 获取数据
1715- BatchOperation batch = client .batchOperation (TABLE_NAME );
1797+ BatchOperation batch = client .batchOperation (TABLE_NAME )
1798+ .setReadConsistency (ObReadConsistency .WEAK ); // 设置弱一致性读
17161799 Get get = client .get (TABLE_NAME )
17171800 .setRowKey (row (colVal ("c1" , rowkey )))
1718- .setReadConsistency (ObReadConsistency .WEAK ) // 设置弱一致性读
17191801 .select ("c2" );
17201802 batch .addOperation (get );
17211803 BatchOperationResult res = batch .execute ();
@@ -1783,10 +1865,10 @@ public void testIdcBatchGet8() throws Exception {
17831865 setZoneRegionIdc (ZONE2 , REGION2 , IDC2 );
17841866 setZoneRegionIdc (ZONE3 , REGION3 , IDC3 );
17851867 // 3. 获取数据
1786- BatchOperation batch = client .batchOperation (TABLE_NAME );
1868+ BatchOperation batch = client .batchOperation (TABLE_NAME )
1869+ .setReadConsistency (ObReadConsistency .WEAK ); // 设置弱一致性读
17871870 Get get = client .get (TABLE_NAME )
17881871 .setRowKey (row (colVal ("c1" , rowkey )))
1789- .setReadConsistency (ObReadConsistency .WEAK )
17901872 .select ("c2" );
17911873 batch .addOperation (get );
17921874 BatchOperationResult res = batch .execute ();
@@ -1864,10 +1946,10 @@ public void testIdcBatchGet10() throws Exception {
18641946 setZoneRegionIdc (ZONE2 , REGION2 , IDC2 );
18651947 setZoneRegionIdc (ZONE3 , REGION3 , IDC3 );
18661948 // 3. 使用不同大小写的weak
1867- BatchOperation batch = client .batchOperation (TABLE_NAME );
1949+ BatchOperation batch = client .batchOperation (TABLE_NAME )
1950+ .setReadConsistency (ObReadConsistency .WEAK ); // 设置弱一致性读
18681951 Get get = client .get (TABLE_NAME )
18691952 .setRowKey (row (colVal ("c1" , rowkey )))
1870- .setReadConsistency (ObReadConsistency .WEAK ) // 大写
18711953 .select ("c2" );
18721954 batch .addOperation (get );
18731955 BatchOperationResult res = batch .execute ();
@@ -1946,10 +2028,10 @@ public void testIdcBatchGet12() throws Exception {
19462028 setZoneRegionIdc (ZONE2 , REGION2 , IDC2 );
19472029 setZoneRegionIdc (ZONE3 , REGION3 , IDC3 );
19482030 // 3. 获取数据,语句级别设置为strong,应该覆盖全局的weak设置
1949- BatchOperation batch = client .batchOperation (TABLE_NAME );
2031+ BatchOperation batch = client .batchOperation (TABLE_NAME )
2032+ .setReadConsistency (ObReadConsistency .STRONG ); // 语句级别设置为strong,应该覆盖全局的weak
19502033 Get get = client .get (TABLE_NAME )
19512034 .setRowKey (row (colVal ("c1" , rowkey )))
1952- .setReadConsistency (ObReadConsistency .STRONG ) // 语句级别设置为strong,应该覆盖全局的weak
19532035 .select ("c2" );
19542036 batch .addOperation (get );
19552037 BatchOperationResult res = batch .execute ();
0 commit comments