Skip to content

Commit f874d80

Browse files
committed
Added executeScanQueryRaw method
1 parent d47c519 commit f874d80

File tree

3 files changed

+26
-9
lines changed

3 files changed

+26
-9
lines changed

table/src/main/java/tech/ydb/table/Session.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import tech.ydb.core.Result;
1010
import tech.ydb.core.Status;
1111
import tech.ydb.core.grpc.GrpcReadStream;
12+
import tech.ydb.proto.ValueProtos;
1213
import tech.ydb.table.description.TableDescription;
1314
import tech.ydb.table.description.TableOptionDescription;
1415
import tech.ydb.table.query.BulkUpsertData;
@@ -19,6 +20,7 @@
1920
import tech.ydb.table.query.ReadRowsResult;
2021
import tech.ydb.table.query.ReadTablePart;
2122
import tech.ydb.table.result.ResultSetReader;
23+
import tech.ydb.table.result.impl.ProtoValueReaders;
2224
import tech.ydb.table.settings.AlterTableSettings;
2325
import tech.ydb.table.settings.BeginTxSettings;
2426
import tech.ydb.table.settings.BulkUpsertSettings;
@@ -147,7 +149,24 @@ default CompletableFuture<Result<TableTransaction>> beginTransaction(TxMode txMo
147149

148150
GrpcReadStream<ReadTablePart> executeReadTable(String tablePath, ReadTableSettings settings);
149151

150-
GrpcReadStream<ResultSetReader> executeScanQuery(String query, Params params, ExecuteScanQuerySettings settings);
152+
GrpcReadStream<ValueProtos.ResultSet> executeScanQueryRaw(String query, Params params,
153+
ExecuteScanQuerySettings settings);
154+
155+
default GrpcReadStream<ResultSetReader> executeScanQuery(String query, Params params,
156+
ExecuteScanQuerySettings settings) {
157+
GrpcReadStream<ValueProtos.ResultSet> stream = executeScanQueryRaw(query, params, settings);
158+
return new GrpcReadStream<ResultSetReader>() {
159+
@Override
160+
public CompletableFuture<Status> start(GrpcReadStream.Observer<ResultSetReader> observer) {
161+
return stream.start(part -> observer.onNext(ProtoValueReaders.forResultSet(part)));
162+
}
163+
164+
@Override
165+
public void cancel() {
166+
stream.cancel();
167+
}
168+
};
169+
}
151170

152171
@Deprecated
153172
default GrpcReadStream<ResultSetReader> readTable(String tablePath, ReadTableSettings settings) {

table/src/main/java/tech/ydb/table/impl/BaseSession.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,6 @@
6464
import tech.ydb.table.query.Params;
6565
import tech.ydb.table.query.ReadRowsResult;
6666
import tech.ydb.table.query.ReadTablePart;
67-
import tech.ydb.table.result.ResultSetReader;
68-
import tech.ydb.table.result.impl.ProtoValueReaders;
6967
import tech.ydb.table.rpc.TableRpc;
7068
import tech.ydb.table.settings.AlterTableSettings;
7169
import tech.ydb.table.settings.AutoPartitioningPolicy;
@@ -1315,7 +1313,7 @@ ReadTablePart readValue(YdbTable.ReadTableResponse message) {
13151313
}
13161314

13171315
@Override
1318-
public GrpcReadStream<ResultSetReader> executeScanQuery(String query, Params params,
1316+
public GrpcReadStream<ValueProtos.ResultSet> executeScanQueryRaw(String query, Params params,
13191317
ExecuteScanQuerySettings settings) {
13201318
YdbTable.ExecuteScanQueryRequest req = YdbTable.ExecuteScanQueryRequest.newBuilder()
13211319
.setQuery(YdbTable.Query.newBuilder().setYqlText(query))
@@ -1330,7 +1328,7 @@ public GrpcReadStream<ResultSetReader> executeScanQuery(String query, Params par
13301328
}
13311329

13321330
GrpcReadStream<YdbTable.ExecuteScanQueryPartialResponse> origin = rpc.streamExecuteScanQuery(req, opts.build());
1333-
return new ProxyStream<YdbTable.ExecuteScanQueryPartialResponse, ResultSetReader>(origin) {
1331+
return new ProxyStream<YdbTable.ExecuteScanQueryPartialResponse, ValueProtos.ResultSet>(origin) {
13341332
@Override
13351333
StatusIds.StatusCode readStatusCode(YdbTable.ExecuteScanQueryPartialResponse message) {
13361334
return message.getStatus();
@@ -1342,8 +1340,8 @@ List<YdbIssueMessage.IssueMessage> readIssues(YdbTable.ExecuteScanQueryPartialRe
13421340
}
13431341

13441342
@Override
1345-
ResultSetReader readValue(YdbTable.ExecuteScanQueryPartialResponse message) {
1346-
return ProtoValueReaders.forResultSet(message.getResult().getResultSet());
1343+
ValueProtos.ResultSet readValue(YdbTable.ExecuteScanQueryPartialResponse message) {
1344+
return message.getResult().getResultSet();
13471345
}
13481346
};
13491347
}

table/src/test/java/tech/ydb/table/SessionStub.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import tech.ydb.core.Status;
99
import tech.ydb.core.grpc.GrpcReadStream;
1010
import tech.ydb.core.utils.FutureTools;
11+
import tech.ydb.proto.ValueProtos;
1112
import tech.ydb.table.description.TableDescription;
1213
import tech.ydb.table.description.TableOptionDescription;
1314
import tech.ydb.table.query.BulkUpsertData;
@@ -17,7 +18,6 @@
1718
import tech.ydb.table.query.Params;
1819
import tech.ydb.table.query.ReadRowsResult;
1920
import tech.ydb.table.query.ReadTablePart;
20-
import tech.ydb.table.result.ResultSetReader;
2121
import tech.ydb.table.settings.AlterTableSettings;
2222
import tech.ydb.table.settings.BeginTxSettings;
2323
import tech.ydb.table.settings.BulkUpsertSettings;
@@ -158,7 +158,7 @@ public GrpcReadStream<ReadTablePart> executeReadTable(String tablePath, ReadTabl
158158
}
159159

160160
@Override
161-
public GrpcReadStream<ResultSetReader> executeScanQuery(String query, Params params, ExecuteScanQuerySettings settings) {
161+
public GrpcReadStream<ValueProtos.ResultSet> executeScanQueryRaw(String query, Params params, ExecuteScanQuerySettings settings) {
162162
throw new UnsupportedOperationException("executeScanQuery not implemented");
163163
}
164164

0 commit comments

Comments
 (0)