Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions jdbc/src/main/java/tech/ydb/jdbc/context/BaseYdbExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,20 +34,27 @@
* @author Aleksandr Gorshenin
*/
public abstract class BaseYdbExecutor implements YdbExecutor {
private final SessionRetryContext retryCtx;
private final Duration sessionTimeout;
private final TableClient tableClient;
private final SessionRetryContext retryCtx;
private final SessionRetryContext idempotentRetryCtx;
private final boolean useStreamResultSet;

private final AtomicReference<YdbQueryResult> currResult;
protected final String prefixPragma;
protected final YdbTypes types;

public BaseYdbExecutor(YdbContext ctx) {
this.retryCtx = ctx.getRetryCtx();
this.sessionTimeout = ctx.getOperationProperties().getSessionTimeout();
this.useStreamResultSet = ctx.getOperationProperties().getUseStreamResultSets();
this.tableClient = ctx.getTableClient();
this.retryCtx = SessionRetryContext.create(tableClient)
.sessionCreationTimeout(ctx.getOperationProperties().getSessionTimeout())
.build();
this.idempotentRetryCtx = SessionRetryContext.create(tableClient)
.sessionCreationTimeout(ctx.getOperationProperties().getSessionTimeout())
.idempotent(true)
.build();
this.prefixPragma = ctx.getPrefixPragma();
this.types = ctx.getTypes();
this.currResult = new AtomicReference<>();
Expand Down Expand Up @@ -117,7 +124,7 @@ public YdbQueryResult executeBulkUpsert(YdbStatement statement, YdbQuery query,
tracer.query(yql);

validator.execute(QueryType.BULK_QUERY + " >>\n" + yql, tracer,
() -> retryCtx.supplyStatus(session -> session.executeBulkUpsert(tablePath, rows))
() -> idempotentRetryCtx.supplyStatus(session -> session.executeBulkUpsert(tablePath, rows))
);

if (!isInsideTransaction()) {
Expand Down
5 changes: 4 additions & 1 deletion jdbc/src/main/java/tech/ydb/jdbc/context/SchemeExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ public class SchemeExecutor {

public SchemeExecutor(YdbContext ctx) {
this.schemeClient = ctx.getSchemeClient();
this.retryCtx = ctx.getRetryCtx();
this.retryCtx = SessionRetryContext.create(ctx.getTableClient())
.sessionCreationTimeout(ctx.getOperationProperties().getSessionTimeout())
.idempotent(true)
.build();
}

public CompletableFuture<Result<ListDirectoryResult>> listDirectory(String path) {
Expand Down
120 changes: 75 additions & 45 deletions jdbc/src/main/java/tech/ydb/jdbc/context/TableTxExecutor.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
package tech.ydb.jdbc.context;

import java.sql.SQLException;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;

import com.google.common.cache.Cache;
import com.google.common.hash.Hashing;

import tech.ydb.core.Result;
Expand All @@ -17,7 +21,7 @@
import tech.ydb.query.QueryStream;
import tech.ydb.query.QueryTransaction;
import tech.ydb.query.settings.ExecuteQuerySettings;
import tech.ydb.table.Session;
import tech.ydb.table.SessionRetryContext;
import tech.ydb.table.description.TableDescription;
import tech.ydb.table.query.DataQueryResult;
import tech.ydb.table.query.Params;
Expand All @@ -29,6 +33,9 @@
* @author mzinal
*/
public class TableTxExecutor extends QueryServiceExecutor {
private static final Logger LOGGER = Logger.getLogger(TableTxExecutor.class.getName());
private static final ReentrantLock VALIDATE_LOCK = new ReentrantLock();

private static final String CREATE_SQL = ""
+ "CREATE TABLE IF NOT EXISTS `%s` ("
+ " hash Text NOT NULL,"
Expand All @@ -55,6 +62,7 @@ public class TableTxExecutor extends QueryServiceExecutor {
private final String commitQuery;
private final String validateQuery;
private final String txTablePath;
private final SessionRetryContext validateRetryCtx;
private boolean isWriteTx;

public TableTxExecutor(YdbContext ctx, String tablePath) throws SQLException {
Expand All @@ -63,6 +71,11 @@ public TableTxExecutor(YdbContext ctx, String tablePath) throws SQLException {
this.commitQuery = String.format(COMMIT_SQL, tablePath);
this.validateQuery = String.format(VALIDATE_SQL, tablePath);
this.isWriteTx = false;
this.validateRetryCtx = SessionRetryContext.create(ctx.getTableClient())
.sessionCreationTimeout(ctx.getOperationProperties().getSessionTimeout())
.idempotent(true)
.build();

}

@Override
Expand Down Expand Up @@ -106,25 +119,21 @@ protected void commitImpl(YdbContext ctx, YdbValidator validator, QueryTransacti
return query.execute();
});
} catch (YdbConditionallyRetryableException ex) {

try (Session session = createNewTableSession(validator)) {
tracer.trace("--> validate tx");
tracer.query(validateQuery);
Result<DataQueryResult> res = session.executeDataQuery(validateQuery, TxControl.snapshotRo(), params)
.join();
if (res.isSuccess()) {
DataQueryResult dqr = res.getValue();
if (dqr.getResultSetCount() == 1) {
if (dqr.getResultSet(0).getRowCount() == 1) {
// Transaction was committed successfully
return;
} else {
// Transaction wann't commit
Status status = Status.of(StatusCode.ABORTED).withCause(ex);
throw ExceptionFactory.createException("Transaction wasn't committed",
new UnexpectedResultException("Transaction not found in " + txTablePath, status)
);
}
Result<DataQueryResult> res = validateRetryCtx.supplyResult(
session -> session.executeDataQuery(validateQuery, TxControl.snapshotRo(), params)
).join();
if (res.isSuccess()) {
DataQueryResult dqr = res.getValue();
if (dqr.getResultSetCount() == 1) {
if (dqr.getResultSet(0).getRowCount() == 1) {
// Transaction was committed successfully
return;
} else {
// Transaction wann't commit
Status status = Status.of(StatusCode.ABORTED).withCause(ex);
throw ExceptionFactory.createException("Transaction wasn't committed",
new UnexpectedResultException("Transaction not found in " + txTablePath, status)
);
}
}
}
Expand All @@ -133,35 +142,56 @@ protected void commitImpl(YdbContext ctx, YdbValidator validator, QueryTransacti
}
}

public static TableDescription validate(YdbContext ctx, String tablePath) throws SQLException {
// validate table name
Result<TableDescription> res = ctx.getRetryCtx().supplyResult(s -> s.describeTable(tablePath)).join();
if (res.isSuccess()) {
return res.getValue();
public static void validate(YdbContext ctx, String tablePath, Cache<String, TableDescription> cache)
throws SQLException {
if (cache.getIfPresent(tablePath) != null) {
return;
}

if (res.getStatus().getCode() != StatusCode.SCHEME_ERROR) {
throw ExceptionFactory.createException(
"Cannot initialize TableTxExecutor with tx table " + tablePath,
new UnexpectedResultException("Cannot describe", res.getStatus()));
}
VALIDATE_LOCK.lock();
try {
LOGGER.log(Level.INFO, "Validate TxTableExecutor {0}", tablePath);
if (cache.getIfPresent(tablePath) != null) {
return;
}

// Try to create a table
String query = String.format(CREATE_SQL, tablePath);
Status status = ctx.getRetryCtx().supplyStatus(session -> session.executeSchemeQuery(query)).join();
if (!status.isSuccess()) {
throw ExceptionFactory.createException(
"Cannot initialize TableTxExecutor with tx table " + tablePath,
new UnexpectedResultException("Cannot create table", status));
}
SessionRetryContext retryCtx = SessionRetryContext.create(ctx.getTableClient())
.sessionCreationTimeout(ctx.getOperationProperties().getSessionTimeout())
.idempotent(true)
.build();

// validate table name
Result<TableDescription> res = retryCtx.supplyResult(s -> s.describeTable(tablePath)).join();
LOGGER.log(Level.INFO, "Describe TxTableExecutor {0} -> {1}", new Object[] {tablePath, res.getStatus()});
if (res.isSuccess()) {
cache.put(tablePath, res.getValue());
return;
}

Result<TableDescription> res2 = ctx.getRetryCtx().supplyResult(s -> s.describeTable(tablePath)).join();
if (!res2.isSuccess()) {
throw ExceptionFactory.createException(
"Cannot initialize TableTxExecutor with tx table " + tablePath,
new UnexpectedResultException("Cannot describe after creating", res2.getStatus()));
}
if (res.getStatus().getCode() != StatusCode.SCHEME_ERROR) {
throw ExceptionFactory.createException("Cannot initialize TableTxExecutor with tx table " + tablePath,
new UnexpectedResultException("Cannot describe", res.getStatus()));
}

// Try to create a table
String query = String.format(CREATE_SQL, tablePath);
Status status = retryCtx.supplyStatus(session -> session.executeSchemeQuery(query)).join();
LOGGER.log(Level.INFO, "Create TxTableExecutor {0} -> {1}", new Object[] {tablePath, status});
if (!status.isSuccess()) {
throw ExceptionFactory.createException("Cannot initialize TableTxExecutor with tx table " + tablePath,
new UnexpectedResultException("Cannot create table", status));
}

Result<TableDescription> res2 = retryCtx.supplyResult(s -> s.describeTable(tablePath)).join();
LOGGER.log(Level.INFO, "Validate TxTableExecutor {0} -> {1}", new Object[] {tablePath, res2.getStatus()});
if (!res2.isSuccess()) {
throw ExceptionFactory.createException("Cannot initialize TableTxExecutor with tx table " + tablePath,
new UnexpectedResultException("Cannot describe after creating", res2.getStatus()));
}

return res2.getValue();
cache.put(tablePath, res2.getValue());
} finally {
VALIDATE_LOCK.unlock();
}
}
}
8 changes: 1 addition & 7 deletions jdbc/src/main/java/tech/ydb/jdbc/context/YdbContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -188,10 +188,6 @@ public QueryClient getQueryClient() {
return queryClient;
}

public SessionRetryContext getRetryCtx() {
return retryCtx;
}

public String getUrl() {
return config.getUrl();
}
Expand All @@ -205,9 +201,7 @@ public YdbExecutor createExecutor() throws SQLException {
String txValidationTable = operationOptions.getTxValidationTable();
if (txValidationTable != null && !txValidationTable.isEmpty()) {
String tablePath = joined(prefixPath, txValidationTable);
if (tableDescribeCache.getIfPresent(tablePath) == null) {
tableDescribeCache.put(tablePath, TableTxExecutor.validate(this, tablePath));
}
TableTxExecutor.validate(this, tablePath, tableDescribeCache);
return new TableTxExecutor(this, tablePath);
}
return new QueryServiceExecutor(this);
Expand Down
31 changes: 31 additions & 0 deletions jdbc/src/test/java/tech/ydb/jdbc/YdbDriverTxValidateTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;

import org.junit.Assert;
import org.junit.jupiter.api.Assertions;
Expand All @@ -14,6 +17,7 @@
import tech.ydb.core.Status;
import tech.ydb.core.StatusCode;
import tech.ydb.core.UnexpectedResultException;
import tech.ydb.jdbc.context.YdbContext;
import tech.ydb.jdbc.impl.YdbTracerImpl;
import tech.ydb.jdbc.impl.helper.ExceptionAssert;
import tech.ydb.jdbc.impl.helper.JdbcConnectionExtention;
Expand Down Expand Up @@ -94,6 +98,33 @@ public void basicUsageTest() throws SQLException {
}
}

@Test
public void testContextCacheConncurrent() throws SQLException {
String url = jdbcURL.withArg("withTxValidationTable", "tx2_store").build();
List<CompletableFuture<YdbConnection>> list = new ArrayList<>();

for (int idx = 0; idx < 20; idx++) {
list.add(CompletableFuture.supplyAsync(() -> {
try {
Connection connection = DriverManager.getConnection(url);
return connection.unwrap(YdbConnection.class);
} catch (SQLException ex) {
throw new RuntimeException("Cannot connect", ex);
}
}));
}

YdbContext first = list.get(0).join().getCtx();

for (CompletableFuture<YdbConnection> future: list) {
Assertions.assertEquals(first, future.join().getCtx());
}

for (CompletableFuture<YdbConnection> future: list) {
future.join().close();
}
}

@Test
public void commitedTxTest() throws SQLException {
String url = jdbcURL.withArg("withTxValidationTable", "tx1_store").build();
Expand Down