Skip to content

Commit ba79298

Browse files
authored
add kvMode test for secondary part (#307)
1 parent 8f48dee commit ba79298

File tree

1 file changed

+223
-0
lines changed

1 file changed

+223
-0
lines changed
Lines changed: 223 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,223 @@
1+
/*-
2+
* #%L
3+
* com.oceanbase:obkv-table-client
4+
* %%
5+
* Copyright (C) 2021 - 2025 OceanBase
6+
* %%
7+
* OBKV Table Client Framework is licensed under Mulan PSL v2.
8+
* You can use this software according to the terms and conditions of the Mulan PSL v2.
9+
* You may obtain a copy of Mulan PSL v2 at:
10+
* http://license.coscl.org.cn/MulanPSL2
11+
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
12+
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
13+
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
14+
* See the Mulan PSL v2 for more details.
15+
* #L%
16+
*/
17+
18+
package com.alipay.oceanbase.rpc;
19+
20+
import com.alipay.oceanbase.rpc.mutation.BatchOperation;
21+
import com.alipay.oceanbase.rpc.mutation.InsertOrUpdate;
22+
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObObj;
23+
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableEntityType;
24+
import com.alipay.oceanbase.rpc.stream.QueryResultSet;
25+
import com.alipay.oceanbase.rpc.table.api.TableQuery;
26+
import com.alipay.oceanbase.rpc.util.ObTableClientTestUtil;
27+
import org.junit.After;
28+
import org.junit.Assert;
29+
import org.junit.Before;
30+
import org.junit.Test;
31+
32+
import java.sql.Connection;
33+
import java.sql.ResultSet;
34+
import java.sql.SQLException;
35+
import java.sql.Statement;
36+
import java.util.Map;
37+
38+
import static com.alipay.oceanbase.rpc.mutation.MutationFactory.colVal;
39+
import static com.alipay.oceanbase.rpc.mutation.MutationFactory.row;
40+
import static org.junit.Assert.assertEquals;
41+
42+
public class ObKVFeatureModeTest {
43+
private String tableName = "test$KVModeConfig";
44+
private String createTableStatement = "CREATE TABLE IF NOT EXISTS `test$KVModeConfig`(\n"
45+
+ " `K` varbinary(256),\n"
46+
+ " `Q` varbinary(256),\n" + " `T` bigint,\n"
47+
+ " `V` varbinary(1024),\n"
48+
+ " PRIMARY KEY(`K`, `Q`, `T`)\n"
49+
+ ") partition by key(`K`) partitions 3;";
50+
51+
@Before
52+
public void setup() throws Exception {
53+
executeSQL(createTableStatement);
54+
}
55+
56+
@After
57+
public void teardown() throws Exception {
58+
executeSQL("drop table " + tableName);
59+
}
60+
61+
private void executeSQL(String sql) throws SQLException {
62+
Connection connection = ObTableClientTestUtil.getConnection();
63+
Statement statement = connection.createStatement();
64+
statement.execute(sql);
65+
}
66+
67+
private void executeSysSQL(String sql) throws SQLException {
68+
Connection connection = ObTableClientTestUtil.getSysConnection();
69+
Statement statement = connection.createStatement();
70+
statement.execute(sql);
71+
}
72+
73+
private ObTableClient createHbaseModeClient() throws Exception {
74+
ObTableClient client = ObTableClientTestUtil.newTestClient();
75+
client.setRunningMode(ObTableClient.RunningMode.HBASE);
76+
client.init();
77+
return client;
78+
}
79+
80+
private ObTableClient createNormalClient() throws Exception {
81+
ObTableClient client = ObTableClientTestUtil.newTestClient();
82+
client.setRunningMode(ObTableClient.RunningMode.NORMAL);
83+
client.init();
84+
return client;
85+
}
86+
87+
private void switchDistributeExecute(boolean enable) throws Exception {
88+
if (enable) {
89+
executeSysSQL("alter system set _obkv_feature_mode='distributed_execute=on';");
90+
} else {
91+
executeSysSQL("alter system set _obkv_feature_mode='distributed_execute=off'");
92+
}
93+
}
94+
95+
private int getSqlAuditResCount(long timestamp, String keyWord, String stmtType)
96+
throws SQLException {
97+
String tenantName = ObTableClientTestUtil.getTenantName();
98+
keyWord = String.format("%s_key_%d", keyWord, timestamp);
99+
String sqlAuditStr = "select count(*) from oceanbase.__all_virtual_sql_audit where query_sql like"
100+
+ "\"%"
101+
+ keyWord
102+
+ "%\""
103+
+ "and tenant_name="
104+
+ "\""
105+
+ tenantName
106+
+ "\"" + "and stmt_type = \"" + stmtType + "\";";
107+
Connection connection = ObTableClientTestUtil.getSysConnection();
108+
Statement statement = connection.createStatement();
109+
statement.execute(sqlAuditStr);
110+
int resCnt = 0;
111+
ResultSet resultSet = statement.getResultSet();
112+
while (resultSet.next()) {
113+
resCnt = resultSet.getInt(1);
114+
}
115+
assertEquals(false, resultSet.next());
116+
return resCnt;
117+
}
118+
119+
@Test
120+
public void testTableRequest() throws Exception {
121+
// 1. distributed_execute = on
122+
switchDistributeExecute(true);
123+
ObTableClient client1 = createNormalClient();
124+
Assert.assertFalse(client1.getServerCapacity().isSupportDistributedExecute());
125+
long timeStamp = System.currentTimeMillis();
126+
testBatchInsertUp(client1, timeStamp, false, "table_dist_on");
127+
Assert.assertEquals(3, getSqlAuditResCount(timeStamp, "table_dist_on", "KV_MULTI_PUT"));
128+
testQuery(client1, timeStamp, false, "table_dist_on");
129+
Assert.assertEquals(6, getSqlAuditResCount(timeStamp, "table_dist_on", "KV_QUERY"));
130+
131+
// 2. distributed_execute = off
132+
switchDistributeExecute(false);
133+
Thread.sleep(5000);
134+
135+
ObTableClient client2 = createNormalClient();
136+
Assert.assertFalse(client2.getServerCapacity().isSupportDistributedExecute());
137+
timeStamp = System.currentTimeMillis();
138+
testBatchInsertUp(client2, timeStamp, false, "table_dist_off");
139+
Assert.assertEquals(3, getSqlAuditResCount(timeStamp, "table_dist_off", "KV_MULTI_PUT"));
140+
testQuery(client2, timeStamp, false, "table_dist_on");
141+
Assert.assertEquals(6, getSqlAuditResCount(timeStamp, "table_dist_on", "KV_QUERY"));
142+
143+
}
144+
145+
@Test
146+
public void testHbaseRequest() throws Exception {
147+
// 1. distributed_execute = on
148+
switchDistributeExecute(true);
149+
ObTableClient client1 = createHbaseModeClient();
150+
Assert.assertTrue(client1.getServerCapacity().isSupportDistributedExecute());
151+
long timeStamp = System.currentTimeMillis();
152+
testBatchInsertUp(client1, timeStamp, true, "hbase_dist_on");
153+
Assert.assertEquals(1, getSqlAuditResCount(timeStamp, "hbase_dist_on", "KV_MULTI_PUT"));
154+
testQuery(client1, timeStamp, true, "hbase_dist_on");
155+
Assert.assertEquals(6, getSqlAuditResCount(timeStamp, "hbase_dist_on", "KV_QUERY"));
156+
157+
// 2. distributed_execute = off
158+
switchDistributeExecute(false);
159+
Thread.sleep(6000);
160+
161+
ObTableClient client2 = createHbaseModeClient();
162+
Assert.assertFalse(client2.getServerCapacity().isSupportDistributedExecute());
163+
long timeStamp2 = System.currentTimeMillis();
164+
testBatchInsertUp(client2, timeStamp2, true, "hbase_dist_off");
165+
Assert.assertEquals(3, getSqlAuditResCount(timeStamp2, "hbase_dist_off", "KV_MULTI_PUT"));
166+
testQuery(client2, timeStamp2, true, "hbase_dist_off");
167+
Assert.assertEquals(6, getSqlAuditResCount(timeStamp2, "hbase_dist_off", "KV_QUERY"));
168+
169+
}
170+
171+
private void testBatchInsertUp(ObTableClient client, long timeStamp, boolean isHkv,
172+
String prefix) throws Exception {
173+
BatchOperation batchOperation = client.batchOperation(tableName);
174+
int batchCount = 10;
175+
if (isHkv) {
176+
batchOperation.setEntityType(ObTableEntityType.HKV);
177+
}
178+
for (int i = 0; i < batchCount; i++) {
179+
String K = String.format("%s_key_%d_%d", prefix, timeStamp, i);
180+
String Q = String.format("%s_qualifier_%d", prefix, i);
181+
String V = String.format("%s_value_%d", prefix, i);
182+
ObObj T = ObObj.getInstance(timeStamp);
183+
InsertOrUpdate insertOrUpdate = new InsertOrUpdate();
184+
insertOrUpdate.setRowKey(row(colVal("K", K.getBytes()), colVal("Q", Q.getBytes()),
185+
colVal("T", T)));
186+
insertOrUpdate.addMutateRow(row(colVal("V", V.getBytes())));
187+
batchOperation.addOperation(insertOrUpdate);
188+
}
189+
batchOperation.execute();
190+
}
191+
192+
private void testQuery(ObTableClient client, long timestamp, boolean isHkv, String prefix)
193+
throws Exception {
194+
// scan whole range
195+
TableQuery tableQuery = client.query(tableName);
196+
Object[] start = new Object[] { String.format("%s_key_%d_%d", prefix, timestamp, 0),
197+
ObObj.getMin(), ObObj.getMin() };
198+
Object[] end = new Object[] { String.format("%s_key_%d", prefix, timestamp, 9),
199+
ObObj.getMax(), ObObj.getMax() };
200+
tableQuery.addScanRange(start, end);
201+
if (isHkv) {
202+
tableQuery.setEntityType(ObTableEntityType.HKV);
203+
}
204+
tableQuery.select("K", "Q", "T", "V");
205+
// async query
206+
QueryResultSet result = tableQuery.asyncExecute();
207+
while (result.next()) {
208+
Map<String, Object> row = result.getRow();
209+
Assert.assertEquals(4, row.size());
210+
System.out.println("row:" + "K:" + row.get("K") + " Q:" + row.get("Q") + " T:"
211+
+ row.get("T") + " V:" + row.get("V"));
212+
}
213+
214+
// sync query
215+
QueryResultSet result2 = tableQuery.execute();
216+
while (result2.next()) {
217+
Map<String, Object> row = result.getRow();
218+
Assert.assertEquals(4, row.size());
219+
System.out.println("row:" + "K:" + row.get("K") + " Q:" + row.get("Q") + " T:"
220+
+ row.get("T") + " V:" + row.get("V"));
221+
}
222+
}
223+
}

0 commit comments

Comments
 (0)