From 9333397e3d4b7fc6e57a77905b328342079d43ba Mon Sep 17 00:00:00 2001 From: Max Zinal Date: Tue, 27 May 2025 16:19:10 +0300 Subject: [PATCH 1/7] initial implementation for UNDETERMINED processing --- .../jdbc/context/QueryServiceExecutor.java | 16 +-- .../jdbc/context/QueryServiceExecutorExt.java | 128 ++++++++++++++++++ .../tech/ydb/jdbc/context/YdbContext.java | 57 +++++++- .../tech/ydb/jdbc/context/YdbValidator.java | 7 +- .../java/tech/ydb/jdbc/query/YdbQuery.java | 17 ++- .../tech/ydb/jdbc/query/YdbQueryParser.java | 13 +- .../tech/ydb/jdbc/settings/YdbConfig.java | 2 + .../jdbc/settings/YdbOperationProperties.java | 20 +++ .../ydb/jdbc/query/YdbQueryParserTest.java | 12 ++ .../settings/YdbDriverProperitesTest.java | 4 + 10 files changed, 257 insertions(+), 19 deletions(-) create mode 100644 jdbc/src/main/java/tech/ydb/jdbc/context/QueryServiceExecutorExt.java diff --git a/jdbc/src/main/java/tech/ydb/jdbc/context/QueryServiceExecutor.java b/jdbc/src/main/java/tech/ydb/jdbc/context/QueryServiceExecutor.java index f8fd06ab..30adcb79 100644 --- a/jdbc/src/main/java/tech/ydb/jdbc/context/QueryServiceExecutor.java +++ b/jdbc/src/main/java/tech/ydb/jdbc/context/QueryServiceExecutor.java @@ -42,17 +42,17 @@ * @author Aleksandr Gorshenin */ public class QueryServiceExecutor extends BaseYdbExecutor { - private final Duration sessionTimeout; - private final QueryClient queryClient; + protected final Duration sessionTimeout; + protected final QueryClient queryClient; private final boolean useStreamResultSet; - private int transactionLevel; - private boolean isReadOnly; - private boolean isAutoCommit; - private TxMode txMode; + protected int transactionLevel; + protected boolean isReadOnly; + protected boolean isAutoCommit; + protected TxMode txMode; - private final AtomicReference tx = new AtomicReference<>(); - private volatile boolean isClosed; + protected final AtomicReference tx = new AtomicReference<>(); + protected volatile boolean isClosed; public QueryServiceExecutor(YdbContext ctx, int transactionLevel, boolean autoCommit) throws SQLException { super(ctx); diff --git a/jdbc/src/main/java/tech/ydb/jdbc/context/QueryServiceExecutorExt.java b/jdbc/src/main/java/tech/ydb/jdbc/context/QueryServiceExecutorExt.java new file mode 100644 index 00000000..c5df23da --- /dev/null +++ b/jdbc/src/main/java/tech/ydb/jdbc/context/QueryServiceExecutorExt.java @@ -0,0 +1,128 @@ +package tech.ydb.jdbc.context; + +import java.sql.SQLException; + +import tech.ydb.core.Result; +import tech.ydb.core.Status; +import tech.ydb.core.StatusCode; +import tech.ydb.jdbc.YdbStatement; +import tech.ydb.jdbc.YdbTracer; +import tech.ydb.jdbc.impl.YdbQueryResult; +import tech.ydb.jdbc.query.YdbQuery; +import tech.ydb.query.QueryTransaction; +import tech.ydb.table.query.DataQueryResult; +import tech.ydb.table.query.Params; +import tech.ydb.table.transaction.TxControl; +import tech.ydb.table.values.PrimitiveValue; + +/** + * + * @author mzinal + */ +public class QueryServiceExecutorExt extends QueryServiceExecutor { + + private final String processUndeterminedTable; + private boolean writing; + + public QueryServiceExecutorExt(YdbContext ctx, int transactionLevel, boolean autoCommit) throws SQLException { + super(ctx, transactionLevel, autoCommit); + this.processUndeterminedTable = ctx.getOperationProperties().getProcessUndeterminedTable(); + this.writing = false; + } + + private Status upsertAndCommit(QueryTransaction localTx) { + String sql = "DECLARE $trans_id AS Text; " + + "UPSERT INTO `" + processUndeterminedTable + + "` (trans_id, trans_tv) VALUES ($trans_id, CurrentUtcTimestamp());"; + Params params = Params.of("$trans_id", PrimitiveValue.newText(localTx.getId())); + return localTx.createQueryWithCommit(sql, params) + .execute() + .join() + .getStatus(); + } + + private boolean checkTransaction(YdbContext ctx, String transId, + YdbValidator validator, YdbTracer tracer) throws SQLException { + String sql = "DECLARE $trans_id AS Text; " + + "SELECT trans_id, trans_tv FROM `" + processUndeterminedTable + + "` WHERE trans_id=$trans_id;"; + Params params = Params.of("$trans_id", PrimitiveValue.newText(transId)); + Result result = ctx.getRetryCtx().supplyResult( + session -> session.executeDataQuery(sql, TxControl.onlineRo(), params)) + .join(); + if (!result.getStatus().isSuccess()) { + // Failed to obtain the transaction status, have to return the error + validator.validate("CommitVal TxId: " + transId, tracer, result.getStatus()); + } + DataQueryResult dqr = result.getValue(); + if (dqr.getResultSetCount() == 1) { + if (dqr.getResultSet(0).getRowCount() == 1) { + return true; + } + } + return false; + } + + private void commitWithCheck(YdbContext ctx, YdbValidator validator) throws SQLException { + ensureOpened(); + + QueryTransaction localTx = tx.get(); + if (localTx == null || !localTx.isActive()) { + return; + } + + YdbTracer tracer = ctx.getTracer(); + tracer.trace("--> commitExt"); + tracer.query(null); + + try { + validator.clearWarnings(); + Status status = upsertAndCommit(localTx); + if (StatusCode.UNDETERMINED.equals(status.getCode())) { + if (!checkTransaction(ctx, localTx.getId(), validator, tracer)) { + status = Status.of(StatusCode.ABORTED, status.getCause(), status.getIssues()); + } + } + validator.validate("CommitExt TxId: " + localTx.getId(), tracer, status); + } finally { + if (tx.compareAndSet(localTx, null)) { + localTx.getSession().close(); + } + tracer.close(); + } + } + + @Override + public void commit(YdbContext ctx, YdbValidator validator) throws SQLException { + try { + if (isInsideTransaction() && writing) { + commitWithCheck(ctx, validator); + } else { + super.commit(ctx, validator); + } + } finally { + writing = false; + } + } + + @Override + public void rollback(YdbContext ctx, YdbValidator validator) throws SQLException { + try { + super.rollback(ctx, validator); + } finally { + writing = false; + } + } + + @Override + public YdbQueryResult executeDataQuery( + YdbStatement statement, YdbQuery query, String preparedYql, Params params, long timeout, boolean keepInCache + ) throws SQLException { + YdbQueryResult yqr = super.executeDataQuery(statement, query, preparedYql, params, timeout, keepInCache); + if (query.isWriting() && !isAutoCommit) { + writing = true; + } + return yqr; + } + +} diff --git a/jdbc/src/main/java/tech/ydb/jdbc/context/YdbContext.java b/jdbc/src/main/java/tech/ydb/jdbc/context/YdbContext.java index fad0c0d3..74f5ee74 100644 --- a/jdbc/src/main/java/tech/ydb/jdbc/context/YdbContext.java +++ b/jdbc/src/main/java/tech/ydb/jdbc/context/YdbContext.java @@ -18,6 +18,7 @@ import com.google.common.cache.CacheBuilder; import tech.ydb.core.Result; +import tech.ydb.core.Status; import tech.ydb.core.UnexpectedResultException; import tech.ydb.core.grpc.GrpcTransport; import tech.ydb.core.grpc.GrpcTransportBuilder; @@ -190,9 +191,16 @@ public boolean isTxTracerEnabled() { public YdbExecutor createExecutor() throws SQLException { if (config.isUseQueryService()) { - return new QueryServiceExecutor(this, operationProps.getTransactionLevel(), operationProps.isAutoCommit()); + if (operationProps.getProcessUndetermined()) { + return new QueryServiceExecutorExt( + this, operationProps.getTransactionLevel(), operationProps.isAutoCommit()); + } else { + return new QueryServiceExecutor( + this, operationProps.getTransactionLevel(), operationProps.isAutoCommit()); + } } else { - return new TableServiceExecutor(this, operationProps.getTransactionLevel(), operationProps.isAutoCommit()); + return new TableServiceExecutor( + this, operationProps.getTransactionLevel(), operationProps.isAutoCommit()); } } @@ -270,6 +278,7 @@ public void deregister() { } public static YdbContext createContext(YdbConfig config) throws SQLException { + GrpcTransport grpcTransport = null; try { LOGGER.log(Level.FINE, "Creating new YDB context to {0}", config.getConnectionString()); @@ -293,7 +302,7 @@ public static YdbContext createContext(YdbConfig config) throws SQLException { }); }); - GrpcTransport grpcTransport = builder.build(); + grpcTransport = builder.build(); PooledTableClient.Builder tableClient = PooledTableClient.newClient( GrpcTableRpc.useTransport(grpcTransport) @@ -302,9 +311,25 @@ public static YdbContext createContext(YdbConfig config) throws SQLException { boolean autoResize = clientProps.applyToTableClient(tableClient, queryClient); - return new YdbContext(config, operationProps, queryProps, grpcTransport, + YdbContext yc = new YdbContext(config, operationProps, queryProps, grpcTransport, tableClient.build(), queryClient.build(), autoResize); + if (operationProps.getProcessUndetermined()) { + if (config.isUseQueryService()) { + yc.ensureTransactionTableExists(); + } else { + LOGGER.log(Level.WARNING, "UNDETERMINED processing is disabled, " + + "because it is only supported for QueryService execution mode."); + } + } + return yc; } catch (RuntimeException ex) { + if (grpcTransport != null) { + try { + grpcTransport.close(); + } catch (Exception exClose) { + LOGGER.log(Level.FINE, "Issue when closing gRPC transport", exClose); + } + } StringBuilder sb = new StringBuilder("Cannot connect to YDB: ").append(ex.getMessage()); Throwable cause = ex.getCause(); while (cause != null) { @@ -315,6 +340,30 @@ public static YdbContext createContext(YdbConfig config) throws SQLException { } } + public void ensureTransactionTableExists() throws SQLException { + String tableName = operationProps.getProcessUndeterminedTable(); + if (tableName.isEmpty()) { + return; + } + LOGGER.log(Level.FINE, "Using table {} for UNDETERMINED processing", tableName); + String sqlCreate = "CREATE TABLE IF NOT EXISTS `" + tableName + + "` (trans_id Text NOT NULL, trans_tv Timestamp," + + " PRIMARY KEY (trans_id)) WITH (" + + "TTL=Interval('PT60M') ON trans_tv," + + "AUTO_PARTITIONING_MIN_PARTITIONS_COUNT=100," + + "AUTO_PARTITIONING_MAX_PARTITIONS_COUNT=150," + + "AUTO_PARTITIONING_BY_LOAD=ENABLED," + + "AUTO_PARTITIONING_BY_SIZE=ENABLED," + + "AUTO_PARTITIONING_PARTITION_SIZE_MB=100" + + ");"; + Status status = retryCtx.supplyStatus( + session -> session.executeSchemeQuery(sqlCreate)) + .join(); + new YdbValidator().validate( + "Create table " + tableName, + getTracer(), status); + } + public > T withDefaultTimeout(T settings) { Duration operation = operationProps.getDeadlineTimeout(); if (!operation.isZero() && !operation.isNegative()) { diff --git a/jdbc/src/main/java/tech/ydb/jdbc/context/YdbValidator.java b/jdbc/src/main/java/tech/ydb/jdbc/context/YdbValidator.java index 512854c8..3f5ad3d2 100644 --- a/jdbc/src/main/java/tech/ydb/jdbc/context/YdbValidator.java +++ b/jdbc/src/main/java/tech/ydb/jdbc/context/YdbValidator.java @@ -54,8 +54,7 @@ public void clearWarnings() { this.issues.clear(); } - public void execute(String msg, YdbTracer tracer, Supplier> fn) throws SQLException { - Status status = fn.get().join(); + public void validate(String msg, YdbTracer tracer, Status status) throws SQLException { addStatusIssues(status); tracer.trace("<-- " + status.toString()); @@ -67,6 +66,10 @@ public void execute(String msg, YdbTracer tracer, Supplier> fn) throws SQLException { + validate(msg, tracer, fn.get().join()); + } + public R call(String msg, Supplier>> fn) throws SQLException { try { Result result = fn.get().join(); diff --git a/jdbc/src/main/java/tech/ydb/jdbc/query/YdbQuery.java b/jdbc/src/main/java/tech/ydb/jdbc/query/YdbQuery.java index 35384f62..8225f5e3 100644 --- a/jdbc/src/main/java/tech/ydb/jdbc/query/YdbQuery.java +++ b/jdbc/src/main/java/tech/ydb/jdbc/query/YdbQuery.java @@ -1,7 +1,5 @@ package tech.ydb.jdbc.query; - - import java.sql.SQLException; import java.util.List; @@ -19,8 +17,10 @@ public class YdbQuery { private final QueryType type; private final boolean isPlainYQL; + private final boolean writing; - YdbQuery(String originQuery, String preparedYQL, List stats, YqlBatcher batcher, QueryType type) { + YdbQuery(String originQuery, String preparedYQL, List stats, + YqlBatcher batcher, QueryType type, boolean writing) { this.originQuery = originQuery; this.preparedYQL = preparedYQL; this.statements = stats; @@ -32,6 +32,7 @@ public class YdbQuery { hasJdbcParameters = hasJdbcParameters || st.hasJdbcParameters(); } this.isPlainYQL = !hasJdbcParameters; + this.writing = writing; } public QueryType getType() { @@ -58,11 +59,16 @@ public List getStatements() { return statements; } + public boolean isWriting() { + return writing; + } + public static YdbQuery parseQuery(String query, YdbQueryProperties opts) throws SQLException { YdbQueryParser parser = new YdbQueryParser( query, opts.isDetectQueryType(), opts.isDetectJdbcParameters(), opts.isReplaceJdbcInByYqlList() ); String preparedYQL = parser.parseSQL(); + boolean writing = false; QueryType type = null; YqlBatcher batcher = parser.getYqlBatcher(); @@ -86,7 +92,10 @@ public static YdbQuery parseQuery(String query, YdbQueryProperties opts) throws if (type == null) { type = parser.detectQueryType(); } + if (type == QueryType.DATA_QUERY) { + writing = parser.detectWriting(); + } - return new YdbQuery(query, preparedYQL, statements, batcher, type); + return new YdbQuery(query, preparedYQL, statements, batcher, type, writing); } } diff --git a/jdbc/src/main/java/tech/ydb/jdbc/query/YdbQueryParser.java b/jdbc/src/main/java/tech/ydb/jdbc/query/YdbQueryParser.java index 8cde00c7..080e34b7 100644 --- a/jdbc/src/main/java/tech/ydb/jdbc/query/YdbQueryParser.java +++ b/jdbc/src/main/java/tech/ydb/jdbc/query/YdbQueryParser.java @@ -1,6 +1,5 @@ package tech.ydb.jdbc.query; - import java.sql.SQLException; import java.sql.SQLFeatureNotSupportedException; import java.util.ArrayList; @@ -62,6 +61,18 @@ public QueryType detectQueryType() throws SQLException { return type != null ? type : QueryType.DATA_QUERY; } + public boolean detectWriting() { + for (QueryStatement st: statements) { + switch (st.getCmd()) { + case INSERT_UPSERT: + case UPDATE_REPLACE_DELETE: + return true; + default: + } + } + return false; + } + @SuppressWarnings("MethodLength") public String parseSQL() throws SQLException { int fragmentStart = 0; diff --git a/jdbc/src/main/java/tech/ydb/jdbc/settings/YdbConfig.java b/jdbc/src/main/java/tech/ydb/jdbc/settings/YdbConfig.java index ee7ae8fd..5c7b6403 100644 --- a/jdbc/src/main/java/tech/ydb/jdbc/settings/YdbConfig.java +++ b/jdbc/src/main/java/tech/ydb/jdbc/settings/YdbConfig.java @@ -209,6 +209,8 @@ public DriverPropertyInfo[] toPropertyInfo() throws SQLException { YdbClientProperties.SESSION_POOL_SIZE_MIN.toInfo(properties), YdbClientProperties.SESSION_POOL_SIZE_MAX.toInfo(properties), + YdbOperationProperties.PROCESS_UNDETERMINED_TABLE.toInfo(properties), + YdbOperationProperties.PROCESS_UNDETERMINED.toInfo(properties), YdbOperationProperties.USE_STREAM_RESULT_SETS.toInfo(properties), YdbOperationProperties.JOIN_DURATION.toInfo(properties), YdbOperationProperties.QUERY_TIMEOUT.toInfo(properties), diff --git a/jdbc/src/main/java/tech/ydb/jdbc/settings/YdbOperationProperties.java b/jdbc/src/main/java/tech/ydb/jdbc/settings/YdbOperationProperties.java index 74c7bd6b..75fac9d0 100644 --- a/jdbc/src/main/java/tech/ydb/jdbc/settings/YdbOperationProperties.java +++ b/jdbc/src/main/java/tech/ydb/jdbc/settings/YdbOperationProperties.java @@ -57,6 +57,14 @@ public class YdbOperationProperties { "Use stream implementation of ResultSet", false ); + static final YdbProperty PROCESS_UNDETERMINED = YdbProperty.bool("processUndetermined", + "Enable automatic processing of UNDETERMINED errors", false + ); + + static final YdbProperty PROCESS_UNDETERMINED_TABLE = YdbProperty.string("processUndeterminedTable", + "Name of working table for automatic processing of UNDETERMINED errors", "ydb_transactions" + ); + private static final int MAX_ROWS = 1000; // TODO: how to figure out the max rows of current connection? private final YdbValue joinDuration; @@ -73,6 +81,8 @@ public class YdbOperationProperties { private final YdbValue bulkQueryTxMode; private final YdbValue useStreamResultSets; + private final YdbValue processUndetermined; + private final YdbValue processUndeterminedTable; public YdbOperationProperties(YdbConfig config) throws SQLException { Properties props = config.getProperties(); @@ -91,6 +101,8 @@ public YdbOperationProperties(YdbConfig config) throws SQLException { this.bulkQueryTxMode = BULK_QUERY_TX_MODE.readValue(props); this.useStreamResultSets = USE_STREAM_RESULT_SETS.readValue(props); + this.processUndetermined = PROCESS_UNDETERMINED.readValue(props); + this.processUndeterminedTable = PROCESS_UNDETERMINED_TABLE.readValue(props); } public Duration getJoinDuration() { @@ -141,6 +153,14 @@ public boolean getUseStreamResultSets() { return useStreamResultSets.getValue(); } + public boolean getProcessUndetermined() { + return processUndetermined.getValue(); + } + + public String getProcessUndeterminedTable() { + return processUndeterminedTable.getValue(); + } + public int getMaxRows() { return MAX_ROWS; } diff --git a/jdbc/src/test/java/tech/ydb/jdbc/query/YdbQueryParserTest.java b/jdbc/src/test/java/tech/ydb/jdbc/query/YdbQueryParserTest.java index c0cf5333..d83a3faa 100644 --- a/jdbc/src/test/java/tech/ydb/jdbc/query/YdbQueryParserTest.java +++ b/jdbc/src/test/java/tech/ydb/jdbc/query/YdbQueryParserTest.java @@ -118,6 +118,7 @@ public void yqlUpsertTest() throws SQLException { Assertions.assertEquals(QueryType.UNKNOWN, parser.getStatements().get(0).getType()); Assertions.assertEquals(QueryType.UNKNOWN, parser.getStatements().get(1).getType()); Assertions.assertEquals(QueryType.DATA_QUERY, parser.getStatements().get(2).getType()); + Assertions.assertEquals(true, parser.detectWriting()); } @Test @@ -135,6 +136,7 @@ public void yqlSelectWithKeyTest() throws SQLException { Assertions.assertEquals(QueryType.DATA_QUERY, parser.getStatements().get(1).getType()); Assertions.assertEquals(QueryType.DATA_QUERY, parser.detectQueryType()); + Assertions.assertEquals(false, parser.detectWriting()); } @Test @@ -348,6 +350,7 @@ public void batchedInsertTest(String query) throws SQLException { Assertions.assertEquals(1, parser.getStatements().size()); Assertions.assertEquals(QueryType.DATA_QUERY, parser.getStatements().get(0).getType()); Assertions.assertEquals(QueryCmd.INSERT_UPSERT, parser.getStatements().get(0).getCmd()); + Assertions.assertEquals(true, parser.detectWriting()); YqlBatcher batch = parser.getYqlBatcher(); Assertions.assertTrue(batch.isValidBatch()); @@ -374,6 +377,7 @@ public void batchedInsertOneColumnTest(String query) throws SQLException { Assertions.assertEquals(1, parser.getStatements().size()); Assertions.assertEquals(QueryType.DATA_QUERY, parser.getStatements().get(0).getType()); Assertions.assertEquals(QueryCmd.INSERT_UPSERT, parser.getStatements().get(0).getCmd()); + Assertions.assertEquals(true, parser.detectWriting()); YqlBatcher batch = parser.getYqlBatcher(); Assertions.assertTrue(batch.isValidBatch()); @@ -400,6 +404,7 @@ public void batchedUpsertTest(String query) throws SQLException { Assertions.assertEquals(1, parser.getStatements().size()); Assertions.assertEquals(QueryType.DATA_QUERY, parser.getStatements().get(0).getType()); Assertions.assertEquals(QueryCmd.INSERT_UPSERT, parser.getStatements().get(0).getCmd()); + Assertions.assertEquals(true, parser.detectWriting()); YqlBatcher batch = parser.getYqlBatcher(); Assertions.assertTrue(batch.isValidBatch()); @@ -426,6 +431,7 @@ public void batchedUpsertOneColumnTest(String query) throws SQLException { Assertions.assertEquals(1, parser.getStatements().size()); Assertions.assertEquals(QueryType.DATA_QUERY, parser.getStatements().get(0).getType()); Assertions.assertEquals(QueryCmd.INSERT_UPSERT, parser.getStatements().get(0).getCmd()); + Assertions.assertEquals(true, parser.detectWriting()); YqlBatcher batch = parser.getYqlBatcher(); Assertions.assertTrue(batch.isValidBatch()); @@ -452,6 +458,7 @@ public void batchedReplaceTest(String query) throws SQLException { Assertions.assertEquals(1, parser.getStatements().size()); Assertions.assertEquals(QueryType.DATA_QUERY, parser.getStatements().get(0).getType()); Assertions.assertEquals(QueryCmd.UPDATE_REPLACE_DELETE, parser.getStatements().get(0).getCmd()); + Assertions.assertEquals(true, parser.detectWriting()); YqlBatcher batch = parser.getYqlBatcher(); Assertions.assertTrue(batch.isValidBatch()); @@ -478,6 +485,7 @@ public void batchedReplaceOneColumnTest(String query) throws SQLException { Assertions.assertEquals(1, parser.getStatements().size()); Assertions.assertEquals(QueryType.DATA_QUERY, parser.getStatements().get(0).getType()); Assertions.assertEquals(QueryCmd.UPDATE_REPLACE_DELETE, parser.getStatements().get(0).getCmd()); + Assertions.assertEquals(true, parser.detectWriting()); YqlBatcher batch = parser.getYqlBatcher(); Assertions.assertTrue(batch.isValidBatch()); @@ -504,6 +512,7 @@ public void batchedUpdateTest(String query) throws SQLException { Assertions.assertEquals(1, parser.getStatements().size()); Assertions.assertEquals(QueryType.DATA_QUERY, parser.getStatements().get(0).getType()); Assertions.assertEquals(QueryCmd.UPDATE_REPLACE_DELETE, parser.getStatements().get(0).getCmd()); + Assertions.assertEquals(true, parser.detectWriting()); YqlBatcher batch = parser.getYqlBatcher(); Assertions.assertTrue(batch.isValidBatch()); @@ -530,6 +539,7 @@ public void batchedUpdateOneColumnTest(String query) throws SQLException { Assertions.assertEquals(1, parser.getStatements().size()); Assertions.assertEquals(QueryType.DATA_QUERY, parser.getStatements().get(0).getType()); Assertions.assertEquals(QueryCmd.UPDATE_REPLACE_DELETE, parser.getStatements().get(0).getCmd()); + Assertions.assertEquals(true, parser.detectWriting()); YqlBatcher batch = parser.getYqlBatcher(); Assertions.assertTrue(batch.isValidBatch()); @@ -556,6 +566,7 @@ public void batchedDeleteTest(String query) throws SQLException { Assertions.assertEquals(1, parser.getStatements().size()); Assertions.assertEquals(QueryType.DATA_QUERY, parser.getStatements().get(0).getType()); Assertions.assertEquals(QueryCmd.UPDATE_REPLACE_DELETE, parser.getStatements().get(0).getCmd()); + Assertions.assertEquals(true, parser.detectWriting()); YqlBatcher batch = parser.getYqlBatcher(); Assertions.assertTrue(batch.isValidBatch()); @@ -581,6 +592,7 @@ public void batchedDeleteOneColumnTest(String query) throws SQLException { Assertions.assertEquals(1, parser.getStatements().size()); Assertions.assertEquals(QueryType.DATA_QUERY, parser.getStatements().get(0).getType()); Assertions.assertEquals(QueryCmd.UPDATE_REPLACE_DELETE, parser.getStatements().get(0).getCmd()); + Assertions.assertEquals(true, parser.detectWriting()); YqlBatcher batch = parser.getYqlBatcher(); Assertions.assertTrue(batch.isValidBatch()); diff --git a/jdbc/src/test/java/tech/ydb/jdbc/settings/YdbDriverProperitesTest.java b/jdbc/src/test/java/tech/ydb/jdbc/settings/YdbDriverProperitesTest.java index 8de9f267..c3d687be 100644 --- a/jdbc/src/test/java/tech/ydb/jdbc/settings/YdbDriverProperitesTest.java +++ b/jdbc/src/test/java/tech/ydb/jdbc/settings/YdbDriverProperitesTest.java @@ -321,6 +321,8 @@ static DriverPropertyInfo[] defaultPropertyInfo(@Nullable String localDatacenter new DriverPropertyInfo("sessionMaxIdleTime", ""), new DriverPropertyInfo("sessionPoolSizeMin", ""), new DriverPropertyInfo("sessionPoolSizeMax", ""), + new DriverPropertyInfo("processUndeterminedTable", "ydb_transactions"), + new DriverPropertyInfo("processUndetermined", "false"), new DriverPropertyInfo("useStreamResultSets", "false"), new DriverPropertyInfo("joinDuration", "5m"), new DriverPropertyInfo("queryTimeout", "0s"), @@ -365,6 +367,8 @@ static DriverPropertyInfo[] customizedPropertyInfo() { new DriverPropertyInfo("sessionMaxIdleTime", "5m"), new DriverPropertyInfo("sessionPoolSizeMin", "3"), new DriverPropertyInfo("sessionPoolSizeMax", "4"), + new DriverPropertyInfo("processUndeterminedTable", "ydb_transactions"), + new DriverPropertyInfo("processUndetermined", "false"), new DriverPropertyInfo("useStreamResultSets", "true"), new DriverPropertyInfo("joinDuration", "6m"), new DriverPropertyInfo("queryTimeout", "2m"), From bf22278688f88a98143d605246d6b13343e805fe Mon Sep 17 00:00:00 2001 From: Max Zinal Date: Tue, 27 May 2025 16:47:21 +0300 Subject: [PATCH 2/7] proper checking logic for UNDETERMINED status --- .../java/tech/ydb/jdbc/context/QueryServiceExecutorExt.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/jdbc/src/main/java/tech/ydb/jdbc/context/QueryServiceExecutorExt.java b/jdbc/src/main/java/tech/ydb/jdbc/context/QueryServiceExecutorExt.java index c5df23da..3b988c59 100644 --- a/jdbc/src/main/java/tech/ydb/jdbc/context/QueryServiceExecutorExt.java +++ b/jdbc/src/main/java/tech/ydb/jdbc/context/QueryServiceExecutorExt.java @@ -79,7 +79,9 @@ private void commitWithCheck(YdbContext ctx, YdbValidator validator) throws SQLE validator.clearWarnings(); Status status = upsertAndCommit(localTx); if (StatusCode.UNDETERMINED.equals(status.getCode())) { - if (!checkTransaction(ctx, localTx.getId(), validator, tracer)) { + if (checkTransaction(ctx, localTx.getId(), validator, tracer)) { + status = Status.SUCCESS; + } else { status = Status.of(StatusCode.ABORTED, status.getCause(), status.getIssues()); } } From 10e7ea75c8100a2758d2cb01312ec5add9743d11 Mon Sep 17 00:00:00 2001 From: Max Zinal Date: Tue, 27 May 2025 18:15:44 +0300 Subject: [PATCH 3/7] use hash for better transaction table partitioning --- .../jdbc/context/QueryServiceExecutorExt.java | 19 +++++++++++++------ .../tech/ydb/jdbc/context/YdbContext.java | 6 ++++-- 2 files changed, 17 insertions(+), 8 deletions(-) diff --git a/jdbc/src/main/java/tech/ydb/jdbc/context/QueryServiceExecutorExt.java b/jdbc/src/main/java/tech/ydb/jdbc/context/QueryServiceExecutorExt.java index 3b988c59..8e4fb8e5 100644 --- a/jdbc/src/main/java/tech/ydb/jdbc/context/QueryServiceExecutorExt.java +++ b/jdbc/src/main/java/tech/ydb/jdbc/context/QueryServiceExecutorExt.java @@ -31,10 +31,14 @@ public QueryServiceExecutorExt(YdbContext ctx, int transactionLevel, boolean aut } private Status upsertAndCommit(QueryTransaction localTx) { - String sql = "DECLARE $trans_id AS Text; " + String sql = "DECLARE $trans_hash AS Int32; DECLARE $trans_id AS Text; " + "UPSERT INTO `" + processUndeterminedTable - + "` (trans_id, trans_tv) VALUES ($trans_id, CurrentUtcTimestamp());"; - Params params = Params.of("$trans_id", PrimitiveValue.newText(localTx.getId())); + + "` (trans_hash, trans_id, trans_tv) " + + "VALUES ($trans_hash, $trans_id, CurrentUtcTimestamp());"; + Params params = Params.of( + "$trans_id", PrimitiveValue.newText(localTx.getId()), + "$trans_hash", PrimitiveValue.newInt32(localTx.getId().hashCode()) + ); return localTx.createQueryWithCommit(sql, params) .execute() .join() @@ -43,10 +47,13 @@ private Status upsertAndCommit(QueryTransaction localTx) { private boolean checkTransaction(YdbContext ctx, String transId, YdbValidator validator, YdbTracer tracer) throws SQLException { - String sql = "DECLARE $trans_id AS Text; " + String sql = "DECLARE $trans_hash AS Int32; DECLARE $trans_id AS Text; " + "SELECT trans_id, trans_tv FROM `" + processUndeterminedTable - + "` WHERE trans_id=$trans_id;"; - Params params = Params.of("$trans_id", PrimitiveValue.newText(transId)); + + "` WHERE trans_hash=$trans_hash AND trans_id=$trans_id;"; + Params params = Params.of( + "$trans_id", PrimitiveValue.newText(transId), + "$trans_hash", PrimitiveValue.newInt32(transId.hashCode()) + ); Result result = ctx.getRetryCtx().supplyResult( session -> session.executeDataQuery(sql, TxControl.onlineRo(), params)) .join(); diff --git a/jdbc/src/main/java/tech/ydb/jdbc/context/YdbContext.java b/jdbc/src/main/java/tech/ydb/jdbc/context/YdbContext.java index 92bbe697..3a660600 100644 --- a/jdbc/src/main/java/tech/ydb/jdbc/context/YdbContext.java +++ b/jdbc/src/main/java/tech/ydb/jdbc/context/YdbContext.java @@ -355,11 +355,13 @@ public void ensureTransactionTableExists() throws SQLException { } LOGGER.log(Level.FINE, "Using table {} for UNDETERMINED processing", tableName); String sqlCreate = "CREATE TABLE IF NOT EXISTS `" + tableName - + "` (trans_id Text NOT NULL, trans_tv Timestamp," - + " PRIMARY KEY (trans_id)) WITH (" + + "` (trans_hash Int32 NOT NULL, trans_id Text NOT NULL, trans_tv Timestamp," + + " PRIMARY KEY (trans_hash, trans_id)) WITH (" + "TTL=Interval('PT60M') ON trans_tv," + /* + "AUTO_PARTITIONING_MIN_PARTITIONS_COUNT=100," + "AUTO_PARTITIONING_MAX_PARTITIONS_COUNT=150," + */ + "AUTO_PARTITIONING_BY_LOAD=ENABLED," + "AUTO_PARTITIONING_BY_SIZE=ENABLED," + "AUTO_PARTITIONING_PARTITION_SIZE_MB=100" From c18b626c7c057d48e603e3ff58b103002c8a1438 Mon Sep 17 00:00:00 2001 From: Alexandr Gorshenin Date: Wed, 4 Jun 2025 10:36:39 +0100 Subject: [PATCH 4/7] Replace the same two statement types by one --- .../java/tech/ydb/jdbc/query/QueryCmd.java | 7 +-- .../tech/ydb/jdbc/query/QueryStatement.java | 4 +- .../java/tech/ydb/jdbc/query/YdbQuery.java | 16 +++---- .../tech/ydb/jdbc/query/YdbQueryParser.java | 26 +++-------- .../ydb/jdbc/query/YdbQueryParserTest.java | 46 +++++++------------ 5 files changed, 37 insertions(+), 62 deletions(-) diff --git a/jdbc/src/main/java/tech/ydb/jdbc/query/QueryCmd.java b/jdbc/src/main/java/tech/ydb/jdbc/query/QueryCmd.java index 3b3d552a..6033f799 100644 --- a/jdbc/src/main/java/tech/ydb/jdbc/query/QueryCmd.java +++ b/jdbc/src/main/java/tech/ydb/jdbc/query/QueryCmd.java @@ -7,7 +7,8 @@ public enum QueryCmd { UNKNOWN, SELECT, - CREATE_ALTER_DROP, - INSERT_UPSERT, - UPDATE_REPLACE_DELETE + /** CREATE, DROP, ALTER, GRANT, REVOKE */ + DDL, + /** INSERT, UPSERT, UPDATE, REPLACE, DELETE */ + DML } diff --git a/jdbc/src/main/java/tech/ydb/jdbc/query/QueryStatement.java b/jdbc/src/main/java/tech/ydb/jdbc/query/QueryStatement.java index e985653b..a8f7d28e 100644 --- a/jdbc/src/main/java/tech/ydb/jdbc/query/QueryStatement.java +++ b/jdbc/src/main/java/tech/ydb/jdbc/query/QueryStatement.java @@ -52,7 +52,7 @@ public void setHasGenerated(boolean hasGenerated) { } public boolean hasUpdateCount() { - return (command == QueryCmd.INSERT_UPSERT || command == QueryCmd.UPDATE_REPLACE_DELETE) && !hasReturinng; + return command == QueryCmd.DML && !hasReturinng; } public boolean hasUpdateWithGenerated() { @@ -64,6 +64,6 @@ public boolean hasResults() { } public boolean isDDL() { - return command == QueryCmd.CREATE_ALTER_DROP; + return command == QueryCmd.DDL; } } diff --git a/jdbc/src/main/java/tech/ydb/jdbc/query/YdbQuery.java b/jdbc/src/main/java/tech/ydb/jdbc/query/YdbQuery.java index 61139559..9da5d794 100644 --- a/jdbc/src/main/java/tech/ydb/jdbc/query/YdbQuery.java +++ b/jdbc/src/main/java/tech/ydb/jdbc/query/YdbQuery.java @@ -18,10 +18,9 @@ public class YdbQuery { private final QueryType type; private final boolean isPlainYQL; - private final boolean writing; + private final boolean isWriting; - YdbQuery(QueryKey key, String preparedYQL, List stats, YqlBatcher batcher, QueryType type, - boolean writing) { + YdbQuery(QueryKey key, String preparedYQL, List stats, YqlBatcher batcher, QueryType type) { this.key = key; this.preparedYQL = preparedYQL; this.statements = stats; @@ -29,11 +28,13 @@ public class YdbQuery { this.batcher = batcher; boolean hasJdbcParameters = false; + boolean hasDML = false; for (QueryStatement st: statements) { hasJdbcParameters = hasJdbcParameters || st.hasJdbcParameters(); + hasDML = hasDML || (st.getCmd() == QueryCmd.DML); } this.isPlainYQL = !hasJdbcParameters; - this.writing = writing; + this.isWriting = (type == QueryType.DATA_QUERY) && hasDML; } public QueryType getType() { @@ -41,7 +42,7 @@ public QueryType getType() { } public boolean isWriting() { - return writing; + return isWriting; } public YqlBatcher getYqlBatcher() { @@ -95,10 +96,7 @@ public static YdbQuery parseQuery(QueryKey query, YdbQueryProperties opts, YdbTy if (type == null) { type = parser.detectQueryType(); } - if (QueryType.DATA_QUERY.equals(type)) { - writing = parser.detectWriting(); - } - return new YdbQuery(query, preparedYQL, statements, batcher, type, writing); + return new YdbQuery(query, preparedYQL, statements, batcher, type); } } diff --git a/jdbc/src/main/java/tech/ydb/jdbc/query/YdbQueryParser.java b/jdbc/src/main/java/tech/ydb/jdbc/query/YdbQueryParser.java index a9bef926..9c9c511b 100644 --- a/jdbc/src/main/java/tech/ydb/jdbc/query/YdbQueryParser.java +++ b/jdbc/src/main/java/tech/ydb/jdbc/query/YdbQueryParser.java @@ -74,18 +74,6 @@ public QueryType detectQueryType() throws SQLException { return type != null ? type : QueryType.DATA_QUERY; } - public boolean detectWriting() { - for (QueryStatement st: statements) { - switch (st.getCmd()) { - case INSERT_UPSERT: - case UPDATE_REPLACE_DELETE: - return true; - default: - } - } - return false; - } - @SuppressWarnings("MethodLength") public String parseSQL() throws SQLException { int fragmentStart = 0; @@ -250,25 +238,25 @@ public String parseSQL() throws SQLException { // starts with INSERT, UPSERT if (parseInsertKeyword(chars, keywordStart, keywordLength)) { - statement = new QueryStatement(type, QueryType.DATA_QUERY, QueryCmd.INSERT_UPSERT); + statement = new QueryStatement(type, QueryType.DATA_QUERY, QueryCmd.DML); batcher.readInsert(); } if (parseUpsertKeyword(chars, keywordStart, keywordLength)) { - statement = new QueryStatement(type, QueryType.DATA_QUERY, QueryCmd.INSERT_UPSERT); + statement = new QueryStatement(type, QueryType.DATA_QUERY, QueryCmd.DML); batcher.readUpsert(); } // starts with UPDATE, REPLACE, DELETE if (parseUpdateKeyword(chars, keywordStart, keywordLength)) { - statement = new QueryStatement(type, QueryType.DATA_QUERY, QueryCmd.UPDATE_REPLACE_DELETE); + statement = new QueryStatement(type, QueryType.DATA_QUERY, QueryCmd.DML); batcher.readUpdate(); } if (parseDeleteKeyword(chars, keywordStart, keywordLength)) { - statement = new QueryStatement(type, QueryType.DATA_QUERY, QueryCmd.UPDATE_REPLACE_DELETE); + statement = new QueryStatement(type, QueryType.DATA_QUERY, QueryCmd.DML); batcher.readDelete(); } if (parseReplaceKeyword(chars, keywordStart, keywordLength)) { - statement = new QueryStatement(type, QueryType.DATA_QUERY, QueryCmd.UPDATE_REPLACE_DELETE); + statement = new QueryStatement(type, QueryType.DATA_QUERY, QueryCmd.DML); batcher.readReplace(); } @@ -279,7 +267,7 @@ public String parseSQL() throws SQLException { || parseGrantKeyword(chars, keywordStart, keywordLength) || parseRevokeKeyword(chars, keywordStart, keywordLength) ) { - statement = new QueryStatement(type, QueryType.SCHEME_QUERY, QueryCmd.CREATE_ALTER_DROP); + statement = new QueryStatement(type, QueryType.SCHEME_QUERY, QueryCmd.DDL); batcher.readIdentifier(chars, keywordStart, keywordLength); } @@ -343,7 +331,7 @@ private void addReturning(StringBuilder parsed, QueryStatement st) throws SQLExc if (st == null || returning == null || st.hasResults()) { return; } - if (st.getCmd() != QueryCmd.INSERT_UPSERT && st.getCmd() != QueryCmd.UPDATE_REPLACE_DELETE) { + if (st.getCmd() != QueryCmd.DML) { return; } diff --git a/jdbc/src/test/java/tech/ydb/jdbc/query/YdbQueryParserTest.java b/jdbc/src/test/java/tech/ydb/jdbc/query/YdbQueryParserTest.java index ae0807bb..53bfffc8 100644 --- a/jdbc/src/test/java/tech/ydb/jdbc/query/YdbQueryParserTest.java +++ b/jdbc/src/test/java/tech/ydb/jdbc/query/YdbQueryParserTest.java @@ -48,10 +48,10 @@ public void emptyQueryTest(String query, String prepared) throws SQLException { @ParameterizedTest(name = "[{index}] {0} is explain query") @CsvSource(value = { "'Explain select * from table', ' select * from table', SELECT", - "'exPlain\nupsert to', '\nupsert to', INSERT_UPSERT", + "'exPlain\nupsert to', '\nupsert to', DML", "' Explain select * from table', ' select * from table', SELECT", - "'\texPlain\nupsert to', '\t\nupsert to', INSERT_UPSERT", - "'EXPLAIN/*comment*/UPSERT INTO', '/*comment*/UPSERT INTO', INSERT_UPSERT", + "'\texPlain\nupsert to', '\t\nupsert to', DML", + "'EXPLAIN/*comment*/UPSERT INTO', '/*comment*/UPSERT INTO', DML", }) public void explainQueryTest(String query, String prepared, String cmd) throws SQLException { YdbQueryParser parser = new YdbQueryParser(types, query, props); @@ -83,7 +83,7 @@ public void schemeQueryTest(String query) throws SQLException { Assertions.assertEquals(1, parser.getStatements().size()); QueryStatement statement = parser.getStatements().get(0); Assertions.assertEquals(QueryType.SCHEME_QUERY, statement.getType()); - Assertions.assertEquals(QueryCmd.CREATE_ALTER_DROP, statement.getCmd()); + Assertions.assertEquals(QueryCmd.DDL, statement.getCmd()); } @ParameterizedTest(name = "[{index}] {0} is data query") @@ -132,7 +132,6 @@ public void yqlUpsertTest() throws SQLException { Assertions.assertEquals(QueryType.DECLARE, parser.getStatements().get(0).getType()); Assertions.assertEquals(QueryType.DECLARE, parser.getStatements().get(1).getType()); Assertions.assertEquals(QueryType.DATA_QUERY, parser.getStatements().get(2).getType()); - Assertions.assertEquals(true, parser.detectWriting()); } @Test @@ -150,7 +149,6 @@ public void yqlSelectWithKeyTest() throws SQLException { Assertions.assertEquals(QueryType.DATA_QUERY, parser.getStatements().get(1).getType()); Assertions.assertEquals(QueryType.DATA_QUERY, parser.detectQueryType()); - Assertions.assertEquals(false, parser.detectWriting()); } @Test @@ -417,8 +415,7 @@ public void batchedInsertTest(String query) throws SQLException { Assertions.assertEquals(1, parser.getStatements().size()); Assertions.assertEquals(QueryType.DATA_QUERY, parser.getStatements().get(0).getType()); - Assertions.assertEquals(QueryCmd.INSERT_UPSERT, parser.getStatements().get(0).getCmd()); - Assertions.assertEquals(true, parser.detectWriting()); + Assertions.assertEquals(QueryCmd.DML, parser.getStatements().get(0).getCmd()); YqlBatcher batch = parser.getYqlBatcher(); Assertions.assertTrue(batch.isValidBatch()); @@ -444,8 +441,7 @@ public void batchedInsertOneColumnTest(String query) throws SQLException { Assertions.assertEquals(1, parser.getStatements().size()); Assertions.assertEquals(QueryType.DATA_QUERY, parser.getStatements().get(0).getType()); - Assertions.assertEquals(QueryCmd.INSERT_UPSERT, parser.getStatements().get(0).getCmd()); - Assertions.assertEquals(true, parser.detectWriting()); + Assertions.assertEquals(QueryCmd.DML, parser.getStatements().get(0).getCmd()); YqlBatcher batch = parser.getYqlBatcher(); Assertions.assertTrue(batch.isValidBatch()); @@ -471,8 +467,7 @@ public void batchedUpsertTest(String query) throws SQLException { Assertions.assertEquals(1, parser.getStatements().size()); Assertions.assertEquals(QueryType.DATA_QUERY, parser.getStatements().get(0).getType()); - Assertions.assertEquals(QueryCmd.INSERT_UPSERT, parser.getStatements().get(0).getCmd()); - Assertions.assertEquals(true, parser.detectWriting()); + Assertions.assertEquals(QueryCmd.DML, parser.getStatements().get(0).getCmd()); YqlBatcher batch = parser.getYqlBatcher(); Assertions.assertTrue(batch.isValidBatch()); @@ -498,8 +493,7 @@ public void batchedUpsertOneColumnTest(String query) throws SQLException { Assertions.assertEquals(1, parser.getStatements().size()); Assertions.assertEquals(QueryType.DATA_QUERY, parser.getStatements().get(0).getType()); - Assertions.assertEquals(QueryCmd.INSERT_UPSERT, parser.getStatements().get(0).getCmd()); - Assertions.assertEquals(true, parser.detectWriting()); + Assertions.assertEquals(QueryCmd.DML, parser.getStatements().get(0).getCmd()); YqlBatcher batch = parser.getYqlBatcher(); Assertions.assertTrue(batch.isValidBatch()); @@ -525,8 +519,7 @@ public void batchedReplaceTest(String query) throws SQLException { Assertions.assertEquals(1, parser.getStatements().size()); Assertions.assertEquals(QueryType.DATA_QUERY, parser.getStatements().get(0).getType()); - Assertions.assertEquals(QueryCmd.UPDATE_REPLACE_DELETE, parser.getStatements().get(0).getCmd()); - Assertions.assertEquals(true, parser.detectWriting()); + Assertions.assertEquals(QueryCmd.DML, parser.getStatements().get(0).getCmd()); YqlBatcher batch = parser.getYqlBatcher(); Assertions.assertTrue(batch.isValidBatch()); @@ -552,8 +545,7 @@ public void batchedReplaceOneColumnTest(String query) throws SQLException { Assertions.assertEquals(1, parser.getStatements().size()); Assertions.assertEquals(QueryType.DATA_QUERY, parser.getStatements().get(0).getType()); - Assertions.assertEquals(QueryCmd.UPDATE_REPLACE_DELETE, parser.getStatements().get(0).getCmd()); - Assertions.assertEquals(true, parser.detectWriting()); + Assertions.assertEquals(QueryCmd.DML, parser.getStatements().get(0).getCmd()); YqlBatcher batch = parser.getYqlBatcher(); Assertions.assertTrue(batch.isValidBatch()); @@ -579,8 +571,7 @@ public void batchedUpdateTest(String query) throws SQLException { Assertions.assertEquals(1, parser.getStatements().size()); Assertions.assertEquals(QueryType.DATA_QUERY, parser.getStatements().get(0).getType()); - Assertions.assertEquals(QueryCmd.UPDATE_REPLACE_DELETE, parser.getStatements().get(0).getCmd()); - Assertions.assertEquals(true, parser.detectWriting()); + Assertions.assertEquals(QueryCmd.DML, parser.getStatements().get(0).getCmd()); YqlBatcher batch = parser.getYqlBatcher(); Assertions.assertTrue(batch.isValidBatch()); @@ -606,8 +597,7 @@ public void batchedUpdateOneColumnTest(String query) throws SQLException { Assertions.assertEquals(1, parser.getStatements().size()); Assertions.assertEquals(QueryType.DATA_QUERY, parser.getStatements().get(0).getType()); - Assertions.assertEquals(QueryCmd.UPDATE_REPLACE_DELETE, parser.getStatements().get(0).getCmd()); - Assertions.assertEquals(true, parser.detectWriting()); + Assertions.assertEquals(QueryCmd.DML, parser.getStatements().get(0).getCmd()); YqlBatcher batch = parser.getYqlBatcher(); Assertions.assertTrue(batch.isValidBatch()); @@ -633,7 +623,7 @@ public void batchedUpdateOneColumnWithTableName(String query) throws SQLExceptio Assertions.assertEquals(1, parser.getStatements().size()); Assertions.assertEquals(QueryType.DATA_QUERY, parser.getStatements().get(0).getType()); - Assertions.assertEquals(QueryCmd.UPDATE_REPLACE_DELETE, parser.getStatements().get(0).getCmd()); + Assertions.assertEquals(QueryCmd.DML, parser.getStatements().get(0).getCmd()); YqlBatcher batch = parser.getYqlBatcher(); Assertions.assertTrue(batch.isValidBatch()); @@ -659,8 +649,7 @@ public void batchedDeleteTest(String query) throws SQLException { Assertions.assertEquals(1, parser.getStatements().size()); Assertions.assertEquals(QueryType.DATA_QUERY, parser.getStatements().get(0).getType()); - Assertions.assertEquals(QueryCmd.UPDATE_REPLACE_DELETE, parser.getStatements().get(0).getCmd()); - Assertions.assertEquals(true, parser.detectWriting()); + Assertions.assertEquals(QueryCmd.DML, parser.getStatements().get(0).getCmd()); YqlBatcher batch = parser.getYqlBatcher(); Assertions.assertTrue(batch.isValidBatch()); @@ -685,8 +674,7 @@ public void batchedDeleteOneColumnTest(String query) throws SQLException { Assertions.assertEquals(1, parser.getStatements().size()); Assertions.assertEquals(QueryType.DATA_QUERY, parser.getStatements().get(0).getType()); - Assertions.assertEquals(QueryCmd.UPDATE_REPLACE_DELETE, parser.getStatements().get(0).getCmd()); - Assertions.assertEquals(true, parser.detectWriting()); + Assertions.assertEquals(QueryCmd.DML, parser.getStatements().get(0).getCmd()); YqlBatcher batch = parser.getYqlBatcher(); Assertions.assertTrue(batch.isValidBatch()); @@ -747,7 +735,7 @@ public void validBulkInsertTest(String query) throws SQLException { Assertions.assertEquals(1, parser.getStatements().size()); Assertions.assertEquals(QueryType.BULK_QUERY, parser.getStatements().get(0).getType()); - Assertions.assertEquals(QueryCmd.INSERT_UPSERT, parser.getStatements().get(0).getCmd()); + Assertions.assertEquals(QueryCmd.DML, parser.getStatements().get(0).getCmd()); YqlBatcher batch = parser.getYqlBatcher(); Assertions.assertTrue(batch.isValidBatch()); @@ -771,7 +759,7 @@ public void validBulkUpsertTest(String query) throws SQLException { Assertions.assertEquals(1, parser.getStatements().size()); Assertions.assertEquals(QueryType.BULK_QUERY, parser.getStatements().get(0).getType()); - Assertions.assertEquals(QueryCmd.INSERT_UPSERT, parser.getStatements().get(0).getCmd()); + Assertions.assertEquals(QueryCmd.DML, parser.getStatements().get(0).getCmd()); YqlBatcher batch = parser.getYqlBatcher(); Assertions.assertTrue(batch.isValidBatch()); From ce05e076782d7d74c3141e2ab3e7baf6b1ef30eb Mon Sep 17 00:00:00 2001 From: Alexandr Gorshenin Date: Thu, 5 Jun 2025 10:29:02 +0100 Subject: [PATCH 5/7] Update implementation of custom query executor --- .../jdbc/context/QueryServiceExecutor.java | 53 +++--- .../jdbc/context/QueryServiceExecutorExt.java | 137 --------------- .../jdbc/context/TableServiceExecutor.java | 8 +- .../ydb/jdbc/context/TableTxExecutor.java | 166 ++++++++++++++++++ .../tech/ydb/jdbc/context/YdbContext.java | 66 ++----- .../tech/ydb/jdbc/context/YdbValidator.java | 8 +- 6 files changed, 217 insertions(+), 221 deletions(-) delete mode 100644 jdbc/src/main/java/tech/ydb/jdbc/context/QueryServiceExecutorExt.java create mode 100644 jdbc/src/main/java/tech/ydb/jdbc/context/TableTxExecutor.java diff --git a/jdbc/src/main/java/tech/ydb/jdbc/context/QueryServiceExecutor.java b/jdbc/src/main/java/tech/ydb/jdbc/context/QueryServiceExecutor.java index 3c995cc4..ab320145 100644 --- a/jdbc/src/main/java/tech/ydb/jdbc/context/QueryServiceExecutor.java +++ b/jdbc/src/main/java/tech/ydb/jdbc/context/QueryServiceExecutor.java @@ -23,6 +23,7 @@ import tech.ydb.jdbc.impl.YdbStaticResultSet; import tech.ydb.jdbc.query.QueryType; import tech.ydb.jdbc.query.YdbQuery; +import tech.ydb.jdbc.settings.YdbOperationProperties; import tech.ydb.query.QueryClient; import tech.ydb.query.QuerySession; import tech.ydb.query.QueryStream; @@ -42,27 +43,28 @@ * @author Aleksandr Gorshenin */ public class QueryServiceExecutor extends BaseYdbExecutor { - protected final Duration sessionTimeout; - protected final QueryClient queryClient; + private final Duration sessionTimeout; + private final QueryClient queryClient; private final boolean useStreamResultSet; - protected int transactionLevel; - protected boolean isReadOnly; - protected boolean isAutoCommit; - protected TxMode txMode; + private int transactionLevel; + private boolean isReadOnly; + private boolean isAutoCommit; + private TxMode txMode; - protected final AtomicReference tx = new AtomicReference<>(); - protected volatile boolean isClosed; + private final AtomicReference tx = new AtomicReference<>(); + private volatile boolean isClosed; - public QueryServiceExecutor(YdbContext ctx, int transactionLevel, boolean autoCommit) throws SQLException { + public QueryServiceExecutor(YdbContext ctx) throws SQLException { super(ctx); - this.sessionTimeout = ctx.getOperationProperties().getSessionTimeout(); + YdbOperationProperties options = ctx.getOperationProperties(); + this.sessionTimeout = options.getSessionTimeout(); this.queryClient = ctx.getQueryClient(); - this.useStreamResultSet = ctx.getOperationProperties().getUseStreamResultSets(); + this.useStreamResultSet = options.getUseStreamResultSets(); - this.transactionLevel = transactionLevel; + this.transactionLevel = options.getTransactionLevel(); + this.isAutoCommit = options.isAutoCommit(); this.isReadOnly = transactionLevel != Connection.TRANSACTION_SERIALIZABLE; - this.isAutoCommit = autoCommit; this.txMode = txMode(transactionLevel, isReadOnly); this.isClosed = false; } @@ -179,22 +181,26 @@ public void commit(YdbContext ctx, YdbValidator validator) throws SQLException { return; } - YdbTracer tracer = ctx.getTracer(); - tracer.trace("--> commit"); - tracer.query(null); - - CommitTransactionSettings settings = ctx.withRequestTimeout(CommitTransactionSettings.newBuilder()).build(); try { - validator.clearWarnings(); - validator.call("Commit TxId: " + localTx.getId(), tracer, () -> localTx.commit(settings)); + commitImpl(ctx, validator, localTx); } finally { if (tx.compareAndSet(localTx, null)) { localTx.getSession().close(); } - tracer.close(); + ctx.getTracer().close(); } } + protected void commitImpl(YdbContext ctx, YdbValidator validator, QueryTransaction tx) throws SQLException { + YdbTracer tracer = ctx.getTracer(); + tracer.trace("--> commit"); + tracer.query(null); + + CommitTransactionSettings settings = ctx.withRequestTimeout(CommitTransactionSettings.newBuilder()).build(); + validator.clearWarnings(); + validator.call("Commit TxId: " + tx.getId(), tracer, () -> tx.commit(settings)); + } + @Override public void rollback(YdbContext ctx, YdbValidator validator) throws SQLException { ensureOpened(); @@ -223,9 +229,8 @@ public void rollback(YdbContext ctx, YdbValidator validator) throws SQLException } @Override - public YdbQueryResult executeDataQuery( - YdbStatement statement, YdbQuery query, String preparedYql, Params params, long timeout, boolean keepInCache - ) throws SQLException { + public YdbQueryResult executeDataQuery(YdbStatement statement, YdbQuery query, String preparedYql, Params params, + long timeout, boolean keepInCache) throws SQLException { ensureOpened(); YdbValidator validator = statement.getValidator(); diff --git a/jdbc/src/main/java/tech/ydb/jdbc/context/QueryServiceExecutorExt.java b/jdbc/src/main/java/tech/ydb/jdbc/context/QueryServiceExecutorExt.java deleted file mode 100644 index 8e4fb8e5..00000000 --- a/jdbc/src/main/java/tech/ydb/jdbc/context/QueryServiceExecutorExt.java +++ /dev/null @@ -1,137 +0,0 @@ -package tech.ydb.jdbc.context; - -import java.sql.SQLException; - -import tech.ydb.core.Result; -import tech.ydb.core.Status; -import tech.ydb.core.StatusCode; -import tech.ydb.jdbc.YdbStatement; -import tech.ydb.jdbc.YdbTracer; -import tech.ydb.jdbc.impl.YdbQueryResult; -import tech.ydb.jdbc.query.YdbQuery; -import tech.ydb.query.QueryTransaction; -import tech.ydb.table.query.DataQueryResult; -import tech.ydb.table.query.Params; -import tech.ydb.table.transaction.TxControl; -import tech.ydb.table.values.PrimitiveValue; - -/** - * - * @author mzinal - */ -public class QueryServiceExecutorExt extends QueryServiceExecutor { - - private final String processUndeterminedTable; - private boolean writing; - - public QueryServiceExecutorExt(YdbContext ctx, int transactionLevel, boolean autoCommit) throws SQLException { - super(ctx, transactionLevel, autoCommit); - this.processUndeterminedTable = ctx.getOperationProperties().getProcessUndeterminedTable(); - this.writing = false; - } - - private Status upsertAndCommit(QueryTransaction localTx) { - String sql = "DECLARE $trans_hash AS Int32; DECLARE $trans_id AS Text; " - + "UPSERT INTO `" + processUndeterminedTable - + "` (trans_hash, trans_id, trans_tv) " - + "VALUES ($trans_hash, $trans_id, CurrentUtcTimestamp());"; - Params params = Params.of( - "$trans_id", PrimitiveValue.newText(localTx.getId()), - "$trans_hash", PrimitiveValue.newInt32(localTx.getId().hashCode()) - ); - return localTx.createQueryWithCommit(sql, params) - .execute() - .join() - .getStatus(); - } - - private boolean checkTransaction(YdbContext ctx, String transId, - YdbValidator validator, YdbTracer tracer) throws SQLException { - String sql = "DECLARE $trans_hash AS Int32; DECLARE $trans_id AS Text; " - + "SELECT trans_id, trans_tv FROM `" + processUndeterminedTable - + "` WHERE trans_hash=$trans_hash AND trans_id=$trans_id;"; - Params params = Params.of( - "$trans_id", PrimitiveValue.newText(transId), - "$trans_hash", PrimitiveValue.newInt32(transId.hashCode()) - ); - Result result = ctx.getRetryCtx().supplyResult( - session -> session.executeDataQuery(sql, TxControl.onlineRo(), params)) - .join(); - if (!result.getStatus().isSuccess()) { - // Failed to obtain the transaction status, have to return the error - validator.validate("CommitVal TxId: " + transId, tracer, result.getStatus()); - } - DataQueryResult dqr = result.getValue(); - if (dqr.getResultSetCount() == 1) { - if (dqr.getResultSet(0).getRowCount() == 1) { - return true; - } - } - return false; - } - - private void commitWithCheck(YdbContext ctx, YdbValidator validator) throws SQLException { - ensureOpened(); - - QueryTransaction localTx = tx.get(); - if (localTx == null || !localTx.isActive()) { - return; - } - - YdbTracer tracer = ctx.getTracer(); - tracer.trace("--> commitExt"); - tracer.query(null); - - try { - validator.clearWarnings(); - Status status = upsertAndCommit(localTx); - if (StatusCode.UNDETERMINED.equals(status.getCode())) { - if (checkTransaction(ctx, localTx.getId(), validator, tracer)) { - status = Status.SUCCESS; - } else { - status = Status.of(StatusCode.ABORTED, status.getCause(), status.getIssues()); - } - } - validator.validate("CommitExt TxId: " + localTx.getId(), tracer, status); - } finally { - if (tx.compareAndSet(localTx, null)) { - localTx.getSession().close(); - } - tracer.close(); - } - } - - @Override - public void commit(YdbContext ctx, YdbValidator validator) throws SQLException { - try { - if (isInsideTransaction() && writing) { - commitWithCheck(ctx, validator); - } else { - super.commit(ctx, validator); - } - } finally { - writing = false; - } - } - - @Override - public void rollback(YdbContext ctx, YdbValidator validator) throws SQLException { - try { - super.rollback(ctx, validator); - } finally { - writing = false; - } - } - - @Override - public YdbQueryResult executeDataQuery( - YdbStatement statement, YdbQuery query, String preparedYql, Params params, long timeout, boolean keepInCache - ) throws SQLException { - YdbQueryResult yqr = super.executeDataQuery(statement, query, preparedYql, params, timeout, keepInCache); - if (query.isWriting() && !isAutoCommit) { - writing = true; - } - return yqr; - } - -} diff --git a/jdbc/src/main/java/tech/ydb/jdbc/context/TableServiceExecutor.java b/jdbc/src/main/java/tech/ydb/jdbc/context/TableServiceExecutor.java index 7fcb506b..02240fb5 100644 --- a/jdbc/src/main/java/tech/ydb/jdbc/context/TableServiceExecutor.java +++ b/jdbc/src/main/java/tech/ydb/jdbc/context/TableServiceExecutor.java @@ -15,6 +15,7 @@ import tech.ydb.jdbc.impl.YdbStaticResultSet; import tech.ydb.jdbc.query.QueryType; import tech.ydb.jdbc.query.YdbQuery; +import tech.ydb.jdbc.settings.YdbOperationProperties; import tech.ydb.table.Session; import tech.ydb.table.query.DataQueryResult; import tech.ydb.table.query.ExplainDataQueryResult; @@ -35,10 +36,11 @@ public class TableServiceExecutor extends BaseYdbExecutor { private final boolean failOnTruncatedResult; private volatile TxState tx; - public TableServiceExecutor(YdbContext ctx, int transactionLevel, boolean autoCommit) throws SQLException { + public TableServiceExecutor(YdbContext ctx) throws SQLException { super(ctx); - this.tx = createTx(transactionLevel, autoCommit); - this.failOnTruncatedResult = ctx.getOperationProperties().isFailOnTruncatedResult(); + YdbOperationProperties options = ctx.getOperationProperties(); + this.tx = createTx(options.getTransactionLevel(), options.isAutoCommit()); + this.failOnTruncatedResult = options.isFailOnTruncatedResult(); } @Override diff --git a/jdbc/src/main/java/tech/ydb/jdbc/context/TableTxExecutor.java b/jdbc/src/main/java/tech/ydb/jdbc/context/TableTxExecutor.java new file mode 100644 index 00000000..482138bc --- /dev/null +++ b/jdbc/src/main/java/tech/ydb/jdbc/context/TableTxExecutor.java @@ -0,0 +1,166 @@ +package tech.ydb.jdbc.context; + +import java.sql.SQLException; + +import com.google.common.hash.Hashing; + +import tech.ydb.core.Result; +import tech.ydb.core.Status; +import tech.ydb.core.StatusCode; +import tech.ydb.core.UnexpectedResultException; +import tech.ydb.jdbc.YdbStatement; +import tech.ydb.jdbc.YdbTracer; +import tech.ydb.jdbc.exception.ExceptionFactory; +import tech.ydb.jdbc.exception.YdbConditionallyRetryableException; +import tech.ydb.jdbc.impl.YdbQueryResult; +import tech.ydb.jdbc.query.YdbQuery; +import tech.ydb.query.QueryStream; +import tech.ydb.query.QueryTransaction; +import tech.ydb.query.settings.ExecuteQuerySettings; +import tech.ydb.table.Session; +import tech.ydb.table.description.TableDescription; +import tech.ydb.table.query.DataQueryResult; +import tech.ydb.table.query.Params; +import tech.ydb.table.transaction.TxControl; +import tech.ydb.table.values.PrimitiveValue; + +/** + * + * @author mzinal + */ +public class TableTxExecutor extends QueryServiceExecutor { + private static final String CREATE_SQL = "" + + "CREATE TABLE IF NOT EXISTS `%s` (" + + " hash Text NOT NULL," + + " tx_id Text NOT NULL," + + " committed_at Timestamp," + + " PRIMARY KEY (hash, tx_id)" + + ") WITH (" + + " TTL=Interval('PT60M') ON committed_at," + + " AUTO_PARTITIONING_BY_LOAD=ENABLED," + + " AUTO_PARTITIONING_BY_SIZE=ENABLED," + + " AUTO_PARTITIONING_PARTITION_SIZE_MB=100" + + ");"; + + private static final String COMMIT_SQL = "" + + "DECLARE $hash AS Text; " + + "DECLARE $tx AS Text; " + + "UPSERT INTO `%s` (hash, tx_id, committed_at) VALUES ($hash, $tx, CurrentUtcTimestamp());"; + + private static final String VALIDATE_SQL = "" + + "DECLARE $hash AS Text; " + + "DECLARE $tx AS Text; " + + "SELECT hash, tx_id FROM `%s` WHERE hash=$hash AND tx_id=$tx;"; + + private final String commitQuery; + private final String validateQuery; + private final String txTablePath; + private boolean isWriteTx; + + public TableTxExecutor(YdbContext ctx, String tablePath) throws SQLException { + super(ctx); + this.txTablePath = tablePath; + this.commitQuery = String.format(COMMIT_SQL, tablePath); + this.validateQuery = String.format(VALIDATE_SQL, tablePath); + this.isWriteTx = false; + } + + @Override + public void rollback(YdbContext ctx, YdbValidator validator) throws SQLException { + isWriteTx = false; + super.rollback(ctx, validator); + } + + @Override + public YdbQueryResult executeDataQuery(YdbStatement statement, YdbQuery query, String preparedYql, Params params, + long timeout, boolean keepInCache) throws SQLException { + YdbQueryResult result = super.executeDataQuery(statement, query, preparedYql, params, timeout, keepInCache); + isWriteTx = isInsideTransaction() && (isWriteTx || query.isWriting()); + + return result; + } + + @Override + protected void commitImpl(YdbContext ctx, YdbValidator validator, QueryTransaction tx) throws SQLException { + boolean storeTx = isWriteTx; + isWriteTx = false; + if (!storeTx) { + super.commitImpl(ctx, validator, tx); + return; + } + + YdbTracer tracer = ctx.getTracer(); + String hash = Hashing.sha256().hashBytes(tx.getId().getBytes()).toString(); + tracer.trace("--> commit-and-store-tx " + hash); + tracer.query(commitQuery); + + Params params = Params.of( + "$hash", PrimitiveValue.newText(hash), + "$tx", PrimitiveValue.newText(tx.getId()) + ); + + ExecuteQuerySettings settings = ctx.withRequestTimeout(ExecuteQuerySettings.newBuilder()).build(); + try { + QueryStream query = tx.createQuery(commitQuery, true, params, settings); + validator.clearWarnings(); + validator.call("CommitAndStore TxId: " + tx.getId(), tracer, query::execute); + } catch (YdbConditionallyRetryableException ex) { + + try (Session session = createNewTableSession(validator)) { + tracer.trace("--> validate tx"); + tracer.query(validateQuery); + Result res = session.executeDataQuery(validateQuery, TxControl.snapshotRo(), params) + .join(); + if (res.isSuccess()) { + DataQueryResult dqr = res.getValue(); + if (dqr.getResultSetCount() == 1) { + if (dqr.getResultSet(0).getRowCount() == 1) { + // Transaction was committed successfully + return; + } else { + // Transaction wann't commit + Status status = Status.of(StatusCode.ABORTED).withCause(ex); + throw ExceptionFactory.createException("Transaction wasn't committed", + new UnexpectedResultException("Transaction not found in " + txTablePath, status) + ); + } + } + } + } + + throw ex; + } + } + + public static TableDescription validate(YdbContext ctx, String tablePath) throws SQLException { + // validate table name + Result res = ctx.getRetryCtx().supplyResult(s -> s.describeTable(tablePath)).join(); + if (res.isSuccess()) { + return res.getValue(); + } + + if (res.getStatus().getCode() != StatusCode.SCHEME_ERROR) { + throw ExceptionFactory.createException( + "Cannot initialize TableTxExecutor with tx table " + tablePath, + new UnexpectedResultException("Cannot describe", res.getStatus())); + } + + // Try to create a table + String query = String.format(CREATE_SQL, tablePath); + Status status = ctx.getRetryCtx().supplyStatus(session -> session.executeSchemeQuery(query)).join(); + if (!status.isSuccess()) { + throw ExceptionFactory.createException( + "Cannot initialize TableTxExecutor with tx table " + tablePath, + new UnexpectedResultException("Cannot create table", status)); + } + + Result res2 = ctx.getRetryCtx().supplyResult(s -> s.describeTable(tablePath)).join(); + if (!res2.isSuccess()) { + throw ExceptionFactory.createException( + "Cannot initialize TableTxExecutor with tx table " + tablePath, + new UnexpectedResultException("Cannot describe after creating", res2.getStatus())); + } + + return res2.getValue(); + } +} diff --git a/jdbc/src/main/java/tech/ydb/jdbc/context/YdbContext.java b/jdbc/src/main/java/tech/ydb/jdbc/context/YdbContext.java index 889ddedd..4fd1997d 100644 --- a/jdbc/src/main/java/tech/ydb/jdbc/context/YdbContext.java +++ b/jdbc/src/main/java/tech/ydb/jdbc/context/YdbContext.java @@ -18,7 +18,6 @@ import com.google.common.cache.CacheBuilder; import tech.ydb.core.Result; -import tech.ydb.core.Status; import tech.ydb.core.UnexpectedResultException; import tech.ydb.core.grpc.GrpcTransport; import tech.ydb.core.grpc.GrpcTransportBuilder; @@ -72,7 +71,7 @@ public class YdbContext implements AutoCloseable { private final YdbConfig config; - private final YdbOperationProperties operationProps; + private final YdbOperationProperties operationOptions; private final YdbQueryProperties queryOptions; private final YdbTypes types; @@ -103,7 +102,7 @@ private YdbContext( ) { this.config = config; - this.operationProps = operationProperties; + this.operationOptions = operationProperties; this.queryOptions = queryProperties; this.autoResizeSessionPool = autoResize; @@ -196,16 +195,16 @@ public String getUsername() { public YdbExecutor createExecutor() throws SQLException { if (config.isUseQueryService()) { - if (operationProps.getProcessUndetermined()) { - return new QueryServiceExecutorExt( - this, operationProps.getTransactionLevel(), operationProps.isAutoCommit()); - } else { - return new QueryServiceExecutor( - this, operationProps.getTransactionLevel(), operationProps.isAutoCommit()); + if (operationOptions.getProcessUndetermined()) { + String tablePath = joined(prefixPath, operationOptions.getProcessUndeterminedTable()); + if (tableDescribeCache.getIfPresent(tablePath) == null) { + tableDescribeCache.put(tablePath, TableTxExecutor.validate(this, tablePath)); + } + return new TableTxExecutor(this, tablePath); } + return new QueryServiceExecutor(this); } else { - return new TableServiceExecutor( - this, operationProps.getTransactionLevel(), operationProps.isAutoCommit()); + return new TableServiceExecutor(this); } } @@ -214,7 +213,7 @@ public int getConnectionsCount() { } public YdbOperationProperties getOperationProperties() { - return operationProps; + return operationOptions; } @Override @@ -318,17 +317,8 @@ public static YdbContext createContext(YdbConfig config) throws SQLException { boolean autoResize = clientProps.applyToTableClient(tableClient, queryClient); - YdbContext yc = new YdbContext(config, operationProps, queryProps, grpcTransport, - tableClient.build(), queryClient.build(), autoResize); - if (operationProps.getProcessUndetermined()) { - if (config.isUseQueryService()) { - yc.ensureTransactionTableExists(); - } else { - LOGGER.log(Level.WARNING, "UNDETERMINED processing is disabled, " - + "because it is only supported for QueryService execution mode."); - } - } - return yc; + return new YdbContext(config, operationProps, queryProps, grpcTransport, tableClient.build(), + queryClient.build(), autoResize); } catch (RuntimeException ex) { if (grpcTransport != null) { try { @@ -347,34 +337,8 @@ public static YdbContext createContext(YdbConfig config) throws SQLException { } } - public void ensureTransactionTableExists() throws SQLException { - String tableName = operationProps.getProcessUndeterminedTable(); - if (tableName.isEmpty()) { - return; - } - LOGGER.log(Level.FINE, "Using table {} for UNDETERMINED processing", tableName); - String sqlCreate = "CREATE TABLE IF NOT EXISTS `" + tableName - + "` (trans_hash Int32 NOT NULL, trans_id Text NOT NULL, trans_tv Timestamp," - + " PRIMARY KEY (trans_hash, trans_id)) WITH (" - + "TTL=Interval('PT60M') ON trans_tv," - /* - + "AUTO_PARTITIONING_MIN_PARTITIONS_COUNT=100," - + "AUTO_PARTITIONING_MAX_PARTITIONS_COUNT=150," - */ - + "AUTO_PARTITIONING_BY_LOAD=ENABLED," - + "AUTO_PARTITIONING_BY_SIZE=ENABLED," - + "AUTO_PARTITIONING_PARTITION_SIZE_MB=100" - + ");"; - Status status = retryCtx.supplyStatus( - session -> session.executeSchemeQuery(sqlCreate)) - .join(); - new YdbValidator().validate( - "Create table " + tableName, - getTracer(), status); - } - public > T withDefaultTimeout(T settings) { - Duration operation = operationProps.getDeadlineTimeout(); + Duration operation = operationOptions.getDeadlineTimeout(); if (!operation.isZero() && !operation.isNegative()) { settings.setOperationTimeout(operation); settings.setTimeout(operation.plusSeconds(1)); @@ -383,7 +347,7 @@ public > T withDefaultTimeout(T settings) { } public > T withRequestTimeout(T builder) { - Duration operation = operationProps.getDeadlineTimeout(); + Duration operation = operationOptions.getDeadlineTimeout(); if (operation.isNegative() || operation.isZero()) { return builder; } diff --git a/jdbc/src/main/java/tech/ydb/jdbc/context/YdbValidator.java b/jdbc/src/main/java/tech/ydb/jdbc/context/YdbValidator.java index 7c282ee5..cc350def 100644 --- a/jdbc/src/main/java/tech/ydb/jdbc/context/YdbValidator.java +++ b/jdbc/src/main/java/tech/ydb/jdbc/context/YdbValidator.java @@ -66,7 +66,8 @@ private T joinFuture(Supplier> supplier) { } } - public void validate(String msg, YdbTracer tracer, Status status) throws SQLException { + public void execute(String msg, YdbTracer tracer, Supplier> fn) throws SQLException { + Status status = joinFuture(fn); addStatusIssues(status); tracer.trace("<-- " + status.toString()); @@ -78,11 +79,6 @@ public void validate(String msg, YdbTracer tracer, Status status) throws SQLExce } } - public void execute(String msg, YdbTracer tracer, Supplier> fn) throws SQLException { - Status status = joinFuture(fn); - validate(msg, tracer, status); - } - public R call(String msg, YdbTracer tracer, Supplier>> fn) throws SQLException { try { Result result = joinFuture(fn); From b977127427040db5ce73429a71ae8ff2e32e1c1d Mon Sep 17 00:00:00 2001 From: Alexandr Gorshenin Date: Thu, 5 Jun 2025 11:01:20 +0100 Subject: [PATCH 6/7] Replace two options by one --- .../tech/ydb/jdbc/context/YdbContext.java | 5 ++-- .../tech/ydb/jdbc/settings/YdbConfig.java | 2 -- .../jdbc/settings/YdbOperationProperties.java | 27 ++++++------------- .../settings/YdbDriverProperitesTest.java | 4 --- 4 files changed, 11 insertions(+), 27 deletions(-) diff --git a/jdbc/src/main/java/tech/ydb/jdbc/context/YdbContext.java b/jdbc/src/main/java/tech/ydb/jdbc/context/YdbContext.java index 4fd1997d..e5b94d1d 100644 --- a/jdbc/src/main/java/tech/ydb/jdbc/context/YdbContext.java +++ b/jdbc/src/main/java/tech/ydb/jdbc/context/YdbContext.java @@ -195,8 +195,9 @@ public String getUsername() { public YdbExecutor createExecutor() throws SQLException { if (config.isUseQueryService()) { - if (operationOptions.getProcessUndetermined()) { - String tablePath = joined(prefixPath, operationOptions.getProcessUndeterminedTable()); + String txValidationTable = operationOptions.getTxValidationTable(); + if (txValidationTable != null && !txValidationTable.isEmpty()) { + String tablePath = joined(prefixPath, txValidationTable); if (tableDescribeCache.getIfPresent(tablePath) == null) { tableDescribeCache.put(tablePath, TableTxExecutor.validate(this, tablePath)); } diff --git a/jdbc/src/main/java/tech/ydb/jdbc/settings/YdbConfig.java b/jdbc/src/main/java/tech/ydb/jdbc/settings/YdbConfig.java index 0107492b..6d18f782 100644 --- a/jdbc/src/main/java/tech/ydb/jdbc/settings/YdbConfig.java +++ b/jdbc/src/main/java/tech/ydb/jdbc/settings/YdbConfig.java @@ -211,8 +211,6 @@ public DriverPropertyInfo[] toPropertyInfo() throws SQLException { YdbClientProperties.SESSION_POOL_SIZE_MIN.toInfo(properties), YdbClientProperties.SESSION_POOL_SIZE_MAX.toInfo(properties), - YdbOperationProperties.PROCESS_UNDETERMINED_TABLE.toInfo(properties), - YdbOperationProperties.PROCESS_UNDETERMINED.toInfo(properties), YdbOperationProperties.USE_STREAM_RESULT_SETS.toInfo(properties), YdbOperationProperties.JOIN_DURATION.toInfo(properties), YdbOperationProperties.QUERY_TIMEOUT.toInfo(properties), diff --git a/jdbc/src/main/java/tech/ydb/jdbc/settings/YdbOperationProperties.java b/jdbc/src/main/java/tech/ydb/jdbc/settings/YdbOperationProperties.java index 2e011762..f53b458f 100644 --- a/jdbc/src/main/java/tech/ydb/jdbc/settings/YdbOperationProperties.java +++ b/jdbc/src/main/java/tech/ydb/jdbc/settings/YdbOperationProperties.java @@ -61,13 +61,8 @@ public class YdbOperationProperties { "Use new data types Date32/Datetime64/Timestamp64 by default", false ); - static final YdbProperty PROCESS_UNDETERMINED = YdbProperty.bool("processUndetermined", - "Enable automatic processing of UNDETERMINED errors", false - ); - - static final YdbProperty PROCESS_UNDETERMINED_TABLE = YdbProperty.string("processUndeterminedTable", - "Name of working table for automatic processing of UNDETERMINED errors", "ydb_transactions" - ); + static final YdbProperty TX_VALIDATION_TABLE = YdbProperty.string("withTxValidationTable", + "Name of working table to store transactions to avoid UNDETERMINED errors"); private static final int MAX_ROWS = 1000; // TODO: how to figure out the max rows of current connection? @@ -86,8 +81,7 @@ public class YdbOperationProperties { private final YdbValue useStreamResultSets; private final YdbValue forceNewDatetypes; - private final YdbValue processUndetermined; - private final YdbValue processUndeterminedTable; + private final YdbValue txValidationTable; public YdbOperationProperties(YdbConfig config) throws SQLException { Properties props = config.getProperties(); @@ -107,8 +101,7 @@ public YdbOperationProperties(YdbConfig config) throws SQLException { this.useStreamResultSets = USE_STREAM_RESULT_SETS.readValue(props); this.forceNewDatetypes = FORCE_NEW_DATETYPES.readValue(props); - this.processUndetermined = PROCESS_UNDETERMINED.readValue(props); - this.processUndeterminedTable = PROCESS_UNDETERMINED_TABLE.readValue(props); + this.txValidationTable = TX_VALIDATION_TABLE.readValue(props); } public Duration getJoinDuration() { @@ -163,15 +156,11 @@ public boolean getForceNewDatetypes() { return forceNewDatetypes.getValue(); } - public boolean getProcessUndetermined() { - return processUndetermined.getValue(); - } - - public String getProcessUndeterminedTable() { - return processUndeterminedTable.getValue(); - } - public int getMaxRows() { return MAX_ROWS; } + + public String getTxValidationTable() { + return txValidationTable.getValue(); + } } diff --git a/jdbc/src/test/java/tech/ydb/jdbc/settings/YdbDriverProperitesTest.java b/jdbc/src/test/java/tech/ydb/jdbc/settings/YdbDriverProperitesTest.java index 0666c3dd..9626d9f3 100644 --- a/jdbc/src/test/java/tech/ydb/jdbc/settings/YdbDriverProperitesTest.java +++ b/jdbc/src/test/java/tech/ydb/jdbc/settings/YdbDriverProperitesTest.java @@ -323,8 +323,6 @@ static DriverPropertyInfo[] defaultPropertyInfo(@Nullable String localDatacenter new DriverPropertyInfo("sessionMaxIdleTime", ""), new DriverPropertyInfo("sessionPoolSizeMin", ""), new DriverPropertyInfo("sessionPoolSizeMax", ""), - new DriverPropertyInfo("processUndeterminedTable", "ydb_transactions"), - new DriverPropertyInfo("processUndetermined", "false"), new DriverPropertyInfo("useStreamResultSets", "false"), new DriverPropertyInfo("joinDuration", "5m"), new DriverPropertyInfo("queryTimeout", "0s"), @@ -373,8 +371,6 @@ static DriverPropertyInfo[] customizedPropertyInfo() { new DriverPropertyInfo("sessionMaxIdleTime", "5m"), new DriverPropertyInfo("sessionPoolSizeMin", "3"), new DriverPropertyInfo("sessionPoolSizeMax", "4"), - new DriverPropertyInfo("processUndeterminedTable", "ydb_transactions"), - new DriverPropertyInfo("processUndetermined", "false"), new DriverPropertyInfo("useStreamResultSets", "true"), new DriverPropertyInfo("joinDuration", "6m"), new DriverPropertyInfo("queryTimeout", "2m"), From 808b1fcdd75439561fa03255ee5ff8c3f50eca9c Mon Sep 17 00:00:00 2001 From: Alexandr Gorshenin Date: Thu, 5 Jun 2025 12:24:56 +0100 Subject: [PATCH 7/7] Added tests for TableTxExecutor --- .../ydb/jdbc/context/TableTxExecutor.java | 11 +- .../ydb/jdbc/YdbDriverTxValidateTest.java | 155 ++++++++++++++++++ 2 files changed, 161 insertions(+), 5 deletions(-) create mode 100644 jdbc/src/test/java/tech/ydb/jdbc/YdbDriverTxValidateTest.java diff --git a/jdbc/src/main/java/tech/ydb/jdbc/context/TableTxExecutor.java b/jdbc/src/main/java/tech/ydb/jdbc/context/TableTxExecutor.java index 482138bc..4eafd3d9 100644 --- a/jdbc/src/main/java/tech/ydb/jdbc/context/TableTxExecutor.java +++ b/jdbc/src/main/java/tech/ydb/jdbc/context/TableTxExecutor.java @@ -89,21 +89,22 @@ protected void commitImpl(YdbContext ctx, YdbValidator validator, QueryTransacti return; } - YdbTracer tracer = ctx.getTracer(); String hash = Hashing.sha256().hashBytes(tx.getId().getBytes()).toString(); - tracer.trace("--> commit-and-store-tx " + hash); - tracer.query(commitQuery); - Params params = Params.of( "$hash", PrimitiveValue.newText(hash), "$tx", PrimitiveValue.newText(tx.getId()) ); + YdbTracer tracer = ctx.getTracer(); ExecuteQuerySettings settings = ctx.withRequestTimeout(ExecuteQuerySettings.newBuilder()).build(); try { QueryStream query = tx.createQuery(commitQuery, true, params, settings); validator.clearWarnings(); - validator.call("CommitAndStore TxId: " + tx.getId(), tracer, query::execute); + validator.call("CommitAndStore TxId: " + tx.getId(), tracer, () -> { + tracer.trace("--> commit-and-store-tx " + hash); + tracer.query(commitQuery); + return query.execute(); + }); } catch (YdbConditionallyRetryableException ex) { try (Session session = createNewTableSession(validator)) { diff --git a/jdbc/src/test/java/tech/ydb/jdbc/YdbDriverTxValidateTest.java b/jdbc/src/test/java/tech/ydb/jdbc/YdbDriverTxValidateTest.java new file mode 100644 index 00000000..a0cb56d3 --- /dev/null +++ b/jdbc/src/test/java/tech/ydb/jdbc/YdbDriverTxValidateTest.java @@ -0,0 +1,155 @@ +package tech.ydb.jdbc; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; + +import org.junit.Assert; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import tech.ydb.core.Status; +import tech.ydb.core.StatusCode; +import tech.ydb.core.UnexpectedResultException; +import tech.ydb.jdbc.impl.YdbTracerImpl; +import tech.ydb.jdbc.impl.helper.ExceptionAssert; +import tech.ydb.jdbc.impl.helper.JdbcConnectionExtention; +import tech.ydb.jdbc.impl.helper.JdbcUrlHelper; +import tech.ydb.test.junit5.YdbHelperExtension; + +/** + * + * @author Aleksandr Gorshenin + */ +public class YdbDriverTxValidateTest { + + @RegisterExtension + private static final YdbHelperExtension ydb = new YdbHelperExtension(); + + @RegisterExtension + private static final JdbcConnectionExtention jdbc = new JdbcConnectionExtention(ydb) + .withArg("usePrefixPath", "tx_validated"); + + private static final JdbcUrlHelper jdbcURL = new JdbcUrlHelper(ydb) + .withArg("enableTxTracer", "true") + .withArg("usePrefixPath", "tx_validated"); + + private void assertTxCount(String tableName, long count) throws SQLException { + try (Statement st = jdbc.connection().createStatement()) { + try (ResultSet rs = st.executeQuery("select count(*) from " + tableName)) { + Assertions.assertTrue(rs.next()); + Assertions.assertEquals(count, rs.getLong(1)); + Assertions.assertFalse(rs.next()); + } + } + } + + @Test + public void basicUsageTest() throws SQLException { + String url = jdbcURL.withArg("withTxValidationTable", "tx_store").build(); + try (Connection conn = DriverManager.getConnection(url)) { + // table was created automatically + assertTxCount("tx_store", 0); + + // statements with auto commit won't be validated + try (Statement st = conn.createStatement()) { + Assertions.assertTrue(st.execute("SELECT * FROM tx_store")); + Assertions.assertFalse(st.execute("DELETE FROM tx_store")); + } + + assertTxCount("tx_store", 0); + + conn.setAutoCommit(false); + + // scheme statements and read won't be validated + Assertions.assertFalse(conn.createStatement() + .execute("CREATE TABLE tmp (id Int32, vv UInt64, PRIMARY KEY(id))")); + conn.commit(); + assertTxCount("tx_store", 0); + + // read tx wont' be validated + Assertions.assertTrue(conn.createStatement().execute("SELECT * FROM tmp;")); + conn.commit(); + assertTxCount("tx_store", 0); + + // rollbacked tx wont' be validated + Assertions.assertFalse(conn.createStatement().execute("INSERT INTO tmp(id, vv) VALUES (1, 1);")); + conn.rollback(); + assertTxCount("tx_store", 0); + + Assertions.assertFalse(conn.createStatement().execute("INSERT INTO tmp(id, vv) VALUES (1, 1);")); + conn.commit(); + assertTxCount("tx_store", 1); + } + + try (Connection conn = DriverManager.getConnection(url)) { + conn.createStatement().execute("DROP TABLE tmp"); + // reuse current table + assertTxCount("tx_store", 1); + } finally { + jdbc.connection().createStatement().execute("DROP TABLE tx_store"); + } + } + + @Test + public void commitedTxTest() throws SQLException { + String url = jdbcURL.withArg("withTxValidationTable", "tx1_store").build(); + try (Connection conn = DriverManager.getConnection(url)) { + ErrorTxTracer tracer = YdbTracerImpl.use(new ErrorTxTracer()); + // table was created automatically + assertTxCount("tx1_store", 0); + + conn.setAutoCommit(false); + + conn.createStatement().execute("DELETE FROM tx1_store"); + // throw condintionally retryable exception AFTER commit + tracer.throwErrorOn("<-- Status", Status.of(StatusCode.UNDETERMINED)); + conn.commit(); // no error, tx is validated successfully + + assertTxCount("tx1_store", 1); + + conn.createStatement().execute("DELETE FROM tx1_store"); + // throw condintionally retryable exception BEFORE commit + tracer.throwErrorOn("--> commit-and-store-tx", Status.of(StatusCode.UNDETERMINED)); + ExceptionAssert.sqlRecoverable("Transaction wasn't committed", conn::commit); + + Assert.assertNull(tracer.error); + } finally { + jdbc.connection().createStatement().execute("DROP TABLE tx1_store"); + } + } + + @Test + public void invalididTxTableTest() throws SQLException { + String url = jdbcURL.withArg("withTxValidationTable", "tx store").build(); + ExceptionAssert.ydbException( + "Cannot initialize TableTxExecutor with tx table " + ydb.database() + "/tx_validated/tx store", + () -> DriverManager.getConnection(url) + ); + } + + private class ErrorTxTracer extends YdbTracerImpl { + private String traceMsg = null; + private Status error = null; + + public void throwErrorOn(String traceMsg, Status error) { + this.traceMsg = traceMsg; + this.error = error; + } + + @Override + public void trace(String message) { + super.trace(message); + System.out.println("TRACE " + message); + if (traceMsg != null && error != null && message.startsWith(traceMsg)) { + Status status = error; + error = null; + traceMsg = null; + throw new UnexpectedResultException("Test error", status); + } + } + } +}