Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
181 changes: 83 additions & 98 deletions query/src/main/java/tech/ydb/query/impl/TableClientImpl.java
Original file line number Diff line number Diff line change
@@ -1,23 +1,29 @@
package tech.ydb.query.impl;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicReference;

import tech.ydb.common.transaction.TxMode;
import tech.ydb.core.Issue;
import tech.ydb.core.Result;
import tech.ydb.core.Status;
import tech.ydb.core.StatusCode;
import tech.ydb.core.UnexpectedResultException;
import tech.ydb.core.grpc.GrpcTransport;
import tech.ydb.proto.ValueProtos;
import tech.ydb.proto.query.YdbQuery;
import tech.ydb.proto.table.YdbTable;
import tech.ydb.query.QuerySession;
import tech.ydb.query.QueryStream;
import tech.ydb.query.result.QueryInfo;
import tech.ydb.query.result.QueryResultPart;
import tech.ydb.query.settings.ExecuteQuerySettings;
import tech.ydb.query.settings.QueryStatsMode;
import tech.ydb.query.tools.QueryReader;
import tech.ydb.table.Session;
import tech.ydb.table.SessionPoolStats;
import tech.ydb.table.TableClient;
Expand All @@ -29,7 +35,6 @@
import tech.ydb.table.query.stats.QueryPhaseStats;
import tech.ydb.table.query.stats.QueryStats;
import tech.ydb.table.query.stats.TableAccessStats;
import tech.ydb.table.result.ResultSetReader;
import tech.ydb.table.rpc.TableRpc;
import tech.ydb.table.rpc.grpc.GrpcTableRpc;
import tech.ydb.table.settings.ExecuteDataQuerySettings;
Expand Down Expand Up @@ -95,103 +100,51 @@ private YdbQuery.TransactionControl mapTxControl(YdbTable.TransactionControl tc)
return TxControl.txModeCtrl(TxMode.NONE, tc.getCommitTx());
}

private static class ProxedDataQueryResult extends DataQueryResult {
private final String txID;
private final QueryReader reader;
private final QueryStats queryStats;

private ProxedDataQueryResult(String txID, QueryReader reader) {
super(YdbTable.ExecuteQueryResult.getDefaultInstance());
this.txID = txID;
this.reader = reader;

tech.ydb.query.result.QueryStats stats = reader.getQueryInfo().getStats();
this.queryStats = stats == null ? null : queryStats(stats);
}

@Override
public String getTxId() {
return txID;
}

@Override
public int getResultSetCount() {
return reader.getResultSetCount();
}

@Override
public ResultSetReader getResultSet(int index) {
return reader.getResultSet(index);
}

@Override
public boolean isTruncated(int index) {
return false;
}

@Override
public int getRowCount(int index) {
return reader.getResultSet(index).getRowCount();
}

@Override
public boolean isEmpty() {
return txID.isEmpty() && reader.getResultSetCount() == 0;
}

@Override
public QueryStats getQueryStats() {
return this.queryStats;
}

@Override
public boolean hasQueryStats() {
return this.queryStats != null;
}

private static QueryStats queryStats(tech.ydb.query.result.QueryStats stats) {
return new QueryStats(
stats.getPhases().stream().map(qp -> queryPhaseStats(qp)).collect(toList()),
compilationStats(stats.getCompilationStats()),
stats.getProcessCpuTimeUs(),
stats.getQueryPlan(),
stats.getQueryAst(),
stats.getTotalDurationUs(),
stats.getTotalCpuTimeUs()
);
private static QueryStats queryStats(tech.ydb.query.result.QueryStats stats) {
if (stats == null) {
return null;
}
return new QueryStats(
stats.getPhases().stream().map(qp -> queryPhaseStats(qp)).collect(toList()),
compilationStats(stats.getCompilationStats()),
stats.getProcessCpuTimeUs(),
stats.getQueryPlan(),
stats.getQueryAst(),
stats.getTotalDurationUs(),
stats.getTotalCpuTimeUs()
);
}

private static QueryPhaseStats queryPhaseStats(tech.ydb.query.result.QueryStats.QueryPhase queryPhase) {
return new QueryPhaseStats(
queryPhase.getDurationUs(),
queryPhase.getTableAccesses().stream().map(ta -> tableAccessStats(ta)).collect(toList()),
queryPhase.getCpuTimeUs(),
queryPhase.getAffectedShards(),
queryPhase.isLiteralPhase()
);
}
private static QueryPhaseStats queryPhaseStats(tech.ydb.query.result.QueryStats.QueryPhase queryPhase) {
return new QueryPhaseStats(
queryPhase.getDurationUs(),
queryPhase.getTableAccesses().stream().map(ta -> tableAccessStats(ta)).collect(toList()),
queryPhase.getCpuTimeUs(),
queryPhase.getAffectedShards(),
queryPhase.isLiteralPhase()
);
}

private static TableAccessStats tableAccessStats(tech.ydb.query.result.QueryStats.TableAccess tableAccess) {
return new TableAccessStats(
tableAccess.getTableName(),
operationStats(tableAccess.getReads()),
operationStats(tableAccess.getUpdates()),
operationStats(tableAccess.getDeletes()),
tableAccess.getPartitionsCount()
);
}
private static TableAccessStats tableAccessStats(tech.ydb.query.result.QueryStats.TableAccess tableAccess) {
return new TableAccessStats(
tableAccess.getTableName(),
operationStats(tableAccess.getReads()),
operationStats(tableAccess.getUpdates()),
operationStats(tableAccess.getDeletes()),
tableAccess.getPartitionsCount()
);
}

private static OperationStats operationStats(tech.ydb.query.result.QueryStats.Operation operation) {
return new OperationStats(operation.getRows(), operation.getBytes());
}
private static OperationStats operationStats(tech.ydb.query.result.QueryStats.Operation operation) {
return new OperationStats(operation.getRows(), operation.getBytes());
}

private static CompilationStats compilationStats(tech.ydb.query.result.QueryStats.Compilation compilation) {
return new CompilationStats(
compilation.isFromCache(),
compilation.getDurationUs(),
compilation.getCpuTimeUs()
);
}
private static CompilationStats compilationStats(tech.ydb.query.result.QueryStats.Compilation compilation) {
return new CompilationStats(
compilation.isFromCache(),
compilation.getDurationUs(),
compilation.getCpuTimeUs()
);
}

private class TableSession extends BaseSession {
Expand All @@ -213,17 +166,49 @@ public CompletableFuture<Result<DataQueryResult>> executeDataQueryInternal(
.build();

final AtomicReference<String> txRef = new AtomicReference<>("");
final List<Issue> issues = new ArrayList<>();
final List<ValueProtos.ResultSet> results = new ArrayList<>();

QueryStream stream = querySession.new StreamImpl(querySession.createGrpcStream(query, tc, prms, qs)) {
@Override
void handleTxMeta(String txID) {
txRef.set(txID);
}
};

return QueryReader.readFrom(stream)
.thenApply(r -> r.map(
reader -> new ProxedDataQueryResult(txRef.get(), reader)
));
CompletableFuture<Result<QueryInfo>> future = stream.execute(new QueryStream.PartsHandler() {
@Override
public void onIssues(Issue[] issueArr) {
issues.addAll(Arrays.asList(issueArr));
}

@Override
public void onNextPart(QueryResultPart part) { } // not used

@Override
public void onNextRawPart(long index, ValueProtos.ResultSet rs) {
int idx = (int) index;
while (results.size() <= idx) {
results.add(null);
}
if (results.get(idx) == null) {
results.set(idx, rs);
} else {
results.set(idx, results.get(idx).toBuilder().addAllRows(rs.getRowsList()).build());
}
}
});


return future.thenApply(res -> {
if (!res.isSuccess()) {
return res.map(v -> null);
}
QueryStats info = queryStats(res.getValue().getStats());
String txId = txRef.get();
Status status = res.getStatus().withIssues(issues.toArray(new Issue[0]));
return Result.success(new DataQueryResult(txId, results, info), status);
});
}

@Override
Expand Down
49 changes: 46 additions & 3 deletions query/src/test/java/tech/ydb/query/TableExampleTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.junit.runners.MethodSorters;

import tech.ydb.common.transaction.TxMode;
import tech.ydb.core.Result;
import tech.ydb.core.Status;
import tech.ydb.core.grpc.GrpcReadStream;
import tech.ydb.table.SessionPoolStats;
Expand All @@ -27,8 +28,10 @@
import tech.ydb.table.description.TableDescription;
import tech.ydb.table.query.DataQueryResult;
import tech.ydb.table.query.Params;
import tech.ydb.table.query.stats.QueryStatsCollectionMode;
import tech.ydb.table.result.ResultSetReader;
import tech.ydb.table.settings.BulkUpsertSettings;
import tech.ydb.table.settings.ExecuteDataQuerySettings;
import tech.ydb.table.settings.ExecuteScanQuerySettings;
import tech.ydb.table.transaction.TableTransaction;
import tech.ydb.table.transaction.TxControl;
Expand Down Expand Up @@ -122,6 +125,7 @@ public void step01_createTables() {

retryCtx.supplyStatus(session -> session.createTable(ydbRule.getDatabase() + "/episodes", episodesTable))
.join().expectSuccess("Can't create table /episodes");

}

@Test
Expand Down Expand Up @@ -432,14 +436,14 @@ public void step10_queryStats() {
+ "FROM seasons AS sa INNER JOIN series AS sr ON sa.series_id = sr.series_id "
+ "WHERE sa.series_id = $seriesId AND sa.season_id = $seasonId";

tech.ydb.table.transaction.TxControl<?> txControl = tech.ydb.table.transaction.TxControl.snapshotRo().setCommitTx(true);
TxControl<?> txControl = TxControl.snapshotRo().setCommitTx(true);
Params params = Params.of(
"$seriesId", PrimitiveValue.newUint64(1),
"$seasonId", PrimitiveValue.newUint64(2)
);
tech.ydb.table.settings.ExecuteDataQuerySettings settings = new tech.ydb.table.settings.ExecuteDataQuerySettings()
ExecuteDataQuerySettings settings = new ExecuteDataQuerySettings()
.disableQueryCache()
.setCollectStats(tech.ydb.table.query.stats.QueryStatsCollectionMode.FULL);
.setCollectStats(QueryStatsCollectionMode.FULL);

DataQueryResult result = retryCtx
.supplyResult(session -> session.executeDataQuery(query, txControl, params, settings))
Expand All @@ -452,4 +456,43 @@ public void step10_queryStats() {
Assert.assertFalse(result.getQueryStats().getQueryAst().isEmpty());
Assert.assertFalse(result.getQueryStats().getQueryPlan().isEmpty());
}

@Test
public void step11_queryIssues() {
String query = "$unused = 1; SELECT * FROM seasons;";

TxControl<?> txControl = TxControl.snapshotRo();

Result<DataQueryResult> result = retryCtx
.supplyResult(session -> session.executeDataQuery(query, txControl))
.join();

Status status = result.getStatus();
Assert.assertTrue(status.isSuccess());
Assert.assertEquals(1, status.getIssues().length);
Assert.assertEquals("Symbol $unused is not used", status.getIssues()[0].getMessage());
}

@Test
public void step12_multiStatementTest() {
String query = ""
+ "SELECT * FROM series ORDER BY series_id;"
+ "INSERT INTO series SELECT * FROM AS_TABLE(ListMap(ListFromRange(10000, 20000), ($x) -> {"
+ " RETURN <|series_id: $x, title: 'TEST'u || CAST($x AS Text)|>;"
+ "}));"
+ "SELECT * FROM series ORDER BY series_id;"
+ "DELETE FROM series WHERE series_id >= 10000;"
+ "SELECT * FROM series ORDER BY series_id;";

TxControl<?> txControl = TxControl.serializableRw();

DataQueryResult result = retryCtx
.supplyResult(session -> session.executeDataQuery(query, txControl))
.join().getValue();

Assert.assertEquals(3, result.getResultSetCount());
Assert.assertEquals(2, result.getResultSet(0).getRowCount());
Assert.assertEquals(10002, result.getResultSet(1).getRowCount());
Assert.assertEquals(2, result.getResultSet(2).getRowCount());
}
}
6 changes: 6 additions & 0 deletions table/src/main/java/tech/ydb/table/query/DataQueryResult.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ public DataQueryResult(YdbTable.ExecuteQueryResult result) {
queryStats = result.hasQueryStats() ? new QueryStats(result.getQueryStats()) : null;
}

public DataQueryResult(String txId, List<ValueProtos.ResultSet> results, QueryStats stats) {
this.txId = txId;
this.resultSets = results;
queryStats = stats;
}

public String getTxId() {
return txId;
}
Expand Down