Skip to content

Commit 435350d

Browse files
authored
Merge pull request #12 from JLY2015/master
support query async
2 parents a6fd117 + 800e714 commit 435350d

35 files changed

+1836
-21
lines changed
Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
/*-
2+
* #%L
3+
* OceanBase Table Client Framework
4+
* %%
5+
* Copyright (C) 2016 - 2022 Ant Financial Services Group
6+
* %%
7+
* OBKV Table Client Framework is licensed under Mulan PSL v2.
8+
* You can use this software according to the terms and conditions of the Mulan PSL v2.
9+
* You may obtain a copy of Mulan PSL v2 at:
10+
* http://license.coscl.org.cn/MulanPSL2
11+
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
12+
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
13+
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
14+
* See the Mulan PSL v2 for more details.
15+
* #L%
16+
*/
17+
18+
package com.alipay.oceanbase.rpc;
19+
20+
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.syncquery.ObQueryOperationType;
21+
import com.alipay.oceanbase.rpc.stream.async.ObTableClientQueryAsyncStreamResult;
22+
import com.alipay.oceanbase.rpc.location.model.partition.ObPair;
23+
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableEntityType;
24+
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.ObHTableFilter;
25+
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.ObTableQuery;
26+
import com.alipay.oceanbase.rpc.stream.QueryResultSet;
27+
import com.alipay.oceanbase.rpc.table.AbstractTableQuery;
28+
import com.alipay.oceanbase.rpc.table.ObTable;
29+
import com.alipay.oceanbase.rpc.table.ObTableClientQueryAsyncImpl;
30+
import com.alipay.oceanbase.rpc.table.api.TableQuery;
31+
32+
public class ObClusterTableAsyncQuery extends AbstractTableQuery {
33+
private final ObTableClientQueryAsyncImpl tableClientQuerySync;
34+
35+
ObClusterTableAsyncQuery(ObTableClientQueryAsyncImpl tableClientQuerySync) {
36+
this.tableClientQuerySync = tableClientQuerySync;
37+
}
38+
39+
@Override
40+
public ObTableQuery getObTableQuery() {
41+
return tableClientQuerySync.getObTableQuery();
42+
}
43+
44+
@Override
45+
public String getTableName() {
46+
return tableClientQuerySync.getTableName();
47+
}
48+
49+
@Override
50+
public QueryResultSet execute() throws Exception {
51+
return tableClientQuerySync.execute();
52+
}
53+
54+
@Override
55+
public QueryResultSet executeInit(ObPair<Long, ObTable> entry) throws Exception {
56+
return tableClientQuerySync.executeInit(entry);
57+
}
58+
59+
@Override
60+
public QueryResultSet executeNext(ObPair<Long, ObTable> entry) throws Exception {
61+
return tableClientQuerySync.executeNext(entry);
62+
}
63+
64+
ObTableClientQueryAsyncStreamResult executeInternal(ObQueryOperationType type) throws Exception {
65+
return tableClientQuerySync.executeInternal(type);
66+
}
67+
68+
@Override
69+
public TableQuery select(String... columns) {
70+
tableClientQuerySync.select(columns);
71+
return this;
72+
}
73+
74+
@Override
75+
public TableQuery setKeys(String... keys) {
76+
throw new IllegalArgumentException("Not needed");
77+
}
78+
79+
@Override
80+
public TableQuery limit(int offset, int limit) {
81+
tableClientQuerySync.limit(offset, limit);
82+
return this;
83+
}
84+
85+
@Override
86+
public TableQuery addScanRange(Object[] start, boolean startEquals, Object[] end,
87+
boolean endEquals) {
88+
tableClientQuerySync.addScanRange(start, startEquals, end, endEquals);
89+
return this;
90+
}
91+
92+
@Override
93+
public TableQuery addScanRangeStartsWith(Object[] start, boolean startEquals) {
94+
tableClientQuerySync.addScanRangeStartsWith(start, startEquals);
95+
return this;
96+
}
97+
98+
@Override
99+
public TableQuery addScanRangeEndsWith(Object[] end, boolean endEquals) {
100+
tableClientQuerySync.addScanRangeStartsWith(end, endEquals);
101+
return this;
102+
}
103+
104+
@Override
105+
public TableQuery scanOrder(boolean forward) {
106+
tableClientQuerySync.scanOrder(forward);
107+
return this;
108+
}
109+
110+
@Override
111+
public TableQuery indexName(String indexName) {
112+
tableClientQuerySync.indexName(indexName);
113+
return this;
114+
}
115+
116+
@Override
117+
public TableQuery filterString(String filterString) {
118+
tableClientQuerySync.filterString(filterString);
119+
return this;
120+
}
121+
122+
@Override
123+
public TableQuery setHTableFilter(ObHTableFilter obHTableFilter) {
124+
return tableClientQuerySync.setHTableFilter(obHTableFilter);
125+
}
126+
127+
@Override
128+
public TableQuery setBatchSize(int batchSize) {
129+
return tableClientQuerySync.setBatchSize(batchSize);
130+
}
131+
132+
@Override
133+
public TableQuery setMaxResultSize(long maxResultSize) {
134+
return tableClientQuerySync.setMaxResultSize(maxResultSize);
135+
}
136+
137+
@Override
138+
public void clear() {
139+
tableClientQuerySync.clear();
140+
}
141+
142+
public void setEntityType(ObTableEntityType entityType) {
143+
super.setEntityType(entityType);
144+
tableClientQuerySync.setEntityType(entityType);
145+
}
146+
}

src/main/java/com/alipay/oceanbase/rpc/ObClusterTableQuery.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,14 @@
1717

1818
package com.alipay.oceanbase.rpc;
1919

20+
import com.alipay.oceanbase.rpc.location.model.partition.ObPair;
2021
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableEntityType;
2122
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.ObHTableFilter;
2223
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.ObTableQuery;
2324
import com.alipay.oceanbase.rpc.stream.ObTableClientQueryStreamResult;
2425
import com.alipay.oceanbase.rpc.stream.QueryResultSet;
2526
import com.alipay.oceanbase.rpc.table.AbstractTableQuery;
27+
import com.alipay.oceanbase.rpc.table.ObTable;
2628
import com.alipay.oceanbase.rpc.table.ObTableClientQueryImpl;
2729
import com.alipay.oceanbase.rpc.table.api.TableQuery;
2830

@@ -58,6 +60,16 @@ public QueryResultSet execute() throws Exception {
5860
return tableClientQuery.execute();
5961
}
6062

63+
@Override
64+
public QueryResultSet executeInit(ObPair<Long, ObTable> entry) throws Exception {
65+
throw new IllegalArgumentException("not support executeInit");
66+
}
67+
68+
@Override
69+
public QueryResultSet executeNext(ObPair<Long, ObTable> entry) throws Exception {
70+
throw new IllegalArgumentException("not support executeNext");
71+
}
72+
6173
/*
6274
* Execute internal.
6375
*/
@@ -164,6 +176,11 @@ public TableQuery setBatchSize(int batchSize) {
164176
return tableClientQuery.setBatchSize(batchSize);
165177
}
166178

179+
@Override
180+
public TableQuery setMaxResultSize(long maxResultSize) {
181+
return tableClientQuery.setMaxResultSize(maxResultSize);
182+
}
183+
167184
/**
168185
* Clear.
169186
*/

src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,11 @@
3232
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.ObNewRange;
3333
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.ObTableQuery;
3434
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.ObTableQueryRequest;
35+
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.syncquery.ObTableQueryAsyncRequest;
3536
import com.alipay.oceanbase.rpc.table.AbstractObTableClient;
3637
import com.alipay.oceanbase.rpc.table.ObTable;
3738
import com.alipay.oceanbase.rpc.table.ObTableClientBatchOpsImpl;
39+
import com.alipay.oceanbase.rpc.table.ObTableClientQueryAsyncImpl;
3840
import com.alipay.oceanbase.rpc.table.ObTableClientQueryImpl;
3941
import com.alipay.oceanbase.rpc.table.api.TableBatchOps;
4042
import com.alipay.oceanbase.rpc.table.api.TableQuery;
@@ -651,7 +653,7 @@ public TableEntry getOrRefreshTableEntry(final String tableName, final boolean r
651653
throw new ObTableUnexpectedException("waiting for table entry "
652654
+ tableName + " punish interval "
653655
+ punishInterval
654-
+ " is interrupted.");
656+
+ " is interrupted.", e);
655657
}
656658
}
657659
} else {
@@ -968,6 +970,12 @@ public TableQuery query(String tableName) {
968970
return new ObClusterTableQuery(tableQuery);
969971
}
970972

973+
@Override
974+
public TableQuery queryByBatchV2(String tableName) {
975+
ObTableClientQueryAsyncImpl querySync = new ObTableClientQueryAsyncImpl(tableName, this);
976+
return new ObClusterTableAsyncQuery(querySync);
977+
}
978+
971979
@Override
972980
public TableQuery queryByBatch(String tableName) throws Exception {
973981
return new QueryByBatch(query(tableName));
@@ -1188,6 +1196,13 @@ public ObPayload execute(final ObTableAbstractOperationRequest request) throws E
11881196
((ObTableQueryRequest) request).getTableQuery(), this);
11891197
tableQuery.setEntityType(request.getEntityType());
11901198
return new ObClusterTableQuery(tableQuery).executeInternal();
1199+
} else if (request instanceof ObTableQueryAsyncRequest) {
1200+
ObTableClientQueryAsyncImpl tableClientQuerySync = new ObTableClientQueryAsyncImpl(
1201+
request.getTableName(), ((ObTableQueryAsyncRequest) request)
1202+
.getObTableQueryRequest().getTableQuery(), this);
1203+
tableClientQuerySync.setEntityType(request.getEntityType());
1204+
return new ObClusterTableAsyncQuery(tableClientQuerySync)
1205+
.executeInternal(((ObTableQueryAsyncRequest) request).getQueryType());
11911206
} else if (request instanceof ObTableBatchOperationRequest) {
11921207
ObTableClientBatchOpsImpl batchOps = new ObTableClientBatchOpsImpl(
11931208
request.getTableName(),

src/main/java/com/alipay/oceanbase/rpc/batch/QueryByBatch.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,12 @@
1717

1818
package com.alipay.oceanbase.rpc.batch;
1919

20+
import com.alipay.oceanbase.rpc.location.model.partition.ObPair;
2021
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableEntityType;
2122
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.ObHTableFilter;
2223
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.ObTableQuery;
2324
import com.alipay.oceanbase.rpc.stream.QueryResultSet;
25+
import com.alipay.oceanbase.rpc.table.ObTable;
2426
import com.alipay.oceanbase.rpc.table.api.TableQuery;
2527
import java.util.Arrays;
2628
import java.util.HashSet;
@@ -62,6 +64,16 @@ public QueryResultSet execute() {
6264
return new QueryResultSet(new QueryByBatchResultSet(this));
6365
}
6466

67+
@Override
68+
public QueryResultSet executeInit(ObPair<Long, ObTable> entry) throws Exception {
69+
throw new IllegalArgumentException("not support executeInit");
70+
}
71+
72+
@Override
73+
public QueryResultSet executeNext(ObPair<Long, ObTable> entry) throws Exception {
74+
throw new IllegalArgumentException("not support executeNext");
75+
}
76+
6577
public TableQuery addScanRange(Object start, Object end) {
6678
addScanRange(start, true, end, true);
6779
return this;
@@ -167,6 +179,11 @@ public TableQuery setOperationTimeout(long operationTimeout) {
167179
return this;
168180
}
169181

182+
@Override
183+
public TableQuery setMaxResultSize(long maxResultSize) {
184+
throw new IllegalArgumentException("not support setMaxResultSize");
185+
}
186+
170187
@Override
171188
public void clear() {
172189
this.tableQuery.clear();

src/main/java/com/alipay/oceanbase/rpc/bolt/protocol/ObTablePacketCode.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableOperationResult;
2525
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.mutate.ObTableQueryAndMutateResult;
2626
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.ObTableQueryResult;
27+
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.syncquery.ObTableQueryAsyncResult;
2728
import com.alipay.oceanbase.rpc.protocol.payload.impl.login.ObTableLoginResult;
2829
import com.alipay.remoting.CommandCode;
2930

@@ -74,8 +75,15 @@ public ObPayload newPayload(ObRpcPacketHeader header) {
7475
return new ObTableQueryAndMutateResult();
7576
}
7677
}, //
77-
78-
// INVALID REQUEST PCODE, no such rpc
78+
OB_TABLE_API_EXECUTE_QUERY_SYNC(Pcodes.OB_TABLE_API_EXECUTE_QUERY_SYNC) {
79+
/**
80+
* New payload.
81+
*/
82+
@Override
83+
public ObPayload newPayload(ObRpcPacketHeader header) {
84+
return new ObTableQueryAsyncResult();
85+
}
86+
}, //
7987
OB_ERROR_PACKET(Pcodes.OB_ERROR_PACKET) {
8088
/*
8189
* New payload.
@@ -114,6 +122,8 @@ public static ObTablePacketCode valueOf(short value) {
114122
return OB_TABLE_API_EXECUTE_QUERY;
115123
case Pcodes.OB_TABLE_API_QUERY_AND_MUTATE:
116124
return OB_TABLE_API_QUERY_AND_MUTATE;
125+
case Pcodes.OB_TABLE_API_EXECUTE_QUERY_SYNC:
126+
return OB_TABLE_API_EXECUTE_QUERY_SYNC;
117127
case Pcodes.OB_ERROR_PACKET:
118128
return OB_ERROR_PACKET;
119129
}

src/main/java/com/alipay/oceanbase/rpc/protocol/payload/Pcodes.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,14 @@
1919

2020
public interface Pcodes {
2121

22-
int OB_TABLE_API_LOGIN = 0x1101;
23-
int OB_TABLE_API_EXECUTE = 0x1102;
24-
int OB_TABLE_API_BATCH_EXECUTE = 0x1103;
25-
int OB_TABLE_API_EXECUTE_QUERY = 0x1104;
26-
int OB_TABLE_API_QUERY_AND_MUTATE = 0x1105;
22+
int OB_TABLE_API_LOGIN = 0x1101;
23+
int OB_TABLE_API_EXECUTE = 0x1102;
24+
int OB_TABLE_API_BATCH_EXECUTE = 0x1103;
25+
int OB_TABLE_API_EXECUTE_QUERY = 0x1104;
26+
int OB_TABLE_API_QUERY_AND_MUTATE = 0x1105;
2727

2828
// INVALID REQUEST PCODE, no such rpc
29-
int OB_ERROR_PACKET = 0x010;
29+
int OB_ERROR_PACKET = 0x010;
3030

31+
int OB_TABLE_API_EXECUTE_QUERY_SYNC = 0x1106;
3132
}

src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/ObObjType.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1783,7 +1783,8 @@ public static Comparable parseTextToComparable(ObObjType obObjType, Object objec
17831783
} catch (IllegalArgumentException e) {
17841784
throw new IllegalArgumentException(
17851785
obObjType.name()
1786-
+ "can not parseToComparable byte array to string with utf-8 charset");
1786+
+ "can not parseToComparable byte array to string with utf-8 charset",
1787+
e);
17871788
}
17881789
}
17891790

0 commit comments

Comments
 (0)