Skip to content

Commit 07ec674

Browse files
committed
adapt fts query request
1 parent 20cc4da commit 07ec674

File tree

10 files changed

+222
-19
lines changed

10 files changed

+222
-19
lines changed

src/main/java/com/alipay/oceanbase/rpc/ObClusterTableQuery.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,4 +240,10 @@ public void setEntityType(ObTableEntityType entityType) {
240240
super.setEntityType(entityType);
241241
tableClientQuery.setEntityType(entityType);
242242
}
243+
244+
@Override
245+
public TableQuery setSearchText(String searchText) {
246+
tableClientQuery.setSearchText(searchText);
247+
return this;
248+
}
243249
}

src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/ObIndexType.java

Lines changed: 32 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,16 +21,38 @@
2121
import java.util.Map;
2222

2323
public enum ObIndexType {
24-
IndexTypeIsNot(0), IndexTypeNormalLocal(1), IndexTypeUniqueLocal(2), IndexTypeNormalGlobal(3), IndexTypeUniqueGlobal(
25-
4), IndexTypePrimary(
26-
5), IndexTypeDomainCtxcat(
27-
6), IndexTypeNormalGlobalLocalStorage(
28-
7), IndexTypeUniqueGlobalLocalStorage(
29-
8), IndexTypeSpatialLocal(
30-
10), IndexTypeSpatialGlobal(
31-
11), IndexTypeSpatialGlobalLocalStorage(
32-
12), IndexTypeMax(
33-
13);
24+
IndexTypeIsNot(0),
25+
IndexTypeNormalLocal(1),
26+
IndexTypeUniqueLocal(2),
27+
IndexTypeNormalGlobal(3),
28+
IndexTypeUniqueGlobal(4),
29+
IndexTypePrimary(5),
30+
IndexTypeDomainCtxcat(6),
31+
IndexTypeNormalGlobalLocalStorage(7),
32+
IndexTypeUniqueGlobalLocalStorage(8),
33+
IndexTypeSpatialLocal(10),
34+
IndexTypeSpatialGlobal(11),
35+
IndexTypeSpatialGlobalLocalStorage(12),
36+
IndexTypeRowkeyDocIdLocal(13),
37+
IndexTypeDocIdRowkeyLocal(14),
38+
IndexTypeFtsIndexLocal(15),
39+
IndexTypeFtsDocWordLocal(16),
40+
/*
41+
IndexTypeDocIdRowkeyGlobal(17),
42+
IndexTypeFtsIndexGlobal(18),
43+
IndexTypeFtsDocWordGlobal(19),
44+
IndexTypeDocIdRowkeyGlobalLocalStorage(20),
45+
IndexTypeFtsIndexGlobalLocalStorage(21),
46+
IndexTypeFtsDocWordGlobalLocalStorage(22),
47+
IndexTypeNormalMultivalueLocal(23),
48+
IndexTypeUniqueMultivalueLocal(24),
49+
IndexTypeVecRowkeyVidLocal(25),
50+
IndexTypeVecVidRowkeyLocal(26),
51+
IndexTypeVecDeltaBufferLocal(27),
52+
IndexTypeVecIndexIdLocal(28),
53+
IndexTypeVecIndexSnapshotDataLocal(29),
54+
*/
55+
IndexTypeMax(30);
3456

3557
private int value;
3658
private static Map<Integer, ObIndexType> map = new HashMap<Integer, ObIndexType>();

src/main/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/ObTableQuery.java

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,15 @@
1818
package com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query;
1919

2020
import com.alipay.oceanbase.rpc.exception.FeatureNotSupportedException;
21+
import com.alipay.oceanbase.rpc.table.ObFTSParams;
2122
import com.alipay.oceanbase.rpc.table.ObHBaseParams;
22-
import com.alipay.oceanbase.rpc.table.ObKVParams;
23+
import com.alipay.oceanbase.rpc.table. ObKVParams;
2324
import com.alipay.oceanbase.rpc.protocol.payload.AbstractPayload;
2425
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObObj;
2526
import com.alipay.oceanbase.rpc.protocol.payload.impl.ObRowKey;
2627
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.aggregation.ObTableAggregationSingle;
2728
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.aggregation.ObTableAggregationType;
29+
import com.alipay.oceanbase.rpc.table.ObKVParamsBase;
2830
import com.alipay.oceanbase.rpc.util.Serialization;
2931
import io.netty.buffer.ByteBuf;
3032

@@ -71,7 +73,7 @@ public class ObTableQuery extends AbstractPayload {
7173

7274
private List<ObTableAggregationSingle> aggregations = new LinkedList<>();
7375

74-
private ObKVParams obKVParams;
76+
private ObKVParams obKVParams = null;
7577

7678
public void adjustStartKey(List<ObObj> key) throws IllegalArgumentException {
7779
List<ObNewRange> keyRanges = getKeyRanges();
@@ -223,7 +225,7 @@ public byte[] encode() {
223225
idx += len;
224226
}
225227

226-
if (isHbaseQuery && obKVParams != null) {
228+
if (obKVParams != null) { // hbaseQuery or FTSQuery will use obKVParams
227229
len = (int) obKVParams.getPayloadSize();
228230
System.arraycopy(obKVParams.encode(), 0, bytes, idx, len);
229231
idx += len;
@@ -290,7 +292,11 @@ public Object decode(ByteBuf buf) {
290292
String agg_column = Serialization.decodeVString(buf);
291293
this.aggregations.add(new ObTableAggregationSingle(ObTableAggregationType.fromByte(agg_type), agg_column));
292294
}
293-
if (isHbaseQuery) {
295+
296+
buf.markReaderIndex();
297+
if (buf.readByte() > 0) {
298+
// read pType if is exists
299+
buf.resetReaderIndex();
294300
obKVParams = new ObKVParams();
295301
this.obKVParams.decode(buf);
296302
}
@@ -325,7 +331,7 @@ public long getPayloadContentSize() {
325331
} else {
326332
contentSize += HTABLE_DUMMY_BYTES.length;
327333
}
328-
if (isHbaseQuery && obKVParams != null) {
334+
if (obKVParams != null) {
329335
contentSize += obKVParams.getPayloadSize();
330336
} else {
331337
contentSize += HTABLE_DUMMY_BYTES.length;
@@ -545,6 +551,18 @@ public void setObKVParams(ObKVParams obKVParams) {
545551
this.obKVParams = obKVParams;
546552
}
547553

554+
public void setSearchText(String searchText) {
555+
if (isHbaseQuery) {
556+
throw new FeatureNotSupportedException("Hbase query not support full text search currently");
557+
}
558+
if (obKVParams == null) {
559+
obKVParams = new ObKVParams();
560+
}
561+
ObFTSParams ftsParams = (ObFTSParams)obKVParams.getObParams(ObKVParamsBase.paramType.FTS);
562+
ftsParams.setSearchText(searchText);
563+
obKVParams.setObParamsBase(ftsParams);
564+
}
565+
548566
public ObKVParams getObKVParams() {
549567
return obKVParams;
550568
}

src/main/java/com/alipay/oceanbase/rpc/table/AbstractTableQueryImpl.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.ObNewRange;
2424
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.ObScanOrder;
2525
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.ObTableQuery;
26+
import com.alipay.oceanbase.rpc.table.api.Table;
2627
import com.alipay.oceanbase.rpc.table.api.TableQuery;
2728

2829
import java.util.Arrays;
@@ -184,6 +185,12 @@ public TableQuery setMaxResultSize(long maxResultSize) {
184185
return this;
185186
}
186187

188+
@Override
189+
public TableQuery setSearchText(String searchText) {
190+
this.tableQuery.setSearchText(searchText);
191+
return this;
192+
}
193+
187194
public String getIndexTableName() {
188195
return indexTableName;
189196
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*-
2+
* #%L
3+
* com.oceanbase:obkv-table-client
4+
* %%
5+
* Copyright (C) 2021 - 2024 OceanBase
6+
* %%
7+
* OBKV Table Client Framework is licensed under Mulan PSL v2.
8+
* You can use this software according to the terms and conditions of the Mulan PSL v2.
9+
* You may obtain a copy of Mulan PSL v2 at:
10+
* http://license.coscl.org.cn/MulanPSL2
11+
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
12+
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
13+
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
14+
* See the Mulan PSL v2 for more details.
15+
* #L%
16+
*/
17+
18+
package com.alipay.oceanbase.rpc.table;
19+
20+
import com.alipay.oceanbase.rpc.util.Serialization;
21+
import io.netty.buffer.ByteBuf;
22+
23+
public class ObFTSParams extends ObKVParamsBase {
24+
String searchText = null;
25+
public ObFTSParams() {
26+
pType = paramType.FTS;
27+
}
28+
29+
public paramType getType() {
30+
return pType;
31+
}
32+
33+
public void setSearchText(String searchText) {
34+
this.searchText = searchText;
35+
}
36+
37+
public String getSearchText() { return this.searchText; }
38+
39+
public byte[] encode() {
40+
byte[] bytes = new byte[(int) getPayloadContentSize()];
41+
int idx = 0;
42+
byte[] b = new byte[] { (byte)pType.ordinal() };
43+
System.arraycopy(b, 0, bytes, idx, 1);
44+
idx += 1;
45+
int len = Serialization.getNeedBytes(searchText);
46+
System.arraycopy(Serialization.encodeVString(searchText), 0, bytes, idx, len);
47+
return bytes;
48+
}
49+
50+
public Object decode(ByteBuf buf) {
51+
// pType is read by ObKVParams
52+
this.searchText = Serialization.decodeVString(buf);
53+
return this;
54+
}
55+
56+
public long getPayloadContentSize() {
57+
return 1 /* pType*/ + Serialization.getNeedBytes(searchText);
58+
}
59+
60+
public String toString() {
61+
return "ObFtsParams: {\n pType = " + pType + ", \n searchText = " + searchText
62+
+ "\n}\n";
63+
}
64+
}

src/main/java/com/alipay/oceanbase/rpc/table/ObKVParams.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@ public ObKVParamsBase getObParams(ObKVParamsBase.paramType pType) {
3232
case HBase:
3333
return new ObHBaseParams();
3434
case Redis:
35+
throw new RuntimeException("Currently does not support redis type");
36+
case FTS:
37+
return new ObFTSParams();
3538
default:
3639
throw new RuntimeException("Currently does not support other types except HBase");
3740
}

src/main/java/com/alipay/oceanbase/rpc/table/ObKVParamsBase.java

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,20 +17,31 @@
1717

1818
package com.alipay.oceanbase.rpc.table;
1919

20+
import com.alipay.oceanbase.rpc.direct_load.protocol.payload.ObTableLoadClientStatus;
2021
import io.netty.buffer.ByteBuf;
2122

23+
import java.util.HashMap;
24+
import java.util.Map;
25+
2226
public abstract class ObKVParamsBase {
2327
public enum paramType {
24-
HBase((byte) 0), Redis((byte) 1);
28+
HBase((byte) 0), Redis((byte) 1), FTS((byte) 2);
2529
private final byte value;
30+
private static final Map<Integer, paramType> map = new HashMap<Integer, paramType>();
31+
32+
static {
33+
for (paramType type : paramType.values()) {
34+
map.put(type.ordinal(), type);
35+
}
36+
}
37+
38+
public static paramType valueOf(int value) { return map.get(value); }
2639

2740
paramType(byte value) {
2841
this.value = value;
2942
}
3043

31-
public byte getValue() {
32-
return value;
33-
}
44+
public byte getValue() { return value; }
3445
}
3546

3647
public int byteSize;

src/main/java/com/alipay/oceanbase/rpc/table/api/TableQuery.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,4 +199,6 @@ public interface TableQuery {
199199
TableQuery setScanRangeColumns(String... columns);
200200

201201
void clear();
202+
203+
TableQuery setSearchText(String searchText);
202204
}

src/test/java/com/alipay/oceanbase/rpc/ObTableFullTextIndexTest.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
import com.alipay.oceanbase.rpc.exception.ObTableException;
44
import com.alipay.oceanbase.rpc.mutation.result.MutationResult;
55
import com.alipay.oceanbase.rpc.protocol.payload.ResultCodes;
6+
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.ObTableQuery;
7+
import com.alipay.oceanbase.rpc.stream.QueryResultSet;
68
import com.alipay.oceanbase.rpc.table.ObTable;
79
import com.alipay.oceanbase.rpc.util.ObTableClientTestUtil;
810
import com.google.protobuf.MapEntry;
@@ -323,6 +325,39 @@ public void testTTLInsert() throws Exception {
323325
}
324326
}
325327

328+
@Test
329+
public void testFtsQuery() throws Exception {
330+
try {
331+
//sync query
332+
QueryResultSet resultSet = client.query(tableName)
333+
.setSearchText("native")
334+
.indexName("full_idx1_tbl1")
335+
.execute();
336+
while(resultSet.next()) {
337+
Map<String, Object> row = resultSet.getRow();
338+
for (Map.Entry<String, Object> entry: row.entrySet()) {
339+
System.out.println("colname: " + entry.getKey() + " \nvalue: " + entry.getValue());
340+
}
341+
System.out.println();
342+
}
343+
// async query
344+
System.out.println("========async query:=========");
345+
QueryResultSet asyncResultSet = client.query(tableName)
346+
.indexName("full_idx1_tbl1")
347+
.setSearchText("oceanbase")
348+
.asyncExecute();
349+
while(asyncResultSet.next()) {
350+
Map<String, Object> row = asyncResultSet.getRow();
351+
for (Map.Entry<String, Object> entry: row.entrySet()) {
352+
System.out.println("colname: " + entry.getKey() + " \nvalue: " + entry.getValue());
353+
}
354+
System.out.println();
355+
}
356+
} catch (Exception e) {
357+
e.printStackTrace();
358+
}
359+
}
360+
326361
private void executeSQL(String createSQL) throws SQLException {
327362
Connection connection = ObTableClientTestUtil.getConnection();
328363
Statement statement = connection.createStatement();

src/test/java/com/alipay/oceanbase/rpc/protocol/payload/impl/execute/query/ObTableQueryPayloadTest.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,15 @@
2020
import com.alipay.oceanbase.rpc.protocol.payload.impl.*;
2121
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableConsistencyLevel;
2222
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableEntityType;
23+
import com.alipay.oceanbase.rpc.table.ObFTSParams;
2324
import com.alipay.oceanbase.rpc.table.ObHBaseParams;
2425
import com.alipay.oceanbase.rpc.table.ObKVParams;
26+
import com.alipay.oceanbase.rpc.table.ObKVParamsBase;
2527
import com.alipay.oceanbase.rpc.util.ObBytesString;
28+
import com.sun.xml.internal.ws.wsdl.writer.document.ParamType;
2629
import io.netty.buffer.ByteBuf;
2730
import io.netty.buffer.PooledByteBufAllocator;
31+
import org.junit.Assert;
2832
import org.junit.Test;
2933

3034
import java.util.ArrayList;
@@ -150,6 +154,37 @@ public void test_ObTableQueryResult() {
150154
buf.release();
151155
}
152156

157+
@Test
158+
public void testFtsParam() {
159+
ObFTSParams ftsParams = new ObFTSParams();
160+
ftsParams.setSearchText("oceanbase");
161+
byte[] bytes = ftsParams.encode();
162+
ByteBuf buf = PooledByteBufAllocator.DEFAULT.buffer();
163+
buf.writeBytes(bytes);
164+
ObFTSParams newFtsParams = new ObFTSParams();
165+
assertEquals(ftsParams.getType(), ObKVParamsBase.paramType.valueOf(buf.readByte()));
166+
newFtsParams.decode(buf);
167+
assertEquals(ftsParams.getSearchText(), newFtsParams.getSearchText());
168+
buf.release();
169+
}
170+
171+
@Test
172+
public void testFtsQuery() {
173+
ObTableQuery obTableQuery = getObTableQuery();
174+
obTableQuery.setIndexName("ftx_idx");
175+
obTableQuery.setSearchText("oceanbase");
176+
byte[] bytes = obTableQuery.encode();
177+
ByteBuf buf = PooledByteBufAllocator.DEFAULT.buffer();
178+
buf.writeBytes(bytes);
179+
180+
ObTableQuery newObTableQuery = new ObTableQuery();
181+
newObTableQuery.decode(buf);
182+
ObKVParamsBase kv_params_base = newObTableQuery.getObKVParams().obKVParamsBase;
183+
Assert.assertEquals(ObKVParamsBase.paramType.FTS, kv_params_base.getType());
184+
ObFTSParams fts_params = (ObFTSParams) kv_params_base;
185+
Assert.assertEquals("oceanbase", fts_params.getSearchText());
186+
}
187+
153188
private ObTableQuery getObTableQuery() {
154189
ObTableQuery obTableQuery = new ObTableQuery();
155190
obTableQuery.addKeyRange(getObNewRange());

0 commit comments

Comments
 (0)