diff --git a/jdbc/src/main/java/tech/ydb/jdbc/context/QueryServiceExecutor.java b/jdbc/src/main/java/tech/ydb/jdbc/context/QueryServiceExecutor.java index 90d2ae7..ab32014 100644 --- a/jdbc/src/main/java/tech/ydb/jdbc/context/QueryServiceExecutor.java +++ b/jdbc/src/main/java/tech/ydb/jdbc/context/QueryServiceExecutor.java @@ -23,6 +23,7 @@ import tech.ydb.jdbc.impl.YdbStaticResultSet; import tech.ydb.jdbc.query.QueryType; import tech.ydb.jdbc.query.YdbQuery; +import tech.ydb.jdbc.settings.YdbOperationProperties; import tech.ydb.query.QueryClient; import tech.ydb.query.QuerySession; import tech.ydb.query.QueryStream; @@ -54,15 +55,16 @@ public class QueryServiceExecutor extends BaseYdbExecutor { private final AtomicReference tx = new AtomicReference<>(); private volatile boolean isClosed; - public QueryServiceExecutor(YdbContext ctx, int transactionLevel, boolean autoCommit) throws SQLException { + public QueryServiceExecutor(YdbContext ctx) throws SQLException { super(ctx); - this.sessionTimeout = ctx.getOperationProperties().getSessionTimeout(); + YdbOperationProperties options = ctx.getOperationProperties(); + this.sessionTimeout = options.getSessionTimeout(); this.queryClient = ctx.getQueryClient(); - this.useStreamResultSet = ctx.getOperationProperties().getUseStreamResultSets(); + this.useStreamResultSet = options.getUseStreamResultSets(); - this.transactionLevel = transactionLevel; + this.transactionLevel = options.getTransactionLevel(); + this.isAutoCommit = options.isAutoCommit(); this.isReadOnly = transactionLevel != Connection.TRANSACTION_SERIALIZABLE; - this.isAutoCommit = autoCommit; this.txMode = txMode(transactionLevel, isReadOnly); this.isClosed = false; } @@ -179,22 +181,26 @@ public void commit(YdbContext ctx, YdbValidator validator) throws SQLException { return; } - YdbTracer tracer = ctx.getTracer(); - tracer.trace("--> commit"); - tracer.query(null); - - CommitTransactionSettings settings = ctx.withRequestTimeout(CommitTransactionSettings.newBuilder()).build(); try { - validator.clearWarnings(); - validator.call("Commit TxId: " + localTx.getId(), tracer, () -> localTx.commit(settings)); + commitImpl(ctx, validator, localTx); } finally { if (tx.compareAndSet(localTx, null)) { localTx.getSession().close(); } - tracer.close(); + ctx.getTracer().close(); } } + protected void commitImpl(YdbContext ctx, YdbValidator validator, QueryTransaction tx) throws SQLException { + YdbTracer tracer = ctx.getTracer(); + tracer.trace("--> commit"); + tracer.query(null); + + CommitTransactionSettings settings = ctx.withRequestTimeout(CommitTransactionSettings.newBuilder()).build(); + validator.clearWarnings(); + validator.call("Commit TxId: " + tx.getId(), tracer, () -> tx.commit(settings)); + } + @Override public void rollback(YdbContext ctx, YdbValidator validator) throws SQLException { ensureOpened(); @@ -223,9 +229,8 @@ public void rollback(YdbContext ctx, YdbValidator validator) throws SQLException } @Override - public YdbQueryResult executeDataQuery( - YdbStatement statement, YdbQuery query, String preparedYql, Params params, long timeout, boolean keepInCache - ) throws SQLException { + public YdbQueryResult executeDataQuery(YdbStatement statement, YdbQuery query, String preparedYql, Params params, + long timeout, boolean keepInCache) throws SQLException { ensureOpened(); YdbValidator validator = statement.getValidator(); diff --git a/jdbc/src/main/java/tech/ydb/jdbc/context/TableServiceExecutor.java b/jdbc/src/main/java/tech/ydb/jdbc/context/TableServiceExecutor.java index 7fcb506..02240fb 100644 --- a/jdbc/src/main/java/tech/ydb/jdbc/context/TableServiceExecutor.java +++ b/jdbc/src/main/java/tech/ydb/jdbc/context/TableServiceExecutor.java @@ -15,6 +15,7 @@ import tech.ydb.jdbc.impl.YdbStaticResultSet; import tech.ydb.jdbc.query.QueryType; import tech.ydb.jdbc.query.YdbQuery; +import tech.ydb.jdbc.settings.YdbOperationProperties; import tech.ydb.table.Session; import tech.ydb.table.query.DataQueryResult; import tech.ydb.table.query.ExplainDataQueryResult; @@ -35,10 +36,11 @@ public class TableServiceExecutor extends BaseYdbExecutor { private final boolean failOnTruncatedResult; private volatile TxState tx; - public TableServiceExecutor(YdbContext ctx, int transactionLevel, boolean autoCommit) throws SQLException { + public TableServiceExecutor(YdbContext ctx) throws SQLException { super(ctx); - this.tx = createTx(transactionLevel, autoCommit); - this.failOnTruncatedResult = ctx.getOperationProperties().isFailOnTruncatedResult(); + YdbOperationProperties options = ctx.getOperationProperties(); + this.tx = createTx(options.getTransactionLevel(), options.isAutoCommit()); + this.failOnTruncatedResult = options.isFailOnTruncatedResult(); } @Override diff --git a/jdbc/src/main/java/tech/ydb/jdbc/context/TableTxExecutor.java b/jdbc/src/main/java/tech/ydb/jdbc/context/TableTxExecutor.java new file mode 100644 index 0000000..4eafd3d --- /dev/null +++ b/jdbc/src/main/java/tech/ydb/jdbc/context/TableTxExecutor.java @@ -0,0 +1,167 @@ +package tech.ydb.jdbc.context; + +import java.sql.SQLException; + +import com.google.common.hash.Hashing; + +import tech.ydb.core.Result; +import tech.ydb.core.Status; +import tech.ydb.core.StatusCode; +import tech.ydb.core.UnexpectedResultException; +import tech.ydb.jdbc.YdbStatement; +import tech.ydb.jdbc.YdbTracer; +import tech.ydb.jdbc.exception.ExceptionFactory; +import tech.ydb.jdbc.exception.YdbConditionallyRetryableException; +import tech.ydb.jdbc.impl.YdbQueryResult; +import tech.ydb.jdbc.query.YdbQuery; +import tech.ydb.query.QueryStream; +import tech.ydb.query.QueryTransaction; +import tech.ydb.query.settings.ExecuteQuerySettings; +import tech.ydb.table.Session; +import tech.ydb.table.description.TableDescription; +import tech.ydb.table.query.DataQueryResult; +import tech.ydb.table.query.Params; +import tech.ydb.table.transaction.TxControl; +import tech.ydb.table.values.PrimitiveValue; + +/** + * + * @author mzinal + */ +public class TableTxExecutor extends QueryServiceExecutor { + private static final String CREATE_SQL = "" + + "CREATE TABLE IF NOT EXISTS `%s` (" + + " hash Text NOT NULL," + + " tx_id Text NOT NULL," + + " committed_at Timestamp," + + " PRIMARY KEY (hash, tx_id)" + + ") WITH (" + + " TTL=Interval('PT60M') ON committed_at," + + " AUTO_PARTITIONING_BY_LOAD=ENABLED," + + " AUTO_PARTITIONING_BY_SIZE=ENABLED," + + " AUTO_PARTITIONING_PARTITION_SIZE_MB=100" + + ");"; + + private static final String COMMIT_SQL = "" + + "DECLARE $hash AS Text; " + + "DECLARE $tx AS Text; " + + "UPSERT INTO `%s` (hash, tx_id, committed_at) VALUES ($hash, $tx, CurrentUtcTimestamp());"; + + private static final String VALIDATE_SQL = "" + + "DECLARE $hash AS Text; " + + "DECLARE $tx AS Text; " + + "SELECT hash, tx_id FROM `%s` WHERE hash=$hash AND tx_id=$tx;"; + + private final String commitQuery; + private final String validateQuery; + private final String txTablePath; + private boolean isWriteTx; + + public TableTxExecutor(YdbContext ctx, String tablePath) throws SQLException { + super(ctx); + this.txTablePath = tablePath; + this.commitQuery = String.format(COMMIT_SQL, tablePath); + this.validateQuery = String.format(VALIDATE_SQL, tablePath); + this.isWriteTx = false; + } + + @Override + public void rollback(YdbContext ctx, YdbValidator validator) throws SQLException { + isWriteTx = false; + super.rollback(ctx, validator); + } + + @Override + public YdbQueryResult executeDataQuery(YdbStatement statement, YdbQuery query, String preparedYql, Params params, + long timeout, boolean keepInCache) throws SQLException { + YdbQueryResult result = super.executeDataQuery(statement, query, preparedYql, params, timeout, keepInCache); + isWriteTx = isInsideTransaction() && (isWriteTx || query.isWriting()); + + return result; + } + + @Override + protected void commitImpl(YdbContext ctx, YdbValidator validator, QueryTransaction tx) throws SQLException { + boolean storeTx = isWriteTx; + isWriteTx = false; + if (!storeTx) { + super.commitImpl(ctx, validator, tx); + return; + } + + String hash = Hashing.sha256().hashBytes(tx.getId().getBytes()).toString(); + Params params = Params.of( + "$hash", PrimitiveValue.newText(hash), + "$tx", PrimitiveValue.newText(tx.getId()) + ); + + YdbTracer tracer = ctx.getTracer(); + ExecuteQuerySettings settings = ctx.withRequestTimeout(ExecuteQuerySettings.newBuilder()).build(); + try { + QueryStream query = tx.createQuery(commitQuery, true, params, settings); + validator.clearWarnings(); + validator.call("CommitAndStore TxId: " + tx.getId(), tracer, () -> { + tracer.trace("--> commit-and-store-tx " + hash); + tracer.query(commitQuery); + return query.execute(); + }); + } catch (YdbConditionallyRetryableException ex) { + + try (Session session = createNewTableSession(validator)) { + tracer.trace("--> validate tx"); + tracer.query(validateQuery); + Result res = session.executeDataQuery(validateQuery, TxControl.snapshotRo(), params) + .join(); + if (res.isSuccess()) { + DataQueryResult dqr = res.getValue(); + if (dqr.getResultSetCount() == 1) { + if (dqr.getResultSet(0).getRowCount() == 1) { + // Transaction was committed successfully + return; + } else { + // Transaction wann't commit + Status status = Status.of(StatusCode.ABORTED).withCause(ex); + throw ExceptionFactory.createException("Transaction wasn't committed", + new UnexpectedResultException("Transaction not found in " + txTablePath, status) + ); + } + } + } + } + + throw ex; + } + } + + public static TableDescription validate(YdbContext ctx, String tablePath) throws SQLException { + // validate table name + Result res = ctx.getRetryCtx().supplyResult(s -> s.describeTable(tablePath)).join(); + if (res.isSuccess()) { + return res.getValue(); + } + + if (res.getStatus().getCode() != StatusCode.SCHEME_ERROR) { + throw ExceptionFactory.createException( + "Cannot initialize TableTxExecutor with tx table " + tablePath, + new UnexpectedResultException("Cannot describe", res.getStatus())); + } + + // Try to create a table + String query = String.format(CREATE_SQL, tablePath); + Status status = ctx.getRetryCtx().supplyStatus(session -> session.executeSchemeQuery(query)).join(); + if (!status.isSuccess()) { + throw ExceptionFactory.createException( + "Cannot initialize TableTxExecutor with tx table " + tablePath, + new UnexpectedResultException("Cannot create table", status)); + } + + Result res2 = ctx.getRetryCtx().supplyResult(s -> s.describeTable(tablePath)).join(); + if (!res2.isSuccess()) { + throw ExceptionFactory.createException( + "Cannot initialize TableTxExecutor with tx table " + tablePath, + new UnexpectedResultException("Cannot describe after creating", res2.getStatus())); + } + + return res2.getValue(); + } +} diff --git a/jdbc/src/main/java/tech/ydb/jdbc/context/YdbContext.java b/jdbc/src/main/java/tech/ydb/jdbc/context/YdbContext.java index 7801bab..e5b94d1 100644 --- a/jdbc/src/main/java/tech/ydb/jdbc/context/YdbContext.java +++ b/jdbc/src/main/java/tech/ydb/jdbc/context/YdbContext.java @@ -71,7 +71,7 @@ public class YdbContext implements AutoCloseable { private final YdbConfig config; - private final YdbOperationProperties operationProps; + private final YdbOperationProperties operationOptions; private final YdbQueryProperties queryOptions; private final YdbTypes types; @@ -102,7 +102,7 @@ private YdbContext( ) { this.config = config; - this.operationProps = operationProperties; + this.operationOptions = operationProperties; this.queryOptions = queryProperties; this.autoResizeSessionPool = autoResize; @@ -195,9 +195,17 @@ public String getUsername() { public YdbExecutor createExecutor() throws SQLException { if (config.isUseQueryService()) { - return new QueryServiceExecutor(this, operationProps.getTransactionLevel(), operationProps.isAutoCommit()); + String txValidationTable = operationOptions.getTxValidationTable(); + if (txValidationTable != null && !txValidationTable.isEmpty()) { + String tablePath = joined(prefixPath, txValidationTable); + if (tableDescribeCache.getIfPresent(tablePath) == null) { + tableDescribeCache.put(tablePath, TableTxExecutor.validate(this, tablePath)); + } + return new TableTxExecutor(this, tablePath); + } + return new QueryServiceExecutor(this); } else { - return new TableServiceExecutor(this, operationProps.getTransactionLevel(), operationProps.isAutoCommit()); + return new TableServiceExecutor(this); } } @@ -206,7 +214,7 @@ public int getConnectionsCount() { } public YdbOperationProperties getOperationProperties() { - return operationProps; + return operationOptions; } @Override @@ -277,6 +285,7 @@ public void deregister() { } public static YdbContext createContext(YdbConfig config) throws SQLException { + GrpcTransport grpcTransport = null; try { LOGGER.log(Level.FINE, "Creating new YDB context to {0}", config.getConnectionString()); @@ -300,7 +309,7 @@ public static YdbContext createContext(YdbConfig config) throws SQLException { }); }); - GrpcTransport grpcTransport = builder.build(); + grpcTransport = builder.build(); PooledTableClient.Builder tableClient = PooledTableClient.newClient( GrpcTableRpc.useTransport(grpcTransport) @@ -309,9 +318,16 @@ public static YdbContext createContext(YdbConfig config) throws SQLException { boolean autoResize = clientProps.applyToTableClient(tableClient, queryClient); - return new YdbContext(config, operationProps, queryProps, grpcTransport, - tableClient.build(), queryClient.build(), autoResize); + return new YdbContext(config, operationProps, queryProps, grpcTransport, tableClient.build(), + queryClient.build(), autoResize); } catch (RuntimeException ex) { + if (grpcTransport != null) { + try { + grpcTransport.close(); + } catch (Exception exClose) { + LOGGER.log(Level.FINE, "Issue when closing gRPC transport", exClose); + } + } StringBuilder sb = new StringBuilder("Cannot connect to YDB: ").append(ex.getMessage()); Throwable cause = ex.getCause(); while (cause != null) { @@ -323,7 +339,7 @@ public static YdbContext createContext(YdbConfig config) throws SQLException { } public > T withDefaultTimeout(T settings) { - Duration operation = operationProps.getDeadlineTimeout(); + Duration operation = operationOptions.getDeadlineTimeout(); if (!operation.isZero() && !operation.isNegative()) { settings.setOperationTimeout(operation); settings.setTimeout(operation.plusSeconds(1)); @@ -332,7 +348,7 @@ public > T withDefaultTimeout(T settings) { } public > T withRequestTimeout(T builder) { - Duration operation = operationProps.getDeadlineTimeout(); + Duration operation = operationOptions.getDeadlineTimeout(); if (operation.isNegative() || operation.isZero()) { return builder; } diff --git a/jdbc/src/main/java/tech/ydb/jdbc/query/QueryCmd.java b/jdbc/src/main/java/tech/ydb/jdbc/query/QueryCmd.java index 3b3d552..6033f79 100644 --- a/jdbc/src/main/java/tech/ydb/jdbc/query/QueryCmd.java +++ b/jdbc/src/main/java/tech/ydb/jdbc/query/QueryCmd.java @@ -7,7 +7,8 @@ public enum QueryCmd { UNKNOWN, SELECT, - CREATE_ALTER_DROP, - INSERT_UPSERT, - UPDATE_REPLACE_DELETE + /** CREATE, DROP, ALTER, GRANT, REVOKE */ + DDL, + /** INSERT, UPSERT, UPDATE, REPLACE, DELETE */ + DML } diff --git a/jdbc/src/main/java/tech/ydb/jdbc/query/QueryStatement.java b/jdbc/src/main/java/tech/ydb/jdbc/query/QueryStatement.java index e985653..a8f7d28 100644 --- a/jdbc/src/main/java/tech/ydb/jdbc/query/QueryStatement.java +++ b/jdbc/src/main/java/tech/ydb/jdbc/query/QueryStatement.java @@ -52,7 +52,7 @@ public void setHasGenerated(boolean hasGenerated) { } public boolean hasUpdateCount() { - return (command == QueryCmd.INSERT_UPSERT || command == QueryCmd.UPDATE_REPLACE_DELETE) && !hasReturinng; + return command == QueryCmd.DML && !hasReturinng; } public boolean hasUpdateWithGenerated() { @@ -64,6 +64,6 @@ public boolean hasResults() { } public boolean isDDL() { - return command == QueryCmd.CREATE_ALTER_DROP; + return command == QueryCmd.DDL; } } diff --git a/jdbc/src/main/java/tech/ydb/jdbc/query/YdbQuery.java b/jdbc/src/main/java/tech/ydb/jdbc/query/YdbQuery.java index ec516ac..9da5d79 100644 --- a/jdbc/src/main/java/tech/ydb/jdbc/query/YdbQuery.java +++ b/jdbc/src/main/java/tech/ydb/jdbc/query/YdbQuery.java @@ -1,7 +1,5 @@ package tech.ydb.jdbc.query; - - import java.sql.SQLException; import java.util.List; @@ -20,6 +18,7 @@ public class YdbQuery { private final QueryType type; private final boolean isPlainYQL; + private final boolean isWriting; YdbQuery(QueryKey key, String preparedYQL, List stats, YqlBatcher batcher, QueryType type) { this.key = key; @@ -29,16 +28,23 @@ public class YdbQuery { this.batcher = batcher; boolean hasJdbcParameters = false; + boolean hasDML = false; for (QueryStatement st: statements) { hasJdbcParameters = hasJdbcParameters || st.hasJdbcParameters(); + hasDML = hasDML || (st.getCmd() == QueryCmd.DML); } this.isPlainYQL = !hasJdbcParameters; + this.isWriting = (type == QueryType.DATA_QUERY) && hasDML; } public QueryType getType() { return type; } + public boolean isWriting() { + return isWriting; + } + public YqlBatcher getYqlBatcher() { return batcher.isValidBatch() ? batcher : null; } @@ -66,6 +72,7 @@ public List getStatements() { public static YdbQuery parseQuery(QueryKey query, YdbQueryProperties opts, YdbTypes types) throws SQLException { YdbQueryParser parser = new YdbQueryParser(types, query, opts); String preparedYQL = parser.parseSQL(); + boolean writing = false; QueryType type = null; YqlBatcher batcher = parser.getYqlBatcher(); diff --git a/jdbc/src/main/java/tech/ydb/jdbc/query/YdbQueryParser.java b/jdbc/src/main/java/tech/ydb/jdbc/query/YdbQueryParser.java index a15c0c1..9c9c511 100644 --- a/jdbc/src/main/java/tech/ydb/jdbc/query/YdbQueryParser.java +++ b/jdbc/src/main/java/tech/ydb/jdbc/query/YdbQueryParser.java @@ -1,6 +1,5 @@ package tech.ydb.jdbc.query; - import java.sql.SQLException; import java.sql.SQLFeatureNotSupportedException; import java.util.ArrayList; @@ -239,25 +238,25 @@ public String parseSQL() throws SQLException { // starts with INSERT, UPSERT if (parseInsertKeyword(chars, keywordStart, keywordLength)) { - statement = new QueryStatement(type, QueryType.DATA_QUERY, QueryCmd.INSERT_UPSERT); + statement = new QueryStatement(type, QueryType.DATA_QUERY, QueryCmd.DML); batcher.readInsert(); } if (parseUpsertKeyword(chars, keywordStart, keywordLength)) { - statement = new QueryStatement(type, QueryType.DATA_QUERY, QueryCmd.INSERT_UPSERT); + statement = new QueryStatement(type, QueryType.DATA_QUERY, QueryCmd.DML); batcher.readUpsert(); } // starts with UPDATE, REPLACE, DELETE if (parseUpdateKeyword(chars, keywordStart, keywordLength)) { - statement = new QueryStatement(type, QueryType.DATA_QUERY, QueryCmd.UPDATE_REPLACE_DELETE); + statement = new QueryStatement(type, QueryType.DATA_QUERY, QueryCmd.DML); batcher.readUpdate(); } if (parseDeleteKeyword(chars, keywordStart, keywordLength)) { - statement = new QueryStatement(type, QueryType.DATA_QUERY, QueryCmd.UPDATE_REPLACE_DELETE); + statement = new QueryStatement(type, QueryType.DATA_QUERY, QueryCmd.DML); batcher.readDelete(); } if (parseReplaceKeyword(chars, keywordStart, keywordLength)) { - statement = new QueryStatement(type, QueryType.DATA_QUERY, QueryCmd.UPDATE_REPLACE_DELETE); + statement = new QueryStatement(type, QueryType.DATA_QUERY, QueryCmd.DML); batcher.readReplace(); } @@ -268,7 +267,7 @@ public String parseSQL() throws SQLException { || parseGrantKeyword(chars, keywordStart, keywordLength) || parseRevokeKeyword(chars, keywordStart, keywordLength) ) { - statement = new QueryStatement(type, QueryType.SCHEME_QUERY, QueryCmd.CREATE_ALTER_DROP); + statement = new QueryStatement(type, QueryType.SCHEME_QUERY, QueryCmd.DDL); batcher.readIdentifier(chars, keywordStart, keywordLength); } @@ -332,7 +331,7 @@ private void addReturning(StringBuilder parsed, QueryStatement st) throws SQLExc if (st == null || returning == null || st.hasResults()) { return; } - if (st.getCmd() != QueryCmd.INSERT_UPSERT && st.getCmd() != QueryCmd.UPDATE_REPLACE_DELETE) { + if (st.getCmd() != QueryCmd.DML) { return; } diff --git a/jdbc/src/main/java/tech/ydb/jdbc/settings/YdbOperationProperties.java b/jdbc/src/main/java/tech/ydb/jdbc/settings/YdbOperationProperties.java index 49079cf..f53b458 100644 --- a/jdbc/src/main/java/tech/ydb/jdbc/settings/YdbOperationProperties.java +++ b/jdbc/src/main/java/tech/ydb/jdbc/settings/YdbOperationProperties.java @@ -61,6 +61,9 @@ public class YdbOperationProperties { "Use new data types Date32/Datetime64/Timestamp64 by default", false ); + static final YdbProperty TX_VALIDATION_TABLE = YdbProperty.string("withTxValidationTable", + "Name of working table to store transactions to avoid UNDETERMINED errors"); + private static final int MAX_ROWS = 1000; // TODO: how to figure out the max rows of current connection? private final YdbValue joinDuration; @@ -78,6 +81,7 @@ public class YdbOperationProperties { private final YdbValue useStreamResultSets; private final YdbValue forceNewDatetypes; + private final YdbValue txValidationTable; public YdbOperationProperties(YdbConfig config) throws SQLException { Properties props = config.getProperties(); @@ -97,6 +101,7 @@ public YdbOperationProperties(YdbConfig config) throws SQLException { this.useStreamResultSets = USE_STREAM_RESULT_SETS.readValue(props); this.forceNewDatetypes = FORCE_NEW_DATETYPES.readValue(props); + this.txValidationTable = TX_VALIDATION_TABLE.readValue(props); } public Duration getJoinDuration() { @@ -154,4 +159,8 @@ public boolean getForceNewDatetypes() { public int getMaxRows() { return MAX_ROWS; } + + public String getTxValidationTable() { + return txValidationTable.getValue(); + } } diff --git a/jdbc/src/test/java/tech/ydb/jdbc/YdbDriverTxValidateTest.java b/jdbc/src/test/java/tech/ydb/jdbc/YdbDriverTxValidateTest.java new file mode 100644 index 0000000..a0cb56d --- /dev/null +++ b/jdbc/src/test/java/tech/ydb/jdbc/YdbDriverTxValidateTest.java @@ -0,0 +1,155 @@ +package tech.ydb.jdbc; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; + +import org.junit.Assert; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import tech.ydb.core.Status; +import tech.ydb.core.StatusCode; +import tech.ydb.core.UnexpectedResultException; +import tech.ydb.jdbc.impl.YdbTracerImpl; +import tech.ydb.jdbc.impl.helper.ExceptionAssert; +import tech.ydb.jdbc.impl.helper.JdbcConnectionExtention; +import tech.ydb.jdbc.impl.helper.JdbcUrlHelper; +import tech.ydb.test.junit5.YdbHelperExtension; + +/** + * + * @author Aleksandr Gorshenin + */ +public class YdbDriverTxValidateTest { + + @RegisterExtension + private static final YdbHelperExtension ydb = new YdbHelperExtension(); + + @RegisterExtension + private static final JdbcConnectionExtention jdbc = new JdbcConnectionExtention(ydb) + .withArg("usePrefixPath", "tx_validated"); + + private static final JdbcUrlHelper jdbcURL = new JdbcUrlHelper(ydb) + .withArg("enableTxTracer", "true") + .withArg("usePrefixPath", "tx_validated"); + + private void assertTxCount(String tableName, long count) throws SQLException { + try (Statement st = jdbc.connection().createStatement()) { + try (ResultSet rs = st.executeQuery("select count(*) from " + tableName)) { + Assertions.assertTrue(rs.next()); + Assertions.assertEquals(count, rs.getLong(1)); + Assertions.assertFalse(rs.next()); + } + } + } + + @Test + public void basicUsageTest() throws SQLException { + String url = jdbcURL.withArg("withTxValidationTable", "tx_store").build(); + try (Connection conn = DriverManager.getConnection(url)) { + // table was created automatically + assertTxCount("tx_store", 0); + + // statements with auto commit won't be validated + try (Statement st = conn.createStatement()) { + Assertions.assertTrue(st.execute("SELECT * FROM tx_store")); + Assertions.assertFalse(st.execute("DELETE FROM tx_store")); + } + + assertTxCount("tx_store", 0); + + conn.setAutoCommit(false); + + // scheme statements and read won't be validated + Assertions.assertFalse(conn.createStatement() + .execute("CREATE TABLE tmp (id Int32, vv UInt64, PRIMARY KEY(id))")); + conn.commit(); + assertTxCount("tx_store", 0); + + // read tx wont' be validated + Assertions.assertTrue(conn.createStatement().execute("SELECT * FROM tmp;")); + conn.commit(); + assertTxCount("tx_store", 0); + + // rollbacked tx wont' be validated + Assertions.assertFalse(conn.createStatement().execute("INSERT INTO tmp(id, vv) VALUES (1, 1);")); + conn.rollback(); + assertTxCount("tx_store", 0); + + Assertions.assertFalse(conn.createStatement().execute("INSERT INTO tmp(id, vv) VALUES (1, 1);")); + conn.commit(); + assertTxCount("tx_store", 1); + } + + try (Connection conn = DriverManager.getConnection(url)) { + conn.createStatement().execute("DROP TABLE tmp"); + // reuse current table + assertTxCount("tx_store", 1); + } finally { + jdbc.connection().createStatement().execute("DROP TABLE tx_store"); + } + } + + @Test + public void commitedTxTest() throws SQLException { + String url = jdbcURL.withArg("withTxValidationTable", "tx1_store").build(); + try (Connection conn = DriverManager.getConnection(url)) { + ErrorTxTracer tracer = YdbTracerImpl.use(new ErrorTxTracer()); + // table was created automatically + assertTxCount("tx1_store", 0); + + conn.setAutoCommit(false); + + conn.createStatement().execute("DELETE FROM tx1_store"); + // throw condintionally retryable exception AFTER commit + tracer.throwErrorOn("<-- Status", Status.of(StatusCode.UNDETERMINED)); + conn.commit(); // no error, tx is validated successfully + + assertTxCount("tx1_store", 1); + + conn.createStatement().execute("DELETE FROM tx1_store"); + // throw condintionally retryable exception BEFORE commit + tracer.throwErrorOn("--> commit-and-store-tx", Status.of(StatusCode.UNDETERMINED)); + ExceptionAssert.sqlRecoverable("Transaction wasn't committed", conn::commit); + + Assert.assertNull(tracer.error); + } finally { + jdbc.connection().createStatement().execute("DROP TABLE tx1_store"); + } + } + + @Test + public void invalididTxTableTest() throws SQLException { + String url = jdbcURL.withArg("withTxValidationTable", "tx store").build(); + ExceptionAssert.ydbException( + "Cannot initialize TableTxExecutor with tx table " + ydb.database() + "/tx_validated/tx store", + () -> DriverManager.getConnection(url) + ); + } + + private class ErrorTxTracer extends YdbTracerImpl { + private String traceMsg = null; + private Status error = null; + + public void throwErrorOn(String traceMsg, Status error) { + this.traceMsg = traceMsg; + this.error = error; + } + + @Override + public void trace(String message) { + super.trace(message); + System.out.println("TRACE " + message); + if (traceMsg != null && error != null && message.startsWith(traceMsg)) { + Status status = error; + error = null; + traceMsg = null; + throw new UnexpectedResultException("Test error", status); + } + } + } +} diff --git a/jdbc/src/test/java/tech/ydb/jdbc/query/YdbQueryParserTest.java b/jdbc/src/test/java/tech/ydb/jdbc/query/YdbQueryParserTest.java index 92f708f..53bfffc 100644 --- a/jdbc/src/test/java/tech/ydb/jdbc/query/YdbQueryParserTest.java +++ b/jdbc/src/test/java/tech/ydb/jdbc/query/YdbQueryParserTest.java @@ -48,10 +48,10 @@ public void emptyQueryTest(String query, String prepared) throws SQLException { @ParameterizedTest(name = "[{index}] {0} is explain query") @CsvSource(value = { "'Explain select * from table', ' select * from table', SELECT", - "'exPlain\nupsert to', '\nupsert to', INSERT_UPSERT", + "'exPlain\nupsert to', '\nupsert to', DML", "' Explain select * from table', ' select * from table', SELECT", - "'\texPlain\nupsert to', '\t\nupsert to', INSERT_UPSERT", - "'EXPLAIN/*comment*/UPSERT INTO', '/*comment*/UPSERT INTO', INSERT_UPSERT", + "'\texPlain\nupsert to', '\t\nupsert to', DML", + "'EXPLAIN/*comment*/UPSERT INTO', '/*comment*/UPSERT INTO', DML", }) public void explainQueryTest(String query, String prepared, String cmd) throws SQLException { YdbQueryParser parser = new YdbQueryParser(types, query, props); @@ -83,7 +83,7 @@ public void schemeQueryTest(String query) throws SQLException { Assertions.assertEquals(1, parser.getStatements().size()); QueryStatement statement = parser.getStatements().get(0); Assertions.assertEquals(QueryType.SCHEME_QUERY, statement.getType()); - Assertions.assertEquals(QueryCmd.CREATE_ALTER_DROP, statement.getCmd()); + Assertions.assertEquals(QueryCmd.DDL, statement.getCmd()); } @ParameterizedTest(name = "[{index}] {0} is data query") @@ -415,7 +415,7 @@ public void batchedInsertTest(String query) throws SQLException { Assertions.assertEquals(1, parser.getStatements().size()); Assertions.assertEquals(QueryType.DATA_QUERY, parser.getStatements().get(0).getType()); - Assertions.assertEquals(QueryCmd.INSERT_UPSERT, parser.getStatements().get(0).getCmd()); + Assertions.assertEquals(QueryCmd.DML, parser.getStatements().get(0).getCmd()); YqlBatcher batch = parser.getYqlBatcher(); Assertions.assertTrue(batch.isValidBatch()); @@ -441,7 +441,7 @@ public void batchedInsertOneColumnTest(String query) throws SQLException { Assertions.assertEquals(1, parser.getStatements().size()); Assertions.assertEquals(QueryType.DATA_QUERY, parser.getStatements().get(0).getType()); - Assertions.assertEquals(QueryCmd.INSERT_UPSERT, parser.getStatements().get(0).getCmd()); + Assertions.assertEquals(QueryCmd.DML, parser.getStatements().get(0).getCmd()); YqlBatcher batch = parser.getYqlBatcher(); Assertions.assertTrue(batch.isValidBatch()); @@ -467,7 +467,7 @@ public void batchedUpsertTest(String query) throws SQLException { Assertions.assertEquals(1, parser.getStatements().size()); Assertions.assertEquals(QueryType.DATA_QUERY, parser.getStatements().get(0).getType()); - Assertions.assertEquals(QueryCmd.INSERT_UPSERT, parser.getStatements().get(0).getCmd()); + Assertions.assertEquals(QueryCmd.DML, parser.getStatements().get(0).getCmd()); YqlBatcher batch = parser.getYqlBatcher(); Assertions.assertTrue(batch.isValidBatch()); @@ -493,7 +493,7 @@ public void batchedUpsertOneColumnTest(String query) throws SQLException { Assertions.assertEquals(1, parser.getStatements().size()); Assertions.assertEquals(QueryType.DATA_QUERY, parser.getStatements().get(0).getType()); - Assertions.assertEquals(QueryCmd.INSERT_UPSERT, parser.getStatements().get(0).getCmd()); + Assertions.assertEquals(QueryCmd.DML, parser.getStatements().get(0).getCmd()); YqlBatcher batch = parser.getYqlBatcher(); Assertions.assertTrue(batch.isValidBatch()); @@ -519,7 +519,7 @@ public void batchedReplaceTest(String query) throws SQLException { Assertions.assertEquals(1, parser.getStatements().size()); Assertions.assertEquals(QueryType.DATA_QUERY, parser.getStatements().get(0).getType()); - Assertions.assertEquals(QueryCmd.UPDATE_REPLACE_DELETE, parser.getStatements().get(0).getCmd()); + Assertions.assertEquals(QueryCmd.DML, parser.getStatements().get(0).getCmd()); YqlBatcher batch = parser.getYqlBatcher(); Assertions.assertTrue(batch.isValidBatch()); @@ -545,7 +545,7 @@ public void batchedReplaceOneColumnTest(String query) throws SQLException { Assertions.assertEquals(1, parser.getStatements().size()); Assertions.assertEquals(QueryType.DATA_QUERY, parser.getStatements().get(0).getType()); - Assertions.assertEquals(QueryCmd.UPDATE_REPLACE_DELETE, parser.getStatements().get(0).getCmd()); + Assertions.assertEquals(QueryCmd.DML, parser.getStatements().get(0).getCmd()); YqlBatcher batch = parser.getYqlBatcher(); Assertions.assertTrue(batch.isValidBatch()); @@ -571,7 +571,7 @@ public void batchedUpdateTest(String query) throws SQLException { Assertions.assertEquals(1, parser.getStatements().size()); Assertions.assertEquals(QueryType.DATA_QUERY, parser.getStatements().get(0).getType()); - Assertions.assertEquals(QueryCmd.UPDATE_REPLACE_DELETE, parser.getStatements().get(0).getCmd()); + Assertions.assertEquals(QueryCmd.DML, parser.getStatements().get(0).getCmd()); YqlBatcher batch = parser.getYqlBatcher(); Assertions.assertTrue(batch.isValidBatch()); @@ -597,7 +597,7 @@ public void batchedUpdateOneColumnTest(String query) throws SQLException { Assertions.assertEquals(1, parser.getStatements().size()); Assertions.assertEquals(QueryType.DATA_QUERY, parser.getStatements().get(0).getType()); - Assertions.assertEquals(QueryCmd.UPDATE_REPLACE_DELETE, parser.getStatements().get(0).getCmd()); + Assertions.assertEquals(QueryCmd.DML, parser.getStatements().get(0).getCmd()); YqlBatcher batch = parser.getYqlBatcher(); Assertions.assertTrue(batch.isValidBatch()); @@ -623,7 +623,7 @@ public void batchedUpdateOneColumnWithTableName(String query) throws SQLExceptio Assertions.assertEquals(1, parser.getStatements().size()); Assertions.assertEquals(QueryType.DATA_QUERY, parser.getStatements().get(0).getType()); - Assertions.assertEquals(QueryCmd.UPDATE_REPLACE_DELETE, parser.getStatements().get(0).getCmd()); + Assertions.assertEquals(QueryCmd.DML, parser.getStatements().get(0).getCmd()); YqlBatcher batch = parser.getYqlBatcher(); Assertions.assertTrue(batch.isValidBatch()); @@ -649,7 +649,7 @@ public void batchedDeleteTest(String query) throws SQLException { Assertions.assertEquals(1, parser.getStatements().size()); Assertions.assertEquals(QueryType.DATA_QUERY, parser.getStatements().get(0).getType()); - Assertions.assertEquals(QueryCmd.UPDATE_REPLACE_DELETE, parser.getStatements().get(0).getCmd()); + Assertions.assertEquals(QueryCmd.DML, parser.getStatements().get(0).getCmd()); YqlBatcher batch = parser.getYqlBatcher(); Assertions.assertTrue(batch.isValidBatch()); @@ -674,7 +674,7 @@ public void batchedDeleteOneColumnTest(String query) throws SQLException { Assertions.assertEquals(1, parser.getStatements().size()); Assertions.assertEquals(QueryType.DATA_QUERY, parser.getStatements().get(0).getType()); - Assertions.assertEquals(QueryCmd.UPDATE_REPLACE_DELETE, parser.getStatements().get(0).getCmd()); + Assertions.assertEquals(QueryCmd.DML, parser.getStatements().get(0).getCmd()); YqlBatcher batch = parser.getYqlBatcher(); Assertions.assertTrue(batch.isValidBatch()); @@ -735,7 +735,7 @@ public void validBulkInsertTest(String query) throws SQLException { Assertions.assertEquals(1, parser.getStatements().size()); Assertions.assertEquals(QueryType.BULK_QUERY, parser.getStatements().get(0).getType()); - Assertions.assertEquals(QueryCmd.INSERT_UPSERT, parser.getStatements().get(0).getCmd()); + Assertions.assertEquals(QueryCmd.DML, parser.getStatements().get(0).getCmd()); YqlBatcher batch = parser.getYqlBatcher(); Assertions.assertTrue(batch.isValidBatch()); @@ -759,7 +759,7 @@ public void validBulkUpsertTest(String query) throws SQLException { Assertions.assertEquals(1, parser.getStatements().size()); Assertions.assertEquals(QueryType.BULK_QUERY, parser.getStatements().get(0).getType()); - Assertions.assertEquals(QueryCmd.INSERT_UPSERT, parser.getStatements().get(0).getCmd()); + Assertions.assertEquals(QueryCmd.DML, parser.getStatements().get(0).getCmd()); YqlBatcher batch = parser.getYqlBatcher(); Assertions.assertTrue(batch.isValidBatch());