Skip to content

Commit 03b4324

Browse files
authored
Added query executor with processing of undeterminted errors (#122)
2 parents 3db87c6 + 808b1fc commit 03b4324

File tree

11 files changed

+422
-61
lines changed

11 files changed

+422
-61
lines changed

jdbc/src/main/java/tech/ydb/jdbc/context/QueryServiceExecutor.java

Lines changed: 21 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import tech.ydb.jdbc.impl.YdbStaticResultSet;
2424
import tech.ydb.jdbc.query.QueryType;
2525
import tech.ydb.jdbc.query.YdbQuery;
26+
import tech.ydb.jdbc.settings.YdbOperationProperties;
2627
import tech.ydb.query.QueryClient;
2728
import tech.ydb.query.QuerySession;
2829
import tech.ydb.query.QueryStream;
@@ -54,15 +55,16 @@ public class QueryServiceExecutor extends BaseYdbExecutor {
5455
private final AtomicReference<QueryTransaction> tx = new AtomicReference<>();
5556
private volatile boolean isClosed;
5657

57-
public QueryServiceExecutor(YdbContext ctx, int transactionLevel, boolean autoCommit) throws SQLException {
58+
public QueryServiceExecutor(YdbContext ctx) throws SQLException {
5859
super(ctx);
59-
this.sessionTimeout = ctx.getOperationProperties().getSessionTimeout();
60+
YdbOperationProperties options = ctx.getOperationProperties();
61+
this.sessionTimeout = options.getSessionTimeout();
6062
this.queryClient = ctx.getQueryClient();
61-
this.useStreamResultSet = ctx.getOperationProperties().getUseStreamResultSets();
63+
this.useStreamResultSet = options.getUseStreamResultSets();
6264

63-
this.transactionLevel = transactionLevel;
65+
this.transactionLevel = options.getTransactionLevel();
66+
this.isAutoCommit = options.isAutoCommit();
6467
this.isReadOnly = transactionLevel != Connection.TRANSACTION_SERIALIZABLE;
65-
this.isAutoCommit = autoCommit;
6668
this.txMode = txMode(transactionLevel, isReadOnly);
6769
this.isClosed = false;
6870
}
@@ -179,22 +181,26 @@ public void commit(YdbContext ctx, YdbValidator validator) throws SQLException {
179181
return;
180182
}
181183

182-
YdbTracer tracer = ctx.getTracer();
183-
tracer.trace("--> commit");
184-
tracer.query(null);
185-
186-
CommitTransactionSettings settings = ctx.withRequestTimeout(CommitTransactionSettings.newBuilder()).build();
187184
try {
188-
validator.clearWarnings();
189-
validator.call("Commit TxId: " + localTx.getId(), tracer, () -> localTx.commit(settings));
185+
commitImpl(ctx, validator, localTx);
190186
} finally {
191187
if (tx.compareAndSet(localTx, null)) {
192188
localTx.getSession().close();
193189
}
194-
tracer.close();
190+
ctx.getTracer().close();
195191
}
196192
}
197193

194+
protected void commitImpl(YdbContext ctx, YdbValidator validator, QueryTransaction tx) throws SQLException {
195+
YdbTracer tracer = ctx.getTracer();
196+
tracer.trace("--> commit");
197+
tracer.query(null);
198+
199+
CommitTransactionSettings settings = ctx.withRequestTimeout(CommitTransactionSettings.newBuilder()).build();
200+
validator.clearWarnings();
201+
validator.call("Commit TxId: " + tx.getId(), tracer, () -> tx.commit(settings));
202+
}
203+
198204
@Override
199205
public void rollback(YdbContext ctx, YdbValidator validator) throws SQLException {
200206
ensureOpened();
@@ -223,9 +229,8 @@ public void rollback(YdbContext ctx, YdbValidator validator) throws SQLException
223229
}
224230

225231
@Override
226-
public YdbQueryResult executeDataQuery(
227-
YdbStatement statement, YdbQuery query, String preparedYql, Params params, long timeout, boolean keepInCache
228-
) throws SQLException {
232+
public YdbQueryResult executeDataQuery(YdbStatement statement, YdbQuery query, String preparedYql, Params params,
233+
long timeout, boolean keepInCache) throws SQLException {
229234
ensureOpened();
230235

231236
YdbValidator validator = statement.getValidator();

jdbc/src/main/java/tech/ydb/jdbc/context/TableServiceExecutor.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import tech.ydb.jdbc.impl.YdbStaticResultSet;
1616
import tech.ydb.jdbc.query.QueryType;
1717
import tech.ydb.jdbc.query.YdbQuery;
18+
import tech.ydb.jdbc.settings.YdbOperationProperties;
1819
import tech.ydb.table.Session;
1920
import tech.ydb.table.query.DataQueryResult;
2021
import tech.ydb.table.query.ExplainDataQueryResult;
@@ -35,10 +36,11 @@ public class TableServiceExecutor extends BaseYdbExecutor {
3536
private final boolean failOnTruncatedResult;
3637
private volatile TxState tx;
3738

38-
public TableServiceExecutor(YdbContext ctx, int transactionLevel, boolean autoCommit) throws SQLException {
39+
public TableServiceExecutor(YdbContext ctx) throws SQLException {
3940
super(ctx);
40-
this.tx = createTx(transactionLevel, autoCommit);
41-
this.failOnTruncatedResult = ctx.getOperationProperties().isFailOnTruncatedResult();
41+
YdbOperationProperties options = ctx.getOperationProperties();
42+
this.tx = createTx(options.getTransactionLevel(), options.isAutoCommit());
43+
this.failOnTruncatedResult = options.isFailOnTruncatedResult();
4244
}
4345

4446
@Override
Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
package tech.ydb.jdbc.context;
2+
3+
import java.sql.SQLException;
4+
5+
import com.google.common.hash.Hashing;
6+
7+
import tech.ydb.core.Result;
8+
import tech.ydb.core.Status;
9+
import tech.ydb.core.StatusCode;
10+
import tech.ydb.core.UnexpectedResultException;
11+
import tech.ydb.jdbc.YdbStatement;
12+
import tech.ydb.jdbc.YdbTracer;
13+
import tech.ydb.jdbc.exception.ExceptionFactory;
14+
import tech.ydb.jdbc.exception.YdbConditionallyRetryableException;
15+
import tech.ydb.jdbc.impl.YdbQueryResult;
16+
import tech.ydb.jdbc.query.YdbQuery;
17+
import tech.ydb.query.QueryStream;
18+
import tech.ydb.query.QueryTransaction;
19+
import tech.ydb.query.settings.ExecuteQuerySettings;
20+
import tech.ydb.table.Session;
21+
import tech.ydb.table.description.TableDescription;
22+
import tech.ydb.table.query.DataQueryResult;
23+
import tech.ydb.table.query.Params;
24+
import tech.ydb.table.transaction.TxControl;
25+
import tech.ydb.table.values.PrimitiveValue;
26+
27+
/**
28+
*
29+
* @author mzinal
30+
*/
31+
public class TableTxExecutor extends QueryServiceExecutor {
32+
private static final String CREATE_SQL = ""
33+
+ "CREATE TABLE IF NOT EXISTS `%s` ("
34+
+ " hash Text NOT NULL,"
35+
+ " tx_id Text NOT NULL,"
36+
+ " committed_at Timestamp,"
37+
+ " PRIMARY KEY (hash, tx_id)"
38+
+ ") WITH ("
39+
+ " TTL=Interval('PT60M') ON committed_at,"
40+
+ " AUTO_PARTITIONING_BY_LOAD=ENABLED,"
41+
+ " AUTO_PARTITIONING_BY_SIZE=ENABLED,"
42+
+ " AUTO_PARTITIONING_PARTITION_SIZE_MB=100"
43+
+ ");";
44+
45+
private static final String COMMIT_SQL = ""
46+
+ "DECLARE $hash AS Text; "
47+
+ "DECLARE $tx AS Text; "
48+
+ "UPSERT INTO `%s` (hash, tx_id, committed_at) VALUES ($hash, $tx, CurrentUtcTimestamp());";
49+
50+
private static final String VALIDATE_SQL = ""
51+
+ "DECLARE $hash AS Text; "
52+
+ "DECLARE $tx AS Text; "
53+
+ "SELECT hash, tx_id FROM `%s` WHERE hash=$hash AND tx_id=$tx;";
54+
55+
private final String commitQuery;
56+
private final String validateQuery;
57+
private final String txTablePath;
58+
private boolean isWriteTx;
59+
60+
public TableTxExecutor(YdbContext ctx, String tablePath) throws SQLException {
61+
super(ctx);
62+
this.txTablePath = tablePath;
63+
this.commitQuery = String.format(COMMIT_SQL, tablePath);
64+
this.validateQuery = String.format(VALIDATE_SQL, tablePath);
65+
this.isWriteTx = false;
66+
}
67+
68+
@Override
69+
public void rollback(YdbContext ctx, YdbValidator validator) throws SQLException {
70+
isWriteTx = false;
71+
super.rollback(ctx, validator);
72+
}
73+
74+
@Override
75+
public YdbQueryResult executeDataQuery(YdbStatement statement, YdbQuery query, String preparedYql, Params params,
76+
long timeout, boolean keepInCache) throws SQLException {
77+
YdbQueryResult result = super.executeDataQuery(statement, query, preparedYql, params, timeout, keepInCache);
78+
isWriteTx = isInsideTransaction() && (isWriteTx || query.isWriting());
79+
80+
return result;
81+
}
82+
83+
@Override
84+
protected void commitImpl(YdbContext ctx, YdbValidator validator, QueryTransaction tx) throws SQLException {
85+
boolean storeTx = isWriteTx;
86+
isWriteTx = false;
87+
if (!storeTx) {
88+
super.commitImpl(ctx, validator, tx);
89+
return;
90+
}
91+
92+
String hash = Hashing.sha256().hashBytes(tx.getId().getBytes()).toString();
93+
Params params = Params.of(
94+
"$hash", PrimitiveValue.newText(hash),
95+
"$tx", PrimitiveValue.newText(tx.getId())
96+
);
97+
98+
YdbTracer tracer = ctx.getTracer();
99+
ExecuteQuerySettings settings = ctx.withRequestTimeout(ExecuteQuerySettings.newBuilder()).build();
100+
try {
101+
QueryStream query = tx.createQuery(commitQuery, true, params, settings);
102+
validator.clearWarnings();
103+
validator.call("CommitAndStore TxId: " + tx.getId(), tracer, () -> {
104+
tracer.trace("--> commit-and-store-tx " + hash);
105+
tracer.query(commitQuery);
106+
return query.execute();
107+
});
108+
} catch (YdbConditionallyRetryableException ex) {
109+
110+
try (Session session = createNewTableSession(validator)) {
111+
tracer.trace("--> validate tx");
112+
tracer.query(validateQuery);
113+
Result<DataQueryResult> res = session.executeDataQuery(validateQuery, TxControl.snapshotRo(), params)
114+
.join();
115+
if (res.isSuccess()) {
116+
DataQueryResult dqr = res.getValue();
117+
if (dqr.getResultSetCount() == 1) {
118+
if (dqr.getResultSet(0).getRowCount() == 1) {
119+
// Transaction was committed successfully
120+
return;
121+
} else {
122+
// Transaction wann't commit
123+
Status status = Status.of(StatusCode.ABORTED).withCause(ex);
124+
throw ExceptionFactory.createException("Transaction wasn't committed",
125+
new UnexpectedResultException("Transaction not found in " + txTablePath, status)
126+
);
127+
}
128+
}
129+
}
130+
}
131+
132+
throw ex;
133+
}
134+
}
135+
136+
public static TableDescription validate(YdbContext ctx, String tablePath) throws SQLException {
137+
// validate table name
138+
Result<TableDescription> res = ctx.getRetryCtx().supplyResult(s -> s.describeTable(tablePath)).join();
139+
if (res.isSuccess()) {
140+
return res.getValue();
141+
}
142+
143+
if (res.getStatus().getCode() != StatusCode.SCHEME_ERROR) {
144+
throw ExceptionFactory.createException(
145+
"Cannot initialize TableTxExecutor with tx table " + tablePath,
146+
new UnexpectedResultException("Cannot describe", res.getStatus()));
147+
}
148+
149+
// Try to create a table
150+
String query = String.format(CREATE_SQL, tablePath);
151+
Status status = ctx.getRetryCtx().supplyStatus(session -> session.executeSchemeQuery(query)).join();
152+
if (!status.isSuccess()) {
153+
throw ExceptionFactory.createException(
154+
"Cannot initialize TableTxExecutor with tx table " + tablePath,
155+
new UnexpectedResultException("Cannot create table", status));
156+
}
157+
158+
Result<TableDescription> res2 = ctx.getRetryCtx().supplyResult(s -> s.describeTable(tablePath)).join();
159+
if (!res2.isSuccess()) {
160+
throw ExceptionFactory.createException(
161+
"Cannot initialize TableTxExecutor with tx table " + tablePath,
162+
new UnexpectedResultException("Cannot describe after creating", res2.getStatus()));
163+
}
164+
165+
return res2.getValue();
166+
}
167+
}

jdbc/src/main/java/tech/ydb/jdbc/context/YdbContext.java

Lines changed: 26 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ public class YdbContext implements AutoCloseable {
7171

7272
private final YdbConfig config;
7373

74-
private final YdbOperationProperties operationProps;
74+
private final YdbOperationProperties operationOptions;
7575
private final YdbQueryProperties queryOptions;
7676
private final YdbTypes types;
7777

@@ -102,7 +102,7 @@ private YdbContext(
102102
) {
103103
this.config = config;
104104

105-
this.operationProps = operationProperties;
105+
this.operationOptions = operationProperties;
106106
this.queryOptions = queryProperties;
107107
this.autoResizeSessionPool = autoResize;
108108

@@ -195,9 +195,17 @@ public String getUsername() {
195195

196196
public YdbExecutor createExecutor() throws SQLException {
197197
if (config.isUseQueryService()) {
198-
return new QueryServiceExecutor(this, operationProps.getTransactionLevel(), operationProps.isAutoCommit());
198+
String txValidationTable = operationOptions.getTxValidationTable();
199+
if (txValidationTable != null && !txValidationTable.isEmpty()) {
200+
String tablePath = joined(prefixPath, txValidationTable);
201+
if (tableDescribeCache.getIfPresent(tablePath) == null) {
202+
tableDescribeCache.put(tablePath, TableTxExecutor.validate(this, tablePath));
203+
}
204+
return new TableTxExecutor(this, tablePath);
205+
}
206+
return new QueryServiceExecutor(this);
199207
} else {
200-
return new TableServiceExecutor(this, operationProps.getTransactionLevel(), operationProps.isAutoCommit());
208+
return new TableServiceExecutor(this);
201209
}
202210
}
203211

@@ -206,7 +214,7 @@ public int getConnectionsCount() {
206214
}
207215

208216
public YdbOperationProperties getOperationProperties() {
209-
return operationProps;
217+
return operationOptions;
210218
}
211219

212220
@Override
@@ -277,6 +285,7 @@ public void deregister() {
277285
}
278286

279287
public static YdbContext createContext(YdbConfig config) throws SQLException {
288+
GrpcTransport grpcTransport = null;
280289
try {
281290
LOGGER.log(Level.FINE, "Creating new YDB context to {0}", config.getConnectionString());
282291

@@ -300,7 +309,7 @@ public static YdbContext createContext(YdbConfig config) throws SQLException {
300309
});
301310
});
302311

303-
GrpcTransport grpcTransport = builder.build();
312+
grpcTransport = builder.build();
304313

305314
PooledTableClient.Builder tableClient = PooledTableClient.newClient(
306315
GrpcTableRpc.useTransport(grpcTransport)
@@ -309,9 +318,16 @@ public static YdbContext createContext(YdbConfig config) throws SQLException {
309318

310319
boolean autoResize = clientProps.applyToTableClient(tableClient, queryClient);
311320

312-
return new YdbContext(config, operationProps, queryProps, grpcTransport,
313-
tableClient.build(), queryClient.build(), autoResize);
321+
return new YdbContext(config, operationProps, queryProps, grpcTransport, tableClient.build(),
322+
queryClient.build(), autoResize);
314323
} catch (RuntimeException ex) {
324+
if (grpcTransport != null) {
325+
try {
326+
grpcTransport.close();
327+
} catch (Exception exClose) {
328+
LOGGER.log(Level.FINE, "Issue when closing gRPC transport", exClose);
329+
}
330+
}
315331
StringBuilder sb = new StringBuilder("Cannot connect to YDB: ").append(ex.getMessage());
316332
Throwable cause = ex.getCause();
317333
while (cause != null) {
@@ -323,7 +339,7 @@ public static YdbContext createContext(YdbConfig config) throws SQLException {
323339
}
324340

325341
public <T extends RequestSettings<?>> T withDefaultTimeout(T settings) {
326-
Duration operation = operationProps.getDeadlineTimeout();
342+
Duration operation = operationOptions.getDeadlineTimeout();
327343
if (!operation.isZero() && !operation.isNegative()) {
328344
settings.setOperationTimeout(operation);
329345
settings.setTimeout(operation.plusSeconds(1));
@@ -332,7 +348,7 @@ public <T extends RequestSettings<?>> T withDefaultTimeout(T settings) {
332348
}
333349

334350
public <T extends BaseRequestSettings.BaseBuilder<T>> T withRequestTimeout(T builder) {
335-
Duration operation = operationProps.getDeadlineTimeout();
351+
Duration operation = operationOptions.getDeadlineTimeout();
336352
if (operation.isNegative() || operation.isZero()) {
337353
return builder;
338354
}

jdbc/src/main/java/tech/ydb/jdbc/query/QueryCmd.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@
77
public enum QueryCmd {
88
UNKNOWN,
99
SELECT,
10-
CREATE_ALTER_DROP,
11-
INSERT_UPSERT,
12-
UPDATE_REPLACE_DELETE
10+
/** CREATE, DROP, ALTER, GRANT, REVOKE */
11+
DDL,
12+
/** INSERT, UPSERT, UPDATE, REPLACE, DELETE */
13+
DML
1314
}

jdbc/src/main/java/tech/ydb/jdbc/query/QueryStatement.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ public void setHasGenerated(boolean hasGenerated) {
5252
}
5353

5454
public boolean hasUpdateCount() {
55-
return (command == QueryCmd.INSERT_UPSERT || command == QueryCmd.UPDATE_REPLACE_DELETE) && !hasReturinng;
55+
return command == QueryCmd.DML && !hasReturinng;
5656
}
5757

5858
public boolean hasUpdateWithGenerated() {
@@ -64,6 +64,6 @@ public boolean hasResults() {
6464
}
6565

6666
public boolean isDDL() {
67-
return command == QueryCmd.CREATE_ALTER_DROP;
67+
return command == QueryCmd.DDL;
6868
}
6969
}

0 commit comments

Comments
 (0)