Skip to content

Commit da4420e

Browse files
committed
Merge remote-tracking branch 'zinal/undetermined-101'
2 parents 3db87c6 + 10e7ea7 commit da4420e

File tree

10 files changed

+269
-19
lines changed

10 files changed

+269
-19
lines changed

jdbc/src/main/java/tech/ydb/jdbc/context/QueryServiceExecutor.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -42,17 +42,17 @@
4242
* @author Aleksandr Gorshenin
4343
*/
4444
public class QueryServiceExecutor extends BaseYdbExecutor {
45-
private final Duration sessionTimeout;
46-
private final QueryClient queryClient;
45+
protected final Duration sessionTimeout;
46+
protected final QueryClient queryClient;
4747
private final boolean useStreamResultSet;
4848

49-
private int transactionLevel;
50-
private boolean isReadOnly;
51-
private boolean isAutoCommit;
52-
private TxMode txMode;
49+
protected int transactionLevel;
50+
protected boolean isReadOnly;
51+
protected boolean isAutoCommit;
52+
protected TxMode txMode;
5353

54-
private final AtomicReference<QueryTransaction> tx = new AtomicReference<>();
55-
private volatile boolean isClosed;
54+
protected final AtomicReference<QueryTransaction> tx = new AtomicReference<>();
55+
protected volatile boolean isClosed;
5656

5757
public QueryServiceExecutor(YdbContext ctx, int transactionLevel, boolean autoCommit) throws SQLException {
5858
super(ctx);
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
package tech.ydb.jdbc.context;
2+
3+
import java.sql.SQLException;
4+
5+
import tech.ydb.core.Result;
6+
import tech.ydb.core.Status;
7+
import tech.ydb.core.StatusCode;
8+
import tech.ydb.jdbc.YdbStatement;
9+
import tech.ydb.jdbc.YdbTracer;
10+
import tech.ydb.jdbc.impl.YdbQueryResult;
11+
import tech.ydb.jdbc.query.YdbQuery;
12+
import tech.ydb.query.QueryTransaction;
13+
import tech.ydb.table.query.DataQueryResult;
14+
import tech.ydb.table.query.Params;
15+
import tech.ydb.table.transaction.TxControl;
16+
import tech.ydb.table.values.PrimitiveValue;
17+
18+
/**
19+
*
20+
* @author mzinal
21+
*/
22+
public class QueryServiceExecutorExt extends QueryServiceExecutor {
23+
24+
private final String processUndeterminedTable;
25+
private boolean writing;
26+
27+
public QueryServiceExecutorExt(YdbContext ctx, int transactionLevel, boolean autoCommit) throws SQLException {
28+
super(ctx, transactionLevel, autoCommit);
29+
this.processUndeterminedTable = ctx.getOperationProperties().getProcessUndeterminedTable();
30+
this.writing = false;
31+
}
32+
33+
private Status upsertAndCommit(QueryTransaction localTx) {
34+
String sql = "DECLARE $trans_hash AS Int32; DECLARE $trans_id AS Text; "
35+
+ "UPSERT INTO `" + processUndeterminedTable
36+
+ "` (trans_hash, trans_id, trans_tv) "
37+
+ "VALUES ($trans_hash, $trans_id, CurrentUtcTimestamp());";
38+
Params params = Params.of(
39+
"$trans_id", PrimitiveValue.newText(localTx.getId()),
40+
"$trans_hash", PrimitiveValue.newInt32(localTx.getId().hashCode())
41+
);
42+
return localTx.createQueryWithCommit(sql, params)
43+
.execute()
44+
.join()
45+
.getStatus();
46+
}
47+
48+
private boolean checkTransaction(YdbContext ctx, String transId,
49+
YdbValidator validator, YdbTracer tracer) throws SQLException {
50+
String sql = "DECLARE $trans_hash AS Int32; DECLARE $trans_id AS Text; "
51+
+ "SELECT trans_id, trans_tv FROM `" + processUndeterminedTable
52+
+ "` WHERE trans_hash=$trans_hash AND trans_id=$trans_id;";
53+
Params params = Params.of(
54+
"$trans_id", PrimitiveValue.newText(transId),
55+
"$trans_hash", PrimitiveValue.newInt32(transId.hashCode())
56+
);
57+
Result<DataQueryResult> result = ctx.getRetryCtx().supplyResult(
58+
session -> session.executeDataQuery(sql, TxControl.onlineRo(), params))
59+
.join();
60+
if (!result.getStatus().isSuccess()) {
61+
// Failed to obtain the transaction status, have to return the error
62+
validator.validate("CommitVal TxId: " + transId, tracer, result.getStatus());
63+
}
64+
DataQueryResult dqr = result.getValue();
65+
if (dqr.getResultSetCount() == 1) {
66+
if (dqr.getResultSet(0).getRowCount() == 1) {
67+
return true;
68+
}
69+
}
70+
return false;
71+
}
72+
73+
private void commitWithCheck(YdbContext ctx, YdbValidator validator) throws SQLException {
74+
ensureOpened();
75+
76+
QueryTransaction localTx = tx.get();
77+
if (localTx == null || !localTx.isActive()) {
78+
return;
79+
}
80+
81+
YdbTracer tracer = ctx.getTracer();
82+
tracer.trace("--> commitExt");
83+
tracer.query(null);
84+
85+
try {
86+
validator.clearWarnings();
87+
Status status = upsertAndCommit(localTx);
88+
if (StatusCode.UNDETERMINED.equals(status.getCode())) {
89+
if (checkTransaction(ctx, localTx.getId(), validator, tracer)) {
90+
status = Status.SUCCESS;
91+
} else {
92+
status = Status.of(StatusCode.ABORTED, status.getCause(), status.getIssues());
93+
}
94+
}
95+
validator.validate("CommitExt TxId: " + localTx.getId(), tracer, status);
96+
} finally {
97+
if (tx.compareAndSet(localTx, null)) {
98+
localTx.getSession().close();
99+
}
100+
tracer.close();
101+
}
102+
}
103+
104+
@Override
105+
public void commit(YdbContext ctx, YdbValidator validator) throws SQLException {
106+
try {
107+
if (isInsideTransaction() && writing) {
108+
commitWithCheck(ctx, validator);
109+
} else {
110+
super.commit(ctx, validator);
111+
}
112+
} finally {
113+
writing = false;
114+
}
115+
}
116+
117+
@Override
118+
public void rollback(YdbContext ctx, YdbValidator validator) throws SQLException {
119+
try {
120+
super.rollback(ctx, validator);
121+
} finally {
122+
writing = false;
123+
}
124+
}
125+
126+
@Override
127+
public YdbQueryResult executeDataQuery(
128+
YdbStatement statement, YdbQuery query, String preparedYql, Params params, long timeout, boolean keepInCache
129+
) throws SQLException {
130+
YdbQueryResult yqr = super.executeDataQuery(statement, query, preparedYql, params, timeout, keepInCache);
131+
if (query.isWriting() && !isAutoCommit) {
132+
writing = true;
133+
}
134+
return yqr;
135+
}
136+
137+
}

jdbc/src/main/java/tech/ydb/jdbc/context/YdbContext.java

Lines changed: 55 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import com.google.common.cache.CacheBuilder;
1919

2020
import tech.ydb.core.Result;
21+
import tech.ydb.core.Status;
2122
import tech.ydb.core.UnexpectedResultException;
2223
import tech.ydb.core.grpc.GrpcTransport;
2324
import tech.ydb.core.grpc.GrpcTransportBuilder;
@@ -195,9 +196,16 @@ public String getUsername() {
195196

196197
public YdbExecutor createExecutor() throws SQLException {
197198
if (config.isUseQueryService()) {
198-
return new QueryServiceExecutor(this, operationProps.getTransactionLevel(), operationProps.isAutoCommit());
199+
if (operationProps.getProcessUndetermined()) {
200+
return new QueryServiceExecutorExt(
201+
this, operationProps.getTransactionLevel(), operationProps.isAutoCommit());
202+
} else {
203+
return new QueryServiceExecutor(
204+
this, operationProps.getTransactionLevel(), operationProps.isAutoCommit());
205+
}
199206
} else {
200-
return new TableServiceExecutor(this, operationProps.getTransactionLevel(), operationProps.isAutoCommit());
207+
return new TableServiceExecutor(
208+
this, operationProps.getTransactionLevel(), operationProps.isAutoCommit());
201209
}
202210
}
203211

@@ -277,6 +285,7 @@ public void deregister() {
277285
}
278286

279287
public static YdbContext createContext(YdbConfig config) throws SQLException {
288+
GrpcTransport grpcTransport = null;
280289
try {
281290
LOGGER.log(Level.FINE, "Creating new YDB context to {0}", config.getConnectionString());
282291

@@ -300,7 +309,7 @@ public static YdbContext createContext(YdbConfig config) throws SQLException {
300309
});
301310
});
302311

303-
GrpcTransport grpcTransport = builder.build();
312+
grpcTransport = builder.build();
304313

305314
PooledTableClient.Builder tableClient = PooledTableClient.newClient(
306315
GrpcTableRpc.useTransport(grpcTransport)
@@ -309,9 +318,25 @@ public static YdbContext createContext(YdbConfig config) throws SQLException {
309318

310319
boolean autoResize = clientProps.applyToTableClient(tableClient, queryClient);
311320

312-
return new YdbContext(config, operationProps, queryProps, grpcTransport,
321+
YdbContext yc = new YdbContext(config, operationProps, queryProps, grpcTransport,
313322
tableClient.build(), queryClient.build(), autoResize);
323+
if (operationProps.getProcessUndetermined()) {
324+
if (config.isUseQueryService()) {
325+
yc.ensureTransactionTableExists();
326+
} else {
327+
LOGGER.log(Level.WARNING, "UNDETERMINED processing is disabled, "
328+
+ "because it is only supported for QueryService execution mode.");
329+
}
330+
}
331+
return yc;
314332
} catch (RuntimeException ex) {
333+
if (grpcTransport != null) {
334+
try {
335+
grpcTransport.close();
336+
} catch (Exception exClose) {
337+
LOGGER.log(Level.FINE, "Issue when closing gRPC transport", exClose);
338+
}
339+
}
315340
StringBuilder sb = new StringBuilder("Cannot connect to YDB: ").append(ex.getMessage());
316341
Throwable cause = ex.getCause();
317342
while (cause != null) {
@@ -322,6 +347,32 @@ public static YdbContext createContext(YdbConfig config) throws SQLException {
322347
}
323348
}
324349

350+
public void ensureTransactionTableExists() throws SQLException {
351+
String tableName = operationProps.getProcessUndeterminedTable();
352+
if (tableName.isEmpty()) {
353+
return;
354+
}
355+
LOGGER.log(Level.FINE, "Using table {} for UNDETERMINED processing", tableName);
356+
String sqlCreate = "CREATE TABLE IF NOT EXISTS `" + tableName
357+
+ "` (trans_hash Int32 NOT NULL, trans_id Text NOT NULL, trans_tv Timestamp,"
358+
+ " PRIMARY KEY (trans_hash, trans_id)) WITH ("
359+
+ "TTL=Interval('PT60M') ON trans_tv,"
360+
/*
361+
+ "AUTO_PARTITIONING_MIN_PARTITIONS_COUNT=100,"
362+
+ "AUTO_PARTITIONING_MAX_PARTITIONS_COUNT=150,"
363+
*/
364+
+ "AUTO_PARTITIONING_BY_LOAD=ENABLED,"
365+
+ "AUTO_PARTITIONING_BY_SIZE=ENABLED,"
366+
+ "AUTO_PARTITIONING_PARTITION_SIZE_MB=100"
367+
+ ");";
368+
Status status = retryCtx.supplyStatus(
369+
session -> session.executeSchemeQuery(sqlCreate))
370+
.join();
371+
new YdbValidator().validate(
372+
"Create table " + tableName,
373+
getTracer(), status);
374+
}
375+
325376
public <T extends RequestSettings<?>> T withDefaultTimeout(T settings) {
326377
Duration operation = operationProps.getDeadlineTimeout();
327378
if (!operation.isZero() && !operation.isNegative()) {

jdbc/src/main/java/tech/ydb/jdbc/context/YdbValidator.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,7 @@ private <T> T joinFuture(Supplier<CompletableFuture<T>> supplier) {
6666
}
6767
}
6868

69-
public void execute(String msg, YdbTracer tracer, Supplier<CompletableFuture<Status>> fn) throws SQLException {
70-
Status status = joinFuture(fn);
69+
public void validate(String msg, YdbTracer tracer, Status status) throws SQLException {
7170
addStatusIssues(status);
7271

7372
tracer.trace("<-- " + status.toString());
@@ -79,6 +78,11 @@ public void execute(String msg, YdbTracer tracer, Supplier<CompletableFuture<Sta
7978
}
8079
}
8180

81+
public void execute(String msg, YdbTracer tracer, Supplier<CompletableFuture<Status>> fn) throws SQLException {
82+
Status status = joinFuture(fn);
83+
validate(msg, tracer, status);
84+
}
85+
8286
public <R> R call(String msg, YdbTracer tracer, Supplier<CompletableFuture<Result<R>>> fn) throws SQLException {
8387
try {
8488
Result<R> result = joinFuture(fn);

jdbc/src/main/java/tech/ydb/jdbc/query/YdbQuery.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
package tech.ydb.jdbc.query;
22

3-
4-
53
import java.sql.SQLException;
64
import java.util.List;
75

@@ -20,8 +18,10 @@ public class YdbQuery {
2018

2119
private final QueryType type;
2220
private final boolean isPlainYQL;
21+
private final boolean writing;
2322

24-
YdbQuery(QueryKey key, String preparedYQL, List<QueryStatement> stats, YqlBatcher batcher, QueryType type) {
23+
YdbQuery(QueryKey key, String preparedYQL, List<QueryStatement> stats, YqlBatcher batcher, QueryType type,
24+
boolean writing) {
2525
this.key = key;
2626
this.preparedYQL = preparedYQL;
2727
this.statements = stats;
@@ -33,12 +33,17 @@ public class YdbQuery {
3333
hasJdbcParameters = hasJdbcParameters || st.hasJdbcParameters();
3434
}
3535
this.isPlainYQL = !hasJdbcParameters;
36+
this.writing = writing;
3637
}
3738

3839
public QueryType getType() {
3940
return type;
4041
}
4142

43+
public boolean isWriting() {
44+
return writing;
45+
}
46+
4247
public YqlBatcher getYqlBatcher() {
4348
return batcher.isValidBatch() ? batcher : null;
4449
}
@@ -66,6 +71,7 @@ public List<QueryStatement> getStatements() {
6671
public static YdbQuery parseQuery(QueryKey query, YdbQueryProperties opts, YdbTypes types) throws SQLException {
6772
YdbQueryParser parser = new YdbQueryParser(types, query, opts);
6873
String preparedYQL = parser.parseSQL();
74+
boolean writing = false;
6975

7076
QueryType type = null;
7177
YqlBatcher batcher = parser.getYqlBatcher();
@@ -89,7 +95,10 @@ public static YdbQuery parseQuery(QueryKey query, YdbQueryProperties opts, YdbTy
8995
if (type == null) {
9096
type = parser.detectQueryType();
9197
}
98+
if (QueryType.DATA_QUERY.equals(type)) {
99+
writing = parser.detectWriting();
100+
}
92101

93-
return new YdbQuery(query, preparedYQL, statements, batcher, type);
102+
return new YdbQuery(query, preparedYQL, statements, batcher, type, writing);
94103
}
95104
}

jdbc/src/main/java/tech/ydb/jdbc/query/YdbQueryParser.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package tech.ydb.jdbc.query;
22

3-
43
import java.sql.SQLException;
54
import java.sql.SQLFeatureNotSupportedException;
65
import java.util.ArrayList;
@@ -75,6 +74,18 @@ public QueryType detectQueryType() throws SQLException {
7574
return type != null ? type : QueryType.DATA_QUERY;
7675
}
7776

77+
public boolean detectWriting() {
78+
for (QueryStatement st: statements) {
79+
switch (st.getCmd()) {
80+
case INSERT_UPSERT:
81+
case UPDATE_REPLACE_DELETE:
82+
return true;
83+
default:
84+
}
85+
}
86+
return false;
87+
}
88+
7889
@SuppressWarnings("MethodLength")
7990
public String parseSQL() throws SQLException {
8091
int fragmentStart = 0;

jdbc/src/main/java/tech/ydb/jdbc/settings/YdbConfig.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,8 @@ public DriverPropertyInfo[] toPropertyInfo() throws SQLException {
211211
YdbClientProperties.SESSION_POOL_SIZE_MIN.toInfo(properties),
212212
YdbClientProperties.SESSION_POOL_SIZE_MAX.toInfo(properties),
213213

214+
YdbOperationProperties.PROCESS_UNDETERMINED_TABLE.toInfo(properties),
215+
YdbOperationProperties.PROCESS_UNDETERMINED.toInfo(properties),
214216
YdbOperationProperties.USE_STREAM_RESULT_SETS.toInfo(properties),
215217
YdbOperationProperties.JOIN_DURATION.toInfo(properties),
216218
YdbOperationProperties.QUERY_TIMEOUT.toInfo(properties),

0 commit comments

Comments
 (0)