Skip to content

Commit f71f0ff

Browse files
committed
update odp rpc lock timeout, add odp range query
1 parent 9682063 commit f71f0ff

File tree

4 files changed

+256
-12
lines changed

4 files changed

+256
-12
lines changed

src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2046,9 +2046,9 @@ public List<ObPair<Long, ObTableParam>> getTables(String tableName, ObTableQuery
20462046
return obTableParams;
20472047
}
20482048

2049-
private List<ObPair<Long, ObTableParam>> getODPTables(String tableName, ObTableQuery query,
2050-
Object[] start, boolean startInclusive,
2051-
Object[] end, boolean endInclusive)
2049+
public List<ObPair<Long, ObTableParam>> getOdpTables(String tableName, ObTableQuery query,
2050+
Object[] start, boolean startInclusive,
2051+
Object[] end, boolean endInclusive)
20522052
throws Exception {
20532053
List<ObPair<Long, ObTableParam>> obTableParams = new ArrayList<ObPair<Long, ObTableParam>>();
20542054
TableEntry odpTableEntry = getOrFetchODPPartitionMeta(tableName, false);
@@ -2977,7 +2977,7 @@ public List<Partition> getPartition(String tableName) throws Exception {
29772977
private List<Partition> getAllPartitionInternal(String tableName) throws Exception {
29782978
List<Partition> partitions = new ArrayList<>();
29792979
if (odpMode) {
2980-
List<ObPair<Long, ObTableParam>> allTables = getODPTables(tableName, new ObTableQuery(), new Object[]{ ObObj.getMin() }, true,
2980+
List<ObPair<Long, ObTableParam>> allTables = getOdpTables(tableName, new ObTableQuery(), new Object[]{ ObObj.getMin() }, true,
29812981
new Object[]{ ObObj.getMax() }, true);
29822982
for (ObPair<Long, ObTableParam> table : allTables) {
29832983
ObTableParam tableParam = table.getRight();
@@ -3040,8 +3040,8 @@ private TableEntry getOrFetchODPPartitionMeta(String tableName, boolean needRene
30403040
if (ODPTableLocations.get(tableName) != null) {
30413041
odpTableEntry = ODPTableLocations.get(tableName);
30423042
long interval = System.currentTimeMillis() - odpTableEntry.getRefreshTimeMills();
3043-
// do not fetch partition meta if the refresh interval is less than 5 seconds
3044-
if (interval < 5000L) {
3043+
// do not fetch partition meta if the refresh interval is less than 3 seconds
3044+
if (interval < 3000L) {
30453045
lock.unlock();
30463046
return odpTableEntry;
30473047
}

src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObFetchPartitionMetaResult.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -211,11 +211,11 @@ public Object decode(ByteBuf buf) {
211211
// data from PROXY_LOCATION_SQL_PARTITION_V4/PROXY_LOCATION_SQL_PARTITION
212212
partitionEntry.setTabletLsIdMap(tabletLsIdMap);
213213
}
214+
tableEntry.setRefreshTimeMills(System.currentTimeMillis());
214215
tableEntry.setPartitionEntry(partitionEntry);
215216
tableEntry.setTableId(tableId);
216217
tableEntry.setPartitionNum(partitionNum);
217218
tableEntry.setPartitionInfo(partitionInfo);
218-
tableEntry.setRefreshTimeMills(System.currentTimeMillis());
219219
tableEntry.setOdpRefreshTimeMills(createTime);
220220
tableEntry.prepare();
221221

src/main/java/com/alipay/oceanbase/rpc/table/ObTableClientQueryImpl.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -169,8 +169,7 @@ private AbstractQueryStreamResult commonExecute(InitQueryResultCallback<Abstract
169169
}
170170
}
171171
if (getPartId() == null) {
172-
this.partitionObTables.put(0L, new ObPair<Long, ObTableParam>(0L, new ObTableParam(
173-
obTableClient.getOdpTable())));
172+
initPartitions();
174173
} else {
175174
ObPair<Long, ObTableParam> odpTable = obTableClient.getODPTableWithPartId(
176175
tableName, getPartId(), false);
@@ -285,9 +284,16 @@ public void initPartitions() throws Exception {
285284
}
286285
ObBorderFlag borderFlag = rang.getBorderFlag();
287286
// pairs -> List<Pair<logicId, param>>
288-
List<ObPair<Long, ObTableParam>> pairs = this.obTableClient.getTables(indexTableName,
289-
tableQuery, start, borderFlag.isInclusiveStart(), end, borderFlag.isInclusiveEnd(),
290-
false, false);
287+
List<ObPair<Long, ObTableParam>> pairs = null;
288+
if (!this.obTableClient.isOdpMode()) {
289+
pairs = this.obTableClient.getTables(indexTableName,
290+
tableQuery, start, borderFlag.isInclusiveStart(), end, borderFlag.isInclusiveEnd(),
291+
false, false);
292+
}
293+
else {
294+
pairs = this.obTableClient.getOdpTables(tableName,
295+
tableQuery, start, borderFlag.isInclusiveStart(), end, borderFlag.isInclusiveEnd());
296+
}
291297
if (this.tableQuery.getScanOrder() == ObScanOrder.Reverse) {
292298
for (int i = pairs.size() - 1; i >= 0; i--) {
293299
this.partitionObTables.put(pairs.get(i).getLeft(), pairs.get(i));

src/test/java/com/alipay/oceanbase/rpc/ODPGetPartitionMetaTest.java

Lines changed: 238 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,16 @@
1717

1818
package com.alipay.oceanbase.rpc;
1919

20+
import com.alipay.oceanbase.rpc.exception.ObTableException;
21+
import com.alipay.oceanbase.rpc.filter.ObCompareOp;
22+
import com.alipay.oceanbase.rpc.filter.ObTableValueFilter;
2023
import com.alipay.oceanbase.rpc.location.model.partition.Partition;
2124
import com.alipay.oceanbase.rpc.mutation.BatchOperation;
2225
import com.alipay.oceanbase.rpc.mutation.Delete;
2326
import com.alipay.oceanbase.rpc.mutation.InsertOrUpdate;
2427
import com.alipay.oceanbase.rpc.mutation.result.BatchOperationResult;
2528
import com.alipay.oceanbase.rpc.mutation.result.MutationResult;
29+
import com.alipay.oceanbase.rpc.protocol.payload.ResultCodes;
2630
import com.alipay.oceanbase.rpc.stream.QueryResultSet;
2731
import com.alipay.oceanbase.rpc.table.ConcurrentTask;
2832
import com.alipay.oceanbase.rpc.table.ConcurrentTaskExecutor;
@@ -34,6 +38,7 @@
3438
import org.junit.Test;
3539

3640
import java.sql.Connection;
41+
import java.sql.SQLException;
3742
import java.sql.Statement;
3843
import java.util.HashMap;
3944
import java.util.List;
@@ -43,12 +48,15 @@
4348
import java.util.concurrent.Executors;
4449
import java.util.concurrent.TimeUnit;
4550

51+
import static com.alipay.oceanbase.rpc.filter.ObTableFilterFactory.compareVal;
4652
import static com.alipay.oceanbase.rpc.mutation.MutationFactory.colVal;
4753
import static com.alipay.oceanbase.rpc.mutation.MutationFactory.row;
4854
import static com.alipay.oceanbase.rpc.util.ObTableClientTestUtil.cleanTable;
4955
import static com.alipay.oceanbase.rpc.util.ObTableClientTestUtil.generateRandomStringByUUID;
5056
import static com.alipay.oceanbase.rpc.util.TableClientLoggerFactory.LCD;
5157
import static java.lang.StrictMath.abs;
58+
import static org.junit.Assert.assertEquals;
59+
import static org.junit.Assert.assertNotNull;
5260

5361
public class ODPGetPartitionMetaTest {
5462
public ObTableClient client;
@@ -601,4 +609,234 @@ public void testReFetchPartitionMeta() throws Exception {
601609
}
602610
}
603611

612+
@Test
613+
public void testODPRangeQuery() throws Exception {
614+
// todo: only support in 4.x currently
615+
if (ObTableClientTestUtil.isOBVersionLessThan(ObTableClientTestUtil.obVsn4000)) {
616+
return;
617+
}
618+
619+
final String TABLE_NAME = "test_auto_increment_rowkey";
620+
client.addRowKeyElement(TABLE_NAME, new String[] { "c1", "c2" });
621+
622+
try {
623+
Connection connection = ObTableClientTestUtil.getConnection();
624+
Statement statement = connection.createStatement();
625+
statement.execute("CREATE TABLE IF NOT EXISTS `test_auto_increment_rowkey` ("
626+
+ "`c1` int auto_increment,"
627+
+ "`c2` int NOT NULL,"
628+
+ "`c3` int DEFAULT NULL,"
629+
+ "`c4` varchar(255) DEFAULT NULL,"
630+
+ "PRIMARY KEY(`c1`, `c2`)) partition by range columns(`c2`)"
631+
+ "(PARTITION p0 VALUES LESS THAN (100), PARTITION p1 VALUES LESS THAN (1000));");
632+
633+
client.insert(TABLE_NAME, new Object[] { 0, 1 }, new String[] { "c3" },
634+
new Object[] { 1 });
635+
636+
TableQuery tableQuery = client.query(TABLE_NAME);
637+
tableQuery.select("c1", "c2", "c3");
638+
tableQuery.addScanRange(new Object[] { 1, 1 }, new Object[] { 200, 90 });
639+
ObTableValueFilter filter = new ObTableValueFilter(ObCompareOp.EQ, "c1", 1);
640+
tableQuery.setFilter(filter);
641+
QueryResultSet result = tableQuery.execute();
642+
Assert.assertTrue(result.next());
643+
Map<String, Object> value = result.getRow();
644+
Assert.assertEquals(1, value.get("c2"));
645+
Assert.assertEquals(1, value.get("c3"));
646+
647+
// test insert use user value
648+
client.insert(TABLE_NAME, new Object[] { 100, 1 }, new String[] { "c3" },
649+
new Object[] { 1 });
650+
651+
tableQuery.select("c1", "c2", "c3");
652+
filter = new ObTableValueFilter(ObCompareOp.EQ, "c1", 100);
653+
tableQuery.setFilter(filter);
654+
result = tableQuery.execute();
655+
Assert.assertTrue(result.next());
656+
value = result.getRow();
657+
Assert.assertEquals(1, value.get("c2"));
658+
Assert.assertEquals(1, value.get("c3"));
659+
660+
// test insert sync global auto inc val
661+
client.insert(TABLE_NAME, new Object[] { 0, 1 }, new String[] { "c3" },
662+
new Object[] { 1 });
663+
664+
tableQuery.select("c1", "c2", "c3");
665+
filter = new ObTableValueFilter(ObCompareOp.EQ, "c1", 101);
666+
tableQuery.setFilter(filter);
667+
result = tableQuery.execute();
668+
Assert.assertTrue(result.next());
669+
value = result.getRow();
670+
Assert.assertEquals(1, value.get("c2"));
671+
Assert.assertEquals(1, value.get("c3"));
672+
673+
// test delete
674+
client.delete(TABLE_NAME, new Object[] { 101, 1 });
675+
676+
// test confirm delete
677+
tableQuery.select("c1", "c2", "c3");
678+
filter = new ObTableValueFilter(ObCompareOp.EQ, "c1", 101);
679+
tableQuery.setFilter(filter);
680+
result = tableQuery.execute();
681+
Assert.assertFalse(result.next());
682+
683+
// test update
684+
ObTableValueFilter filter_3 = compareVal(ObCompareOp.EQ, "c3", 1);
685+
686+
MutationResult updateResult = client.update(TABLE_NAME)
687+
.setRowKey(colVal("c1", 1), colVal("c2", 1)).setFilter(filter_3)
688+
.addMutateRow(row(colVal("c3", 5))).execute();
689+
690+
tableQuery.select("c1", "c2", "c3");
691+
filter = new ObTableValueFilter(ObCompareOp.EQ, "c1", 1);
692+
tableQuery.setFilter(filter);
693+
result = tableQuery.execute();
694+
Assert.assertTrue(result.next());
695+
value = result.getRow();
696+
Assert.assertEquals(1, value.get("c2"));
697+
Assert.assertEquals(5, value.get("c3"));
698+
699+
// test replace not exist, insert
700+
MutationResult theResult = client.replace(TABLE_NAME)
701+
.setRowKey(colVal("c1", 0), colVal("c2", 1)).addMutateRow(row(colVal("c3", 2)))
702+
.execute();
703+
704+
tableQuery.select("c1", "c2", "c3");
705+
filter = new ObTableValueFilter(ObCompareOp.EQ, "c1", 102);
706+
tableQuery.setFilter(filter);
707+
result = tableQuery.execute();
708+
Assert.assertTrue(result.next());
709+
value = result.getRow();
710+
Assert.assertEquals(1, value.get("c2"));
711+
Assert.assertEquals(2, value.get("c3"));
712+
713+
// test replace exist, replace
714+
theResult = client.replace(TABLE_NAME).setRowKey(colVal("c1", 101), colVal("c2", 1))
715+
.addMutateRow(row(colVal("c3", 20))).execute();
716+
717+
tableQuery.select("c1", "c2", "c3");
718+
filter = new ObTableValueFilter(ObCompareOp.EQ, "c1", 101);
719+
tableQuery.setFilter(filter);
720+
result = tableQuery.execute();
721+
Assert.assertTrue(result.next());
722+
value = result.getRow();
723+
Assert.assertEquals(1, value.get("c2"));
724+
Assert.assertEquals(20, value.get("c3"));
725+
726+
// test insertup not exist, insert
727+
theResult = client.insertOrUpdate(TABLE_NAME)
728+
.setRowKey(colVal("c1", 0), colVal("c2", 1)).addMutateRow(row(colVal("c3", 5)))
729+
.execute();
730+
731+
tableQuery.select("c1", "c2", "c3");
732+
filter = new ObTableValueFilter(ObCompareOp.EQ, "c1", 103);
733+
tableQuery.setFilter(filter);
734+
result = tableQuery.execute();
735+
Assert.assertTrue(result.next());
736+
value = result.getRow();
737+
Assert.assertEquals(1, value.get("c2"));
738+
Assert.assertEquals(5, value.get("c3"));
739+
740+
// test insertup exist, update
741+
theResult = client.insertOrUpdate(TABLE_NAME)
742+
.setRowKey(colVal("c1", 103), colVal("c2", 1)).addMutateRow(row(colVal("c3", 50)))
743+
.execute();
744+
745+
tableQuery.select("c1", "c2", "c3");
746+
filter = new ObTableValueFilter(ObCompareOp.EQ, "c1", 103);
747+
tableQuery.setFilter(filter);
748+
result = tableQuery.execute();
749+
Assert.assertTrue(result.next());
750+
value = result.getRow();
751+
Assert.assertEquals(1, value.get("c2"));
752+
Assert.assertEquals(50, value.get("c3"));
753+
754+
// test insertup exist, update again
755+
theResult = client.insertOrUpdate(TABLE_NAME)
756+
.setRowKey(colVal("c1", 103), colVal("c2", 1)).addMutateRow(row(colVal("c3", 50)))
757+
.execute();
758+
759+
tableQuery.select("c1", "c2", "c3");
760+
filter = new ObTableValueFilter(ObCompareOp.EQ, "c1", 103);
761+
tableQuery.setFilter(filter);
762+
result = tableQuery.execute();
763+
Assert.assertTrue(result.next());
764+
value = result.getRow();
765+
Assert.assertEquals(1, value.get("c2"));
766+
Assert.assertEquals(50, value.get("c3"));
767+
768+
// test increment not exist, insert
769+
value = client.increment(TABLE_NAME, new Object[] { 0, 1 }, new String[] { "c3" },
770+
new Object[] { 6 }, true);
771+
772+
tableQuery.select("c1", "c2", "c3");
773+
filter = new ObTableValueFilter(ObCompareOp.EQ, "c1", 104);
774+
tableQuery.setFilter(filter);
775+
result = tableQuery.execute();
776+
Assert.assertTrue(result.next());
777+
value = result.getRow();
778+
Assert.assertEquals(1, value.get("c2"));
779+
Assert.assertEquals(6, value.get("c3"));
780+
781+
// test increment exist, increment
782+
value = client.increment(TABLE_NAME, new Object[] { 104, 1 }, new String[] { "c3" },
783+
new Object[] { 6 }, true);
784+
785+
tableQuery.select("c1", "c2", "c3");
786+
filter = new ObTableValueFilter(ObCompareOp.EQ, "c1", 104);
787+
tableQuery.setFilter(filter);
788+
result = tableQuery.execute();
789+
Assert.assertTrue(result.next());
790+
value = result.getRow();
791+
Assert.assertEquals(1, value.get("c2"));
792+
Assert.assertEquals(12, value.get("c3"));
793+
794+
// test illegal increment on auto increment column
795+
try {
796+
value = client.increment(TABLE_NAME, new Object[] { 104, 1 },
797+
new String[] { "c1" }, new Object[] { 1 }, true);
798+
} catch (ObTableException e) {
799+
assertNotNull(e);
800+
assertEquals(ResultCodes.OB_NOT_SUPPORTED.errorCode, e.getErrorCode());
801+
}
802+
803+
// test append not exist, insert
804+
Map<String, Object> res = client.append(TABLE_NAME, new Object[] { 0, 1 },
805+
new String[] { "c4" }, new Object[] { "a" }, true);
806+
807+
tableQuery.select("c1", "c2", "c3", "c4");
808+
filter = new ObTableValueFilter(ObCompareOp.EQ, "c1", 105);
809+
tableQuery.setFilter(filter);
810+
result = tableQuery.execute();
811+
Assert.assertTrue(result.next());
812+
value = result.getRow();
813+
Assert.assertEquals(1, value.get("c2"));
814+
Assert.assertEquals("a", value.get("c4"));
815+
816+
// test append exist, append
817+
res = client.append(TABLE_NAME, new Object[] { 105, 1 }, new String[] { "c4" },
818+
new Object[] { "b" }, true);
819+
820+
tableQuery.select("c1", "c2", "c3", "c4");
821+
filter = new ObTableValueFilter(ObCompareOp.EQ, "c1", 105);
822+
tableQuery.setFilter(filter);
823+
result = tableQuery.execute();
824+
Assert.assertTrue(result.next());
825+
value = result.getRow();
826+
Assert.assertEquals(1, value.get("c2"));
827+
Assert.assertEquals("ab", value.get("c4"));
828+
829+
// the total number of data
830+
tableQuery.select("c1", "c2", "c3", "c4");
831+
filter = new ObTableValueFilter(ObCompareOp.LT, "c1", 300);
832+
tableQuery.setFilter(filter);
833+
result = tableQuery.execute();
834+
Assert.assertEquals(7, result.cacheSize());
835+
} finally { // drop table
836+
Connection connection = ObTableClientTestUtil.getConnection();
837+
Statement statement = connection.createStatement();
838+
statement.execute("drop table " + TABLE_NAME);
839+
}
840+
}
841+
604842
}

0 commit comments

Comments
 (0)