Skip to content

Commit a2293d8

Browse files
committed
Updated TableClient over QueryClient implementation
1 parent f874d80 commit a2293d8

File tree

3 files changed

+135
-101
lines changed

3 files changed

+135
-101
lines changed

query/src/main/java/tech/ydb/query/impl/TableClientImpl.java

Lines changed: 83 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,29 @@
11
package tech.ydb.query.impl;
22

33
import java.time.Duration;
4+
import java.util.ArrayList;
5+
import java.util.Arrays;
6+
import java.util.List;
47
import java.util.concurrent.CompletableFuture;
58
import java.util.concurrent.ScheduledExecutorService;
69
import java.util.concurrent.atomic.AtomicReference;
710

811
import tech.ydb.common.transaction.TxMode;
12+
import tech.ydb.core.Issue;
913
import tech.ydb.core.Result;
1014
import tech.ydb.core.Status;
1115
import tech.ydb.core.StatusCode;
1216
import tech.ydb.core.UnexpectedResultException;
1317
import tech.ydb.core.grpc.GrpcTransport;
18+
import tech.ydb.proto.ValueProtos;
1419
import tech.ydb.proto.query.YdbQuery;
1520
import tech.ydb.proto.table.YdbTable;
1621
import tech.ydb.query.QuerySession;
1722
import tech.ydb.query.QueryStream;
23+
import tech.ydb.query.result.QueryInfo;
24+
import tech.ydb.query.result.QueryResultPart;
1825
import tech.ydb.query.settings.ExecuteQuerySettings;
1926
import tech.ydb.query.settings.QueryStatsMode;
20-
import tech.ydb.query.tools.QueryReader;
2127
import tech.ydb.table.Session;
2228
import tech.ydb.table.SessionPoolStats;
2329
import tech.ydb.table.TableClient;
@@ -29,7 +35,6 @@
2935
import tech.ydb.table.query.stats.QueryPhaseStats;
3036
import tech.ydb.table.query.stats.QueryStats;
3137
import tech.ydb.table.query.stats.TableAccessStats;
32-
import tech.ydb.table.result.ResultSetReader;
3338
import tech.ydb.table.rpc.TableRpc;
3439
import tech.ydb.table.rpc.grpc.GrpcTableRpc;
3540
import tech.ydb.table.settings.ExecuteDataQuerySettings;
@@ -95,103 +100,51 @@ private YdbQuery.TransactionControl mapTxControl(YdbTable.TransactionControl tc)
95100
return TxControl.txModeCtrl(TxMode.NONE, tc.getCommitTx());
96101
}
97102

98-
private static class ProxedDataQueryResult extends DataQueryResult {
99-
private final String txID;
100-
private final QueryReader reader;
101-
private final QueryStats queryStats;
102-
103-
private ProxedDataQueryResult(String txID, QueryReader reader) {
104-
super(YdbTable.ExecuteQueryResult.getDefaultInstance());
105-
this.txID = txID;
106-
this.reader = reader;
107-
108-
tech.ydb.query.result.QueryStats stats = reader.getQueryInfo().getStats();
109-
this.queryStats = stats == null ? null : queryStats(stats);
110-
}
111-
112-
@Override
113-
public String getTxId() {
114-
return txID;
115-
}
116-
117-
@Override
118-
public int getResultSetCount() {
119-
return reader.getResultSetCount();
120-
}
121-
122-
@Override
123-
public ResultSetReader getResultSet(int index) {
124-
return reader.getResultSet(index);
125-
}
126-
127-
@Override
128-
public boolean isTruncated(int index) {
129-
return false;
130-
}
131-
132-
@Override
133-
public int getRowCount(int index) {
134-
return reader.getResultSet(index).getRowCount();
135-
}
136-
137-
@Override
138-
public boolean isEmpty() {
139-
return txID.isEmpty() && reader.getResultSetCount() == 0;
140-
}
141-
142-
@Override
143-
public QueryStats getQueryStats() {
144-
return this.queryStats;
145-
}
146-
147-
@Override
148-
public boolean hasQueryStats() {
149-
return this.queryStats != null;
150-
}
151-
152-
private static QueryStats queryStats(tech.ydb.query.result.QueryStats stats) {
153-
return new QueryStats(
154-
stats.getPhases().stream().map(qp -> queryPhaseStats(qp)).collect(toList()),
155-
compilationStats(stats.getCompilationStats()),
156-
stats.getProcessCpuTimeUs(),
157-
stats.getQueryPlan(),
158-
stats.getQueryAst(),
159-
stats.getTotalDurationUs(),
160-
stats.getTotalCpuTimeUs()
161-
);
103+
private static QueryStats queryStats(tech.ydb.query.result.QueryStats stats) {
104+
if (stats == null) {
105+
return null;
162106
}
107+
return new QueryStats(
108+
stats.getPhases().stream().map(qp -> queryPhaseStats(qp)).collect(toList()),
109+
compilationStats(stats.getCompilationStats()),
110+
stats.getProcessCpuTimeUs(),
111+
stats.getQueryPlan(),
112+
stats.getQueryAst(),
113+
stats.getTotalDurationUs(),
114+
stats.getTotalCpuTimeUs()
115+
);
116+
}
163117

164-
private static QueryPhaseStats queryPhaseStats(tech.ydb.query.result.QueryStats.QueryPhase queryPhase) {
165-
return new QueryPhaseStats(
166-
queryPhase.getDurationUs(),
167-
queryPhase.getTableAccesses().stream().map(ta -> tableAccessStats(ta)).collect(toList()),
168-
queryPhase.getCpuTimeUs(),
169-
queryPhase.getAffectedShards(),
170-
queryPhase.isLiteralPhase()
171-
);
172-
}
118+
private static QueryPhaseStats queryPhaseStats(tech.ydb.query.result.QueryStats.QueryPhase queryPhase) {
119+
return new QueryPhaseStats(
120+
queryPhase.getDurationUs(),
121+
queryPhase.getTableAccesses().stream().map(ta -> tableAccessStats(ta)).collect(toList()),
122+
queryPhase.getCpuTimeUs(),
123+
queryPhase.getAffectedShards(),
124+
queryPhase.isLiteralPhase()
125+
);
126+
}
173127

174-
private static TableAccessStats tableAccessStats(tech.ydb.query.result.QueryStats.TableAccess tableAccess) {
175-
return new TableAccessStats(
176-
tableAccess.getTableName(),
177-
operationStats(tableAccess.getReads()),
178-
operationStats(tableAccess.getUpdates()),
179-
operationStats(tableAccess.getDeletes()),
180-
tableAccess.getPartitionsCount()
181-
);
182-
}
128+
private static TableAccessStats tableAccessStats(tech.ydb.query.result.QueryStats.TableAccess tableAccess) {
129+
return new TableAccessStats(
130+
tableAccess.getTableName(),
131+
operationStats(tableAccess.getReads()),
132+
operationStats(tableAccess.getUpdates()),
133+
operationStats(tableAccess.getDeletes()),
134+
tableAccess.getPartitionsCount()
135+
);
136+
}
183137

184-
private static OperationStats operationStats(tech.ydb.query.result.QueryStats.Operation operation) {
185-
return new OperationStats(operation.getRows(), operation.getBytes());
186-
}
138+
private static OperationStats operationStats(tech.ydb.query.result.QueryStats.Operation operation) {
139+
return new OperationStats(operation.getRows(), operation.getBytes());
140+
}
187141

188-
private static CompilationStats compilationStats(tech.ydb.query.result.QueryStats.Compilation compilation) {
189-
return new CompilationStats(
190-
compilation.isFromCache(),
191-
compilation.getDurationUs(),
192-
compilation.getCpuTimeUs()
193-
);
194-
}
142+
private static CompilationStats compilationStats(tech.ydb.query.result.QueryStats.Compilation compilation) {
143+
return new CompilationStats(
144+
compilation.isFromCache(),
145+
compilation.getDurationUs(),
146+
compilation.getCpuTimeUs()
147+
);
195148
}
196149

197150
private class TableSession extends BaseSession {
@@ -213,17 +166,49 @@ public CompletableFuture<Result<DataQueryResult>> executeDataQueryInternal(
213166
.build();
214167

215168
final AtomicReference<String> txRef = new AtomicReference<>("");
169+
final List<Issue> issues = new ArrayList<>();
170+
final List<ValueProtos.ResultSet> results = new ArrayList<>();
171+
216172
QueryStream stream = querySession.new StreamImpl(querySession.createGrpcStream(query, tc, prms, qs)) {
217173
@Override
218174
void handleTxMeta(String txID) {
219175
txRef.set(txID);
220176
}
221177
};
222178

223-
return QueryReader.readFrom(stream)
224-
.thenApply(r -> r.map(
225-
reader -> new ProxedDataQueryResult(txRef.get(), reader)
226-
));
179+
CompletableFuture<Result<QueryInfo>> future = stream.execute(new QueryStream.PartsHandler() {
180+
@Override
181+
public void onIssues(Issue[] issueArr) {
182+
issues.addAll(Arrays.asList(issueArr));
183+
}
184+
185+
@Override
186+
public void onNextPart(QueryResultPart part) { } // not used
187+
188+
@Override
189+
public void onNextRawPart(long index, ValueProtos.ResultSet rs) {
190+
int idx = (int) index;
191+
while (results.size() <= idx) {
192+
results.add(null);
193+
}
194+
if (results.get(idx) == null) {
195+
results.set(idx, rs);
196+
} else {
197+
results.set(idx, results.get(idx).toBuilder().addAllRows(rs.getRowsList()).build());
198+
}
199+
}
200+
});
201+
202+
203+
return future.thenApply(res -> {
204+
if (!res.isSuccess()) {
205+
return res.map(v -> null);
206+
}
207+
QueryStats info = queryStats(res.getValue().getStats());
208+
String txId = txRef.get();
209+
Status status = res.getStatus().withIssues(issues.toArray(new Issue[0]));
210+
return Result.success(new DataQueryResult(txId, results, info), status);
211+
});
227212
}
228213

229214
@Override

query/src/test/java/tech/ydb/query/TableExampleTest.java

Lines changed: 46 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.junit.runners.MethodSorters;
2020

2121
import tech.ydb.common.transaction.TxMode;
22+
import tech.ydb.core.Result;
2223
import tech.ydb.core.Status;
2324
import tech.ydb.core.grpc.GrpcReadStream;
2425
import tech.ydb.table.SessionPoolStats;
@@ -27,8 +28,10 @@
2728
import tech.ydb.table.description.TableDescription;
2829
import tech.ydb.table.query.DataQueryResult;
2930
import tech.ydb.table.query.Params;
31+
import tech.ydb.table.query.stats.QueryStatsCollectionMode;
3032
import tech.ydb.table.result.ResultSetReader;
3133
import tech.ydb.table.settings.BulkUpsertSettings;
34+
import tech.ydb.table.settings.ExecuteDataQuerySettings;
3235
import tech.ydb.table.settings.ExecuteScanQuerySettings;
3336
import tech.ydb.table.transaction.TableTransaction;
3437
import tech.ydb.table.transaction.TxControl;
@@ -122,6 +125,7 @@ public void step01_createTables() {
122125

123126
retryCtx.supplyStatus(session -> session.createTable(ydbRule.getDatabase() + "/episodes", episodesTable))
124127
.join().expectSuccess("Can't create table /episodes");
128+
125129
}
126130

127131
@Test
@@ -432,14 +436,14 @@ public void step10_queryStats() {
432436
+ "FROM seasons AS sa INNER JOIN series AS sr ON sa.series_id = sr.series_id "
433437
+ "WHERE sa.series_id = $seriesId AND sa.season_id = $seasonId";
434438

435-
tech.ydb.table.transaction.TxControl<?> txControl = tech.ydb.table.transaction.TxControl.snapshotRo().setCommitTx(true);
439+
TxControl<?> txControl = TxControl.snapshotRo().setCommitTx(true);
436440
Params params = Params.of(
437441
"$seriesId", PrimitiveValue.newUint64(1),
438442
"$seasonId", PrimitiveValue.newUint64(2)
439443
);
440-
tech.ydb.table.settings.ExecuteDataQuerySettings settings = new tech.ydb.table.settings.ExecuteDataQuerySettings()
444+
ExecuteDataQuerySettings settings = new ExecuteDataQuerySettings()
441445
.disableQueryCache()
442-
.setCollectStats(tech.ydb.table.query.stats.QueryStatsCollectionMode.FULL);
446+
.setCollectStats(QueryStatsCollectionMode.FULL);
443447

444448
DataQueryResult result = retryCtx
445449
.supplyResult(session -> session.executeDataQuery(query, txControl, params, settings))
@@ -452,4 +456,43 @@ public void step10_queryStats() {
452456
Assert.assertFalse(result.getQueryStats().getQueryAst().isEmpty());
453457
Assert.assertFalse(result.getQueryStats().getQueryPlan().isEmpty());
454458
}
459+
460+
@Test
461+
public void step11_queryIssues() {
462+
String query = "$unused = 1; SELECT * FROM seasons;";
463+
464+
TxControl<?> txControl = TxControl.snapshotRo();
465+
466+
Result<DataQueryResult> result = retryCtx
467+
.supplyResult(session -> session.executeDataQuery(query, txControl))
468+
.join();
469+
470+
Status status = result.getStatus();
471+
Assert.assertTrue(status.isSuccess());
472+
Assert.assertEquals(1, status.getIssues().length);
473+
Assert.assertEquals("Symbol $unused is not used", status.getIssues()[0].getMessage());
474+
}
475+
476+
@Test
477+
public void step12_multiStatementTest() {
478+
String query = ""
479+
+ "SELECT * FROM series ORDER BY series_id;"
480+
+ "INSERT INTO series SELECT * FROM AS_TABLE(ListMap(ListFromRange(10000, 20000), ($x) -> {"
481+
+ " RETURN <|series_id: $x, title: 'TEST'u || CAST($x AS Text)|>;"
482+
+ "}));"
483+
+ "SELECT * FROM series ORDER BY series_id;"
484+
+ "DELETE FROM series WHERE series_id >= 10000;"
485+
+ "SELECT * FROM series ORDER BY series_id;";
486+
487+
TxControl<?> txControl = TxControl.serializableRw();
488+
489+
DataQueryResult result = retryCtx
490+
.supplyResult(session -> session.executeDataQuery(query, txControl))
491+
.join().getValue();
492+
493+
Assert.assertEquals(3, result.getResultSetCount());
494+
Assert.assertEquals(2, result.getResultSet(0).getRowCount());
495+
Assert.assertEquals(10002, result.getResultSet(1).getRowCount());
496+
Assert.assertEquals(2, result.getResultSet(2).getRowCount());
497+
}
455498
}

table/src/main/java/tech/ydb/table/query/DataQueryResult.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,12 @@ public DataQueryResult(YdbTable.ExecuteQueryResult result) {
2323
queryStats = result.hasQueryStats() ? new QueryStats(result.getQueryStats()) : null;
2424
}
2525

26+
public DataQueryResult(String txId, List<ValueProtos.ResultSet> results, QueryStats stats) {
27+
this.txId = txId;
28+
this.resultSets = results;
29+
queryStats = stats;
30+
}
31+
2632
public String getTxId() {
2733
return txId;
2834
}

0 commit comments

Comments
 (0)