Skip to content

Commit 9333397

Browse files
committed
initial implementation for UNDETERMINED processing
1 parent 2222ad7 commit 9333397

File tree

10 files changed

+257
-19
lines changed

10 files changed

+257
-19
lines changed

jdbc/src/main/java/tech/ydb/jdbc/context/QueryServiceExecutor.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -42,17 +42,17 @@
4242
* @author Aleksandr Gorshenin
4343
*/
4444
public class QueryServiceExecutor extends BaseYdbExecutor {
45-
private final Duration sessionTimeout;
46-
private final QueryClient queryClient;
45+
protected final Duration sessionTimeout;
46+
protected final QueryClient queryClient;
4747
private final boolean useStreamResultSet;
4848

49-
private int transactionLevel;
50-
private boolean isReadOnly;
51-
private boolean isAutoCommit;
52-
private TxMode txMode;
49+
protected int transactionLevel;
50+
protected boolean isReadOnly;
51+
protected boolean isAutoCommit;
52+
protected TxMode txMode;
5353

54-
private final AtomicReference<QueryTransaction> tx = new AtomicReference<>();
55-
private volatile boolean isClosed;
54+
protected final AtomicReference<QueryTransaction> tx = new AtomicReference<>();
55+
protected volatile boolean isClosed;
5656

5757
public QueryServiceExecutor(YdbContext ctx, int transactionLevel, boolean autoCommit) throws SQLException {
5858
super(ctx);
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
package tech.ydb.jdbc.context;
2+
3+
import java.sql.SQLException;
4+
5+
import tech.ydb.core.Result;
6+
import tech.ydb.core.Status;
7+
import tech.ydb.core.StatusCode;
8+
import tech.ydb.jdbc.YdbStatement;
9+
import tech.ydb.jdbc.YdbTracer;
10+
import tech.ydb.jdbc.impl.YdbQueryResult;
11+
import tech.ydb.jdbc.query.YdbQuery;
12+
import tech.ydb.query.QueryTransaction;
13+
import tech.ydb.table.query.DataQueryResult;
14+
import tech.ydb.table.query.Params;
15+
import tech.ydb.table.transaction.TxControl;
16+
import tech.ydb.table.values.PrimitiveValue;
17+
18+
/**
19+
*
20+
* @author mzinal
21+
*/
22+
public class QueryServiceExecutorExt extends QueryServiceExecutor {
23+
24+
private final String processUndeterminedTable;
25+
private boolean writing;
26+
27+
public QueryServiceExecutorExt(YdbContext ctx, int transactionLevel, boolean autoCommit) throws SQLException {
28+
super(ctx, transactionLevel, autoCommit);
29+
this.processUndeterminedTable = ctx.getOperationProperties().getProcessUndeterminedTable();
30+
this.writing = false;
31+
}
32+
33+
private Status upsertAndCommit(QueryTransaction localTx) {
34+
String sql = "DECLARE $trans_id AS Text; "
35+
+ "UPSERT INTO `" + processUndeterminedTable
36+
+ "` (trans_id, trans_tv) VALUES ($trans_id, CurrentUtcTimestamp());";
37+
Params params = Params.of("$trans_id", PrimitiveValue.newText(localTx.getId()));
38+
return localTx.createQueryWithCommit(sql, params)
39+
.execute()
40+
.join()
41+
.getStatus();
42+
}
43+
44+
private boolean checkTransaction(YdbContext ctx, String transId,
45+
YdbValidator validator, YdbTracer tracer) throws SQLException {
46+
String sql = "DECLARE $trans_id AS Text; "
47+
+ "SELECT trans_id, trans_tv FROM `" + processUndeterminedTable
48+
+ "` WHERE trans_id=$trans_id;";
49+
Params params = Params.of("$trans_id", PrimitiveValue.newText(transId));
50+
Result<DataQueryResult> result = ctx.getRetryCtx().supplyResult(
51+
session -> session.executeDataQuery(sql, TxControl.onlineRo(), params))
52+
.join();
53+
if (!result.getStatus().isSuccess()) {
54+
// Failed to obtain the transaction status, have to return the error
55+
validator.validate("CommitVal TxId: " + transId, tracer, result.getStatus());
56+
}
57+
DataQueryResult dqr = result.getValue();
58+
if (dqr.getResultSetCount() == 1) {
59+
if (dqr.getResultSet(0).getRowCount() == 1) {
60+
return true;
61+
}
62+
}
63+
return false;
64+
}
65+
66+
private void commitWithCheck(YdbContext ctx, YdbValidator validator) throws SQLException {
67+
ensureOpened();
68+
69+
QueryTransaction localTx = tx.get();
70+
if (localTx == null || !localTx.isActive()) {
71+
return;
72+
}
73+
74+
YdbTracer tracer = ctx.getTracer();
75+
tracer.trace("--> commitExt");
76+
tracer.query(null);
77+
78+
try {
79+
validator.clearWarnings();
80+
Status status = upsertAndCommit(localTx);
81+
if (StatusCode.UNDETERMINED.equals(status.getCode())) {
82+
if (!checkTransaction(ctx, localTx.getId(), validator, tracer)) {
83+
status = Status.of(StatusCode.ABORTED, status.getCause(), status.getIssues());
84+
}
85+
}
86+
validator.validate("CommitExt TxId: " + localTx.getId(), tracer, status);
87+
} finally {
88+
if (tx.compareAndSet(localTx, null)) {
89+
localTx.getSession().close();
90+
}
91+
tracer.close();
92+
}
93+
}
94+
95+
@Override
96+
public void commit(YdbContext ctx, YdbValidator validator) throws SQLException {
97+
try {
98+
if (isInsideTransaction() && writing) {
99+
commitWithCheck(ctx, validator);
100+
} else {
101+
super.commit(ctx, validator);
102+
}
103+
} finally {
104+
writing = false;
105+
}
106+
}
107+
108+
@Override
109+
public void rollback(YdbContext ctx, YdbValidator validator) throws SQLException {
110+
try {
111+
super.rollback(ctx, validator);
112+
} finally {
113+
writing = false;
114+
}
115+
}
116+
117+
@Override
118+
public YdbQueryResult executeDataQuery(
119+
YdbStatement statement, YdbQuery query, String preparedYql, Params params, long timeout, boolean keepInCache
120+
) throws SQLException {
121+
YdbQueryResult yqr = super.executeDataQuery(statement, query, preparedYql, params, timeout, keepInCache);
122+
if (query.isWriting() && !isAutoCommit) {
123+
writing = true;
124+
}
125+
return yqr;
126+
}
127+
128+
}

jdbc/src/main/java/tech/ydb/jdbc/context/YdbContext.java

Lines changed: 53 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import com.google.common.cache.CacheBuilder;
1919

2020
import tech.ydb.core.Result;
21+
import tech.ydb.core.Status;
2122
import tech.ydb.core.UnexpectedResultException;
2223
import tech.ydb.core.grpc.GrpcTransport;
2324
import tech.ydb.core.grpc.GrpcTransportBuilder;
@@ -190,9 +191,16 @@ public boolean isTxTracerEnabled() {
190191

191192
public YdbExecutor createExecutor() throws SQLException {
192193
if (config.isUseQueryService()) {
193-
return new QueryServiceExecutor(this, operationProps.getTransactionLevel(), operationProps.isAutoCommit());
194+
if (operationProps.getProcessUndetermined()) {
195+
return new QueryServiceExecutorExt(
196+
this, operationProps.getTransactionLevel(), operationProps.isAutoCommit());
197+
} else {
198+
return new QueryServiceExecutor(
199+
this, operationProps.getTransactionLevel(), operationProps.isAutoCommit());
200+
}
194201
} else {
195-
return new TableServiceExecutor(this, operationProps.getTransactionLevel(), operationProps.isAutoCommit());
202+
return new TableServiceExecutor(
203+
this, operationProps.getTransactionLevel(), operationProps.isAutoCommit());
196204
}
197205
}
198206

@@ -270,6 +278,7 @@ public void deregister() {
270278
}
271279

272280
public static YdbContext createContext(YdbConfig config) throws SQLException {
281+
GrpcTransport grpcTransport = null;
273282
try {
274283
LOGGER.log(Level.FINE, "Creating new YDB context to {0}", config.getConnectionString());
275284

@@ -293,7 +302,7 @@ public static YdbContext createContext(YdbConfig config) throws SQLException {
293302
});
294303
});
295304

296-
GrpcTransport grpcTransport = builder.build();
305+
grpcTransport = builder.build();
297306

298307
PooledTableClient.Builder tableClient = PooledTableClient.newClient(
299308
GrpcTableRpc.useTransport(grpcTransport)
@@ -302,9 +311,25 @@ public static YdbContext createContext(YdbConfig config) throws SQLException {
302311

303312
boolean autoResize = clientProps.applyToTableClient(tableClient, queryClient);
304313

305-
return new YdbContext(config, operationProps, queryProps, grpcTransport,
314+
YdbContext yc = new YdbContext(config, operationProps, queryProps, grpcTransport,
306315
tableClient.build(), queryClient.build(), autoResize);
316+
if (operationProps.getProcessUndetermined()) {
317+
if (config.isUseQueryService()) {
318+
yc.ensureTransactionTableExists();
319+
} else {
320+
LOGGER.log(Level.WARNING, "UNDETERMINED processing is disabled, "
321+
+ "because it is only supported for QueryService execution mode.");
322+
}
323+
}
324+
return yc;
307325
} catch (RuntimeException ex) {
326+
if (grpcTransport != null) {
327+
try {
328+
grpcTransport.close();
329+
} catch (Exception exClose) {
330+
LOGGER.log(Level.FINE, "Issue when closing gRPC transport", exClose);
331+
}
332+
}
308333
StringBuilder sb = new StringBuilder("Cannot connect to YDB: ").append(ex.getMessage());
309334
Throwable cause = ex.getCause();
310335
while (cause != null) {
@@ -315,6 +340,30 @@ public static YdbContext createContext(YdbConfig config) throws SQLException {
315340
}
316341
}
317342

343+
public void ensureTransactionTableExists() throws SQLException {
344+
String tableName = operationProps.getProcessUndeterminedTable();
345+
if (tableName.isEmpty()) {
346+
return;
347+
}
348+
LOGGER.log(Level.FINE, "Using table {} for UNDETERMINED processing", tableName);
349+
String sqlCreate = "CREATE TABLE IF NOT EXISTS `" + tableName
350+
+ "` (trans_id Text NOT NULL, trans_tv Timestamp,"
351+
+ " PRIMARY KEY (trans_id)) WITH ("
352+
+ "TTL=Interval('PT60M') ON trans_tv,"
353+
+ "AUTO_PARTITIONING_MIN_PARTITIONS_COUNT=100,"
354+
+ "AUTO_PARTITIONING_MAX_PARTITIONS_COUNT=150,"
355+
+ "AUTO_PARTITIONING_BY_LOAD=ENABLED,"
356+
+ "AUTO_PARTITIONING_BY_SIZE=ENABLED,"
357+
+ "AUTO_PARTITIONING_PARTITION_SIZE_MB=100"
358+
+ ");";
359+
Status status = retryCtx.supplyStatus(
360+
session -> session.executeSchemeQuery(sqlCreate))
361+
.join();
362+
new YdbValidator().validate(
363+
"Create table " + tableName,
364+
getTracer(), status);
365+
}
366+
318367
public <T extends RequestSettings<?>> T withDefaultTimeout(T settings) {
319368
Duration operation = operationProps.getDeadlineTimeout();
320369
if (!operation.isZero() && !operation.isNegative()) {

jdbc/src/main/java/tech/ydb/jdbc/context/YdbValidator.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,7 @@ public void clearWarnings() {
5454
this.issues.clear();
5555
}
5656

57-
public void execute(String msg, YdbTracer tracer, Supplier<CompletableFuture<Status>> fn) throws SQLException {
58-
Status status = fn.get().join();
57+
public void validate(String msg, YdbTracer tracer, Status status) throws SQLException {
5958
addStatusIssues(status);
6059

6160
tracer.trace("<-- " + status.toString());
@@ -67,6 +66,10 @@ public void execute(String msg, YdbTracer tracer, Supplier<CompletableFuture<Sta
6766
}
6867
}
6968

69+
public void execute(String msg, YdbTracer tracer, Supplier<CompletableFuture<Status>> fn) throws SQLException {
70+
validate(msg, tracer, fn.get().join());
71+
}
72+
7073
public <R> R call(String msg, Supplier<CompletableFuture<Result<R>>> fn) throws SQLException {
7174
try {
7275
Result<R> result = fn.get().join();

jdbc/src/main/java/tech/ydb/jdbc/query/YdbQuery.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
package tech.ydb.jdbc.query;
22

3-
4-
53
import java.sql.SQLException;
64
import java.util.List;
75

@@ -19,8 +17,10 @@ public class YdbQuery {
1917

2018
private final QueryType type;
2119
private final boolean isPlainYQL;
20+
private final boolean writing;
2221

23-
YdbQuery(String originQuery, String preparedYQL, List<QueryStatement> stats, YqlBatcher batcher, QueryType type) {
22+
YdbQuery(String originQuery, String preparedYQL, List<QueryStatement> stats,
23+
YqlBatcher batcher, QueryType type, boolean writing) {
2424
this.originQuery = originQuery;
2525
this.preparedYQL = preparedYQL;
2626
this.statements = stats;
@@ -32,6 +32,7 @@ public class YdbQuery {
3232
hasJdbcParameters = hasJdbcParameters || st.hasJdbcParameters();
3333
}
3434
this.isPlainYQL = !hasJdbcParameters;
35+
this.writing = writing;
3536
}
3637

3738
public QueryType getType() {
@@ -58,11 +59,16 @@ public List<QueryStatement> getStatements() {
5859
return statements;
5960
}
6061

62+
public boolean isWriting() {
63+
return writing;
64+
}
65+
6166
public static YdbQuery parseQuery(String query, YdbQueryProperties opts) throws SQLException {
6267
YdbQueryParser parser = new YdbQueryParser(
6368
query, opts.isDetectQueryType(), opts.isDetectJdbcParameters(), opts.isReplaceJdbcInByYqlList()
6469
);
6570
String preparedYQL = parser.parseSQL();
71+
boolean writing = false;
6672

6773
QueryType type = null;
6874
YqlBatcher batcher = parser.getYqlBatcher();
@@ -86,7 +92,10 @@ public static YdbQuery parseQuery(String query, YdbQueryProperties opts) throws
8692
if (type == null) {
8793
type = parser.detectQueryType();
8894
}
95+
if (type == QueryType.DATA_QUERY) {
96+
writing = parser.detectWriting();
97+
}
8998

90-
return new YdbQuery(query, preparedYQL, statements, batcher, type);
99+
return new YdbQuery(query, preparedYQL, statements, batcher, type, writing);
91100
}
92101
}

jdbc/src/main/java/tech/ydb/jdbc/query/YdbQueryParser.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package tech.ydb.jdbc.query;
22

3-
43
import java.sql.SQLException;
54
import java.sql.SQLFeatureNotSupportedException;
65
import java.util.ArrayList;
@@ -62,6 +61,18 @@ public QueryType detectQueryType() throws SQLException {
6261
return type != null ? type : QueryType.DATA_QUERY;
6362
}
6463

64+
public boolean detectWriting() {
65+
for (QueryStatement st: statements) {
66+
switch (st.getCmd()) {
67+
case INSERT_UPSERT:
68+
case UPDATE_REPLACE_DELETE:
69+
return true;
70+
default:
71+
}
72+
}
73+
return false;
74+
}
75+
6576
@SuppressWarnings("MethodLength")
6677
public String parseSQL() throws SQLException {
6778
int fragmentStart = 0;

jdbc/src/main/java/tech/ydb/jdbc/settings/YdbConfig.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,8 @@ public DriverPropertyInfo[] toPropertyInfo() throws SQLException {
209209
YdbClientProperties.SESSION_POOL_SIZE_MIN.toInfo(properties),
210210
YdbClientProperties.SESSION_POOL_SIZE_MAX.toInfo(properties),
211211

212+
YdbOperationProperties.PROCESS_UNDETERMINED_TABLE.toInfo(properties),
213+
YdbOperationProperties.PROCESS_UNDETERMINED.toInfo(properties),
212214
YdbOperationProperties.USE_STREAM_RESULT_SETS.toInfo(properties),
213215
YdbOperationProperties.JOIN_DURATION.toInfo(properties),
214216
YdbOperationProperties.QUERY_TIMEOUT.toInfo(properties),

0 commit comments

Comments
 (0)