From bae6b46d9556852a42445c457bf61b5edf092b88 Mon Sep 17 00:00:00 2001 From: Evgeniy Kuvardin Date: Sun, 28 Sep 2025 22:48:23 +0300 Subject: [PATCH 1/4] Add tests and execute script --- .../java/tech/ydb/query/QuerySession.java | 49 +++ .../java/tech/ydb/query/impl/SessionImpl.java | 73 +++++ .../query/settings/ExecuteScriptSettings.java | 90 ++++++ .../query/settings/FetchScriptSettings.java | 52 ++++ .../tech/ydb/query/ScriptExampleTest.java | 278 ++++++++++++++++++ 5 files changed, 542 insertions(+) create mode 100644 query/src/main/java/tech/ydb/query/settings/ExecuteScriptSettings.java create mode 100644 query/src/main/java/tech/ydb/query/settings/FetchScriptSettings.java create mode 100644 query/src/test/java/tech/ydb/query/ScriptExampleTest.java diff --git a/query/src/main/java/tech/ydb/query/QuerySession.java b/query/src/main/java/tech/ydb/query/QuerySession.java index 9bbee82a..cf44541a 100644 --- a/query/src/main/java/tech/ydb/query/QuerySession.java +++ b/query/src/main/java/tech/ydb/query/QuerySession.java @@ -6,8 +6,11 @@ import tech.ydb.common.transaction.TxMode; import tech.ydb.core.Result; +import tech.ydb.proto.OperationProtos; import tech.ydb.query.settings.BeginTransactionSettings; import tech.ydb.query.settings.ExecuteQuerySettings; +import tech.ydb.query.settings.ExecuteScriptSettings; +import tech.ydb.query.settings.FetchScriptSettings; import tech.ydb.table.query.Params; /** @@ -68,6 +71,29 @@ public interface QuerySession extends AutoCloseable { */ QueryStream createQuery(String query, TxMode tx, Params params, ExecuteQuerySettings settings); + /** + * Create {@link QueryStream} for executing query with specified {@link TxMode}. The query can contain DML, DDL and + * DCL statements. Supported mix of different statement types depends on the chosen transaction type. + * + * @param query text of query + * @param params query parameters + * @param settings additional settings of query execution + * @return a ready to execute instance of {@link QueryStream} + */ + CompletableFuture> executeScript(String query, Params params, ExecuteScriptSettings settings); + + /** + * Create {@link QueryStream} for executing query with specified {@link TxMode}. The query can contain DML, DDL and + * DCL statements. Supported mix of different statement types depends on the chosen transaction type. + * + * @param query text of query + * @param tx transaction mode + * @param params query parameters + * @param settings additional settings of query execution + * @return a ready to execute instance of {@link QueryStream} + */ + QueryStream fetchScriptResults(String query, TxMode tx, Params params, FetchScriptSettings settings); + @Override void close(); @@ -106,4 +132,27 @@ default QueryStream createQuery(String query, TxMode tx) { default CompletableFuture> beginTransaction(TxMode txMode) { return beginTransaction(txMode, BeginTransactionSettings.newBuilder().build()); } + + /** + * Create {@link QueryStream} for executing query with specified {@link TxMode}. The query can contain DML, DDL and + * DCL statements. Supported mix of different statement types depends on the chosen transaction type. + * + * @param query text of query + * @return a ready to execute instance of {@link QueryStream} + */ + default CompletableFuture> executeScript(String query) { + return executeScript(query, Params.empty(), ExecuteScriptSettings.newBuilder().build()); + } + + /** + * Create {@link QueryStream} for executing query with specified {@link TxMode}. The query can contain DML, DDL and + * DCL statements. Supported mix of different statement types depends on the chosen transaction type. + * + * @param query text of query + * @param tx transaction mode + * @return a ready to execute instance of {@link QueryStream} + */ + default QueryStream fetchScriptResults(String query, TxMode tx) { + return fetchScriptResults(query, tx, Params.empty(), FetchScriptSettings.newBuilder().build()); + } } diff --git a/query/src/main/java/tech/ydb/query/impl/SessionImpl.java b/query/src/main/java/tech/ydb/query/impl/SessionImpl.java index c146018b..5e26d9d9 100644 --- a/query/src/main/java/tech/ydb/query/impl/SessionImpl.java +++ b/query/src/main/java/tech/ydb/query/impl/SessionImpl.java @@ -9,6 +9,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import com.google.protobuf.Duration; import com.google.protobuf.TextFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -25,6 +26,7 @@ import tech.ydb.core.settings.BaseRequestSettings; import tech.ydb.core.utils.URITools; import tech.ydb.core.utils.UpdatableOptional; +import tech.ydb.proto.OperationProtos; import tech.ydb.proto.query.YdbQuery; import tech.ydb.query.QuerySession; import tech.ydb.query.QueryStream; @@ -37,6 +39,8 @@ import tech.ydb.query.settings.CreateSessionSettings; import tech.ydb.query.settings.DeleteSessionSettings; import tech.ydb.query.settings.ExecuteQuerySettings; +import tech.ydb.query.settings.ExecuteScriptSettings; +import tech.ydb.query.settings.FetchScriptSettings; import tech.ydb.query.settings.QueryExecMode; import tech.ydb.query.settings.QueryStatsMode; import tech.ydb.query.settings.RollbackTransactionSettings; @@ -244,6 +248,75 @@ void handleTxMeta(String txID) { }; } + @Override + public CompletableFuture> executeScript(String query, Params params, ExecuteScriptSettings settings) { + YdbQuery.ExecuteScriptRequest.Builder request = YdbQuery.ExecuteScriptRequest.newBuilder() + .setExecMode(mapExecMode(settings.getExecMode())) + .setStatsMode(mapStatsMode(settings.getStatsMode())) + .setScriptContent(YdbQuery.QueryContent.newBuilder() + .setSyntax(YdbQuery.Syntax.SYNTAX_YQL_V1) + .setText(query) + .build()); + + java.time.Duration ttl = settings.getTtl(); + if(ttl != null) { + request.setResultsTtl(Duration.newBuilder().setNanos(settings.getTtl().getNano())); + } + + String resourcePool = settings.getResourcePool(); + if (resourcePool != null && !resourcePool.isEmpty()) { + request.setPoolId(resourcePool); + } + + request.putAllParameters(params.toPb()); + + GrpcRequestSettings.Builder options = makeOptions(settings); + + return rpc.executeScript(request.build(), options.build()); + } + + @Override + public QueryStream fetchScriptResults(String query, TxMode tx, Params params, FetchScriptSettings settings) { + YdbQuery.FetchScriptResultsRequest.Builder request = YdbQuery.FetchScriptResultsRequest.newBuilder() + .setOperationId(settings.getOperationId()) + .setFetchToken(settings.getFetchToken()) + .setRowsLimit(10) + .setResultSetIndex(0L); + + GrpcRequestSettings.Builder options = makeOptions(settings); + CompletableFuture> response + = rpc.fetchScriptResults(request.build(), options.build()); + + return null; + /* response.thenApply(result -> result.getValue() ) + + Result resp = response.get(); + YdbQuery.FetchScriptResultsResponse rs = resp.getValue(); + + rs.getResultSet(); + rs.get*/ + /* + rs := response.GetResultSet() + columns := rs.GetColumns() + columnNames := make([]string, len(columns)) + columnTypes := make([]types.Type, len(columns)) + for i := range columns { + columnNames[i] = columns[i].GetName() + columnTypes[i] = types.TypeFromYDB(columns[i].GetType()) + } + rows := make([]query.Row, len(rs.GetRows())) + for i, r := range rs.GetRows() { + rows[i] = NewRow(columns, r) + } + + return &options.FetchScriptResult{ + ResultSetIndex: response.GetResultSetIndex(), + ResultSet: MaterializedResultSet(int(response.GetResultSetIndex()), columnNames, columnTypes, rows), + NextToken: response.GetNextFetchToken(), + }, nil + */ + } + public CompletableFuture> delete(DeleteSessionSettings settings) { YdbQuery.DeleteSessionRequest request = YdbQuery.DeleteSessionRequest.newBuilder() .setSessionId(sessionId) diff --git a/query/src/main/java/tech/ydb/query/settings/ExecuteScriptSettings.java b/query/src/main/java/tech/ydb/query/settings/ExecuteScriptSettings.java new file mode 100644 index 00000000..9e9c23bf --- /dev/null +++ b/query/src/main/java/tech/ydb/query/settings/ExecuteScriptSettings.java @@ -0,0 +1,90 @@ +package tech.ydb.query.settings; + +import tech.ydb.core.grpc.GrpcFlowControl; +import tech.ydb.core.settings.BaseRequestSettings; + +import java.time.Duration; + +/** + + */ +public class ExecuteScriptSettings extends BaseRequestSettings { + private final QueryExecMode execMode; + private final QueryStatsMode statsMode; + private final String resourcePool; + private final Duration ttl; + + private ExecuteScriptSettings(Builder builder) { + super(builder); + this.execMode = builder.execMode; + this.statsMode = builder.statsMode; + this.ttl = builder.ttl; + this.resourcePool = builder.resourcePool; + } + + public QueryExecMode getExecMode() { + return this.execMode; + } + + public Duration getTtl() { + return ttl; + } + + public QueryStatsMode getStatsMode() { + return this.statsMode; + } + + /** + * Get resource pool for query execution + * @return resource pool name + */ + public String getResourcePool() { + return this.resourcePool; + } + + + public static Builder newBuilder() { + return new Builder(); + } + + public static class Builder extends BaseBuilder { + private QueryExecMode execMode = QueryExecMode.EXECUTE; + private QueryStatsMode statsMode = QueryStatsMode.NONE; + private String resourcePool = null; + private Duration ttl =null; + + public Builder withExecMode(QueryExecMode mode) { + this.execMode = mode; + return this; + } + + public Builder withStatsMode(QueryStatsMode mode) { + this.statsMode = mode; + return this; + } + + public Builder withTtl(Duration value) { + this.ttl = value; + return this; + } + + /** + * Set resource pool which query try to use. + * If no pool specify or poolId is empty or poolId equals "default" + * the unremovable resource pool "default" will be used + * + * @param poolId resource pool identifier + * + * @return builder + */ + public Builder withResourcePool(String poolId) { + this.resourcePool = poolId; + return this; + } + + @Override + public ExecuteScriptSettings build() { + return new ExecuteScriptSettings(this); + } + } +} diff --git a/query/src/main/java/tech/ydb/query/settings/FetchScriptSettings.java b/query/src/main/java/tech/ydb/query/settings/FetchScriptSettings.java new file mode 100644 index 00000000..110e5203 --- /dev/null +++ b/query/src/main/java/tech/ydb/query/settings/FetchScriptSettings.java @@ -0,0 +1,52 @@ +package tech.ydb.query.settings; + +import java.time.Duration; + +import tech.ydb.core.settings.BaseRequestSettings; + +/** + * operationId_ = ""; + * fetchToken_ = ""; + */ +public class FetchScriptSettings extends BaseRequestSettings { + private final String operationId; + private final String fetchToken; + + private FetchScriptSettings(Builder builder) { + super(builder); + this.operationId = builder.operationId; + this.fetchToken = builder.fetchToken; + } + + public String getOperationId() { + return operationId; + } + + public String getFetchToken() { + return fetchToken; + } + + public static Builder newBuilder() { + return new Builder(); + } + + public static class Builder extends BaseBuilder { + private String operationId = ""; + private String fetchToken = ""; + + public Builder withEOperationId(String operationId ) { + this.operationId = operationId; + return this; + } + + public Builder withFetchToken(String fetchToken ) { + this.fetchToken = fetchToken; + return this; + } + + @Override + public FetchScriptSettings build() { + return new FetchScriptSettings(this); + } + } +} diff --git a/query/src/test/java/tech/ydb/query/ScriptExampleTest.java b/query/src/test/java/tech/ydb/query/ScriptExampleTest.java new file mode 100644 index 00000000..9d23cf72 --- /dev/null +++ b/query/src/test/java/tech/ydb/query/ScriptExampleTest.java @@ -0,0 +1,278 @@ +package tech.ydb.query; + +import java.time.Instant; +import java.time.LocalDate; +import java.time.Month; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.FixMethodOrder; +import org.junit.Test; +import org.junit.runners.MethodSorters; + +import tech.ydb.common.transaction.TxMode; +import tech.ydb.core.Result; +import tech.ydb.core.Status; +import tech.ydb.proto.OperationProtos; +import tech.ydb.query.result.QueryInfo; +import tech.ydb.query.settings.ExecuteScriptSettings; +import tech.ydb.query.settings.QueryExecMode; +import tech.ydb.query.tools.QueryReader; +import tech.ydb.query.tools.SessionRetryContext; +import tech.ydb.table.query.Params; +import tech.ydb.table.result.ResultSetReader; +import tech.ydb.table.values.ListType; +import tech.ydb.table.values.ListValue; +import tech.ydb.table.values.PrimitiveType; +import tech.ydb.table.values.PrimitiveValue; +import tech.ydb.table.values.StructType; +import tech.ydb.test.junit4.GrpcTransportRule; + +/** + * @author Alexandr Gorshenin + */ +@FixMethodOrder(MethodSorters.NAME_ASCENDING) +public class ScriptExampleTest { + @ClassRule + public final static GrpcTransportRule ydbRule = new GrpcTransportRule(); + + private static QueryClient client; + private static SessionRetryContext retryCtx; + + @BeforeClass + public static void init() { + client = QueryClient.newClient(ydbRule) + .sessionPoolMaxSize(5) + .build(); + retryCtx = SessionRetryContext.create(client).build(); + + Assert.assertNotNull(client.getScheduler()); + } + + @AfterClass + public static void clean() { + Result t1 = retryCtx.supplyResult(session -> session.createQuery("DROP TABLE series;", TxMode.NONE).execute()) + .join(); + Result t2 = retryCtx.supplyResult(session -> session.createQuery("DROP TABLE seasons;", TxMode.NONE).execute()) + .join(); + + client.close(); + } + + /** + * Simple test that check script without parameters execute without errors + */ + @Test + public void createScript() { + Result operationResult = retryCtx.supplyResult(session -> session.executeScript("" + + "CREATE TABLE series (" + + " series_id UInt64," + + " title Text," + + " series_info Text," + + " release_date Date," + + " PRIMARY KEY(series_id)" + + ");" + + "" + + "CREATE TABLE seasons (" + + " series_id UInt64," + + " season_id UInt64," + + " title Text," + + " first_aired Date," + + " last_aired Date," + + " PRIMARY KEY(series_id, season_id)" + + ")" + ) + ).join(); + + Assert.assertTrue(operationResult.isSuccess()); + + String query + = "SELECT series_id, title, release_date " + + "FROM series WHERE series_id = 1"; + + // Executes data query with specified transaction control settings. + QueryReader result = retryCtx.supplyResult( + session -> QueryReader.readFrom(session.createQuery(query, TxMode.SERIALIZABLE_RW)) + ).join().getValue(); + + ResultSetReader rs = result.getResultSet(0); + + // Check that table exists and contains no data + Assert.assertFalse(rs.next()); + + String query1 + = "SELECT series_id, season_id, title " + + "FROM seasons WHERE series_id = 1"; + + // Executes data query with specified transaction control settings. + QueryReader result1 = retryCtx.supplyResult( + session -> QueryReader.readFrom(session.createQuery(query, TxMode.SERIALIZABLE_RW)) + ).join().getValue(); + + ResultSetReader rs1 = result.getResultSet(0); + + // Check that table exists and contains no data + Assert.assertFalse(rs1.next()); + } + + + /** + * Simple test that check script without parameters execute without errors + */ + @Test + public void createScriptShouldFail() { + Result operationResult = retryCtx.supplyResult(session -> session.executeScript("" + + "CREATE TABLE series (" + + " series_id UInt64," + + " title Text," + + " series_info Text," + + " release_date Date," + + " PRIMARY KEY(series_id)" + + ");" + + "" + + "ZCREATE TABLE seasons (" + + " series_id UInt64," + + " season_id UInt64," + + " title Text," + + " first_aired Date," + + " last_aired Date," + + " PRIMARY KEY(series_id, season_id)" + + ")" + ) + ).join(); + + Assert.assertTrue(operationResult.isSuccess()); + + String query + = "SELECT series_id, title, release_date " + + "FROM series WHERE series_id = 1"; + + // Executes data query with specified transaction control settings. + Result result = retryCtx.supplyResult( + session -> QueryReader.readFrom(session.createQuery(query, TxMode.SERIALIZABLE_RW)) + ).join(); + + + // Check that table exists and contains no data + Assert.assertFalse(result.isSuccess()); + + String query1 + = "SELECT series_id, season_id, title " + + "FROM seasons WHERE series_id = 1"; + + Result result1 = retryCtx.supplyResult( + session -> QueryReader.readFrom(session.createQuery(query, TxMode.SERIALIZABLE_RW)) + ).join(); + + Assert.assertFalse(result1.isSuccess()); + } + + @Test + public void createInsertScript() { + // Create type for struct of series + StructType seriesType = StructType.of( + "series_id", PrimitiveType.Uint64, + "title", PrimitiveType.Text, + "release_date", PrimitiveType.Date, + "series_info", PrimitiveType.Text + ); + // Create and fill list of series + ListValue seriesData = ListType.of(seriesType).newValue( + TestExampleData.SERIES.stream().map(series -> seriesType.newValue( + "series_id", PrimitiveValue.newUint64(series.seriesID()), + "title", PrimitiveValue.newText(series.title()), + "series_info", PrimitiveValue.newText(series.seriesInfo()), + "release_date", PrimitiveValue.newDate(series.releaseDate()) + )).collect(Collectors.toList()) + ); + + // Create type for struct of season + StructType seasonType = StructType.of( + "series_id", PrimitiveType.Uint64, + "season_id", PrimitiveType.Uint64, + "title", PrimitiveType.Text, + "first_aired", PrimitiveType.Date, + "last_aired", PrimitiveType.Date + ); + // Create and fill list of seasons + ListValue seasonsData = ListType.of(seasonType).newValue( + TestExampleData.SEASONS.stream().map(season -> seasonType.newValue( + "series_id", PrimitiveValue.newUint64(season.seriesID()), + "season_id", PrimitiveValue.newUint64(season.seasonID()), + "title", PrimitiveValue.newText(season.title()), + "first_aired", PrimitiveValue.newDate(season.firstAired()), + "last_aired", PrimitiveValue.newDate(season.lastAired()) + )).collect(Collectors.toList()) + ); + + ExecuteScriptSettings executeScriptSettings = tech.ydb.query.settings.ExecuteScriptSettings.newBuilder() + // .withExecMode(QueryExecMode.EXECUTE) + .build(); + + Result operationResult = retryCtx.supplyResult(session -> session.executeScript("" + + "CREATE TABLE series (" + + " series_id UInt64," + + " title Text," + + " series_info Text," + + " release_date Date," + + " PRIMARY KEY(series_id)" + + ");" + + "" + + "CREATE TABLE seasons (" + + " series_id UInt64," + + " season_id UInt64," + + " title Text," + + " first_aired Date," + + " last_aired Date," + + " PRIMARY KEY(series_id, season_id)" + + ");" + , Params.of("$values", seriesData), executeScriptSettings) + ).join(); + + retryCtx.supplyResult(session -> session.executeScript("" + + "DECLARE $values AS List>;" + + "UPSERT INTO series SELECT * FROM AS_TABLE($values);" + + "" + + "DECLARE $values AS List>;" + + "UPSERT INTO seasons SELECT * FROM AS_TABLE($series)", + Params.of("$values", seriesData, "$series", seasonsData), executeScriptSettings) + ).join().getStatus().expectSuccess("upsert problem"); + + + String query + = "SELECT series_id, title, release_date " + + "FROM series WHERE series_id = 1"; + + // Executes data query with specified transaction control settings. + Result result = retryCtx.supplyResult( + session -> QueryReader.readFrom(session.createQuery(query, TxMode.SERIALIZABLE_RW)) + ).join(); + + ResultSetReader rs = result.getValue().getResultSet(0); + + Assert.assertTrue(rs.next()); + Assert.assertEquals(1, rs.getColumn("series_id").getUint64()); + Assert.assertEquals("IT Crowd", rs.getColumn("title").getText()); + Assert.assertEquals(LocalDate.of(2006, Month.FEBRUARY, 3), rs.getColumn("release_date").getDate()); + + Assert.assertFalse(rs.next()); + + + operationResult.getValue(); + } +} From ef4f8f2eeb83a1c3c4c5872c8630a480e0ec7497 Mon Sep 17 00:00:00 2001 From: Evgeniy Kuvardin Date: Mon, 29 Sep 2025 22:28:45 +0300 Subject: [PATCH 2/4] Add test with parameters --- .../tech/ydb/query/ScriptExampleTest.java | 69 +++++++++++++++---- 1 file changed, 55 insertions(+), 14 deletions(-) diff --git a/query/src/test/java/tech/ydb/query/ScriptExampleTest.java b/query/src/test/java/tech/ydb/query/ScriptExampleTest.java index 9d23cf72..d7d8ec76 100644 --- a/query/src/test/java/tech/ydb/query/ScriptExampleTest.java +++ b/query/src/test/java/tech/ydb/query/ScriptExampleTest.java @@ -68,6 +68,7 @@ public static void clean() { */ @Test public void createScript() { + Result operationResult = retryCtx.supplyResult(session -> session.executeScript("" + "CREATE TABLE series (" + " series_id UInt64," @@ -229,19 +230,31 @@ public void createInsertScript() { + " first_aired Date," + " last_aired Date," + " PRIMARY KEY(series_id, season_id)" - + ");" - , Params.of("$values", seriesData), executeScriptSettings) + + ")" + ) ).join(); - retryCtx.supplyResult(session -> session.executeScript("" + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } +/* + + retryCtx.supplyResult(session -> session.createQuery("" + "DECLARE $values AS List>;" - + "UPSERT INTO series SELECT * FROM AS_TABLE($values);" - + "" + + "UPSERT INTO seasons SELECT * FROM AS_TABLE($values)", + TxMode.SERIALIZABLE_RW, + Params.of("$values", seasonsData) + ).execute()).join().getStatus().expectSuccess("upsert problem");*/ + + Result t1 = retryCtx.supplyResult(session -> session.executeScript("" + "DECLARE $values AS List>;" - + "UPSERT INTO seasons SELECT * FROM AS_TABLE($series)", - Params.of("$values", seriesData, "$series", seasonsData), executeScriptSettings) + + "DECLARE $values1 AS List>;" + + "UPSERT INTO seasons SELECT * FROM AS_TABLE($values);" + + "UPSERT INTO series SELECT * FROM AS_TABLE($values1);", + Params.of("$values", seasonsData, "$values1", seriesData), executeScriptSettings) + ).join(); + t1.getValue(); + + /* retryCtx.supplyResult(session -> session.executeScript(""+ + "DECLARE $values1 AS List>;" + + "UPSERT INTO seasons SELECT * FROM AS_TABLE($values1);" + /*+ "UPSERT INTO series SELECT * FROM AS_TABLE($values1);", + Params.of("$values1", seasonsData), executeScriptSettings) ).join().getStatus().expectSuccess("upsert problem"); +*/ + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + String query - = "SELECT series_id, title, release_date " - + "FROM series WHERE series_id = 1"; + = "SELECT series_id " + + "FROM seasons WHERE series_id = 1"; // Executes data query with specified transaction control settings. Result result = retryCtx.supplyResult( @@ -267,10 +308,10 @@ public void createInsertScript() { Assert.assertTrue(rs.next()); Assert.assertEquals(1, rs.getColumn("series_id").getUint64()); - Assert.assertEquals("IT Crowd", rs.getColumn("title").getText()); - Assert.assertEquals(LocalDate.of(2006, Month.FEBRUARY, 3), rs.getColumn("release_date").getDate()); + // Assert.assertEquals("IT Crowd", rs.getColumn("title").getText()); + // Assert.assertEquals(LocalDate.of(2006, Month.FEBRUARY, 3), rs.getColumn("release_date").getDate()); + - Assert.assertFalse(rs.next()); operationResult.getValue(); From dd982b06a071e62faf9c02624f2c970a06e628fc Mon Sep 17 00:00:00 2001 From: Evgeniy Kuvardin Date: Wed, 8 Oct 2025 23:53:11 +0300 Subject: [PATCH 3/4] Remove unused code, add comments, java doc --- .../java/tech/ydb/core/OperationResult.java | 53 +++ .../java/tech/ydb/query/QuerySession.java | 45 ++- .../tech/ydb/query/impl/QueryServiceRpc.java | 45 ++- .../java/tech/ydb/query/impl/SessionImpl.java | 88 ++-- .../query/settings/ExecuteScriptSettings.java | 5 +- .../query/settings/FetchScriptSettings.java | 41 +- .../ydb/query/tools/SessionRetryContext.java | 85 ++++ .../tech/ydb/query/ScriptExampleTest.java | 375 +++++++++++------- 8 files changed, 524 insertions(+), 213 deletions(-) create mode 100644 core/src/main/java/tech/ydb/core/OperationResult.java diff --git a/core/src/main/java/tech/ydb/core/OperationResult.java b/core/src/main/java/tech/ydb/core/OperationResult.java new file mode 100644 index 00000000..ea146171 --- /dev/null +++ b/core/src/main/java/tech/ydb/core/OperationResult.java @@ -0,0 +1,53 @@ +package tech.ydb.core; + +import tech.ydb.core.operation.Operation; + +import javax.annotation.Nonnull; + +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; + +public class OperationResult implements Result{ + + Operation> operation; + Result result; + + public OperationResult(Operation> resultOperation) { + this.operation = resultOperation; + this.result = operation.getValue(); + } + + public Operation> getOperation() { + return operation; + } + + @Nonnull + @Override + public Status getStatus() { + return result.getStatus(); + } + + @Nonnull + @Override + public T getValue() throws UnexpectedResultException { + return result.getValue(); + } + + @Nonnull + @Override + public Result map(@Nonnull Function mapper) { + return result.map(mapper); + } + + @Nonnull + @Override + public CompletableFuture> mapResultFuture(@Nonnull Function>> mapper) { + return result.mapResultFuture(mapper); + } + + @Nonnull + @Override + public CompletableFuture mapStatusFuture(@Nonnull Function> mapper) { + return result.mapStatusFuture(mapper); + } +} diff --git a/query/src/main/java/tech/ydb/query/QuerySession.java b/query/src/main/java/tech/ydb/query/QuerySession.java index cf44541a..6104f5e0 100644 --- a/query/src/main/java/tech/ydb/query/QuerySession.java +++ b/query/src/main/java/tech/ydb/query/QuerySession.java @@ -6,7 +6,10 @@ import tech.ydb.common.transaction.TxMode; import tech.ydb.core.Result; -import tech.ydb.proto.OperationProtos; +import tech.ydb.core.Status; +import tech.ydb.core.operation.Operation; +import tech.ydb.proto.query.YdbQuery; +import tech.ydb.proto.scripting.ScriptingProtos; import tech.ydb.query.settings.BeginTransactionSettings; import tech.ydb.query.settings.ExecuteQuerySettings; import tech.ydb.query.settings.ExecuteScriptSettings; @@ -71,6 +74,17 @@ public interface QuerySession extends AutoCloseable { */ QueryStream createQuery(String query, TxMode tx, Params params, ExecuteQuerySettings settings); + + /** + * Execute yql script + * @param query text of query + * @param params query parameters + * @param settings additional settings of query execution + * @return future with result of the script execution + */ + CompletableFuture> executeScriptYql(String query, Params params, ExecuteScriptSettings settings); + + /** * Create {@link QueryStream} for executing query with specified {@link TxMode}. The query can contain DML, DDL and * DCL statements. Supported mix of different statement types depends on the chosen transaction type. @@ -80,19 +94,18 @@ public interface QuerySession extends AutoCloseable { * @param settings additional settings of query execution * @return a ready to execute instance of {@link QueryStream} */ - CompletableFuture> executeScript(String query, Params params, ExecuteScriptSettings settings); + CompletableFuture> executeScript(String query, Params params, ExecuteScriptSettings settings); /** - * Create {@link QueryStream} for executing query with specified {@link TxMode}. The query can contain DML, DDL and - * DCL statements. Supported mix of different statement types depends on the chosen transaction type. + * Fetch result from script which has already passed to YDB + * Before use wait for CompletableFuture with operation * * @param query text of query - * @param tx transaction mode * @param params query parameters * @param settings additional settings of query execution * @return a ready to execute instance of {@link QueryStream} */ - QueryStream fetchScriptResults(String query, TxMode tx, Params params, FetchScriptSettings settings); + CompletableFuture> fetchScriptResults(String query, Params params, FetchScriptSettings settings); @Override void close(); @@ -134,25 +147,23 @@ default CompletableFuture> beginTransaction(TxMode txMo } /** - * Create {@link QueryStream} for executing query with specified {@link TxMode}. The query can contain DML, DDL and - * DCL statements. Supported mix of different statement types depends on the chosen transaction type. + * Execute Yql script with different type of operation * * @param query text of query - * @return a ready to execute instance of {@link QueryStream} + * @return a future join on it to wait for script finished */ - default CompletableFuture> executeScript(String query) { - return executeScript(query, Params.empty(), ExecuteScriptSettings.newBuilder().build()); + default CompletableFuture> executeScriptYql(String query) { + return executeScriptYql(query, Params.empty(), ExecuteScriptSettings.newBuilder().build()); } /** - * Create {@link QueryStream} for executing query with specified {@link TxMode}. The query can contain DML, DDL and - * DCL statements. Supported mix of different statement types depends on the chosen transaction type. + * Execute Yql script with different type of operation + * Take a not join on a future is not granted that script is executed instead join guarantee that script pass to YDB * * @param query text of query - * @param tx transaction mode - * @return a ready to execute instance of {@link QueryStream} + * @return future join on it to wait for script pass to YDB but not get result */ - default QueryStream fetchScriptResults(String query, TxMode tx) { - return fetchScriptResults(query, tx, Params.empty(), FetchScriptSettings.newBuilder().build()); + default CompletableFuture> executeScript(String query) { + return executeScript(query, Params.empty(), ExecuteScriptSettings.newBuilder().build()); } } diff --git a/query/src/main/java/tech/ydb/query/impl/QueryServiceRpc.java b/query/src/main/java/tech/ydb/query/impl/QueryServiceRpc.java index e1ef7fc8..b636cbc0 100644 --- a/query/src/main/java/tech/ydb/query/impl/QueryServiceRpc.java +++ b/query/src/main/java/tech/ydb/query/impl/QueryServiceRpc.java @@ -5,13 +5,17 @@ import io.grpc.Context; import tech.ydb.core.Result; +import tech.ydb.core.Status; import tech.ydb.core.grpc.GrpcReadStream; import tech.ydb.core.grpc.GrpcRequestSettings; import tech.ydb.core.grpc.GrpcTransport; +import tech.ydb.core.operation.Operation; +import tech.ydb.core.operation.OperationBinder; import tech.ydb.core.operation.StatusExtractor; -import tech.ydb.proto.OperationProtos; import tech.ydb.proto.query.YdbQuery; import tech.ydb.proto.query.v1.QueryServiceGrpc; +import tech.ydb.proto.scripting.ScriptingProtos; +import tech.ydb.proto.scripting.v1.ScriptingServiceGrpc; /** * @@ -106,11 +110,46 @@ public GrpcReadStream executeQuery( return transport.readStreamCall(QueryServiceGrpc.getExecuteQueryMethod(), settings, request); } - public CompletableFuture> executeScript( + /** + * Run execute script using Yql + * + * @param request request for execute script + * @param settings grpc settings + * @return future with result of execution + */ + public CompletableFuture>> executeScriptYql( + ScriptingProtos.ExecuteYqlRequest request, GrpcRequestSettings settings) { + + return transport.unaryCall(ScriptingServiceGrpc.getExecuteYqlMethod(), settings, request) + .thenApply(OperationBinder.bindAsync( + transport, ScriptingProtos.ExecuteYqlResponse::getOperation, ScriptingProtos.ExecuteYqlResult.class) + ); + } + + /** + * Execute script using query + * + * @param request request for execute script + * @param settings grpc settings + * @return future with result of execution + */ + public CompletableFuture> executeScript( YdbQuery.ExecuteScriptRequest request, GrpcRequestSettings settings) { - return transport.unaryCall(QueryServiceGrpc.getExecuteScriptMethod(), settings, request); + + return transport.unaryCall(QueryServiceGrpc.getExecuteScriptMethod(), settings, request) + .thenApply( + OperationBinder.bindAsync(transport, + op -> op + )); } + /** + * Fetch script using query + * + * @param request for execute script + * @param settings grpc settings + * @return future with result of execution + */ public CompletableFuture> fetchScriptResults( YdbQuery.FetchScriptResultsRequest request, GrpcRequestSettings settings) { return transport diff --git a/query/src/main/java/tech/ydb/query/impl/SessionImpl.java b/query/src/main/java/tech/ydb/query/impl/SessionImpl.java index 5e26d9d9..cb8b78d0 100644 --- a/query/src/main/java/tech/ydb/query/impl/SessionImpl.java +++ b/query/src/main/java/tech/ydb/query/impl/SessionImpl.java @@ -9,6 +9,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import com.google.common.base.Strings; import com.google.protobuf.Duration; import com.google.protobuf.TextFormat; import org.slf4j.Logger; @@ -17,17 +18,20 @@ import tech.ydb.common.transaction.TxMode; import tech.ydb.common.transaction.impl.YdbTransactionImpl; import tech.ydb.core.Issue; +import tech.ydb.core.OperationResult; import tech.ydb.core.Result; import tech.ydb.core.Status; import tech.ydb.core.StatusCode; import tech.ydb.core.grpc.GrpcReadStream; import tech.ydb.core.grpc.GrpcRequestSettings; +import tech.ydb.core.operation.Operation; import tech.ydb.core.operation.StatusExtractor; import tech.ydb.core.settings.BaseRequestSettings; import tech.ydb.core.utils.URITools; import tech.ydb.core.utils.UpdatableOptional; -import tech.ydb.proto.OperationProtos; import tech.ydb.proto.query.YdbQuery; +import tech.ydb.proto.scripting.ScriptingProtos; +import tech.ydb.proto.table.YdbTable; import tech.ydb.query.QuerySession; import tech.ydb.query.QueryStream; import tech.ydb.query.QueryTransaction; @@ -186,6 +190,19 @@ private static YdbQuery.ExecMode mapExecMode(QueryExecMode mode) { } } + private static YdbTable.QueryStatsCollection.Mode mapStatsCollectionMode(QueryStatsMode mode) { + switch (mode) { + case NONE: return YdbTable.QueryStatsCollection.Mode.STATS_COLLECTION_NONE; + case BASIC: return YdbTable.QueryStatsCollection.Mode.STATS_COLLECTION_BASIC; + case FULL: return YdbTable.QueryStatsCollection.Mode.STATS_COLLECTION_FULL; + case PROFILE: return YdbTable.QueryStatsCollection.Mode.STATS_COLLECTION_PROFILE; + + case UNSPECIFIED: + default: + return YdbTable.QueryStatsCollection.Mode.STATS_COLLECTION_UNSPECIFIED; + } + } + private static YdbQuery.StatsMode mapStatsMode(QueryStatsMode mode) { switch (mode) { case NONE: return YdbQuery.StatsMode.STATS_MODE_NONE; @@ -249,7 +266,20 @@ void handleTxMeta(String txID) { } @Override - public CompletableFuture> executeScript(String query, Params params, ExecuteScriptSettings settings) { + public CompletableFuture> executeScriptYql(String query, Params params, ExecuteScriptSettings settings) { + ScriptingProtos.ExecuteYqlRequest.Builder requestBuilder = ScriptingProtos.ExecuteYqlRequest.newBuilder() + .setScript(query) + .setCollectStats(mapStatsCollectionMode(settings.getStatsMode())); + + requestBuilder.putAllParameters(params.toPb()); + + GrpcRequestSettings.Builder options = makeOptions(settings); + + return rpc.executeScriptYql(requestBuilder.build(), options.build()).thenApply(OperationResult::new); + } + + @Override + public CompletableFuture> executeScript(String query, Params params, ExecuteScriptSettings settings) { YdbQuery.ExecuteScriptRequest.Builder request = YdbQuery.ExecuteScriptRequest.newBuilder() .setExecMode(mapExecMode(settings.getExecMode())) .setStatsMode(mapStatsMode(settings.getStatsMode())) @@ -276,45 +306,25 @@ public CompletableFuture> executeScript(String } @Override - public QueryStream fetchScriptResults(String query, TxMode tx, Params params, FetchScriptSettings settings) { - YdbQuery.FetchScriptResultsRequest.Builder request = YdbQuery.FetchScriptResultsRequest.newBuilder() - .setOperationId(settings.getOperationId()) - .setFetchToken(settings.getFetchToken()) - .setRowsLimit(10) - .setResultSetIndex(0L); + public CompletableFuture> fetchScriptResults(String query, Params params, FetchScriptSettings settings) { + YdbQuery.FetchScriptResultsRequest.Builder requestBuilder = YdbQuery.FetchScriptResultsRequest.newBuilder(); + + if(!Strings.isNullOrEmpty(settings.getFetchToken())) { + requestBuilder.setFetchToken(settings.getFetchToken()); + } + + if(settings.getRowsLimit() > 0) { + requestBuilder.setRowsLimit(settings.getRowsLimit()); + } + + requestBuilder.setOperationId(settings.getOperationId()); + + if(settings.getSetResultSetIndex() >= 0) { + requestBuilder.setResultSetIndex(settings.getSetResultSetIndex()); + } GrpcRequestSettings.Builder options = makeOptions(settings); - CompletableFuture> response - = rpc.fetchScriptResults(request.build(), options.build()); - - return null; - /* response.thenApply(result -> result.getValue() ) - - Result resp = response.get(); - YdbQuery.FetchScriptResultsResponse rs = resp.getValue(); - - rs.getResultSet(); - rs.get*/ - /* - rs := response.GetResultSet() - columns := rs.GetColumns() - columnNames := make([]string, len(columns)) - columnTypes := make([]types.Type, len(columns)) - for i := range columns { - columnNames[i] = columns[i].GetName() - columnTypes[i] = types.TypeFromYDB(columns[i].GetType()) - } - rows := make([]query.Row, len(rs.GetRows())) - for i, r := range rs.GetRows() { - rows[i] = NewRow(columns, r) - } - - return &options.FetchScriptResult{ - ResultSetIndex: response.GetResultSetIndex(), - ResultSet: MaterializedResultSet(int(response.GetResultSetIndex()), columnNames, columnTypes, rows), - NextToken: response.GetNextFetchToken(), - }, nil - */ + return rpc.fetchScriptResults(requestBuilder.build(), options.build()); } public CompletableFuture> delete(DeleteSessionSettings settings) { diff --git a/query/src/main/java/tech/ydb/query/settings/ExecuteScriptSettings.java b/query/src/main/java/tech/ydb/query/settings/ExecuteScriptSettings.java index 9e9c23bf..e32a2648 100644 --- a/query/src/main/java/tech/ydb/query/settings/ExecuteScriptSettings.java +++ b/query/src/main/java/tech/ydb/query/settings/ExecuteScriptSettings.java @@ -1,12 +1,11 @@ package tech.ydb.query.settings; -import tech.ydb.core.grpc.GrpcFlowControl; import tech.ydb.core.settings.BaseRequestSettings; import java.time.Duration; /** - + * Script settings to pass to execute script */ public class ExecuteScriptSettings extends BaseRequestSettings { private final QueryExecMode execMode; @@ -51,7 +50,7 @@ public static class Builder extends BaseBuilder { private QueryExecMode execMode = QueryExecMode.EXECUTE; private QueryStatsMode statsMode = QueryStatsMode.NONE; private String resourcePool = null; - private Duration ttl =null; + private Duration ttl = null; public Builder withExecMode(QueryExecMode mode) { this.execMode = mode; diff --git a/query/src/main/java/tech/ydb/query/settings/FetchScriptSettings.java b/query/src/main/java/tech/ydb/query/settings/FetchScriptSettings.java index 110e5203..092ab00b 100644 --- a/query/src/main/java/tech/ydb/query/settings/FetchScriptSettings.java +++ b/query/src/main/java/tech/ydb/query/settings/FetchScriptSettings.java @@ -1,21 +1,22 @@ package tech.ydb.query.settings; -import java.time.Duration; - import tech.ydb.core.settings.BaseRequestSettings; /** - * operationId_ = ""; - * fetchToken_ = ""; + * Script fetch settings to pass to fetch script */ public class FetchScriptSettings extends BaseRequestSettings { private final String operationId; private final String fetchToken; + private final int rowsLimit; + private final int setResultSetIndex; private FetchScriptSettings(Builder builder) { super(builder); this.operationId = builder.operationId; this.fetchToken = builder.fetchToken; + this.rowsLimit = builder.rowsLimit; + this.setResultSetIndex = builder.setResultSetIndex; } public String getOperationId() { @@ -26,20 +27,48 @@ public String getFetchToken() { return fetchToken; } + public int getRowsLimit() { + return rowsLimit; + } + + public int getSetResultSetIndex() { + return setResultSetIndex; + } + public static Builder newBuilder() { return new Builder(); } public static class Builder extends BaseBuilder { + public int getRowsLimit() { + return rowsLimit; + } + + public Builder setRowsLimit(int rowsLimit) { + this.rowsLimit = rowsLimit; + return this; + } + + public int getSetResultSetIndex() { + return setResultSetIndex; + } + + public Builder setSetResultSetIndex(int setResultSetIndex) { + this.setResultSetIndex = setResultSetIndex; + return this; + } + + public int rowsLimit = -1; + public int setResultSetIndex = 0; private String operationId = ""; private String fetchToken = ""; - public Builder withEOperationId(String operationId ) { + public Builder withEOperationId(String operationId) { this.operationId = operationId; return this; } - public Builder withFetchToken(String fetchToken ) { + public Builder withFetchToken(String fetchToken) { this.fetchToken = fetchToken; return this; } diff --git a/query/src/main/java/tech/ydb/query/tools/SessionRetryContext.java b/query/src/main/java/tech/ydb/query/tools/SessionRetryContext.java index cbecca71..1f69c378 100644 --- a/query/src/main/java/tech/ydb/query/tools/SessionRetryContext.java +++ b/query/src/main/java/tech/ydb/query/tools/SessionRetryContext.java @@ -10,6 +10,7 @@ import java.util.function.Function; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import javax.annotation.ParametersAreNonnullByDefault; import com.google.common.base.Preconditions; @@ -19,6 +20,7 @@ import tech.ydb.core.Status; import tech.ydb.core.StatusCode; import tech.ydb.core.UnexpectedResultException; +import tech.ydb.core.operation.Operation; import tech.ydb.core.utils.FutureTools; import tech.ydb.query.QueryClient; import tech.ydb.query.QuerySession; @@ -70,6 +72,13 @@ public CompletableFuture supplyStatus(Function> supplyOperation(Function>> fn) { + RetryableOperationTask task = new RetryableOperationTask(fn); + task.requestSession(); + return task.getFuture(); + } + private boolean canRetry(StatusCode code) { return code.isRetryable(idempotent) || (retryNotFound && code == StatusCode.NOT_FOUND); } @@ -285,6 +294,82 @@ Status toFailedResult(Result sessionResult) { } } + /** + * RETRYABLE OPERATION TASK + */ + private final class RetryableOperationTask extends BaseRetryableTask> { + RetryableOperationTask(Function>> fn) { + super(fn); + } + + @Override + StatusCode toStatusCode(Operation result) { + if(result.getValue() == null) { + return StatusCode.SUCCESS; + } + return result.getValue().getCode(); + } + + @Override + Operation toFailedResult(Result sessionResult) { + return new FailedOperationTask<>(null, sessionResult.getStatus()); + } + + /** + * Failed Operation task which will be return in case of fail + * @param - type of the operation result + */ + class FailedOperationTask implements Operation { + + final Status status; + final R value; + + FailedOperationTask(@Nullable R value, Status status) { + this.status = status; + this.value = value; + } + + @Nullable + @Override + public String getId() { + return ""; + } + + @Override + public boolean isReady() { + return false; + } + + @Nullable + @Override + public R getValue() { + return value; + } + + @Override + public CompletableFuture cancel() { + return CompletableFuture.completedFuture(status); + } + + @Override + public CompletableFuture forget() { + return CompletableFuture.completedFuture(status); + } + + @Override + public CompletableFuture> fetch() { + return CompletableFuture.completedFuture(Result.fail(status)); + } + + @Override + public Operation transform(Function mapper) { + return new FailedOperationTask<>(mapper.apply(value), status); + } + + + } + } + /** * BUILDER */ diff --git a/query/src/test/java/tech/ydb/query/ScriptExampleTest.java b/query/src/test/java/tech/ydb/query/ScriptExampleTest.java index d7d8ec76..d6b22600 100644 --- a/query/src/test/java/tech/ydb/query/ScriptExampleTest.java +++ b/query/src/test/java/tech/ydb/query/ScriptExampleTest.java @@ -1,30 +1,29 @@ package tech.ydb.query; -import java.time.Instant; -import java.time.LocalDate; -import java.time.Month; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.ClassRule; -import org.junit.FixMethodOrder; import org.junit.Test; -import org.junit.runners.MethodSorters; import tech.ydb.common.transaction.TxMode; import tech.ydb.core.Result; import tech.ydb.core.Status; -import tech.ydb.proto.OperationProtos; -import tech.ydb.query.result.QueryInfo; +import tech.ydb.core.operation.Operation; +import tech.ydb.core.operation.OperationTray; +import tech.ydb.proto.query.YdbQuery; +import tech.ydb.proto.scripting.ScriptingProtos; import tech.ydb.query.settings.ExecuteScriptSettings; -import tech.ydb.query.settings.QueryExecMode; +import tech.ydb.query.settings.FetchScriptSettings; import tech.ydb.query.tools.QueryReader; import tech.ydb.query.tools.SessionRetryContext; import tech.ydb.table.query.Params; import tech.ydb.table.result.ResultSetReader; +import tech.ydb.table.result.impl.ProtoValueReaders; import tech.ydb.table.values.ListType; import tech.ydb.table.values.ListValue; import tech.ydb.table.values.PrimitiveType; @@ -32,17 +31,54 @@ import tech.ydb.table.values.StructType; import tech.ydb.test.junit4.GrpcTransportRule; + /** - * @author Alexandr Gorshenin + * */ -@FixMethodOrder(MethodSorters.NAME_ASCENDING) public class ScriptExampleTest { + @ClassRule public final static GrpcTransportRule ydbRule = new GrpcTransportRule(); private static QueryClient client; private static SessionRetryContext retryCtx; + // Create type for struct of series + StructType seriesType = StructType.of( + "series_id", PrimitiveType.Uint64, + "title", PrimitiveType.Text, + "release_date", PrimitiveType.Date, + "series_info", PrimitiveType.Text + ); + // Create and fill list of series + ListValue seriesData = ListType.of(seriesType).newValue( + TestExampleData.SERIES.stream().map(series -> seriesType.newValue( + "series_id", PrimitiveValue.newUint64(series.seriesID()), + "title", PrimitiveValue.newText(series.title()), + "series_info", PrimitiveValue.newText(series.seriesInfo()), + "release_date", PrimitiveValue.newDate(series.releaseDate()) + )).collect(Collectors.toList()) + ); + + // Create type for struct of season + StructType seasonType = StructType.of( + "series_id", PrimitiveType.Uint64, + "season_id", PrimitiveType.Uint64, + "title", PrimitiveType.Text, + "first_aired", PrimitiveType.Date, + "last_aired", PrimitiveType.Date + ); + // Create and fill list of seasons + ListValue seasonsData = ListType.of(seasonType).newValue( + TestExampleData.SEASONS.stream().map(season -> seasonType.newValue( + "series_id", PrimitiveValue.newUint64(season.seriesID()), + "season_id", PrimitiveValue.newUint64(season.seasonID()), + "title", PrimitiveValue.newText(season.title()), + "first_aired", PrimitiveValue.newDate(season.firstAired()), + "last_aired", PrimitiveValue.newDate(season.lastAired()) + )).collect(Collectors.toList()) + ); + @BeforeClass public static void init() { client = QueryClient.newClient(ydbRule) @@ -55,9 +91,9 @@ public static void init() { @AfterClass public static void clean() { - Result t1 = retryCtx.supplyResult(session -> session.createQuery("DROP TABLE series;", TxMode.NONE).execute()) + retryCtx.supplyResult(session -> session.createQuery("DROP TABLE series;", TxMode.NONE).execute()) .join(); - Result t2 = retryCtx.supplyResult(session -> session.createQuery("DROP TABLE seasons;", TxMode.NONE).execute()) + retryCtx.supplyResult(session -> session.createQuery("DROP TABLE seasons;", TxMode.NONE).execute()) .join(); client.close(); @@ -68,28 +104,26 @@ public static void clean() { */ @Test public void createScript() { - - Result operationResult = retryCtx.supplyResult(session -> session.executeScript("" - + "CREATE TABLE series (" - + " series_id UInt64," - + " title Text," - + " series_info Text," - + " release_date Date," - + " PRIMARY KEY(series_id)" - + ");" - + "" - + "CREATE TABLE seasons (" - + " series_id UInt64," - + " season_id UInt64," - + " title Text," - + " first_aired Date," - + " last_aired Date," - + " PRIMARY KEY(series_id, season_id)" - + ")" + Result result0 = retryCtx.supplyResult(session -> session.executeScriptYql( + "CREATE TABLE series (" + + " series_id UInt64," + + " title Text," + + " series_info Text," + + " release_date Date," + + " PRIMARY KEY(series_id)" + + ");" + + "CREATE TABLE seasons (" + + " series_id UInt64," + + " season_id UInt64," + + " title Text," + + " first_aired Date," + + " last_aired Date," + + " PRIMARY KEY(series_id, season_id)" + + ")" ) ).join(); - Assert.assertTrue(operationResult.isSuccess()); + Assert.assertTrue(result0.isSuccess()); String query = "SELECT series_id, title, release_date " @@ -104,29 +138,54 @@ public void createScript() { // Check that table exists and contains no data Assert.assertFalse(rs.next()); + } - String query1 - = "SELECT series_id, season_id, title " - + "FROM seasons WHERE series_id = 1"; + /** + * Simple test that check script without parameters execute with errors if script has errors in text + */ + @Test + public void createScriptShouldFail() { + Result operationResult = retryCtx.supplyResult(session -> session.executeScriptYql( + "CREATE TABLE series (" + + " series_id UInt64," + + " title Text," + + " series_info Text," + + " release_date Date," + + " PRIMARY KEY(series_id)" + + ");" + + "ZCREATE TABLE seasons (" + + " series_id UInt64," + + " season_id UInt64," + + " title Text," + + " first_aired Date," + + " last_aired Date," + + " PRIMARY KEY(series_id, season_id)" + + ")" + ) + ).join(); + + Assert.assertFalse(operationResult.isSuccess()); + + String query + = "SELECT series_id, title, release_date " + + "FROM series WHERE series_id = 1"; // Executes data query with specified transaction control settings. - QueryReader result1 = retryCtx.supplyResult( + Result result = retryCtx.supplyResult( session -> QueryReader.readFrom(session.createQuery(query, TxMode.SERIALIZABLE_RW)) - ).join().getValue(); + ).join(); - ResultSetReader rs1 = result.getResultSet(0); // Check that table exists and contains no data - Assert.assertFalse(rs1.next()); + Assert.assertFalse(result.isSuccess()); } - /** - * Simple test that check script without parameters execute without errors + * Test create and then insert data using {@link ScriptingProtos} proto */ @Test - public void createScriptShouldFail() { - Result operationResult = retryCtx.supplyResult(session -> session.executeScript("" + public void createInsertYqlScript() { + Result task = retryCtx.supplyResult(session -> session.executeScriptYql("" + "CREATE TABLE series (" + " series_id UInt64," + " title Text," @@ -135,7 +194,7 @@ public void createScriptShouldFail() { + " PRIMARY KEY(series_id)" + ");" + "" - + "ZCREATE TABLE seasons (" + + "CREATE TABLE seasons (" + " series_id UInt64," + " season_id UInt64," + " title Text," @@ -146,75 +205,53 @@ public void createScriptShouldFail() { ) ).join(); - Assert.assertTrue(operationResult.isSuccess()); - String query - = "SELECT series_id, title, release_date " - + "FROM series WHERE series_id = 1"; + ExecuteScriptSettings executeScriptSettings = tech.ydb.query.settings.ExecuteScriptSettings.newBuilder() + // .withExecMode(QueryExecMode.EXECUTE) + .build(); - // Executes data query with specified transaction control settings. - Result result = retryCtx.supplyResult( - session -> QueryReader.readFrom(session.createQuery(query, TxMode.SERIALIZABLE_RW)) + Result task1 = retryCtx.supplyResult(session -> session.executeScriptYql("" + + "DECLARE $values AS List>;" + + "DECLARE $values1 AS List>;" + + "UPSERT INTO seasons SELECT * FROM AS_TABLE($values);" + + "UPSERT INTO series SELECT * FROM AS_TABLE($values1);", + Params.of("$values", seasonsData, "$values1", seriesData), executeScriptSettings) ).join(); - - // Check that table exists and contains no data - Assert.assertFalse(result.isSuccess()); - - String query1 - = "SELECT series_id, season_id, title " + String query + = "SELECT series_id " + "FROM seasons WHERE series_id = 1"; - Result result1 = retryCtx.supplyResult( + // Executes data query with specified transaction control settings. + Result result = retryCtx.supplyResult( session -> QueryReader.readFrom(session.createQuery(query, TxMode.SERIALIZABLE_RW)) ).join(); - Assert.assertFalse(result1.isSuccess()); + ResultSetReader rs = result.getValue().getResultSet(0); + + Assert.assertTrue(rs.next()); + Assert.assertEquals(1, rs.getColumn("series_id").getUint64()); } + /** + * Run 2 scripts one after another. + * Ch + */ @Test - public void createInsertScript() { - // Create type for struct of series - StructType seriesType = StructType.of( - "series_id", PrimitiveType.Uint64, - "title", PrimitiveType.Text, - "release_date", PrimitiveType.Date, - "series_info", PrimitiveType.Text - ); - // Create and fill list of series - ListValue seriesData = ListType.of(seriesType).newValue( - TestExampleData.SERIES.stream().map(series -> seriesType.newValue( - "series_id", PrimitiveValue.newUint64(series.seriesID()), - "title", PrimitiveValue.newText(series.title()), - "series_info", PrimitiveValue.newText(series.seriesInfo()), - "release_date", PrimitiveValue.newDate(series.releaseDate()) - )).collect(Collectors.toList()) - ); - - // Create type for struct of season - StructType seasonType = StructType.of( - "series_id", PrimitiveType.Uint64, - "season_id", PrimitiveType.Uint64, - "title", PrimitiveType.Text, - "first_aired", PrimitiveType.Date, - "last_aired", PrimitiveType.Date - ); - // Create and fill list of seasons - ListValue seasonsData = ListType.of(seasonType).newValue( - TestExampleData.SEASONS.stream().map(season -> seasonType.newValue( - "series_id", PrimitiveValue.newUint64(season.seriesID()), - "season_id", PrimitiveValue.newUint64(season.seasonID()), - "title", PrimitiveValue.newText(season.title()), - "first_aired", PrimitiveValue.newDate(season.firstAired()), - "last_aired", PrimitiveValue.newDate(season.lastAired()) - )).collect(Collectors.toList()) - ); - - ExecuteScriptSettings executeScriptSettings = tech.ydb.query.settings.ExecuteScriptSettings.newBuilder() - // .withExecMode(QueryExecMode.EXECUTE) - .build(); - - Result operationResult = retryCtx.supplyResult(session -> session.executeScript("" + public void createInsertQueryScript() { + CompletableFuture> s = + retryCtx.supplyOperation(querySession -> querySession.executeScript("" + "CREATE TABLE series (" + " series_id UInt64," + " title Text," @@ -230,31 +267,19 @@ public void createInsertScript() { + " first_aired Date," + " last_aired Date," + " PRIMARY KEY(series_id, season_id)" - + ")" - ) + + ")")); + + retryCtx.supplyStatus( + ss -> + s.thenCompose(operation -> OperationTray.fetchOperation(operation, 1)) ).join(); - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } -/* - retryCtx.supplyResult(session -> session.createQuery("" - + "DECLARE $values AS List>;" - + "UPSERT INTO seasons SELECT * FROM AS_TABLE($values)", - TxMode.SERIALIZABLE_RW, - Params.of("$values", seasonsData) - ).execute()).join().getStatus().expectSuccess("upsert problem");*/ + ExecuteScriptSettings executeScriptSettings = tech.ydb.query.settings.ExecuteScriptSettings.newBuilder() + .build(); + - Result t1 = retryCtx.supplyResult(session -> session.executeScript("" + CompletableFuture> test = retryCtx.supplyOperation(querySession -> querySession.executeScript("" + "DECLARE $values AS List session.executeScript(""+ - "DECLARE $values1 AS List>;" - + "UPSERT INTO seasons SELECT * FROM AS_TABLE($values1);" - /*+ "UPSERT INTO series SELECT * FROM AS_TABLE($values1);", - Params.of("$values1", seasonsData), executeScriptSettings) - ).join().getStatus().expectSuccess("upsert problem"); -*/ + ); - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } + retryCtx.supplyStatus( + ss -> + test.thenCompose(operation -> OperationTray.fetchOperation(operation, 1)) + ).join(); String query = "SELECT series_id " @@ -308,12 +317,88 @@ public void createInsertScript() { Assert.assertTrue(rs.next()); Assert.assertEquals(1, rs.getColumn("series_id").getUint64()); - // Assert.assertEquals("IT Crowd", rs.getColumn("title").getText()); - // Assert.assertEquals(LocalDate.of(2006, Month.FEBRUARY, 3), rs.getColumn("release_date").getDate()); + } + /** + * Check fetch result + * + * @throws ExecutionException + * @throws InterruptedException + */ + @Test + public void fetchScript() throws ExecutionException, InterruptedException { + Result result0 = retryCtx.supplyResult(session -> session.executeScriptYql("" + + "CREATE TABLE series (" + + " series_id UInt64," + + " title Text," + + " series_info Text," + + " release_date Date," + + " PRIMARY KEY(series_id)" + + ");" + + "" + + "CREATE TABLE seasons (" + + " series_id UInt64," + + " season_id UInt64," + + " title Text," + + " first_aired Date," + + " last_aired Date," + + " PRIMARY KEY(series_id, season_id)" + + ")" + ) + ).join(); + + ExecuteScriptSettings executeScriptSettings = tech.ydb.query.settings.ExecuteScriptSettings.newBuilder() + // .withExecMode(QueryExecMode.EXECUTE) + .build(); + + CompletableFuture> s = + retryCtx.supplyOperation(querySession -> querySession.executeScript("" + + "DECLARE $values AS List>;" + + "DECLARE $values1 AS List>;" + + "UPSERT INTO seasons SELECT * FROM AS_TABLE($values);" + + "UPSERT INTO series SELECT * FROM AS_TABLE($values1);" + + "SELECT series_id FROM seasons where series_id = 1 order by series_id;", + Params.of("$values", seasonsData, "$values1", seriesData), executeScriptSettings)); + + retryCtx.supplyStatus( + ss -> + s.thenCompose(operation -> OperationTray.fetchOperation(operation, 1)) + ).join(); + + + FetchScriptSettings fetchScriptSettings = FetchScriptSettings.newBuilder() + .setRowsLimit(2) + .setSetResultSetIndex(0) + .withEOperationId(s.get().getId()) + .withFetchToken("") + .build(); + + + Result test = retryCtx.supplyResult( + session -> session.fetchScriptResults("" + + "SELECT series_id FROM seasons;", + Params.empty(), fetchScriptSettings) + ).join(); + tech.ydb.proto.ValueProtos.ResultSet resultSet = test.getValue().getResultSet(); - operationResult.getValue(); + Assert.assertEquals(2, resultSet.getRowsCount()); + ResultSetReader reader = ProtoValueReaders.forResultSet(resultSet); + reader.next(); + Assert.assertEquals(1, reader.getColumn(0).getUint64()); + reader.next(); + Assert.assertEquals(1, reader.getColumn(0).getUint64()); } } From 1c48a51b9f4337e90422a56bca65ced6631a3e28 Mon Sep 17 00:00:00 2001 From: Evgeniy Kuvardin Date: Fri, 10 Oct 2025 11:49:35 +0300 Subject: [PATCH 4/4] Add comments --- .../java/tech/ydb/core/OperationResult.java | 53 ---- .../java/tech/ydb/query/QuerySession.java | 91 ++++-- .../tech/ydb/query/impl/QueryServiceRpc.java | 38 ++- .../java/tech/ydb/query/impl/SessionImpl.java | 28 +- .../ydb/query/result/OperationResult.java | 84 ++++++ .../query/settings/ExecuteScriptSettings.java | 84 +++++- .../query/settings/FetchScriptSettings.java | 77 +++-- .../ydb/query/tools/SessionRetryContext.java | 2 +- .../tech/ydb/query/ScriptExampleTest.java | 276 +++++++++--------- 9 files changed, 453 insertions(+), 280 deletions(-) delete mode 100644 core/src/main/java/tech/ydb/core/OperationResult.java create mode 100644 query/src/main/java/tech/ydb/query/result/OperationResult.java diff --git a/core/src/main/java/tech/ydb/core/OperationResult.java b/core/src/main/java/tech/ydb/core/OperationResult.java deleted file mode 100644 index ea146171..00000000 --- a/core/src/main/java/tech/ydb/core/OperationResult.java +++ /dev/null @@ -1,53 +0,0 @@ -package tech.ydb.core; - -import tech.ydb.core.operation.Operation; - -import javax.annotation.Nonnull; - -import java.util.concurrent.CompletableFuture; -import java.util.function.Function; - -public class OperationResult implements Result{ - - Operation> operation; - Result result; - - public OperationResult(Operation> resultOperation) { - this.operation = resultOperation; - this.result = operation.getValue(); - } - - public Operation> getOperation() { - return operation; - } - - @Nonnull - @Override - public Status getStatus() { - return result.getStatus(); - } - - @Nonnull - @Override - public T getValue() throws UnexpectedResultException { - return result.getValue(); - } - - @Nonnull - @Override - public Result map(@Nonnull Function mapper) { - return result.map(mapper); - } - - @Nonnull - @Override - public CompletableFuture> mapResultFuture(@Nonnull Function>> mapper) { - return result.mapResultFuture(mapper); - } - - @Nonnull - @Override - public CompletableFuture mapStatusFuture(@Nonnull Function> mapper) { - return result.mapStatusFuture(mapper); - } -} diff --git a/query/src/main/java/tech/ydb/query/QuerySession.java b/query/src/main/java/tech/ydb/query/QuerySession.java index 6104f5e0..d492e33a 100644 --- a/query/src/main/java/tech/ydb/query/QuerySession.java +++ b/query/src/main/java/tech/ydb/query/QuerySession.java @@ -8,8 +8,10 @@ import tech.ydb.core.Result; import tech.ydb.core.Status; import tech.ydb.core.operation.Operation; +import tech.ydb.core.operation.OperationTray; import tech.ydb.proto.query.YdbQuery; import tech.ydb.proto.scripting.ScriptingProtos; +import tech.ydb.query.result.OperationResult; import tech.ydb.query.settings.BeginTransactionSettings; import tech.ydb.query.settings.ExecuteQuerySettings; import tech.ydb.query.settings.ExecuteScriptSettings; @@ -74,38 +76,52 @@ public interface QuerySession extends AutoCloseable { */ QueryStream createQuery(String query, TxMode tx, Params params, ExecuteQuerySettings settings); - /** - * Execute yql script - * @param query text of query - * @param params query parameters - * @param settings additional settings of query execution - * @return future with result of the script execution + * Executes a YQL script via the scripting service and returns its result as a completed future. + * + *

This method sends a YQL script for execution and collects the full result set in a single response. + * It uses {@link ScriptingProtos.ExecuteYqlRequest} under the hood and returns + * an {@link OperationResult} wrapped in {@link Result} to provide status and issues details.

+ * + * @param query the YQL script text to execute + * @param params input parameters for the script + * @param settings execution settings such as statistics collection or tracing + * @return a future that resolves to a {@link Result} containing {@link ScriptingProtos.ExecuteYqlResult} */ - CompletableFuture> executeScriptYql(String query, Params params, ExecuteScriptSettings settings); + CompletableFuture> executeScriptYql(String query, + Params params, + ExecuteScriptSettings settings); /** - * Create {@link QueryStream} for executing query with specified {@link TxMode}. The query can contain DML, DDL and - * DCL statements. Supported mix of different statement types depends on the chosen transaction type. + * Submits a YQL script for asynchronous execution and returns a handle to the operation. + * Take a not that join return future will not guarantee that script is finished. It's guarantee that script is passed to ydb * - * @param query text of query - * @param params query parameters - * @param settings additional settings of query execution - * @return a ready to execute instance of {@link QueryStream} + *

This method executes the given script asynchronously and immediately returns + * a {@link CompletableFuture} for an {@link Operation}, which can be later monitored or fetched + * via {@link #waitForScript(CompletableFuture)} or {@link #fetchScriptResults(String, Params, FetchScriptSettings)}.

+ * + * @param query the YQL script text to execute + * @param params input parameters to pass to the script + * @param settings script execution options such as TTL, statistics mode, or resource pool + * @return a future resolving to an {@link Operation} representing the submitted script execution */ CompletableFuture> executeScript(String query, Params params, ExecuteScriptSettings settings); /** - * Fetch result from script which has already passed to YDB - * Before use wait for CompletableFuture with operation + * Fetches partial or complete results from a previously executed YQL script. * - * @param query text of query - * @param params query parameters - * @param settings additional settings of query execution - * @return a ready to execute instance of {@link QueryStream} + *

This method retrieves result sets produced by an asynchronous script execution. + * It supports incremental fetching using tokens, row limits, and result set index selection.

+ * + * @param query optional query text for context (not used by the server but may help debugging) + * @param params parameters used during script execution (typically empty) + * @param settings settings that define which operation to fetch results from, including fetch token, row limit, and index + * @return a future resolving to a {@link Result} containing {@link YdbQuery.FetchScriptResultsResponse} */ - CompletableFuture> fetchScriptResults(String query, Params params, FetchScriptSettings settings); + CompletableFuture> fetchScriptResults(String query, + Params params, + FetchScriptSettings settings); @Override void close(); @@ -147,23 +163,44 @@ default CompletableFuture> beginTransaction(TxMode txMo } /** - * Execute Yql script with different type of operation + * Executes a YQL script via the scripting service and returns its result as a completed future. * - * @param query text of query - * @return a future join on it to wait for script finished + *

This method sends a YQL script for execution and collects the full result set in a single response. + * It uses {@link ScriptingProtos.ExecuteYqlRequest} under the hood and returns + * an {@link OperationResult} wrapped in {@link Result} to provide status and issues details.

+ * + * @param query the YQL script text to execute + * @return a future that resolves to a {@link Result} containing {@link ScriptingProtos.ExecuteYqlResult} */ default CompletableFuture> executeScriptYql(String query) { return executeScriptYql(query, Params.empty(), ExecuteScriptSettings.newBuilder().build()); } /** - * Execute Yql script with different type of operation - * Take a not join on a future is not granted that script is executed instead join guarantee that script pass to YDB + * Submits a YQL script for asynchronous execution and returns a handle to the operation. + * Take a not that join return future will not guarantee that script is finished. It's guarantee that script is passed to ydb * - * @param query text of query - * @return future join on it to wait for script pass to YDB but not get result + *

This method executes the given script asynchronously and immediately returns + * a {@link CompletableFuture} for an {@link Operation}, which can be later monitored or fetched + * via {@link #waitForScript(CompletableFuture)} or {@link #fetchScriptResults(String, Params, FetchScriptSettings)}.

+ * + * @param query the YQL script text to execute + * @return a future resolving to an {@link Operation} representing the submitted script execution */ default CompletableFuture> executeScript(String query) { return executeScript(query, Params.empty(), ExecuteScriptSettings.newBuilder().build()); } + + /** + * Waits for a previously submitted script operation to complete. + * + *

This method polls or fetches the state of the running operation via {@link OperationTray#fetchOperation} + * until the operation completes successfully or fails. It is typically used after calling + * {@link #executeScript(String, Params, ExecuteScriptSettings)}.

+ * + * @param scriptFuture a {@link CompletableFuture} returned by {@link #executeScript(String, Params, ExecuteScriptSettings)} + * @return a future resolving to the final {@link Status} of the script execution + */ + CompletableFuture waitForScript(CompletableFuture> scriptFuture); + } diff --git a/query/src/main/java/tech/ydb/query/impl/QueryServiceRpc.java b/query/src/main/java/tech/ydb/query/impl/QueryServiceRpc.java index b636cbc0..ee1318ef 100644 --- a/query/src/main/java/tech/ydb/query/impl/QueryServiceRpc.java +++ b/query/src/main/java/tech/ydb/query/impl/QueryServiceRpc.java @@ -18,6 +18,12 @@ import tech.ydb.proto.scripting.v1.ScriptingServiceGrpc; /** + * Low-level RPC client for YDB Query and Scripting services. + *

+ * Provides direct gRPC bindings for session management, query execution, + * transaction control, and script execution APIs. + *

+ * Used internally by higher-level query session and client abstractions. * * @author Aleksandr Gorshenin */ @@ -111,27 +117,30 @@ public GrpcReadStream executeQuery( } /** - * Run execute script using Yql + * Executes a YQL script via the scripting service. * - * @param request request for execute script - * @param settings grpc settings - * @return future with result of execution + * @param request the {@link ScriptingProtos.ExecuteYqlRequest} containing the script definition + * @param settings gRPC request settings + * @return a future resolving to an {@link Operation} with {@link ScriptingProtos.ExecuteYqlResult} */ public CompletableFuture>> executeScriptYql( ScriptingProtos.ExecuteYqlRequest request, GrpcRequestSettings settings) { return transport.unaryCall(ScriptingServiceGrpc.getExecuteYqlMethod(), settings, request) .thenApply(OperationBinder.bindAsync( - transport, ScriptingProtos.ExecuteYqlResponse::getOperation, ScriptingProtos.ExecuteYqlResult.class) + transport, + ScriptingProtos.ExecuteYqlResponse::getOperation, + ScriptingProtos.ExecuteYqlResult.class) ); } /** - * Execute script using query + * Executes a YQL script using the Query service API. * - * @param request request for execute script - * @param settings grpc settings - * @return future with result of execution + * + * @param request the {@link YdbQuery.ExecuteScriptRequest} containing the script + * @param settings gRPC request settings + * @return a future resolving to an {@link Operation} representing the script execution */ public CompletableFuture> executeScript( YdbQuery.ExecuteScriptRequest request, GrpcRequestSettings settings) { @@ -144,11 +153,14 @@ public CompletableFuture> executeScript( } /** - * Fetch script using query + * Fetches the results of a previously executed script. + * + *

This method retrieves the next portion of script execution results, + * supporting pagination and partial fetch using tokens.

* - * @param request for execute script - * @param settings grpc settings - * @return future with result of execution + * @param request the {@link YdbQuery.FetchScriptResultsRequest} specifying the fetch parameters + * @param settings gRPC request settings + * @return a future resolving to {@link Result} containing {@link YdbQuery.FetchScriptResultsResponse} */ public CompletableFuture> fetchScriptResults( YdbQuery.FetchScriptResultsRequest request, GrpcRequestSettings settings) { diff --git a/query/src/main/java/tech/ydb/query/impl/SessionImpl.java b/query/src/main/java/tech/ydb/query/impl/SessionImpl.java index cb8b78d0..14862ff2 100644 --- a/query/src/main/java/tech/ydb/query/impl/SessionImpl.java +++ b/query/src/main/java/tech/ydb/query/impl/SessionImpl.java @@ -18,13 +18,13 @@ import tech.ydb.common.transaction.TxMode; import tech.ydb.common.transaction.impl.YdbTransactionImpl; import tech.ydb.core.Issue; -import tech.ydb.core.OperationResult; import tech.ydb.core.Result; import tech.ydb.core.Status; import tech.ydb.core.StatusCode; import tech.ydb.core.grpc.GrpcReadStream; import tech.ydb.core.grpc.GrpcRequestSettings; import tech.ydb.core.operation.Operation; +import tech.ydb.core.operation.OperationTray; import tech.ydb.core.operation.StatusExtractor; import tech.ydb.core.settings.BaseRequestSettings; import tech.ydb.core.utils.URITools; @@ -35,6 +35,7 @@ import tech.ydb.query.QuerySession; import tech.ydb.query.QueryStream; import tech.ydb.query.QueryTransaction; +import tech.ydb.query.result.OperationResult; import tech.ydb.query.result.QueryInfo; import tech.ydb.query.result.QueryStats; import tech.ydb.query.settings.AttachSessionSettings; @@ -266,7 +267,10 @@ void handleTxMeta(String txID) { } @Override - public CompletableFuture> executeScriptYql(String query, Params params, ExecuteScriptSettings settings) { + public CompletableFuture> executeScriptYql( + String query, + Params params, + ExecuteScriptSettings settings) { ScriptingProtos.ExecuteYqlRequest.Builder requestBuilder = ScriptingProtos.ExecuteYqlRequest.newBuilder() .setScript(query) .setCollectStats(mapStatsCollectionMode(settings.getStatsMode())); @@ -279,7 +283,9 @@ public CompletableFuture> executeScript } @Override - public CompletableFuture> executeScript(String query, Params params, ExecuteScriptSettings settings) { + public CompletableFuture> executeScript(String query, + Params params, + ExecuteScriptSettings settings) { YdbQuery.ExecuteScriptRequest.Builder request = YdbQuery.ExecuteScriptRequest.newBuilder() .setExecMode(mapExecMode(settings.getExecMode())) .setStatsMode(mapStatsMode(settings.getStatsMode())) @@ -289,7 +295,7 @@ public CompletableFuture> executeScript(String query, Params p .build()); java.time.Duration ttl = settings.getTtl(); - if(ttl != null) { + if (ttl != null) { request.setResultsTtl(Duration.newBuilder().setNanos(settings.getTtl().getNano())); } @@ -306,20 +312,26 @@ public CompletableFuture> executeScript(String query, Params p } @Override - public CompletableFuture> fetchScriptResults(String query, Params params, FetchScriptSettings settings) { + public CompletableFuture waitForScript(CompletableFuture> scriptFuture) { + return scriptFuture.thenCompose(operation -> OperationTray.fetchOperation(operation, 1)); + } + + @Override + public CompletableFuture> + fetchScriptResults(String query, Params params, FetchScriptSettings settings) { YdbQuery.FetchScriptResultsRequest.Builder requestBuilder = YdbQuery.FetchScriptResultsRequest.newBuilder(); - if(!Strings.isNullOrEmpty(settings.getFetchToken())) { + if (!Strings.isNullOrEmpty(settings.getFetchToken())) { requestBuilder.setFetchToken(settings.getFetchToken()); } - if(settings.getRowsLimit() > 0) { + if (settings.getRowsLimit() > 0) { requestBuilder.setRowsLimit(settings.getRowsLimit()); } requestBuilder.setOperationId(settings.getOperationId()); - if(settings.getSetResultSetIndex() >= 0) { + if (settings.getSetResultSetIndex() >= 0) { requestBuilder.setResultSetIndex(settings.getSetResultSetIndex()); } diff --git a/query/src/main/java/tech/ydb/query/result/OperationResult.java b/query/src/main/java/tech/ydb/query/result/OperationResult.java new file mode 100644 index 00000000..2a07c33d --- /dev/null +++ b/query/src/main/java/tech/ydb/query/result/OperationResult.java @@ -0,0 +1,84 @@ +package tech.ydb.query.result; + +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; + +import javax.annotation.Nonnull; + +import tech.ydb.core.Result; +import tech.ydb.core.Status; +import tech.ydb.core.UnexpectedResultException; +import tech.ydb.core.operation.Operation; +import tech.ydb.query.QuerySession; +import tech.ydb.query.settings.ExecuteScriptSettings; +import tech.ydb.table.query.Params; + +/** + * Represents the result of an executed YQL script operation. + *

+ * This class wraps a {@link Operation} that contains a {@link Result} object + * and provides convenient access to both the operation metadata and the + * actual execution result. + *

+ * + *

Typically used as the return type for + * {@link QuerySession#executeScriptYql(String, Params, ExecuteScriptSettings)} + * and similar asynchronous script execution APIs.

+ * + * @param the type of value contained in the result + * + *

Author: Evgeny Kuvardin + */ +public class OperationResult implements Result { + + private final Operation> operation; + private final Result result; + + public OperationResult(Operation> resultOperation) { + this.operation = resultOperation; + this.result = operation.getValue(); + } + + /** + * Returns the underlying {@link Operation} associated with this result. + *

+ * The operation object contains metadata such as operation ID, execution status, + * and timing information. + *

+ * + * @return the wrapped {@link Operation} object + */ + public Operation> getOperation() { + return operation; + } + + @Nonnull + @Override + public Status getStatus() { + return result.getStatus(); + } + + @Nonnull + @Override + public T getValue() throws UnexpectedResultException { + return result.getValue(); + } + + @Nonnull + @Override + public Result map(@Nonnull Function mapper) { + return result.map(mapper); + } + + @Nonnull + @Override + public CompletableFuture> mapResultFuture(@Nonnull Function>> mapper) { + return result.mapResultFuture(mapper); + } + + @Nonnull + @Override + public CompletableFuture mapStatusFuture(@Nonnull Function> mapper) { + return result.mapStatusFuture(mapper); + } +} diff --git a/query/src/main/java/tech/ydb/query/settings/ExecuteScriptSettings.java b/query/src/main/java/tech/ydb/query/settings/ExecuteScriptSettings.java index e32a2648..214dc994 100644 --- a/query/src/main/java/tech/ydb/query/settings/ExecuteScriptSettings.java +++ b/query/src/main/java/tech/ydb/query/settings/ExecuteScriptSettings.java @@ -1,11 +1,15 @@ package tech.ydb.query.settings; -import tech.ydb.core.settings.BaseRequestSettings; - import java.time.Duration; +import tech.ydb.core.settings.BaseRequestSettings; + /** - * Script settings to pass to execute script + * Settings for configuring script execution requests. + *

+ * Used by {@code QuerySession.executeScript(...)} and similar APIs. + * + *

Author: Evgeny Kuvardin */ public class ExecuteScriptSettings extends BaseRequestSettings { private final QueryExecMode execMode; @@ -21,60 +25,120 @@ private ExecuteScriptSettings(Builder builder) { this.resourcePool = builder.resourcePool; } + /** + * Returns the execution mode for the script. + * + *

Defines how the script should be processed, e.g. executed, explained, validated, or parsed.

+ * + * @return the {@link QueryExecMode} used for execution + */ public QueryExecMode getExecMode() { return this.execMode; } + /** + * Returns the time-to-live (TTL) duration for the script results. + * + *

Specifies how long results of the executed script will be kept available + * before automatic cleanup on the server.

+ * + * @return the TTL value, or {@code null} if not set + */ public Duration getTtl() { return ttl; } + /** + * Returns the statistics collection mode for script execution. + * + *

Determines how detailed execution statistics should be gathered + * (none, basic, full, or profiling level).

+ * + * @return the {@link QueryStatsMode} used for statistics collection + */ public QueryStatsMode getStatsMode() { return this.statsMode; } /** - * Get resource pool for query execution - * @return resource pool name + * Returns the name of the resource pool assigned to the script execution. + * + *

Resource pools define isolated resource groups for workload management. + * If not specified, the default pool is used.

+ * + * @return the resource pool name, or {@code null} if not set */ public String getResourcePool() { return this.resourcePool; } + /** + * Creates a new {@link Builder} instance for constructing {@link ExecuteScriptSettings}. + * + * @return a new builder + */ public static Builder newBuilder() { return new Builder(); } + /** + * Builder for creating immutable {@link ExecuteScriptSettings} instances. + *

+ * Provides fluent configuration for script execution settings + */ public static class Builder extends BaseBuilder { private QueryExecMode execMode = QueryExecMode.EXECUTE; private QueryStatsMode statsMode = QueryStatsMode.NONE; private String resourcePool = null; private Duration ttl = null; + /** + * Sets the execution mode for the script. + * + * @param mode the desired execution mode + * @return this builder instance for chaining + * @see QueryExecMode + */ public Builder withExecMode(QueryExecMode mode) { this.execMode = mode; return this; } + /** + * Sets the statistics collection mode for the script execution. + * + * @param mode the desired statistics mode + * @return this builder instance for chaining + * @see QueryStatsMode + */ public Builder withStatsMode(QueryStatsMode mode) { this.statsMode = mode; return this; } + /** + * Sets the time-to-live (TTL) duration for script results. + * + *

After this duration expires, stored script results may be deleted + * from the server automatically.

+ * + * @param value the TTL duration + * @return this builder instance for chaining + */ public Builder withTtl(Duration value) { this.ttl = value; return this; } /** - * Set resource pool which query try to use. - * If no pool specify or poolId is empty or poolId equals "default" - * the unremovable resource pool "default" will be used + * Specifies the resource pool to use for query execution. + *

+ * If no pool is specified, or the ID is empty, or equal to {@code "default"}, + * the unremovable resource pool "default" will be used. * * @param poolId resource pool identifier - * - * @return builder + * @return this builder instance for chaining */ public Builder withResourcePool(String poolId) { this.resourcePool = poolId; diff --git a/query/src/main/java/tech/ydb/query/settings/FetchScriptSettings.java b/query/src/main/java/tech/ydb/query/settings/FetchScriptSettings.java index 092ab00b..cd33de38 100644 --- a/query/src/main/java/tech/ydb/query/settings/FetchScriptSettings.java +++ b/query/src/main/java/tech/ydb/query/settings/FetchScriptSettings.java @@ -3,7 +3,13 @@ import tech.ydb.core.settings.BaseRequestSettings; /** - * Script fetch settings to pass to fetch script + * Settings for configuring the fetch phase of a previously executed YQL script. + *

+ * These settings define which operation results to fetch, pagination options, + * row limits, and which result set index to retrieve. + * Used with {@code QuerySession.fetchScriptResults(...)} and similar APIs. + * + *

Author: Evgeny Kuvardin */ public class FetchScriptSettings extends BaseRequestSettings { private final String operationId; @@ -19,18 +25,49 @@ private FetchScriptSettings(Builder builder) { this.setResultSetIndex = builder.setResultSetIndex; } + /** + * Returns the identifier of the operation whose results should be fetched. + * + *

This ID corresponds to the operation returned by + * {@code QuerySession.executeScript(...)} or a similar asynchronous call.

+ * + * @return the operation ID string + */ public String getOperationId() { return operationId; } + /** + * Returns the fetch token used to continue fetching paginated results. + * + *

When a previous fetch request indicates more data is available, + * this token can be used to retrieve the next portion of results.

+ * + * @return the fetch token, or an empty string if not set + */ public String getFetchToken() { return fetchToken; } + /** + * Returns the maximum number of rows to retrieve in this fetch request. + * + *

If not set , the server will use its default limit.

+ * + * @return the maximum number of rows to fetch + */ public int getRowsLimit() { return rowsLimit; } + /** + * Returns the index of the result set to fetch from the executed script. + * + *

When the executed script produces multiple result sets, + * this value specifies which one to retrieve (starting from 0).

+ * + * @return the result set index + */ public int getSetResultSetIndex() { return setResultSetIndex; } @@ -40,29 +77,17 @@ public static Builder newBuilder() { } public static class Builder extends BaseBuilder { - public int getRowsLimit() { - return rowsLimit; - } - - public Builder setRowsLimit(int rowsLimit) { - this.rowsLimit = rowsLimit; - return this; - } - - public int getSetResultSetIndex() { - return setResultSetIndex; - } - public Builder setSetResultSetIndex(int setResultSetIndex) { - this.setResultSetIndex = setResultSetIndex; - return this; - } - - public int rowsLimit = -1; - public int setResultSetIndex = 0; + private int rowsLimit = 0; + private int setResultSetIndex = 0; private String operationId = ""; private String fetchToken = ""; + @Override + public FetchScriptSettings build() { + return new FetchScriptSettings(this); + } + public Builder withEOperationId(String operationId) { this.operationId = operationId; return this; @@ -73,9 +98,15 @@ public Builder withFetchToken(String fetchToken) { return this; } - @Override - public FetchScriptSettings build() { - return new FetchScriptSettings(this); + public Builder withRowsLimit(int rowsLimit) { + this.rowsLimit = rowsLimit; + return this; + } + + public Builder withSetResultSetIndex(int setResultSetIndex) { + this.setResultSetIndex = setResultSetIndex; + return this; } + } } diff --git a/query/src/main/java/tech/ydb/query/tools/SessionRetryContext.java b/query/src/main/java/tech/ydb/query/tools/SessionRetryContext.java index 1f69c378..34ec9d4b 100644 --- a/query/src/main/java/tech/ydb/query/tools/SessionRetryContext.java +++ b/query/src/main/java/tech/ydb/query/tools/SessionRetryContext.java @@ -304,7 +304,7 @@ private final class RetryableOperationTask extends BaseRetryableTask result) { - if(result.getValue() == null) { + if (result.getValue() == null) { return StatusCode.SUCCESS; } return result.getValue().getCode(); diff --git a/query/src/test/java/tech/ydb/query/ScriptExampleTest.java b/query/src/test/java/tech/ydb/query/ScriptExampleTest.java index d6b22600..db4d6882 100644 --- a/query/src/test/java/tech/ydb/query/ScriptExampleTest.java +++ b/query/src/test/java/tech/ydb/query/ScriptExampleTest.java @@ -1,9 +1,11 @@ package tech.ydb.query; +import java.time.Duration; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; +import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; @@ -15,10 +17,12 @@ import tech.ydb.core.Status; import tech.ydb.core.operation.Operation; import tech.ydb.core.operation.OperationTray; +import tech.ydb.proto.ValueProtos; import tech.ydb.proto.query.YdbQuery; import tech.ydb.proto.scripting.ScriptingProtos; import tech.ydb.query.settings.ExecuteScriptSettings; import tech.ydb.query.settings.FetchScriptSettings; +import tech.ydb.query.settings.QueryExecMode; import tech.ydb.query.tools.QueryReader; import tech.ydb.query.tools.SessionRetryContext; import tech.ydb.table.query.Params; @@ -33,7 +37,18 @@ /** + * Integration tests that validate the execution of YQL scripts + * using the YDB Query API and scripting features. * + *

Tests cover: + *

    + *
  • Script execution with and without parameters
  • + *
  • Error handling in scripts
  • + *
  • Sequential script execution
  • + *
  • Fetching results from executed scripts
  • + *
+ * + *

Author: Evgeny Kuvardin */ public class ScriptExampleTest { @@ -89,41 +104,23 @@ public static void init() { Assert.assertNotNull(client.getScheduler()); } - @AfterClass - public static void clean() { + @After + public void clean() { retryCtx.supplyResult(session -> session.createQuery("DROP TABLE series;", TxMode.NONE).execute()) .join(); retryCtx.supplyResult(session -> session.createQuery("DROP TABLE seasons;", TxMode.NONE).execute()) .join(); + } + @AfterClass + public static void cleanAll() { client.close(); } - /** - * Simple test that check script without parameters execute without errors - */ @Test public void createScript() { - Result result0 = retryCtx.supplyResult(session -> session.executeScriptYql( - "CREATE TABLE series (" - + " series_id UInt64," - + " title Text," - + " series_info Text," - + " release_date Date," - + " PRIMARY KEY(series_id)" - + ");" - + "CREATE TABLE seasons (" - + " series_id UInt64," - + " season_id UInt64," - + " title Text," - + " first_aired Date," - + " last_aired Date," - + " PRIMARY KEY(series_id, season_id)" - + ")" - ) - ).join(); - - Assert.assertTrue(result0.isSuccess()); + Status status = runCreateSuccessScript(); + Assert.assertTrue(status.isSuccess()); String query = "SELECT series_id, title, release_date " @@ -141,30 +138,30 @@ public void createScript() { } /** - * Simple test that check script without parameters execute with errors if script has errors in text + * Ensures that script execution fails when it contains syntax errors. + *

+ * Attempts to execute a malformed YQL script and verifies that the result + * indicates failure. */ @Test public void createScriptShouldFail() { - Result operationResult = retryCtx.supplyResult(session -> session.executeScriptYql( - "CREATE TABLE series (" - + " series_id UInt64," - + " title Text," - + " series_info Text," - + " release_date Date," - + " PRIMARY KEY(series_id)" - + ");" - + "ZCREATE TABLE seasons (" - + " series_id UInt64," - + " season_id UInt64," - + " title Text," - + " first_aired Date," - + " last_aired Date," - + " PRIMARY KEY(series_id, season_id)" - + ")" - ) - ).join(); - - Assert.assertFalse(operationResult.isSuccess()); + Status statusOperation = runCreateScript("CREATE TABLE series (" + + " series_id UInt64," + + " title Text," + + " series_info Text," + + " release_date Date," + + " PRIMARY KEY(series_id)" + + ");" + + "ZCREATE TABLE seasons (" + + " series_id UInt64," + + " season_id UInt64," + + " title Text," + + " first_aired Date," + + " last_aired Date," + + " PRIMARY KEY(series_id, season_id)" + + ")"); + + Assert.assertFalse(statusOperation.isSuccess()); String query = "SELECT series_id, title, release_date " @@ -181,36 +178,20 @@ public void createScriptShouldFail() { } /** - * Test create and then insert data using {@link ScriptingProtos} proto + * Verifies creation and data insertion using the {@link ScriptingProtos.ExecuteYqlResult} proto interface. + *

+ * Creates the necessary tables, inserts test data via declared parameters, + * and validates that the data was successfully persisted. */ @Test public void createInsertYqlScript() { - Result task = retryCtx.supplyResult(session -> session.executeScriptYql("" - + "CREATE TABLE series (" - + " series_id UInt64," - + " title Text," - + " series_info Text," - + " release_date Date," - + " PRIMARY KEY(series_id)" - + ");" - + "" - + "CREATE TABLE seasons (" - + " series_id UInt64," - + " season_id UInt64," - + " title Text," - + " first_aired Date," - + " last_aired Date," - + " PRIMARY KEY(series_id, season_id)" - + ")" - ) - ).join(); - + runCreateSuccessScript(); ExecuteScriptSettings executeScriptSettings = tech.ydb.query.settings.ExecuteScriptSettings.newBuilder() // .withExecMode(QueryExecMode.EXECUTE) .build(); - Result task1 = retryCtx.supplyResult(session -> session.executeScriptYql("" + retryCtx.supplyResult(session -> session.executeScriptYql("" + "DECLARE $values AS List + * Creates tables, then inserts data in a separate script execution, and + * verifies data persistence. */ @Test public void createInsertQueryScript() { - CompletableFuture> s = - retryCtx.supplyOperation(querySession -> querySession.executeScript("" - + "CREATE TABLE series (" - + " series_id UInt64," - + " title Text," - + " series_info Text," - + " release_date Date," - + " PRIMARY KEY(series_id)" - + ");" - + "" - + "CREATE TABLE seasons (" - + " series_id UInt64," - + " season_id UInt64," - + " title Text," - + " first_aired Date," - + " last_aired Date," - + " PRIMARY KEY(series_id, season_id)" - + ")")); - - retryCtx.supplyStatus( - ss -> - s.thenCompose(operation -> OperationTray.fetchOperation(operation, 1)) - ).join(); - + runCreateSuccessScript(); ExecuteScriptSettings executeScriptSettings = tech.ydb.query.settings.ExecuteScriptSettings.newBuilder() + .withExecMode(QueryExecMode.EXECUTE) + .withTtl(Duration.ofSeconds(10)) .build(); - CompletableFuture> test = retryCtx.supplyOperation(querySession -> querySession.executeScript("" + "DECLARE $values AS List>;" - + "DECLARE $values1 AS List>;" + + "DECLARE $values1 AS List>;" + "UPSERT INTO seasons SELECT * FROM AS_TABLE($values);" + "UPSERT INTO series SELECT * FROM AS_TABLE($values1);", Params.of("$values", seasonsData, "$values1", seriesData), executeScriptSettings) @@ -320,38 +281,27 @@ public void createInsertQueryScript() { } /** - * Check fetch result + * Tests fetching results from an executed script using {@link FetchScriptSettings}. + * + *

Scenario: + *

    + *
  1. Create tables
  2. + *
  3. Insert sample data via parameterized script
  4. + *
  5. Fetch the result set from the executed operation
  6. + *
* - * @throws ExecutionException - * @throws InterruptedException + * @throws ExecutionException if the script future fails + * @throws InterruptedException if the fetch operation is interrupted */ @Test public void fetchScript() throws ExecutionException, InterruptedException { - Result result0 = retryCtx.supplyResult(session -> session.executeScriptYql("" - + "CREATE TABLE series (" - + " series_id UInt64," - + " title Text," - + " series_info Text," - + " release_date Date," - + " PRIMARY KEY(series_id)" - + ");" - + "" - + "CREATE TABLE seasons (" - + " series_id UInt64," - + " season_id UInt64," - + " title Text," - + " first_aired Date," - + " last_aired Date," - + " PRIMARY KEY(series_id, season_id)" - + ")" - ) - ).join(); + runCreateSuccessScript(); ExecuteScriptSettings executeScriptSettings = tech.ydb.query.settings.ExecuteScriptSettings.newBuilder() - // .withExecMode(QueryExecMode.EXECUTE) + .withExecMode(QueryExecMode.EXECUTE) .build(); - CompletableFuture> s = + CompletableFuture> updateScript = retryCtx.supplyOperation(querySession -> querySession.executeScript("" + "DECLARE $values AS List>;" - + "DECLARE $values1 AS List>;" + + "DECLARE $values1 AS List>;" + "UPSERT INTO seasons SELECT * FROM AS_TABLE($values);" - + "UPSERT INTO series SELECT * FROM AS_TABLE($values1);" + - "SELECT series_id FROM seasons where series_id = 1 order by series_id;", + + "UPSERT INTO series SELECT * FROM AS_TABLE($values1);" + + "SELECT season_id FROM seasons where series_id = 1 order by series_id;", Params.of("$values", seasonsData, "$values1", seriesData), executeScriptSettings)); retryCtx.supplyStatus( ss -> - s.thenCompose(operation -> OperationTray.fetchOperation(operation, 1)) + updateScript.thenCompose(operation -> OperationTray.fetchOperation(operation, 1)) ).join(); - FetchScriptSettings fetchScriptSettings = FetchScriptSettings.newBuilder() - .setRowsLimit(2) - .setSetResultSetIndex(0) - .withEOperationId(s.get().getId()) + FetchScriptSettings fetchScriptSettings1 = FetchScriptSettings.newBuilder() + .withRowsLimit(1) + .withSetResultSetIndex(0) + .withEOperationId(updateScript.get().getId()) .withFetchToken("") .build(); + YdbQuery.FetchScriptResultsResponse rs = checkFetch(fetchScriptSettings1, 1); + + FetchScriptSettings fetchScriptSettings2 = FetchScriptSettings.newBuilder() + .withRowsLimit(1) + .withSetResultSetIndex(0) + .withEOperationId(updateScript.get().getId()) + .withFetchToken(rs.getNextFetchToken()) + .build(); + + checkFetch(fetchScriptSettings2, 2); + } + private YdbQuery.FetchScriptResultsResponse checkFetch(FetchScriptSettings fetchScriptSettings, int value) { Result test = retryCtx.supplyResult( session -> session.fetchScriptResults("" - + "SELECT series_id FROM seasons;", + + "SELECT season_id FROM seasons;", Params.empty(), fetchScriptSettings) ).join(); - tech.ydb.proto.ValueProtos.ResultSet resultSet = test.getValue().getResultSet(); + ValueProtos.ResultSet resultSet = test.getValue().getResultSet(); + Assert.assertEquals(1, resultSet.getRowsCount()); - - Assert.assertEquals(2, resultSet.getRowsCount()); ResultSetReader reader = ProtoValueReaders.forResultSet(resultSet); reader.next(); - Assert.assertEquals(1, reader.getColumn(0).getUint64()); - reader.next(); - Assert.assertEquals(1, reader.getColumn(0).getUint64()); + Assert.assertEquals(value, reader.getColumn(0).getUint64()); + return test.getValue(); + } + + private Status runCreateSuccessScript() { + return runCreateScript("" + + "CREATE TABLE series (" + + " series_id UInt64," + + " title Text," + + " series_info Text," + + " release_date Date," + + " PRIMARY KEY(series_id)" + + ");" + + "" + + "CREATE TABLE seasons (" + + " series_id UInt64," + + " season_id UInt64," + + " title Text," + + " first_aired Date," + + " last_aired Date," + + " PRIMARY KEY(series_id, season_id)" + + ")"); + } + + private Status runCreateScript(String query) { + return retryCtx.supplyStatus( + querySession -> querySession.waitForScript( + retryCtx.supplyOperation(ss -> querySession.executeScript(query)))).join(); } }