Skip to content

Commit 840ed65

Browse files
authored
Merge pull request #547 from alex268/master
Added support of raw proto data to executeDateQuery/readTable/bulkUpsert
2 parents 3d3d696 + f874d80 commit 840ed65

File tree

14 files changed

+289
-46
lines changed

14 files changed

+289
-46
lines changed

table/src/main/java/tech/ydb/table/Session.java

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,18 @@
99
import tech.ydb.core.Result;
1010
import tech.ydb.core.Status;
1111
import tech.ydb.core.grpc.GrpcReadStream;
12+
import tech.ydb.proto.ValueProtos;
1213
import tech.ydb.table.description.TableDescription;
1314
import tech.ydb.table.description.TableOptionDescription;
15+
import tech.ydb.table.query.BulkUpsertData;
1416
import tech.ydb.table.query.DataQuery;
1517
import tech.ydb.table.query.DataQueryResult;
1618
import tech.ydb.table.query.ExplainDataQueryResult;
1719
import tech.ydb.table.query.Params;
1820
import tech.ydb.table.query.ReadRowsResult;
1921
import tech.ydb.table.query.ReadTablePart;
2022
import tech.ydb.table.result.ResultSetReader;
23+
import tech.ydb.table.result.impl.ProtoValueReaders;
2124
import tech.ydb.table.settings.AlterTableSettings;
2225
import tech.ydb.table.settings.BeginTxSettings;
2326
import tech.ydb.table.settings.BulkUpsertSettings;
@@ -146,7 +149,24 @@ default CompletableFuture<Result<TableTransaction>> beginTransaction(TxMode txMo
146149

147150
GrpcReadStream<ReadTablePart> executeReadTable(String tablePath, ReadTableSettings settings);
148151

149-
GrpcReadStream<ResultSetReader> executeScanQuery(String query, Params params, ExecuteScanQuerySettings settings);
152+
GrpcReadStream<ValueProtos.ResultSet> executeScanQueryRaw(String query, Params params,
153+
ExecuteScanQuerySettings settings);
154+
155+
default GrpcReadStream<ResultSetReader> executeScanQuery(String query, Params params,
156+
ExecuteScanQuerySettings settings) {
157+
GrpcReadStream<ValueProtos.ResultSet> stream = executeScanQueryRaw(query, params, settings);
158+
return new GrpcReadStream<ResultSetReader>() {
159+
@Override
160+
public CompletableFuture<Status> start(GrpcReadStream.Observer<ResultSetReader> observer) {
161+
return stream.start(part -> observer.onNext(ProtoValueReaders.forResultSet(part)));
162+
}
163+
164+
@Override
165+
public void cancel() {
166+
stream.cancel();
167+
}
168+
};
169+
}
150170

151171
@Deprecated
152172
default GrpcReadStream<ResultSetReader> readTable(String tablePath, ReadTableSettings settings) {
@@ -178,7 +198,15 @@ default CompletableFuture<Status> executeScanQuery(String query, Params params,
178198

179199
CompletableFuture<Result<State>> keepAlive(KeepAliveSessionSettings settings);
180200

181-
CompletableFuture<Status> executeBulkUpsert(String tablePath, ListValue rows, BulkUpsertSettings settings);
201+
default CompletableFuture<Status> executeBulkUpsert(String tablePath, ListValue rows, BulkUpsertSettings settings) {
202+
return executeBulkUpsert(tablePath, new BulkUpsertData(rows), settings);
203+
}
204+
205+
CompletableFuture<Status> executeBulkUpsert(String tablePath, BulkUpsertData data, BulkUpsertSettings settings);
206+
207+
default CompletableFuture<Status> executeBulkUpsert(String tablePath, BulkUpsertData data) {
208+
return executeBulkUpsert(tablePath, data, new BulkUpsertSettings());
209+
}
182210

183211
default CompletableFuture<Status> createTable(String path, TableDescription tableDescriptions) {
184212
return createTable(path, tableDescriptions, new CreateTableSettings());

table/src/main/java/tech/ydb/table/impl/BaseSession.java

Lines changed: 17 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -57,14 +57,13 @@
5757
import tech.ydb.table.description.TableIndex;
5858
import tech.ydb.table.description.TableOptionDescription;
5959
import tech.ydb.table.description.TableTtl;
60+
import tech.ydb.table.query.BulkUpsertData;
6061
import tech.ydb.table.query.DataQuery;
6162
import tech.ydb.table.query.DataQueryResult;
6263
import tech.ydb.table.query.ExplainDataQueryResult;
6364
import tech.ydb.table.query.Params;
6465
import tech.ydb.table.query.ReadRowsResult;
6566
import tech.ydb.table.query.ReadTablePart;
66-
import tech.ydb.table.result.ResultSetReader;
67-
import tech.ydb.table.result.impl.ProtoValueReaders;
6867
import tech.ydb.table.rpc.TableRpc;
6968
import tech.ydb.table.settings.AlterTableSettings;
7069
import tech.ydb.table.settings.AutoPartitioningPolicy;
@@ -99,7 +98,6 @@
9998
import tech.ydb.table.transaction.Transaction;
10099
import tech.ydb.table.transaction.TxControl;
101100
import tech.ydb.table.values.ListType;
102-
import tech.ydb.table.values.ListValue;
103101
import tech.ydb.table.values.PrimitiveValue;
104102
import tech.ydb.table.values.StructValue;
105103
import tech.ydb.table.values.TupleValue;
@@ -1266,23 +1264,23 @@ public GrpcReadStream<ReadTablePart> executeReadTable(String tablePath, ReadTabl
12661264
.setBatchLimitBytes(settings.batchLimitBytes())
12671265
.setBatchLimitRows(settings.batchLimitRows());
12681266

1269-
Value<?> fromKey = settings.getFromKey();
1267+
ValueProtos.TypedValue fromKey = settings.getFromKeyRaw();
12701268
if (fromKey != null) {
12711269
YdbTable.KeyRange.Builder range = request.getKeyRangeBuilder();
12721270
if (settings.isFromInclusive()) {
1273-
range.setGreaterOrEqual(ProtoValue.toTypedValue(fromKey));
1271+
range.setGreaterOrEqual(fromKey);
12741272
} else {
1275-
range.setGreater(ProtoValue.toTypedValue(fromKey));
1273+
range.setGreater(fromKey);
12761274
}
12771275
}
12781276

1279-
Value<?> toKey = settings.getToKey();
1277+
ValueProtos.TypedValue toKey = settings.getToKeyRaw();
12801278
if (toKey != null) {
12811279
YdbTable.KeyRange.Builder range = request.getKeyRangeBuilder();
12821280
if (settings.isToInclusive()) {
1283-
range.setLessOrEqual(ProtoValue.toTypedValue(toKey));
1281+
range.setLessOrEqual(toKey);
12841282
} else {
1285-
range.setLess(ProtoValue.toTypedValue(toKey));
1283+
range.setLess(toKey);
12861284
}
12871285
}
12881286

@@ -1309,13 +1307,13 @@ List<YdbIssueMessage.IssueMessage> readIssues(YdbTable.ReadTableResponse message
13091307

13101308
@Override
13111309
ReadTablePart readValue(YdbTable.ReadTableResponse message) {
1312-
return new ReadTablePart(message.getResult(), message.getSnapshot());
1310+
return new ReadTablePart(message);
13131311
}
13141312
};
13151313
}
13161314

13171315
@Override
1318-
public GrpcReadStream<ResultSetReader> executeScanQuery(String query, Params params,
1316+
public GrpcReadStream<ValueProtos.ResultSet> executeScanQueryRaw(String query, Params params,
13191317
ExecuteScanQuerySettings settings) {
13201318
YdbTable.ExecuteScanQueryRequest req = YdbTable.ExecuteScanQueryRequest.newBuilder()
13211319
.setQuery(YdbTable.Query.newBuilder().setYqlText(query))
@@ -1330,7 +1328,7 @@ public GrpcReadStream<ResultSetReader> executeScanQuery(String query, Params par
13301328
}
13311329

13321330
GrpcReadStream<YdbTable.ExecuteScanQueryPartialResponse> origin = rpc.streamExecuteScanQuery(req, opts.build());
1333-
return new ProxyStream<YdbTable.ExecuteScanQueryPartialResponse, ResultSetReader>(origin) {
1331+
return new ProxyStream<YdbTable.ExecuteScanQueryPartialResponse, ValueProtos.ResultSet>(origin) {
13341332
@Override
13351333
StatusIds.StatusCode readStatusCode(YdbTable.ExecuteScanQueryPartialResponse message) {
13361334
return message.getStatus();
@@ -1342,8 +1340,8 @@ List<YdbIssueMessage.IssueMessage> readIssues(YdbTable.ExecuteScanQueryPartialRe
13421340
}
13431341

13441342
@Override
1345-
ResultSetReader readValue(YdbTable.ExecuteScanQueryPartialResponse message) {
1346-
return ProtoValueReaders.forResultSet(message.getResult().getResultSet());
1343+
ValueProtos.ResultSet readValue(YdbTable.ExecuteScanQueryPartialResponse message) {
1344+
return message.getResult().getResultSet();
13471345
}
13481346
};
13491347
}
@@ -1394,19 +1392,13 @@ public CompletableFuture<Result<State>> keepAlive(KeepAliveSessionSettings setti
13941392
}
13951393

13961394
@Override
1397-
public CompletableFuture<Status> executeBulkUpsert(String tablePath, ListValue rows, BulkUpsertSettings settings) {
1398-
ValueProtos.TypedValue typedRows = ValueProtos.TypedValue.newBuilder()
1399-
.setType(rows.getType().toPb())
1400-
.setValue(rows.toPb())
1401-
.build();
1402-
1403-
YdbTable.BulkUpsertRequest request = YdbTable.BulkUpsertRequest.newBuilder()
1395+
public CompletableFuture<Status> executeBulkUpsert(String tablePath, BulkUpsertData data, BulkUpsertSettings st) {
1396+
YdbTable.BulkUpsertRequest.Builder request = YdbTable.BulkUpsertRequest.newBuilder()
14041397
.setTable(tablePath)
1405-
.setRows(typedRows)
1406-
.setOperationParams(Operation.buildParams(settings.toOperationSettings()))
1407-
.build();
1398+
.setOperationParams(Operation.buildParams(st.toOperationSettings()));
14081399

1409-
return interceptStatus(rpc.bulkUpsert(request, makeOptions(settings).build()));
1400+
data.applyToRequest(request);
1401+
return interceptStatus(rpc.bulkUpsert(request.build(), makeOptions(st).build()));
14101402
}
14111403

14121404
private static State mapSessionStatus(YdbTable.KeepAliveResult result) {
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package tech.ydb.table.query;
2+
3+
4+
import tech.ydb.proto.ValueProtos;
5+
import tech.ydb.proto.table.YdbTable;
6+
import tech.ydb.table.values.ListValue;
7+
8+
/**
9+
*
10+
* @author Aleksandr Gorshenin
11+
*/
12+
public class BulkUpsertData {
13+
private final ValueProtos.TypedValue rows;
14+
15+
public BulkUpsertData(ListValue rows) {
16+
this.rows = ValueProtos.TypedValue.newBuilder()
17+
.setType(rows.getType().toPb())
18+
.setValue(rows.toPb())
19+
.build();
20+
}
21+
22+
public BulkUpsertData(ValueProtos.TypedValue rows) {
23+
this.rows = rows;
24+
}
25+
26+
public void applyToRequest(YdbTable.BulkUpsertRequest.Builder builder) {
27+
builder.setRows(rows);
28+
}
29+
}

table/src/main/java/tech/ydb/table/query/DataQueryResult.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,10 @@ public ResultSetReader getResultSet(int index) {
3535
return ProtoValueReaders.forResultSet(resultSets.get(index));
3636
}
3737

38+
public ValueProtos.ResultSet getRawResultSet(int index) {
39+
return resultSets.get(index);
40+
}
41+
3842
public boolean isTruncated(int index) {
3943
return resultSets.get(index).getTruncated();
4044
}

table/src/main/java/tech/ydb/table/query/ReadTablePart.java

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,19 +27,30 @@ public long getTxId() {
2727
}
2828
}
2929

30-
private final ResultSetReader resultSetReader;
31-
private final VirtualTimestamp timestamp;
30+
private final YdbTable.ReadTableResponse part;
31+
private ResultSetReader resultSetReader = null;
32+
private VirtualTimestamp timestamp = null;
3233

33-
public ReadTablePart(YdbTable.ReadTableResult result, CommonProtos.VirtualTimestamp snapshot) {
34-
this.resultSetReader = ProtoValueReaders.forResultSet(result.getResultSet());
35-
this.timestamp = new VirtualTimestamp(snapshot.getPlanStep(), snapshot.getTxId());
34+
public ReadTablePart(YdbTable.ReadTableResponse part) {
35+
this.part = part;
36+
}
37+
38+
public YdbTable.ReadTableResponse getReadTableResponse() {
39+
return part;
3640
}
3741

3842
public ResultSetReader getResultSetReader() {
43+
if (resultSetReader == null) {
44+
resultSetReader = ProtoValueReaders.forResultSet(part.getResult().getResultSet());
45+
}
3946
return resultSetReader;
4047
}
4148

4249
public VirtualTimestamp getVirtualTimestamp() {
50+
if (timestamp == null) {
51+
CommonProtos.VirtualTimestamp vt = part.getSnapshot();
52+
timestamp = new VirtualTimestamp(vt.getPlanStep(), vt.getTxId());
53+
}
4354
return timestamp;
4455
}
4556
}

table/src/main/java/tech/ydb/table/result/impl/ProtoValueReaders.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,17 @@ public static ResultSetReader forResultSets(Collection<ResultSetReader> resultSe
4040
return new ProtoResultSetReader(builder.build());
4141
}
4242

43+
@Deprecated
4344
public static ValueReader forType(ValueProtos.Type type) {
4445
return forTypeImpl(type);
4546
}
4647

48+
public static ValueReader forTypedValue(ValueProtos.TypedValue tv) {
49+
AbstractValueReader vr = forTypeImpl(tv.getType());
50+
vr.setProtoValue(tv.getValue());
51+
return vr;
52+
}
53+
4754
static AbstractValueReader forTypeImpl(ValueProtos.Type type) {
4855
switch (type.getTypeCase()) {
4956
case TYPE_ID:

table/src/main/java/tech/ydb/table/settings/ReadTableSettings.java

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,11 @@
1111

1212
import tech.ydb.core.grpc.GrpcFlowControl;
1313
import tech.ydb.core.settings.BaseRequestSettings;
14+
import tech.ydb.proto.ValueProtos;
15+
import tech.ydb.table.result.impl.ProtoValueReaders;
1416
import tech.ydb.table.values.PrimitiveValue;
1517
import tech.ydb.table.values.TupleValue;
18+
import tech.ydb.table.values.proto.ProtoValue;
1619

1720
import static com.google.common.base.Preconditions.checkArgument;
1821

@@ -24,9 +27,9 @@
2427
public class ReadTableSettings extends BaseRequestSettings {
2528

2629
private final boolean ordered;
27-
private final TupleValue fromKey;
30+
private final ValueProtos.TypedValue fromKey;
2831
private final boolean fromInclusive;
29-
private final TupleValue toKey;
32+
private final ValueProtos.TypedValue toKey;
3033
private final boolean toInclusive;
3134
private final int rowLimit;
3235
private final ImmutableList<String> columns;
@@ -58,6 +61,11 @@ public boolean isOrdered() {
5861

5962
@Nullable
6063
public TupleValue getFromKey() {
64+
return fromKey == null ? null : ProtoValueReaders.forTypedValue(fromKey).getValue().asTuple();
65+
}
66+
67+
@Nullable
68+
public ValueProtos.TypedValue getFromKeyRaw() {
6169
return fromKey;
6270
}
6371

@@ -67,6 +75,11 @@ public boolean isFromInclusive() {
6775

6876
@Nullable
6977
public TupleValue getToKey() {
78+
return toKey == null ? null : ProtoValueReaders.forTypedValue(toKey).getValue().asTuple();
79+
}
80+
81+
@Nullable
82+
public ValueProtos.TypedValue getToKeyRaw() {
7083
return toKey;
7184
}
7285

@@ -99,9 +112,9 @@ public GrpcFlowControl getGrpcFlowControl() {
99112
*/
100113
public static final class Builder extends BaseBuilder<Builder> {
101114
private boolean ordered = false;
102-
private TupleValue fromKey = null;
115+
private ValueProtos.TypedValue fromKey = null;
103116
private boolean fromInclusive = false;
104-
private TupleValue toKey = null;
117+
private ValueProtos.TypedValue toKey = null;
105118
private boolean toInclusive = false;
106119
private int rowLimit = 0;
107120
private List<String> columns = Collections.emptyList();
@@ -115,12 +128,24 @@ public Builder orderedRead(boolean ordered) {
115128
}
116129

117130
public Builder fromKey(TupleValue value, boolean inclusive) {
131+
this.fromKey = ProtoValue.toTypedValue(value);
132+
this.fromInclusive = inclusive;
133+
return this;
134+
}
135+
136+
public Builder fromKey(ValueProtos.TypedValue value, boolean inclusive) {
118137
this.fromKey = value;
119138
this.fromInclusive = inclusive;
120139
return this;
121140
}
122141

123142
public Builder toKey(TupleValue value, boolean inclusive) {
143+
this.toKey = ProtoValue.toTypedValue(value);
144+
this.toInclusive = inclusive;
145+
return this;
146+
}
147+
148+
public Builder toKey(ValueProtos.TypedValue value, boolean inclusive) {
124149
this.toKey = value;
125150
this.toInclusive = inclusive;
126151
return this;

table/src/main/java/tech/ydb/table/values/Value.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,10 @@ default StructValue asStruct() {
4444
return (StructValue) this;
4545
}
4646

47+
default TupleValue asTuple() {
48+
return (TupleValue) this;
49+
}
50+
4751
default VariantValue asVariant() {
4852
return (VariantValue) this;
4953
}

0 commit comments

Comments
 (0)