Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,17 @@
* @author Aleksandr Gorshenin
*/
public class QueryServiceExecutor extends BaseYdbExecutor {
private final Duration sessionTimeout;
private final QueryClient queryClient;
protected final Duration sessionTimeout;
protected final QueryClient queryClient;
private final boolean useStreamResultSet;

private int transactionLevel;
private boolean isReadOnly;
private boolean isAutoCommit;
private TxMode txMode;
protected int transactionLevel;
protected boolean isReadOnly;
protected boolean isAutoCommit;
protected TxMode txMode;

private final AtomicReference<QueryTransaction> tx = new AtomicReference<>();
private volatile boolean isClosed;
protected final AtomicReference<QueryTransaction> tx = new AtomicReference<>();
protected volatile boolean isClosed;

public QueryServiceExecutor(YdbContext ctx, int transactionLevel, boolean autoCommit) throws SQLException {
super(ctx);
Expand Down
137 changes: 137 additions & 0 deletions jdbc/src/main/java/tech/ydb/jdbc/context/QueryServiceExecutorExt.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
package tech.ydb.jdbc.context;

import java.sql.SQLException;

import tech.ydb.core.Result;
import tech.ydb.core.Status;
import tech.ydb.core.StatusCode;
import tech.ydb.jdbc.YdbStatement;
import tech.ydb.jdbc.YdbTracer;
import tech.ydb.jdbc.impl.YdbQueryResult;
import tech.ydb.jdbc.query.YdbQuery;
import tech.ydb.query.QueryTransaction;
import tech.ydb.table.query.DataQueryResult;
import tech.ydb.table.query.Params;
import tech.ydb.table.transaction.TxControl;
import tech.ydb.table.values.PrimitiveValue;

/**
*
* @author mzinal
*/
public class QueryServiceExecutorExt extends QueryServiceExecutor {

private final String processUndeterminedTable;
private boolean writing;

public QueryServiceExecutorExt(YdbContext ctx, int transactionLevel, boolean autoCommit) throws SQLException {
super(ctx, transactionLevel, autoCommit);
this.processUndeterminedTable = ctx.getOperationProperties().getProcessUndeterminedTable();
this.writing = false;
}

private Status upsertAndCommit(QueryTransaction localTx) {
String sql = "DECLARE $trans_hash AS Int32; DECLARE $trans_id AS Text; "
+ "UPSERT INTO `" + processUndeterminedTable
+ "` (trans_hash, trans_id, trans_tv) "
+ "VALUES ($trans_hash, $trans_id, CurrentUtcTimestamp());";
Params params = Params.of(
"$trans_id", PrimitiveValue.newText(localTx.getId()),
"$trans_hash", PrimitiveValue.newInt32(localTx.getId().hashCode())
);
return localTx.createQueryWithCommit(sql, params)
.execute()
.join()
.getStatus();
}

private boolean checkTransaction(YdbContext ctx, String transId,
YdbValidator validator, YdbTracer tracer) throws SQLException {
String sql = "DECLARE $trans_hash AS Int32; DECLARE $trans_id AS Text; "
+ "SELECT trans_id, trans_tv FROM `" + processUndeterminedTable
+ "` WHERE trans_hash=$trans_hash AND trans_id=$trans_id;";
Params params = Params.of(
"$trans_id", PrimitiveValue.newText(transId),
"$trans_hash", PrimitiveValue.newInt32(transId.hashCode())
);
Result<DataQueryResult> result = ctx.getRetryCtx().supplyResult(
session -> session.executeDataQuery(sql, TxControl.onlineRo(), params))
.join();
if (!result.getStatus().isSuccess()) {
// Failed to obtain the transaction status, have to return the error
validator.validate("CommitVal TxId: " + transId, tracer, result.getStatus());
}
DataQueryResult dqr = result.getValue();
if (dqr.getResultSetCount() == 1) {
if (dqr.getResultSet(0).getRowCount() == 1) {
return true;
}
}
return false;
}

private void commitWithCheck(YdbContext ctx, YdbValidator validator) throws SQLException {
ensureOpened();

QueryTransaction localTx = tx.get();
if (localTx == null || !localTx.isActive()) {
return;
}

YdbTracer tracer = ctx.getTracer();
tracer.trace("--> commitExt");
tracer.query(null);

try {
validator.clearWarnings();
Status status = upsertAndCommit(localTx);
if (StatusCode.UNDETERMINED.equals(status.getCode())) {
if (checkTransaction(ctx, localTx.getId(), validator, tracer)) {
status = Status.SUCCESS;
} else {
status = Status.of(StatusCode.ABORTED, status.getCause(), status.getIssues());
}
}
validator.validate("CommitExt TxId: " + localTx.getId(), tracer, status);
} finally {
if (tx.compareAndSet(localTx, null)) {
localTx.getSession().close();
}
tracer.close();
}
}

@Override
public void commit(YdbContext ctx, YdbValidator validator) throws SQLException {
try {
if (isInsideTransaction() && writing) {
commitWithCheck(ctx, validator);
} else {
super.commit(ctx, validator);
}
} finally {
writing = false;
}
}

@Override
public void rollback(YdbContext ctx, YdbValidator validator) throws SQLException {
try {
super.rollback(ctx, validator);
} finally {
writing = false;
}
}

@Override
public YdbQueryResult executeDataQuery(
YdbStatement statement, YdbQuery query, String preparedYql, Params params, long timeout, boolean keepInCache
) throws SQLException {
YdbQueryResult yqr = super.executeDataQuery(statement, query, preparedYql, params, timeout, keepInCache);
if (query.isWriting() && !isAutoCommit) {
writing = true;
}
return yqr;
}

}
59 changes: 55 additions & 4 deletions jdbc/src/main/java/tech/ydb/jdbc/context/YdbContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.google.common.cache.CacheBuilder;

import tech.ydb.core.Result;
import tech.ydb.core.Status;
import tech.ydb.core.UnexpectedResultException;
import tech.ydb.core.grpc.GrpcTransport;
import tech.ydb.core.grpc.GrpcTransportBuilder;
Expand Down Expand Up @@ -198,9 +199,16 @@ public boolean isTxTracerEnabled() {

public YdbExecutor createExecutor() throws SQLException {
if (config.isUseQueryService()) {
return new QueryServiceExecutor(this, operationProps.getTransactionLevel(), operationProps.isAutoCommit());
if (operationProps.getProcessUndetermined()) {
return new QueryServiceExecutorExt(
this, operationProps.getTransactionLevel(), operationProps.isAutoCommit());
} else {
return new QueryServiceExecutor(
this, operationProps.getTransactionLevel(), operationProps.isAutoCommit());
}
} else {
return new TableServiceExecutor(this, operationProps.getTransactionLevel(), operationProps.isAutoCommit());
return new TableServiceExecutor(
this, operationProps.getTransactionLevel(), operationProps.isAutoCommit());
}
}

Expand Down Expand Up @@ -278,6 +286,7 @@ public void deregister() {
}

public static YdbContext createContext(YdbConfig config) throws SQLException {
GrpcTransport grpcTransport = null;
try {
LOGGER.log(Level.FINE, "Creating new YDB context to {0}", config.getConnectionString());

Expand All @@ -301,7 +310,7 @@ public static YdbContext createContext(YdbConfig config) throws SQLException {
});
});

GrpcTransport grpcTransport = builder.build();
grpcTransport = builder.build();

PooledTableClient.Builder tableClient = PooledTableClient.newClient(
GrpcTableRpc.useTransport(grpcTransport)
Expand All @@ -310,9 +319,25 @@ public static YdbContext createContext(YdbConfig config) throws SQLException {

boolean autoResize = clientProps.applyToTableClient(tableClient, queryClient);

return new YdbContext(config, operationProps, queryProps, grpcTransport,
YdbContext yc = new YdbContext(config, operationProps, queryProps, grpcTransport,
tableClient.build(), queryClient.build(), autoResize);
if (operationProps.getProcessUndetermined()) {
if (config.isUseQueryService()) {
yc.ensureTransactionTableExists();
} else {
LOGGER.log(Level.WARNING, "UNDETERMINED processing is disabled, "
+ "because it is only supported for QueryService execution mode.");
}
}
return yc;
} catch (RuntimeException ex) {
if (grpcTransport != null) {
try {
grpcTransport.close();
} catch (Exception exClose) {
LOGGER.log(Level.FINE, "Issue when closing gRPC transport", exClose);
}
}
StringBuilder sb = new StringBuilder("Cannot connect to YDB: ").append(ex.getMessage());
Throwable cause = ex.getCause();
while (cause != null) {
Expand All @@ -323,6 +348,32 @@ public static YdbContext createContext(YdbConfig config) throws SQLException {
}
}

public void ensureTransactionTableExists() throws SQLException {
String tableName = operationProps.getProcessUndeterminedTable();
if (tableName.isEmpty()) {
return;
}
LOGGER.log(Level.FINE, "Using table {} for UNDETERMINED processing", tableName);
String sqlCreate = "CREATE TABLE IF NOT EXISTS `" + tableName
+ "` (trans_hash Int32 NOT NULL, trans_id Text NOT NULL, trans_tv Timestamp,"
+ " PRIMARY KEY (trans_hash, trans_id)) WITH ("
+ "TTL=Interval('PT60M') ON trans_tv,"
/*
+ "AUTO_PARTITIONING_MIN_PARTITIONS_COUNT=100,"
+ "AUTO_PARTITIONING_MAX_PARTITIONS_COUNT=150,"
*/
+ "AUTO_PARTITIONING_BY_LOAD=ENABLED,"
+ "AUTO_PARTITIONING_BY_SIZE=ENABLED,"
+ "AUTO_PARTITIONING_PARTITION_SIZE_MB=100"
+ ");";
Status status = retryCtx.supplyStatus(
session -> session.executeSchemeQuery(sqlCreate))
.join();
new YdbValidator().validate(
"Create table " + tableName,
getTracer(), status);
}

public <T extends RequestSettings<?>> T withDefaultTimeout(T settings) {
Duration operation = operationProps.getDeadlineTimeout();
if (!operation.isZero() && !operation.isNegative()) {
Expand Down
8 changes: 6 additions & 2 deletions jdbc/src/main/java/tech/ydb/jdbc/context/YdbValidator.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,7 @@ private <T> T joinFuture(Supplier<CompletableFuture<T>> supplier) {
}
}

public void execute(String msg, YdbTracer tracer, Supplier<CompletableFuture<Status>> fn) throws SQLException {
Status status = joinFuture(fn);
public void validate(String msg, YdbTracer tracer, Status status) throws SQLException {
addStatusIssues(status);

tracer.trace("<-- " + status.toString());
Expand All @@ -79,6 +78,11 @@ public void execute(String msg, YdbTracer tracer, Supplier<CompletableFuture<Sta
}
}

public void execute(String msg, YdbTracer tracer, Supplier<CompletableFuture<Status>> fn) throws SQLException {
Status status = joinFuture(fn);
validate(msg, tracer, status);
}

public <R> R call(String msg, YdbTracer tracer, Supplier<CompletableFuture<Result<R>>> fn) throws SQLException {
try {
Result<R> result = joinFuture(fn);
Expand Down
17 changes: 13 additions & 4 deletions jdbc/src/main/java/tech/ydb/jdbc/query/YdbQuery.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package tech.ydb.jdbc.query;



import java.sql.SQLException;
import java.util.List;

Expand All @@ -20,8 +18,10 @@ public class YdbQuery {

private final QueryType type;
private final boolean isPlainYQL;
private final boolean writing;

YdbQuery(String originQuery, String preparedYQL, List<QueryStatement> stats, YqlBatcher batcher, QueryType type) {
YdbQuery(String originQuery, String preparedYQL, List<QueryStatement> stats,
YqlBatcher batcher, QueryType type, boolean writing) {
this.originQuery = originQuery;
this.preparedYQL = preparedYQL;
this.statements = stats;
Expand All @@ -33,12 +33,17 @@ public class YdbQuery {
hasJdbcParameters = hasJdbcParameters || st.hasJdbcParameters();
}
this.isPlainYQL = !hasJdbcParameters;
this.writing = writing;
}

public QueryType getType() {
return type;
}

public boolean isWriting() {
return writing;
}

public YqlBatcher getYqlBatcher() {
return batcher.isValidBatch() ? batcher : null;
}
Expand All @@ -62,6 +67,7 @@ public List<QueryStatement> getStatements() {
public static YdbQuery parseQuery(String query, YdbQueryProperties opts, YdbTypes types) throws SQLException {
YdbQueryParser parser = new YdbQueryParser(types, query, opts);
String preparedYQL = parser.parseSQL();
boolean writing = false;

QueryType type = null;
YqlBatcher batcher = parser.getYqlBatcher();
Expand All @@ -85,7 +91,10 @@ public static YdbQuery parseQuery(String query, YdbQueryProperties opts, YdbType
if (type == null) {
type = parser.detectQueryType();
}
if (QueryType.DATA_QUERY.equals(type)) {
writing = parser.detectWriting();
}

return new YdbQuery(query, preparedYQL, statements, batcher, type);
return new YdbQuery(query, preparedYQL, statements, batcher, type, writing);
}
}
13 changes: 12 additions & 1 deletion jdbc/src/main/java/tech/ydb/jdbc/query/YdbQueryParser.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package tech.ydb.jdbc.query;


import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import java.util.ArrayList;
Expand Down Expand Up @@ -68,6 +67,18 @@ public QueryType detectQueryType() throws SQLException {
return type != null ? type : QueryType.DATA_QUERY;
}

public boolean detectWriting() {
for (QueryStatement st: statements) {
switch (st.getCmd()) {
case INSERT_UPSERT:
case UPDATE_REPLACE_DELETE:
return true;
default:
}
}
return false;
}

@SuppressWarnings("MethodLength")
public String parseSQL() throws SQLException {
int fragmentStart = 0;
Expand Down
2 changes: 2 additions & 0 deletions jdbc/src/main/java/tech/ydb/jdbc/settings/YdbConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,8 @@ public DriverPropertyInfo[] toPropertyInfo() throws SQLException {
YdbClientProperties.SESSION_POOL_SIZE_MIN.toInfo(properties),
YdbClientProperties.SESSION_POOL_SIZE_MAX.toInfo(properties),

YdbOperationProperties.PROCESS_UNDETERMINED_TABLE.toInfo(properties),
YdbOperationProperties.PROCESS_UNDETERMINED.toInfo(properties),
YdbOperationProperties.USE_STREAM_RESULT_SETS.toInfo(properties),
YdbOperationProperties.JOIN_DURATION.toInfo(properties),
YdbOperationProperties.QUERY_TIMEOUT.toInfo(properties),
Expand Down
Loading