Skip to content
37 changes: 21 additions & 16 deletions jdbc/src/main/java/tech/ydb/jdbc/context/QueryServiceExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import tech.ydb.jdbc.impl.YdbStaticResultSet;
import tech.ydb.jdbc.query.QueryType;
import tech.ydb.jdbc.query.YdbQuery;
import tech.ydb.jdbc.settings.YdbOperationProperties;
import tech.ydb.query.QueryClient;
import tech.ydb.query.QuerySession;
import tech.ydb.query.QueryStream;
Expand Down Expand Up @@ -54,15 +55,16 @@ public class QueryServiceExecutor extends BaseYdbExecutor {
private final AtomicReference<QueryTransaction> tx = new AtomicReference<>();
private volatile boolean isClosed;

public QueryServiceExecutor(YdbContext ctx, int transactionLevel, boolean autoCommit) throws SQLException {
public QueryServiceExecutor(YdbContext ctx) throws SQLException {
super(ctx);
this.sessionTimeout = ctx.getOperationProperties().getSessionTimeout();
YdbOperationProperties options = ctx.getOperationProperties();
this.sessionTimeout = options.getSessionTimeout();
this.queryClient = ctx.getQueryClient();
this.useStreamResultSet = ctx.getOperationProperties().getUseStreamResultSets();
this.useStreamResultSet = options.getUseStreamResultSets();

this.transactionLevel = transactionLevel;
this.transactionLevel = options.getTransactionLevel();
this.isAutoCommit = options.isAutoCommit();
this.isReadOnly = transactionLevel != Connection.TRANSACTION_SERIALIZABLE;
this.isAutoCommit = autoCommit;
this.txMode = txMode(transactionLevel, isReadOnly);
this.isClosed = false;
}
Expand Down Expand Up @@ -179,22 +181,26 @@ public void commit(YdbContext ctx, YdbValidator validator) throws SQLException {
return;
}

YdbTracer tracer = ctx.getTracer();
tracer.trace("--> commit");
tracer.query(null);

CommitTransactionSettings settings = ctx.withRequestTimeout(CommitTransactionSettings.newBuilder()).build();
try {
validator.clearWarnings();
validator.call("Commit TxId: " + localTx.getId(), tracer, () -> localTx.commit(settings));
commitImpl(ctx, validator, localTx);
} finally {
if (tx.compareAndSet(localTx, null)) {
localTx.getSession().close();
}
tracer.close();
ctx.getTracer().close();
}
}

protected void commitImpl(YdbContext ctx, YdbValidator validator, QueryTransaction tx) throws SQLException {
YdbTracer tracer = ctx.getTracer();
tracer.trace("--> commit");
tracer.query(null);

CommitTransactionSettings settings = ctx.withRequestTimeout(CommitTransactionSettings.newBuilder()).build();
validator.clearWarnings();
validator.call("Commit TxId: " + tx.getId(), tracer, () -> tx.commit(settings));
}

@Override
public void rollback(YdbContext ctx, YdbValidator validator) throws SQLException {
ensureOpened();
Expand Down Expand Up @@ -223,9 +229,8 @@ public void rollback(YdbContext ctx, YdbValidator validator) throws SQLException
}

@Override
public YdbQueryResult executeDataQuery(
YdbStatement statement, YdbQuery query, String preparedYql, Params params, long timeout, boolean keepInCache
) throws SQLException {
public YdbQueryResult executeDataQuery(YdbStatement statement, YdbQuery query, String preparedYql, Params params,
long timeout, boolean keepInCache) throws SQLException {
ensureOpened();

YdbValidator validator = statement.getValidator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import tech.ydb.jdbc.impl.YdbStaticResultSet;
import tech.ydb.jdbc.query.QueryType;
import tech.ydb.jdbc.query.YdbQuery;
import tech.ydb.jdbc.settings.YdbOperationProperties;
import tech.ydb.table.Session;
import tech.ydb.table.query.DataQueryResult;
import tech.ydb.table.query.ExplainDataQueryResult;
Expand All @@ -35,10 +36,11 @@ public class TableServiceExecutor extends BaseYdbExecutor {
private final boolean failOnTruncatedResult;
private volatile TxState tx;

public TableServiceExecutor(YdbContext ctx, int transactionLevel, boolean autoCommit) throws SQLException {
public TableServiceExecutor(YdbContext ctx) throws SQLException {
super(ctx);
this.tx = createTx(transactionLevel, autoCommit);
this.failOnTruncatedResult = ctx.getOperationProperties().isFailOnTruncatedResult();
YdbOperationProperties options = ctx.getOperationProperties();
this.tx = createTx(options.getTransactionLevel(), options.isAutoCommit());
this.failOnTruncatedResult = options.isFailOnTruncatedResult();
}

@Override
Expand Down
167 changes: 167 additions & 0 deletions jdbc/src/main/java/tech/ydb/jdbc/context/TableTxExecutor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
package tech.ydb.jdbc.context;

import java.sql.SQLException;

import com.google.common.hash.Hashing;

import tech.ydb.core.Result;
import tech.ydb.core.Status;
import tech.ydb.core.StatusCode;
import tech.ydb.core.UnexpectedResultException;
import tech.ydb.jdbc.YdbStatement;
import tech.ydb.jdbc.YdbTracer;
import tech.ydb.jdbc.exception.ExceptionFactory;
import tech.ydb.jdbc.exception.YdbConditionallyRetryableException;
import tech.ydb.jdbc.impl.YdbQueryResult;
import tech.ydb.jdbc.query.YdbQuery;
import tech.ydb.query.QueryStream;
import tech.ydb.query.QueryTransaction;
import tech.ydb.query.settings.ExecuteQuerySettings;
import tech.ydb.table.Session;
import tech.ydb.table.description.TableDescription;
import tech.ydb.table.query.DataQueryResult;
import tech.ydb.table.query.Params;
import tech.ydb.table.transaction.TxControl;
import tech.ydb.table.values.PrimitiveValue;

/**
*
* @author mzinal
*/
public class TableTxExecutor extends QueryServiceExecutor {
private static final String CREATE_SQL = ""
+ "CREATE TABLE IF NOT EXISTS `%s` ("
+ " hash Text NOT NULL,"
+ " tx_id Text NOT NULL,"
+ " committed_at Timestamp,"
+ " PRIMARY KEY (hash, tx_id)"
+ ") WITH ("
+ " TTL=Interval('PT60M') ON committed_at,"
+ " AUTO_PARTITIONING_BY_LOAD=ENABLED,"
+ " AUTO_PARTITIONING_BY_SIZE=ENABLED,"
+ " AUTO_PARTITIONING_PARTITION_SIZE_MB=100"
+ ");";

private static final String COMMIT_SQL = ""
+ "DECLARE $hash AS Text; "
+ "DECLARE $tx AS Text; "
+ "UPSERT INTO `%s` (hash, tx_id, committed_at) VALUES ($hash, $tx, CurrentUtcTimestamp());";

private static final String VALIDATE_SQL = ""
+ "DECLARE $hash AS Text; "
+ "DECLARE $tx AS Text; "
+ "SELECT hash, tx_id FROM `%s` WHERE hash=$hash AND tx_id=$tx;";

private final String commitQuery;
private final String validateQuery;
private final String txTablePath;
private boolean isWriteTx;

public TableTxExecutor(YdbContext ctx, String tablePath) throws SQLException {
super(ctx);
this.txTablePath = tablePath;
this.commitQuery = String.format(COMMIT_SQL, tablePath);
this.validateQuery = String.format(VALIDATE_SQL, tablePath);
this.isWriteTx = false;
}

@Override
public void rollback(YdbContext ctx, YdbValidator validator) throws SQLException {
isWriteTx = false;
super.rollback(ctx, validator);
}

@Override
public YdbQueryResult executeDataQuery(YdbStatement statement, YdbQuery query, String preparedYql, Params params,
long timeout, boolean keepInCache) throws SQLException {
YdbQueryResult result = super.executeDataQuery(statement, query, preparedYql, params, timeout, keepInCache);
isWriteTx = isInsideTransaction() && (isWriteTx || query.isWriting());

return result;
}

@Override
protected void commitImpl(YdbContext ctx, YdbValidator validator, QueryTransaction tx) throws SQLException {
boolean storeTx = isWriteTx;
isWriteTx = false;
if (!storeTx) {
super.commitImpl(ctx, validator, tx);
return;
}

String hash = Hashing.sha256().hashBytes(tx.getId().getBytes()).toString();
Params params = Params.of(
"$hash", PrimitiveValue.newText(hash),
"$tx", PrimitiveValue.newText(tx.getId())
);

YdbTracer tracer = ctx.getTracer();
ExecuteQuerySettings settings = ctx.withRequestTimeout(ExecuteQuerySettings.newBuilder()).build();
try {
QueryStream query = tx.createQuery(commitQuery, true, params, settings);
validator.clearWarnings();
validator.call("CommitAndStore TxId: " + tx.getId(), tracer, () -> {
tracer.trace("--> commit-and-store-tx " + hash);
tracer.query(commitQuery);
return query.execute();
});
} catch (YdbConditionallyRetryableException ex) {

try (Session session = createNewTableSession(validator)) {
tracer.trace("--> validate tx");
tracer.query(validateQuery);
Result<DataQueryResult> res = session.executeDataQuery(validateQuery, TxControl.snapshotRo(), params)
.join();
if (res.isSuccess()) {
DataQueryResult dqr = res.getValue();
if (dqr.getResultSetCount() == 1) {
if (dqr.getResultSet(0).getRowCount() == 1) {
// Transaction was committed successfully
return;
} else {
// Transaction wann't commit
Status status = Status.of(StatusCode.ABORTED).withCause(ex);
throw ExceptionFactory.createException("Transaction wasn't committed",
new UnexpectedResultException("Transaction not found in " + txTablePath, status)
);
}
}
}
}

throw ex;
}
}

public static TableDescription validate(YdbContext ctx, String tablePath) throws SQLException {
// validate table name
Result<TableDescription> res = ctx.getRetryCtx().supplyResult(s -> s.describeTable(tablePath)).join();
if (res.isSuccess()) {
return res.getValue();
}

if (res.getStatus().getCode() != StatusCode.SCHEME_ERROR) {
throw ExceptionFactory.createException(
"Cannot initialize TableTxExecutor with tx table " + tablePath,
new UnexpectedResultException("Cannot describe", res.getStatus()));
}

// Try to create a table
String query = String.format(CREATE_SQL, tablePath);
Status status = ctx.getRetryCtx().supplyStatus(session -> session.executeSchemeQuery(query)).join();
if (!status.isSuccess()) {
throw ExceptionFactory.createException(
"Cannot initialize TableTxExecutor with tx table " + tablePath,
new UnexpectedResultException("Cannot create table", status));
}

Result<TableDescription> res2 = ctx.getRetryCtx().supplyResult(s -> s.describeTable(tablePath)).join();
if (!res2.isSuccess()) {
throw ExceptionFactory.createException(
"Cannot initialize TableTxExecutor with tx table " + tablePath,
new UnexpectedResultException("Cannot describe after creating", res2.getStatus()));
}

return res2.getValue();
}
}
36 changes: 26 additions & 10 deletions jdbc/src/main/java/tech/ydb/jdbc/context/YdbContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public class YdbContext implements AutoCloseable {

private final YdbConfig config;

private final YdbOperationProperties operationProps;
private final YdbOperationProperties operationOptions;
private final YdbQueryProperties queryOptions;
private final YdbTypes types;

Expand Down Expand Up @@ -102,7 +102,7 @@ private YdbContext(
) {
this.config = config;

this.operationProps = operationProperties;
this.operationOptions = operationProperties;
this.queryOptions = queryProperties;
this.autoResizeSessionPool = autoResize;

Expand Down Expand Up @@ -195,9 +195,17 @@ public String getUsername() {

public YdbExecutor createExecutor() throws SQLException {
if (config.isUseQueryService()) {
return new QueryServiceExecutor(this, operationProps.getTransactionLevel(), operationProps.isAutoCommit());
String txValidationTable = operationOptions.getTxValidationTable();
if (txValidationTable != null && !txValidationTable.isEmpty()) {
String tablePath = joined(prefixPath, txValidationTable);
if (tableDescribeCache.getIfPresent(tablePath) == null) {
tableDescribeCache.put(tablePath, TableTxExecutor.validate(this, tablePath));
}
return new TableTxExecutor(this, tablePath);
}
return new QueryServiceExecutor(this);
} else {
return new TableServiceExecutor(this, operationProps.getTransactionLevel(), operationProps.isAutoCommit());
return new TableServiceExecutor(this);
}
}

Expand All @@ -206,7 +214,7 @@ public int getConnectionsCount() {
}

public YdbOperationProperties getOperationProperties() {
return operationProps;
return operationOptions;
}

@Override
Expand Down Expand Up @@ -277,6 +285,7 @@ public void deregister() {
}

public static YdbContext createContext(YdbConfig config) throws SQLException {
GrpcTransport grpcTransport = null;
try {
LOGGER.log(Level.FINE, "Creating new YDB context to {0}", config.getConnectionString());

Expand All @@ -300,7 +309,7 @@ public static YdbContext createContext(YdbConfig config) throws SQLException {
});
});

GrpcTransport grpcTransport = builder.build();
grpcTransport = builder.build();

PooledTableClient.Builder tableClient = PooledTableClient.newClient(
GrpcTableRpc.useTransport(grpcTransport)
Expand All @@ -309,9 +318,16 @@ public static YdbContext createContext(YdbConfig config) throws SQLException {

boolean autoResize = clientProps.applyToTableClient(tableClient, queryClient);

return new YdbContext(config, operationProps, queryProps, grpcTransport,
tableClient.build(), queryClient.build(), autoResize);
return new YdbContext(config, operationProps, queryProps, grpcTransport, tableClient.build(),
queryClient.build(), autoResize);
} catch (RuntimeException ex) {
if (grpcTransport != null) {
try {
grpcTransport.close();
} catch (Exception exClose) {
LOGGER.log(Level.FINE, "Issue when closing gRPC transport", exClose);
}
}
StringBuilder sb = new StringBuilder("Cannot connect to YDB: ").append(ex.getMessage());
Throwable cause = ex.getCause();
while (cause != null) {
Expand All @@ -323,7 +339,7 @@ public static YdbContext createContext(YdbConfig config) throws SQLException {
}

public <T extends RequestSettings<?>> T withDefaultTimeout(T settings) {
Duration operation = operationProps.getDeadlineTimeout();
Duration operation = operationOptions.getDeadlineTimeout();
if (!operation.isZero() && !operation.isNegative()) {
settings.setOperationTimeout(operation);
settings.setTimeout(operation.plusSeconds(1));
Expand All @@ -332,7 +348,7 @@ public <T extends RequestSettings<?>> T withDefaultTimeout(T settings) {
}

public <T extends BaseRequestSettings.BaseBuilder<T>> T withRequestTimeout(T builder) {
Duration operation = operationProps.getDeadlineTimeout();
Duration operation = operationOptions.getDeadlineTimeout();
if (operation.isNegative() || operation.isZero()) {
return builder;
}
Expand Down
7 changes: 4 additions & 3 deletions jdbc/src/main/java/tech/ydb/jdbc/query/QueryCmd.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
public enum QueryCmd {
UNKNOWN,
SELECT,
CREATE_ALTER_DROP,
INSERT_UPSERT,
UPDATE_REPLACE_DELETE
/** CREATE, DROP, ALTER, GRANT, REVOKE */
DDL,
/** INSERT, UPSERT, UPDATE, REPLACE, DELETE */
DML
}
4 changes: 2 additions & 2 deletions jdbc/src/main/java/tech/ydb/jdbc/query/QueryStatement.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public void setHasGenerated(boolean hasGenerated) {
}

public boolean hasUpdateCount() {
return (command == QueryCmd.INSERT_UPSERT || command == QueryCmd.UPDATE_REPLACE_DELETE) && !hasReturinng;
return command == QueryCmd.DML && !hasReturinng;
}

public boolean hasUpdateWithGenerated() {
Expand All @@ -64,6 +64,6 @@ public boolean hasResults() {
}

public boolean isDDL() {
return command == QueryCmd.CREATE_ALTER_DROP;
return command == QueryCmd.DDL;
}
}
Loading