Skip to content

Commit 0caee1b

Browse files
committed
adapt fts query request
1 parent c762c85 commit 0caee1b

File tree

10 files changed

+222
-19
lines changed

10 files changed

+222
-19
lines changed

src/main/java/com/alipay/oceanbase/rpc/ObClusterTableQuery.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,4 +240,10 @@ public void setEntityType(ObTableEntityType entityType) {
240240
super.setEntityType(entityType);
241241
tableClientQuery.setEntityType(entityType);
242242
}
243+
244+
@Override
245+
public TableQuery setSearchText(String searchText) {
246+
tableClientQuery.setSearchText(searchText);
247+
return this;
248+
}
243249
}

src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObIndexType.java

Lines changed: 32 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,16 +21,38 @@
2121
import java.util.Map;
2222

2323
public enum ObIndexType {
24-
IndexTypeIsNot(0), IndexTypeNormalLocal(1), IndexTypeUniqueLocal(2), IndexTypeNormalGlobal(3), IndexTypeUniqueGlobal(
25-
4), IndexTypePrimary(
26-
5), IndexTypeDomainCtxcat(
27-
6), IndexTypeNormalGlobalLocalStorage(
28-
7), IndexTypeUniqueGlobalLocalStorage(
29-
8), IndexTypeSpatialLocal(
30-
10), IndexTypeSpatialGlobal(
31-
11), IndexTypeSpatialGlobalLocalStorage(
32-
12), IndexTypeMax(
33-
13);
24+
IndexTypeIsNot(0),
25+
IndexTypeNormalLocal(1),
26+
IndexTypeUniqueLocal(2),
27+
IndexTypeNormalGlobal(3),
28+
IndexTypeUniqueGlobal(4),
29+
IndexTypePrimary(5),
30+
IndexTypeDomainCtxcat(6),
31+
IndexTypeNormalGlobalLocalStorage(7),
32+
IndexTypeUniqueGlobalLocalStorage(8),
33+
IndexTypeSpatialLocal(10),
34+
IndexTypeSpatialGlobal(11),
35+
IndexTypeSpatialGlobalLocalStorage(12),
36+
IndexTypeRowkeyDocIdLocal(13),
37+
IndexTypeDocIdRowkeyLocal(14),
38+
IndexTypeFtsIndexLocal(15),
39+
IndexTypeFtsDocWordLocal(16),
40+
/*
41+
IndexTypeDocIdRowkeyGlobal(17),
42+
IndexTypeFtsIndexGlobal(18),
43+
IndexTypeFtsDocWordGlobal(19),
44+
IndexTypeDocIdRowkeyGlobalLocalStorage(20),
45+
IndexTypeFtsIndexGlobalLocalStorage(21),
46+
IndexTypeFtsDocWordGlobalLocalStorage(22),
47+
IndexTypeNormalMultivalueLocal(23),
48+
IndexTypeUniqueMultivalueLocal(24),
49+
IndexTypeVecRowkeyVidLocal(25),
50+
IndexTypeVecVidRowkeyLocal(26),
51+
IndexTypeVecDeltaBufferLocal(27),
52+
IndexTypeVecIndexIdLocal(28),
53+
IndexTypeVecIndexSnapshotDataLocal(29),
54+
*/
55+
IndexTypeMax(30);
3456

3557
private int value;
3658
private static Map<Integer, ObIndexType> map = new HashMap<Integer, ObIndexType>();

src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/ObTableQuery.java

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,13 @@
1818
package com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query;
1919

2020
import com.alipay.oceanbase.rpc.exception.FeatureNotSupportedException;
21+
import com.alipay.oceanbase.rpc.table.ObFTSParams;
2122
import com.alipay.oceanbase.rpc.table.ObHBaseParams;
22-
import com.alipay.oceanbase.rpc.table.ObKVParams;
23+
import com.alipay.oceanbase.rpc.table. ObKVParams;
2324
import com.alipay.oceanbase.rpc.protocol.payload.AbstractPayload;
2425
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.aggregation.ObTableAggregationSingle;
2526
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.aggregation.ObTableAggregationType;
27+
import com.alipay.oceanbase.rpc.table.ObKVParamsBase;
2628
import com.alipay.oceanbase.rpc.util.Serialization;
2729
import io.netty.buffer.ByteBuf;
2830

@@ -67,7 +69,7 @@ public class ObTableQuery extends AbstractPayload {
6769

6870
private List<ObTableAggregationSingle> aggregations = new LinkedList<>();
6971

70-
private ObKVParams obKVParams;
72+
private ObKVParams obKVParams = null;
7173

7274
/*
7375
* Check filter.
@@ -178,7 +180,7 @@ public byte[] encode() {
178180
idx += len;
179181
}
180182

181-
if (isHbaseQuery && obKVParams != null) {
183+
if (obKVParams != null) { // hbaseQuery or FTSQuery will use obKVParams
182184
len = (int) obKVParams.getPayloadSize();
183185
System.arraycopy(obKVParams.encode(), 0, bytes, idx, len);
184186
idx += len;
@@ -245,7 +247,11 @@ public Object decode(ByteBuf buf) {
245247
String agg_column = Serialization.decodeVString(buf);
246248
this.aggregations.add(new ObTableAggregationSingle(ObTableAggregationType.fromByte(agg_type), agg_column));
247249
}
248-
if (isHbaseQuery) {
250+
251+
buf.markReaderIndex();
252+
if (buf.readByte() > 0) {
253+
// read pType if is exists
254+
buf.resetReaderIndex();
249255
obKVParams = new ObKVParams();
250256
this.obKVParams.decode(buf);
251257
}
@@ -280,7 +286,7 @@ public long getPayloadContentSize() {
280286
} else {
281287
contentSize += HTABLE_DUMMY_BYTES.length;
282288
}
283-
if (isHbaseQuery && obKVParams != null) {
289+
if (obKVParams != null) {
284290
contentSize += obKVParams.getPayloadSize();
285291
} else {
286292
contentSize += HTABLE_DUMMY_BYTES.length;
@@ -500,6 +506,18 @@ public void setObKVParams(ObKVParams obKVParams) {
500506
this.obKVParams = obKVParams;
501507
}
502508

509+
public void setSearchText(String searchText) {
510+
if (isHbaseQuery) {
511+
throw new FeatureNotSupportedException("Hbase query not support full text search currently");
512+
}
513+
if (obKVParams == null) {
514+
obKVParams = new ObKVParams();
515+
}
516+
ObFTSParams ftsParams = (ObFTSParams)obKVParams.getObParams(ObKVParamsBase.paramType.FTS);
517+
ftsParams.setSearchText(searchText);
518+
obKVParams.setObParamsBase(ftsParams);
519+
}
520+
503521
public ObKVParams getObKVParams() {
504522
return obKVParams;
505523
}

src/main/java/com/alipay/oceanbase/rpc/table/AbstractTableQueryImpl.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.ObNewRange;
2424
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.ObScanOrder;
2525
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.ObTableQuery;
26+
import com.alipay.oceanbase.rpc.table.api.Table;
2627
import com.alipay.oceanbase.rpc.table.api.TableQuery;
2728

2829
import java.util.Arrays;
@@ -184,6 +185,12 @@ public TableQuery setMaxResultSize(long maxResultSize) {
184185
return this;
185186
}
186187

188+
@Override
189+
public TableQuery setSearchText(String searchText) {
190+
this.tableQuery.setSearchText(searchText);
191+
return this;
192+
}
193+
187194
public String getIndexTableName() {
188195
return indexTableName;
189196
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*-
2+
* #%L
3+
* com.oceanbase:obkv-table-client
4+
* %%
5+
* Copyright (C) 2021 - 2024 OceanBase
6+
* %%
7+
* OBKV Table Client Framework is licensed under Mulan PSL v2.
8+
* You can use this software according to the terms and conditions of the Mulan PSL v2.
9+
* You may obtain a copy of Mulan PSL v2 at:
10+
* http://license.coscl.org.cn/MulanPSL2
11+
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
12+
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
13+
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
14+
* See the Mulan PSL v2 for more details.
15+
* #L%
16+
*/
17+
18+
package com.alipay.oceanbase.rpc.table;
19+
20+
import com.alipay.oceanbase.rpc.util.Serialization;
21+
import io.netty.buffer.ByteBuf;
22+
23+
public class ObFTSParams extends ObKVParamsBase {
24+
String searchText = null;
25+
public ObFTSParams() {
26+
pType = paramType.FTS;
27+
}
28+
29+
public paramType getType() {
30+
return pType;
31+
}
32+
33+
public void setSearchText(String searchText) {
34+
this.searchText = searchText;
35+
}
36+
37+
public String getSearchText() { return this.searchText; }
38+
39+
public byte[] encode() {
40+
byte[] bytes = new byte[(int) getPayloadContentSize()];
41+
int idx = 0;
42+
byte[] b = new byte[] { (byte)pType.ordinal() };
43+
System.arraycopy(b, 0, bytes, idx, 1);
44+
idx += 1;
45+
int len = Serialization.getNeedBytes(searchText);
46+
System.arraycopy(Serialization.encodeVString(searchText), 0, bytes, idx, len);
47+
return bytes;
48+
}
49+
50+
public Object decode(ByteBuf buf) {
51+
// pType is read by ObKVParams
52+
this.searchText = Serialization.decodeVString(buf);
53+
return this;
54+
}
55+
56+
public long getPayloadContentSize() {
57+
return 1 /* pType*/ + Serialization.getNeedBytes(searchText);
58+
}
59+
60+
public String toString() {
61+
return "ObFtsParams: {\n pType = " + pType + ", \n searchText = " + searchText
62+
+ "\n}\n";
63+
}
64+
}

src/main/java/com/alipay/oceanbase/rpc/table/ObKVParams.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@ public ObKVParamsBase getObParams(ObKVParamsBase.paramType pType) {
3232
case HBase:
3333
return new ObHBaseParams();
3434
case Redis:
35+
throw new RuntimeException("Currently does not support redis type");
36+
case FTS:
37+
return new ObFTSParams();
3538
default:
3639
throw new RuntimeException("Currently does not support other types except HBase");
3740
}

src/main/java/com/alipay/oceanbase/rpc/table/ObKVParamsBase.java

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,20 +17,31 @@
1717

1818
package com.alipay.oceanbase.rpc.table;
1919

20+
import com.alipay.oceanbase.rpc.direct_load.protocol.payload.ObTableLoadClientStatus;
2021
import io.netty.buffer.ByteBuf;
2122

23+
import java.util.HashMap;
24+
import java.util.Map;
25+
2226
public abstract class ObKVParamsBase {
2327
public enum paramType {
24-
HBase((byte) 0), Redis((byte) 1);
28+
HBase((byte) 0), Redis((byte) 1), FTS((byte) 2);
2529
private final byte value;
30+
private static final Map<Integer, paramType> map = new HashMap<Integer, paramType>();
31+
32+
static {
33+
for (paramType type : paramType.values()) {
34+
map.put(type.ordinal(), type);
35+
}
36+
}
37+
38+
public static paramType valueOf(int value) { return map.get(value); }
2639

2740
paramType(byte value) {
2841
this.value = value;
2942
}
3043

31-
public byte getValue() {
32-
return value;
33-
}
44+
public byte getValue() { return value; }
3445
}
3546

3647
public int byteSize;

src/main/java/com/alipay/oceanbase/rpc/table/api/TableQuery.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,4 +199,6 @@ public interface TableQuery {
199199
TableQuery setScanRangeColumns(String... columns);
200200

201201
void clear();
202+
203+
TableQuery setSearchText(String searchText);
202204
}

src/test/java/com/alipay/oceanbase/rpc/ObTableFullTextIndexTest.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
import com.alipay.oceanbase.rpc.exception.ObTableException;
44
import com.alipay.oceanbase.rpc.mutation.result.MutationResult;
55
import com.alipay.oceanbase.rpc.protocol.payload.ResultCodes;
6+
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.ObTableQuery;
7+
import com.alipay.oceanbase.rpc.stream.QueryResultSet;
68
import com.alipay.oceanbase.rpc.table.ObTable;
79
import com.alipay.oceanbase.rpc.util.ObTableClientTestUtil;
810
import com.google.protobuf.MapEntry;
@@ -323,6 +325,39 @@ public void testTTLInsert() throws Exception {
323325
}
324326
}
325327

328+
@Test
329+
public void testFtsQuery() throws Exception {
330+
try {
331+
//sync query
332+
QueryResultSet resultSet = client.query(tableName)
333+
.setSearchText("native")
334+
.indexName("full_idx1_tbl1")
335+
.execute();
336+
while(resultSet.next()) {
337+
Map<String, Object> row = resultSet.getRow();
338+
for (Map.Entry<String, Object> entry: row.entrySet()) {
339+
System.out.println("colname: " + entry.getKey() + " \nvalue: " + entry.getValue());
340+
}
341+
System.out.println();
342+
}
343+
// async query
344+
System.out.println("========async query:=========");
345+
QueryResultSet asyncResultSet = client.query(tableName)
346+
.indexName("full_idx1_tbl1")
347+
.setSearchText("oceanbase")
348+
.asyncExecute();
349+
while(asyncResultSet.next()) {
350+
Map<String, Object> row = asyncResultSet.getRow();
351+
for (Map.Entry<String, Object> entry: row.entrySet()) {
352+
System.out.println("colname: " + entry.getKey() + " \nvalue: " + entry.getValue());
353+
}
354+
System.out.println();
355+
}
356+
} catch (Exception e) {
357+
e.printStackTrace();
358+
}
359+
}
360+
326361
private void executeSQL(String createSQL) throws SQLException {
327362
Connection connection = ObTableClientTestUtil.getConnection();
328363
Statement statement = connection.createStatement();

src/test/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/ObTableQueryPayloadTest.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,15 @@
2020
import com.alipay.oceanbase.rpc.protocol.payload.impl.*;
2121
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableConsistencyLevel;
2222
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableEntityType;
23+
import com.alipay.oceanbase.rpc.table.ObFTSParams;
2324
import com.alipay.oceanbase.rpc.table.ObHBaseParams;
2425
import com.alipay.oceanbase.rpc.table.ObKVParams;
26+
import com.alipay.oceanbase.rpc.table.ObKVParamsBase;
2527
import com.alipay.oceanbase.rpc.util.ObBytesString;
28+
import com.sun.xml.internal.ws.wsdl.writer.document.ParamType;
2629
import io.netty.buffer.ByteBuf;
2730
import io.netty.buffer.PooledByteBufAllocator;
31+
import org.junit.Assert;
2832
import org.junit.Test;
2933

3034
import java.util.ArrayList;
@@ -150,6 +154,37 @@ public void test_ObTableQueryResult() {
150154
buf.release();
151155
}
152156

157+
@Test
158+
public void testFtsParam() {
159+
ObFTSParams ftsParams = new ObFTSParams();
160+
ftsParams.setSearchText("oceanbase");
161+
byte[] bytes = ftsParams.encode();
162+
ByteBuf buf = PooledByteBufAllocator.DEFAULT.buffer();
163+
buf.writeBytes(bytes);
164+
ObFTSParams newFtsParams = new ObFTSParams();
165+
assertEquals(ftsParams.getType(), ObKVParamsBase.paramType.valueOf(buf.readByte()));
166+
newFtsParams.decode(buf);
167+
assertEquals(ftsParams.getSearchText(), newFtsParams.getSearchText());
168+
buf.release();
169+
}
170+
171+
@Test
172+
public void testFtsQuery() {
173+
ObTableQuery obTableQuery = getObTableQuery();
174+
obTableQuery.setIndexName("ftx_idx");
175+
obTableQuery.setSearchText("oceanbase");
176+
byte[] bytes = obTableQuery.encode();
177+
ByteBuf buf = PooledByteBufAllocator.DEFAULT.buffer();
178+
buf.writeBytes(bytes);
179+
180+
ObTableQuery newObTableQuery = new ObTableQuery();
181+
newObTableQuery.decode(buf);
182+
ObKVParamsBase kv_params_base = newObTableQuery.getObKVParams().obKVParamsBase;
183+
Assert.assertEquals(ObKVParamsBase.paramType.FTS, kv_params_base.getType());
184+
ObFTSParams fts_params = (ObFTSParams) kv_params_base;
185+
Assert.assertEquals("oceanbase", fts_params.getSearchText());
186+
}
187+
153188
private ObTableQuery getObTableQuery() {
154189
ObTableQuery obTableQuery = new ObTableQuery();
155190
obTableQuery.addKeyRange(getObNewRange());

0 commit comments

Comments
 (0)