diff --git a/query/src/main/java/tech/ydb/query/impl/TableClientImpl.java b/query/src/main/java/tech/ydb/query/impl/TableClientImpl.java index 44d486f7..be90f07c 100644 --- a/query/src/main/java/tech/ydb/query/impl/TableClientImpl.java +++ b/query/src/main/java/tech/ydb/query/impl/TableClientImpl.java @@ -1,23 +1,29 @@ package tech.ydb.query.impl; import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicReference; import tech.ydb.common.transaction.TxMode; +import tech.ydb.core.Issue; import tech.ydb.core.Result; import tech.ydb.core.Status; import tech.ydb.core.StatusCode; import tech.ydb.core.UnexpectedResultException; import tech.ydb.core.grpc.GrpcTransport; +import tech.ydb.proto.ValueProtos; import tech.ydb.proto.query.YdbQuery; import tech.ydb.proto.table.YdbTable; import tech.ydb.query.QuerySession; import tech.ydb.query.QueryStream; +import tech.ydb.query.result.QueryInfo; +import tech.ydb.query.result.QueryResultPart; import tech.ydb.query.settings.ExecuteQuerySettings; import tech.ydb.query.settings.QueryStatsMode; -import tech.ydb.query.tools.QueryReader; import tech.ydb.table.Session; import tech.ydb.table.SessionPoolStats; import tech.ydb.table.TableClient; @@ -29,7 +35,6 @@ import tech.ydb.table.query.stats.QueryPhaseStats; import tech.ydb.table.query.stats.QueryStats; import tech.ydb.table.query.stats.TableAccessStats; -import tech.ydb.table.result.ResultSetReader; import tech.ydb.table.rpc.TableRpc; import tech.ydb.table.rpc.grpc.GrpcTableRpc; import tech.ydb.table.settings.ExecuteDataQuerySettings; @@ -95,103 +100,51 @@ private YdbQuery.TransactionControl mapTxControl(YdbTable.TransactionControl tc) return TxControl.txModeCtrl(TxMode.NONE, tc.getCommitTx()); } - private static class ProxedDataQueryResult extends DataQueryResult { - private final String txID; - private final QueryReader reader; - private final QueryStats queryStats; - - private ProxedDataQueryResult(String txID, QueryReader reader) { - super(YdbTable.ExecuteQueryResult.getDefaultInstance()); - this.txID = txID; - this.reader = reader; - - tech.ydb.query.result.QueryStats stats = reader.getQueryInfo().getStats(); - this.queryStats = stats == null ? null : queryStats(stats); - } - - @Override - public String getTxId() { - return txID; - } - - @Override - public int getResultSetCount() { - return reader.getResultSetCount(); - } - - @Override - public ResultSetReader getResultSet(int index) { - return reader.getResultSet(index); - } - - @Override - public boolean isTruncated(int index) { - return false; - } - - @Override - public int getRowCount(int index) { - return reader.getResultSet(index).getRowCount(); - } - - @Override - public boolean isEmpty() { - return txID.isEmpty() && reader.getResultSetCount() == 0; - } - - @Override - public QueryStats getQueryStats() { - return this.queryStats; - } - - @Override - public boolean hasQueryStats() { - return this.queryStats != null; - } - - private static QueryStats queryStats(tech.ydb.query.result.QueryStats stats) { - return new QueryStats( - stats.getPhases().stream().map(qp -> queryPhaseStats(qp)).collect(toList()), - compilationStats(stats.getCompilationStats()), - stats.getProcessCpuTimeUs(), - stats.getQueryPlan(), - stats.getQueryAst(), - stats.getTotalDurationUs(), - stats.getTotalCpuTimeUs() - ); + private static QueryStats queryStats(tech.ydb.query.result.QueryStats stats) { + if (stats == null) { + return null; } + return new QueryStats( + stats.getPhases().stream().map(qp -> queryPhaseStats(qp)).collect(toList()), + compilationStats(stats.getCompilationStats()), + stats.getProcessCpuTimeUs(), + stats.getQueryPlan(), + stats.getQueryAst(), + stats.getTotalDurationUs(), + stats.getTotalCpuTimeUs() + ); + } - private static QueryPhaseStats queryPhaseStats(tech.ydb.query.result.QueryStats.QueryPhase queryPhase) { - return new QueryPhaseStats( - queryPhase.getDurationUs(), - queryPhase.getTableAccesses().stream().map(ta -> tableAccessStats(ta)).collect(toList()), - queryPhase.getCpuTimeUs(), - queryPhase.getAffectedShards(), - queryPhase.isLiteralPhase() - ); - } + private static QueryPhaseStats queryPhaseStats(tech.ydb.query.result.QueryStats.QueryPhase queryPhase) { + return new QueryPhaseStats( + queryPhase.getDurationUs(), + queryPhase.getTableAccesses().stream().map(ta -> tableAccessStats(ta)).collect(toList()), + queryPhase.getCpuTimeUs(), + queryPhase.getAffectedShards(), + queryPhase.isLiteralPhase() + ); + } - private static TableAccessStats tableAccessStats(tech.ydb.query.result.QueryStats.TableAccess tableAccess) { - return new TableAccessStats( - tableAccess.getTableName(), - operationStats(tableAccess.getReads()), - operationStats(tableAccess.getUpdates()), - operationStats(tableAccess.getDeletes()), - tableAccess.getPartitionsCount() - ); - } + private static TableAccessStats tableAccessStats(tech.ydb.query.result.QueryStats.TableAccess tableAccess) { + return new TableAccessStats( + tableAccess.getTableName(), + operationStats(tableAccess.getReads()), + operationStats(tableAccess.getUpdates()), + operationStats(tableAccess.getDeletes()), + tableAccess.getPartitionsCount() + ); + } - private static OperationStats operationStats(tech.ydb.query.result.QueryStats.Operation operation) { - return new OperationStats(operation.getRows(), operation.getBytes()); - } + private static OperationStats operationStats(tech.ydb.query.result.QueryStats.Operation operation) { + return new OperationStats(operation.getRows(), operation.getBytes()); + } - private static CompilationStats compilationStats(tech.ydb.query.result.QueryStats.Compilation compilation) { - return new CompilationStats( - compilation.isFromCache(), - compilation.getDurationUs(), - compilation.getCpuTimeUs() - ); - } + private static CompilationStats compilationStats(tech.ydb.query.result.QueryStats.Compilation compilation) { + return new CompilationStats( + compilation.isFromCache(), + compilation.getDurationUs(), + compilation.getCpuTimeUs() + ); } private class TableSession extends BaseSession { @@ -213,6 +166,9 @@ public CompletableFuture> executeDataQueryInternal( .build(); final AtomicReference txRef = new AtomicReference<>(""); + final List issues = new ArrayList<>(); + final List results = new ArrayList<>(); + QueryStream stream = querySession.new StreamImpl(querySession.createGrpcStream(query, tc, prms, qs)) { @Override void handleTxMeta(String txID) { @@ -220,10 +176,39 @@ void handleTxMeta(String txID) { } }; - return QueryReader.readFrom(stream) - .thenApply(r -> r.map( - reader -> new ProxedDataQueryResult(txRef.get(), reader) - )); + CompletableFuture> future = stream.execute(new QueryStream.PartsHandler() { + @Override + public void onIssues(Issue[] issueArr) { + issues.addAll(Arrays.asList(issueArr)); + } + + @Override + public void onNextPart(QueryResultPart part) { } // not used + + @Override + public void onNextRawPart(long index, ValueProtos.ResultSet rs) { + int idx = (int) index; + while (results.size() <= idx) { + results.add(null); + } + if (results.get(idx) == null) { + results.set(idx, rs); + } else { + results.set(idx, results.get(idx).toBuilder().addAllRows(rs.getRowsList()).build()); + } + } + }); + + + return future.thenApply(res -> { + if (!res.isSuccess()) { + return res.map(v -> null); + } + QueryStats info = queryStats(res.getValue().getStats()); + String txId = txRef.get(); + Status status = res.getStatus().withIssues(issues.toArray(new Issue[0])); + return Result.success(new DataQueryResult(txId, results, info), status); + }); } @Override diff --git a/query/src/test/java/tech/ydb/query/TableExampleTest.java b/query/src/test/java/tech/ydb/query/TableExampleTest.java index 63972cf2..38c4b7df 100644 --- a/query/src/test/java/tech/ydb/query/TableExampleTest.java +++ b/query/src/test/java/tech/ydb/query/TableExampleTest.java @@ -19,6 +19,7 @@ import org.junit.runners.MethodSorters; import tech.ydb.common.transaction.TxMode; +import tech.ydb.core.Result; import tech.ydb.core.Status; import tech.ydb.core.grpc.GrpcReadStream; import tech.ydb.table.SessionPoolStats; @@ -27,8 +28,10 @@ import tech.ydb.table.description.TableDescription; import tech.ydb.table.query.DataQueryResult; import tech.ydb.table.query.Params; +import tech.ydb.table.query.stats.QueryStatsCollectionMode; import tech.ydb.table.result.ResultSetReader; import tech.ydb.table.settings.BulkUpsertSettings; +import tech.ydb.table.settings.ExecuteDataQuerySettings; import tech.ydb.table.settings.ExecuteScanQuerySettings; import tech.ydb.table.transaction.TableTransaction; import tech.ydb.table.transaction.TxControl; @@ -122,6 +125,7 @@ public void step01_createTables() { retryCtx.supplyStatus(session -> session.createTable(ydbRule.getDatabase() + "/episodes", episodesTable)) .join().expectSuccess("Can't create table /episodes"); + } @Test @@ -432,14 +436,14 @@ public void step10_queryStats() { + "FROM seasons AS sa INNER JOIN series AS sr ON sa.series_id = sr.series_id " + "WHERE sa.series_id = $seriesId AND sa.season_id = $seasonId"; - tech.ydb.table.transaction.TxControl txControl = tech.ydb.table.transaction.TxControl.snapshotRo().setCommitTx(true); + TxControl txControl = TxControl.snapshotRo().setCommitTx(true); Params params = Params.of( "$seriesId", PrimitiveValue.newUint64(1), "$seasonId", PrimitiveValue.newUint64(2) ); - tech.ydb.table.settings.ExecuteDataQuerySettings settings = new tech.ydb.table.settings.ExecuteDataQuerySettings() + ExecuteDataQuerySettings settings = new ExecuteDataQuerySettings() .disableQueryCache() - .setCollectStats(tech.ydb.table.query.stats.QueryStatsCollectionMode.FULL); + .setCollectStats(QueryStatsCollectionMode.FULL); DataQueryResult result = retryCtx .supplyResult(session -> session.executeDataQuery(query, txControl, params, settings)) @@ -452,4 +456,43 @@ public void step10_queryStats() { Assert.assertFalse(result.getQueryStats().getQueryAst().isEmpty()); Assert.assertFalse(result.getQueryStats().getQueryPlan().isEmpty()); } + + @Test + public void step11_queryIssues() { + String query = "$unused = 1; SELECT * FROM seasons;"; + + TxControl txControl = TxControl.snapshotRo(); + + Result result = retryCtx + .supplyResult(session -> session.executeDataQuery(query, txControl)) + .join(); + + Status status = result.getStatus(); + Assert.assertTrue(status.isSuccess()); + Assert.assertEquals(1, status.getIssues().length); + Assert.assertEquals("Symbol $unused is not used", status.getIssues()[0].getMessage()); + } + + @Test + public void step12_multiStatementTest() { + String query = "" + + "SELECT * FROM series ORDER BY series_id;" + + "INSERT INTO series SELECT * FROM AS_TABLE(ListMap(ListFromRange(10000, 20000), ($x) -> {" + + " RETURN <|series_id: $x, title: 'TEST'u || CAST($x AS Text)|>;" + + "}));" + + "SELECT * FROM series ORDER BY series_id;" + + "DELETE FROM series WHERE series_id >= 10000;" + + "SELECT * FROM series ORDER BY series_id;"; + + TxControl txControl = TxControl.serializableRw(); + + DataQueryResult result = retryCtx + .supplyResult(session -> session.executeDataQuery(query, txControl)) + .join().getValue(); + + Assert.assertEquals(3, result.getResultSetCount()); + Assert.assertEquals(2, result.getResultSet(0).getRowCount()); + Assert.assertEquals(10002, result.getResultSet(1).getRowCount()); + Assert.assertEquals(2, result.getResultSet(2).getRowCount()); + } } diff --git a/table/src/main/java/tech/ydb/table/query/DataQueryResult.java b/table/src/main/java/tech/ydb/table/query/DataQueryResult.java index 6cdd187b..406685da 100644 --- a/table/src/main/java/tech/ydb/table/query/DataQueryResult.java +++ b/table/src/main/java/tech/ydb/table/query/DataQueryResult.java @@ -23,6 +23,12 @@ public DataQueryResult(YdbTable.ExecuteQueryResult result) { queryStats = result.hasQueryStats() ? new QueryStats(result.getQueryStats()) : null; } + public DataQueryResult(String txId, List results, QueryStats stats) { + this.txId = txId; + this.resultSets = results; + queryStats = stats; + } + public String getTxId() { return txId; }