Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 69 additions & 0 deletions jdbc/src/main/java/tech/ydb/jdbc/context/BaseYdbExecutor.java
Original file line number Diff line number Diff line change
@@ -1,15 +1,26 @@
package tech.ydb.jdbc.context;

import java.sql.SQLException;
import java.time.Duration;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;

import tech.ydb.core.Result;
import tech.ydb.core.UnexpectedResultException;
import tech.ydb.core.grpc.GrpcReadStream;
import tech.ydb.jdbc.YdbConst;
import tech.ydb.jdbc.YdbStatement;
import tech.ydb.jdbc.exception.ExceptionFactory;
import tech.ydb.jdbc.impl.YdbQueryResult;
import tech.ydb.jdbc.query.QueryType;
import tech.ydb.jdbc.query.YdbQuery;
import tech.ydb.table.Session;
import tech.ydb.table.SessionRetryContext;
import tech.ydb.table.TableClient;
import tech.ydb.table.query.Params;
import tech.ydb.table.result.ResultSetReader;
import tech.ydb.table.settings.ExecuteScanQuerySettings;
import tech.ydb.table.settings.ExecuteSchemeQuerySettings;
import tech.ydb.table.values.ListValue;

Expand All @@ -19,13 +30,27 @@
*/
public abstract class BaseYdbExecutor implements YdbExecutor {
private final SessionRetryContext retryCtx;
private final Duration sessionTimeout;
private final TableClient tableClient;
private final AtomicReference<YdbQueryResult> currResult;

public BaseYdbExecutor(YdbContext ctx) {
this.retryCtx = ctx.getRetryCtx();
this.sessionTimeout = ctx.getOperationProperties().getSessionTimeout();
this.tableClient = ctx.getTableClient();
this.currResult = new AtomicReference<>();
}

protected Session createNewTableSession(YdbValidator validator) throws SQLException {
try {
Result<Session> session = tableClient.createSession(sessionTimeout).join();
validator.addStatusIssues(session.getStatus());
return session.getValue();
} catch (UnexpectedResultException ex) {
throw ExceptionFactory.createException("Cannot create session with " + ex.getStatus(), ex);
}
}

protected void closeCurrentResult() throws SQLException {
YdbQueryResult rs = currResult.get();
if (rs != null) {
Expand Down Expand Up @@ -79,4 +104,48 @@ public YdbQueryResult executeBulkUpsert(YdbStatement statement, YdbQuery query,

return updateCurrentResult(new StaticQueryResult(query, Collections.emptyList()));
}

@Override
public YdbQueryResult executeScanQuery(YdbStatement statement, YdbQuery query, String yql, Params params)
throws SQLException {
ensureOpened();

YdbContext ctx = statement.getConnection().getCtx();
YdbValidator validator = statement.getValidator();
Duration scanQueryTimeout = ctx.getOperationProperties().getScanQueryTimeout();
ExecuteScanQuerySettings settings = ExecuteScanQuerySettings.newBuilder()
.withRequestTimeout(scanQueryTimeout)
.build();

final Session session = createNewTableSession(validator);

String msg = QueryType.SCAN_QUERY + " >>\n" + yql;
StreamQueryResult lazy = validator.call(msg, () -> {
final CompletableFuture<Result<StreamQueryResult>> future = new CompletableFuture<>();
final GrpcReadStream<ResultSetReader> stream = session.executeScanQuery(yql, params, settings);
final StreamQueryResult result = new StreamQueryResult(msg, statement, query, stream::cancel);

stream.start((rsr) -> {
future.complete(Result.success(result));
result.onStreamResultSet(0, rsr);
}).whenComplete((st, th) -> {
session.close();

if (th != null) {
result.onStreamFinished(th);
future.completeExceptionally(th);
}
if (st != null) {
validator.addStatusIssues(st);
result.onStreamFinished(st);
future.complete(st.isSuccess() ? Result.success(result) : Result.fail(st));
}
});

return future;
});

return updateCurrentResult(lazy);
}

}
139 changes: 89 additions & 50 deletions jdbc/src/main/java/tech/ydb/jdbc/context/QueryServiceExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import tech.ydb.common.transaction.TxMode;
import tech.ydb.core.Issue;
Expand Down Expand Up @@ -43,24 +45,26 @@
public class QueryServiceExecutor extends BaseYdbExecutor {
private final Duration sessionTimeout;
private final QueryClient queryClient;
private final boolean useStreamResultSet;

private int transactionLevel;
private boolean isReadOnly;
private boolean isAutoCommit;
private TxMode txMode;

private QueryTransaction tx;
private boolean isClosed;
private final AtomicReference<QueryTransaction> tx = new AtomicReference<>();
private volatile boolean isClosed;

public QueryServiceExecutor(YdbContext ctx, int transactionLevel, boolean autoCommit) throws SQLException {
super(ctx);
this.sessionTimeout = ctx.getOperationProperties().getSessionTimeout();
this.queryClient = ctx.getQueryClient();
this.useStreamResultSet = ctx.getOperationProperties().getUseStreamResultSets();

this.transactionLevel = transactionLevel;
this.isReadOnly = transactionLevel != Connection.TRANSACTION_SERIALIZABLE;
this.isAutoCommit = autoCommit;
this.txMode = txMode(transactionLevel, isReadOnly);
this.tx = null;
this.isClosed = false;
}

Expand All @@ -78,14 +82,10 @@ protected QuerySession createNewQuerySession(YdbValidator validator) throws SQLE
@Override
public void close() throws SQLException {
closeCurrentResult();
cleanTx();
isClosed = true;
}

private void cleanTx() {
if (tx != null) {
tx.getSession().close();
tx = null;
QueryTransaction old = tx.getAndSet(null);
if (old != null) {
old.getSession().close();
}
}

Expand All @@ -97,7 +97,8 @@ public void setTransactionLevel(int level) throws SQLException {
return;
}

if (tx != null && tx.isActive()) {
QueryTransaction localTx = tx.get();
if (localTx != null && localTx.isActive()) {
throw new SQLFeatureNotSupportedException(YdbConst.CHANGE_ISOLATION_INSIDE_TX);
}

Expand All @@ -114,7 +115,8 @@ public void setReadOnly(boolean readOnly) throws SQLException {
return;
}

if (tx != null && tx.isActive()) {
QueryTransaction localTx = tx.get();
if (localTx != null && localTx.isActive()) {
throw new SQLFeatureNotSupportedException(YdbConst.READONLY_INSIDE_TRANSACTION);
}

Expand All @@ -130,7 +132,8 @@ public void setAutoCommit(boolean autoCommit) throws SQLException {
return;
}

if (tx != null && tx.isActive()) {
QueryTransaction localTx = tx.get();
if (localTx != null && localTx.isActive()) {
throw new SQLFeatureNotSupportedException(YdbConst.CHANGE_ISOLATION_INSIDE_TX);
}

Expand All @@ -146,13 +149,15 @@ public boolean isClosed() throws SQLException {
@Override
public String txID() throws SQLException {
closeCurrentResult();
return tx != null ? tx.getId() : null;
QueryTransaction localTx = tx.get();
return localTx != null ? localTx.getId() : null;
}

@Override
public boolean isInsideTransaction() throws SQLException {
ensureOpened();
return tx != null && tx.isActive();
QueryTransaction localTx = tx.get();
return localTx != null && localTx.isActive();
}

@Override
Expand All @@ -177,24 +182,28 @@ public int transactionLevel() throws SQLException {
public void commit(YdbContext ctx, YdbValidator validator) throws SQLException {
ensureOpened();

if (tx == null || !tx.isActive()) {
QueryTransaction localTx = tx.get();
if (localTx == null || !localTx.isActive()) {
return;
}

CommitTransactionSettings settings = ctx.withRequestTimeout(CommitTransactionSettings.newBuilder()).build();
try {
validator.clearWarnings();
validator.call("Commit TxId: " + tx.getId(), () -> tx.commit(settings));
validator.call("Commit TxId: " + localTx.getId(), () -> localTx.commit(settings));
} finally {
cleanTx();
if (tx.compareAndSet(localTx, null)) {
localTx.getSession().close();
}
}
}

@Override
public void rollback(YdbContext ctx, YdbValidator validator) throws SQLException {
ensureOpened();

if (tx == null || !tx.isActive()) {
QueryTransaction localTx = tx.get();
if (localTx == null || !localTx.isActive()) {
return;
}

Expand All @@ -203,9 +212,11 @@ public void rollback(YdbContext ctx, YdbValidator validator) throws SQLException

try {
validator.clearWarnings();
validator.execute("Rollback TxId: " + tx.getId(), () -> tx.rollback(settings));
validator.execute("Rollback TxId: " + localTx.getId(), () -> localTx.rollback(settings));
} finally {
cleanTx();
if (tx.compareAndSet(localTx, null)) {
localTx.getSession().close();
}
}
}

Expand All @@ -223,13 +234,63 @@ public YdbQueryResult executeDataQuery(
}
final ExecuteQuerySettings settings = builder.build();

if (tx == null) {
tx = createNewQuerySession(validator).createNewTransaction(txMode);
QueryTransaction nextTx = tx.get();
while (nextTx == null) {
nextTx = createNewQuerySession(validator).createNewTransaction(txMode);
if (!tx.compareAndSet(null, nextTx)) {
nextTx.getSession().close();
nextTx = tx.get();
}
}

final QueryTransaction localTx = nextTx;

if (useStreamResultSet) {
String msg = "STREAM_QUERY >>\n" + yql;
StreamQueryResult lazy = validator.call(msg, () -> {
final CompletableFuture<Result<StreamQueryResult>> future = new CompletableFuture<>();
final QueryStream stream = localTx.createQuery(yql, isAutoCommit, params, settings);
final StreamQueryResult result = new StreamQueryResult(msg, statement, query, stream::cancel);


stream.execute(new QueryStream.PartsHandler() {
@Override
public void onIssues(Issue[] issues) {
validator.addStatusIssues(Arrays.asList(issues));
}

@Override
public void onNextPart(QueryResultPart part) {
result.onStreamResultSet((int) part.getResultSetIndex(), part.getResultSetReader());
future.complete(Result.success(result));
}
}).whenComplete((res, th) -> {
if (!localTx.isActive()) {
if (tx.compareAndSet(localTx, null)) {
localTx.getSession().close();
}
}

if (th != null) {
future.completeExceptionally(th);
result.onStreamFinished(th);
}
if (res != null) {
validator.addStatusIssues(res.getStatus());
future.complete(res.isSuccess() ? Result.success(result) : Result.fail(res.getStatus()));
result.onStreamFinished(res.getStatus());
}
});

return future;
});

return updateCurrentResult(lazy);
}

try {
QueryReader result = validator.call(QueryType.DATA_QUERY + " >>\n" + yql,
() -> QueryReader.readFrom(tx.createQuery(yql, isAutoCommit, params, settings))
() -> QueryReader.readFrom(localTx.createQuery(yql, isAutoCommit, params, settings))
);
validator.addStatusIssues(result.getIssueList());

Expand All @@ -239,36 +300,14 @@ public YdbQueryResult executeDataQuery(
}
return updateCurrentResult(new StaticQueryResult(query, readers));
} finally {
if (!tx.isActive()) {
cleanTx();
if (!localTx.isActive()) {
if (tx.compareAndSet(localTx, null)) {
localTx.getSession().close();
}
}
}
}

@Override
public YdbQueryResult executeScanQuery(YdbStatement statement, YdbQuery query, String yql, Params params)
throws SQLException {
ensureOpened();

YdbContext ctx = statement.getConnection().getCtx();
YdbValidator validator = statement.getValidator();

Duration scanQueryTimeout = ctx.getOperationProperties().getScanQueryTimeout();
ExecuteQuerySettings settings = ExecuteQuerySettings.newBuilder()
.withRequestTimeout(scanQueryTimeout)
.build();

final QuerySession session = createNewQuerySession(validator);
String msg = "STREAM_QUERY >>\n" + yql;
StreamQueryResult lazy = validator.call(msg, () -> {
QueryStream stream = session.createQuery(yql, TxMode.SNAPSHOT_RO, params, settings);
StreamQueryResult result = new StreamQueryResult(msg, statement, query, stream::cancel);
return result.execute(stream, session::close);
});

return updateCurrentResult(lazy);
}

@Override
public YdbQueryResult executeSchemeQuery(YdbStatement statement, YdbQuery query) throws SQLException {
ensureOpened();
Expand Down
Loading