Skip to content

Commit f3745c5

Browse files
committed
Updated implementation of TableTxExecutor
1 parent fa897d3 commit f3745c5

File tree

4 files changed

+83
-56
lines changed

4 files changed

+83
-56
lines changed

jdbc/src/main/java/tech/ydb/jdbc/context/BaseYdbExecutor.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,20 +34,27 @@
3434
* @author Aleksandr Gorshenin
3535
*/
3636
public abstract class BaseYdbExecutor implements YdbExecutor {
37-
private final SessionRetryContext retryCtx;
3837
private final Duration sessionTimeout;
3938
private final TableClient tableClient;
39+
private final SessionRetryContext retryCtx;
40+
private final SessionRetryContext idempotentRetryCtx;
4041
private final boolean useStreamResultSet;
4142

4243
private final AtomicReference<YdbQueryResult> currResult;
4344
protected final String prefixPragma;
4445
protected final YdbTypes types;
4546

4647
public BaseYdbExecutor(YdbContext ctx) {
47-
this.retryCtx = ctx.getRetryCtx();
4848
this.sessionTimeout = ctx.getOperationProperties().getSessionTimeout();
4949
this.useStreamResultSet = ctx.getOperationProperties().getUseStreamResultSets();
5050
this.tableClient = ctx.getTableClient();
51+
this.retryCtx = SessionRetryContext.create(tableClient)
52+
.sessionCreationTimeout(ctx.getOperationProperties().getSessionTimeout())
53+
.build();
54+
this.idempotentRetryCtx = SessionRetryContext.create(tableClient)
55+
.sessionCreationTimeout(ctx.getOperationProperties().getSessionTimeout())
56+
.idempotent(true)
57+
.build();
5158
this.prefixPragma = ctx.getPrefixPragma();
5259
this.types = ctx.getTypes();
5360
this.currResult = new AtomicReference<>();
@@ -117,7 +124,7 @@ public YdbQueryResult executeBulkUpsert(YdbStatement statement, YdbQuery query,
117124
tracer.query(yql);
118125

119126
validator.execute(QueryType.BULK_QUERY + " >>\n" + yql, tracer,
120-
() -> retryCtx.supplyStatus(session -> session.executeBulkUpsert(tablePath, rows))
127+
() -> idempotentRetryCtx.supplyStatus(session -> session.executeBulkUpsert(tablePath, rows))
121128
);
122129

123130
if (!isInsideTransaction()) {

jdbc/src/main/java/tech/ydb/jdbc/context/SchemeExecutor.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,10 @@ public class SchemeExecutor {
2020

2121
public SchemeExecutor(YdbContext ctx) {
2222
this.schemeClient = ctx.getSchemeClient();
23-
this.retryCtx = ctx.getRetryCtx();
23+
this.retryCtx = SessionRetryContext.create(ctx.getTableClient())
24+
.sessionCreationTimeout(ctx.getOperationProperties().getSessionTimeout())
25+
.idempotent(true)
26+
.build();
2427
}
2528

2629
public CompletableFuture<Result<ListDirectoryResult>> listDirectory(String path) {

jdbc/src/main/java/tech/ydb/jdbc/context/TableTxExecutor.java

Lines changed: 68 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
package tech.ydb.jdbc.context;
22

33
import java.sql.SQLException;
4+
import java.util.concurrent.locks.ReentrantLock;
45

6+
import com.google.common.cache.Cache;
57
import com.google.common.hash.Hashing;
68

79
import tech.ydb.core.Result;
@@ -17,7 +19,7 @@
1719
import tech.ydb.query.QueryStream;
1820
import tech.ydb.query.QueryTransaction;
1921
import tech.ydb.query.settings.ExecuteQuerySettings;
20-
import tech.ydb.table.Session;
22+
import tech.ydb.table.SessionRetryContext;
2123
import tech.ydb.table.description.TableDescription;
2224
import tech.ydb.table.query.DataQueryResult;
2325
import tech.ydb.table.query.Params;
@@ -29,6 +31,8 @@
2931
* @author mzinal
3032
*/
3133
public class TableTxExecutor extends QueryServiceExecutor {
34+
private static final ReentrantLock VALIDATE_LOCK = new ReentrantLock();
35+
3236
private static final String CREATE_SQL = ""
3337
+ "CREATE TABLE IF NOT EXISTS `%s` ("
3438
+ " hash Text NOT NULL,"
@@ -55,6 +59,7 @@ public class TableTxExecutor extends QueryServiceExecutor {
5559
private final String commitQuery;
5660
private final String validateQuery;
5761
private final String txTablePath;
62+
private final SessionRetryContext validateRetryCtx;
5863
private boolean isWriteTx;
5964

6065
public TableTxExecutor(YdbContext ctx, String tablePath) throws SQLException {
@@ -63,6 +68,11 @@ public TableTxExecutor(YdbContext ctx, String tablePath) throws SQLException {
6368
this.commitQuery = String.format(COMMIT_SQL, tablePath);
6469
this.validateQuery = String.format(VALIDATE_SQL, tablePath);
6570
this.isWriteTx = false;
71+
this.validateRetryCtx = SessionRetryContext.create(ctx.getTableClient())
72+
.sessionCreationTimeout(ctx.getOperationProperties().getSessionTimeout())
73+
.idempotent(true)
74+
.build();
75+
6676
}
6777

6878
@Override
@@ -106,25 +116,21 @@ protected void commitImpl(YdbContext ctx, YdbValidator validator, QueryTransacti
106116
return query.execute();
107117
});
108118
} catch (YdbConditionallyRetryableException ex) {
109-
110-
try (Session session = createNewTableSession(validator)) {
111-
tracer.trace("--> validate tx");
112-
tracer.query(validateQuery);
113-
Result<DataQueryResult> res = session.executeDataQuery(validateQuery, TxControl.snapshotRo(), params)
114-
.join();
115-
if (res.isSuccess()) {
116-
DataQueryResult dqr = res.getValue();
117-
if (dqr.getResultSetCount() == 1) {
118-
if (dqr.getResultSet(0).getRowCount() == 1) {
119-
// Transaction was committed successfully
120-
return;
121-
} else {
122-
// Transaction wann't commit
123-
Status status = Status.of(StatusCode.ABORTED).withCause(ex);
124-
throw ExceptionFactory.createException("Transaction wasn't committed",
125-
new UnexpectedResultException("Transaction not found in " + txTablePath, status)
126-
);
127-
}
119+
Result<DataQueryResult> res = validateRetryCtx.supplyResult(
120+
session -> session.executeDataQuery(validateQuery, TxControl.snapshotRo(), params)
121+
).join();
122+
if (res.isSuccess()) {
123+
DataQueryResult dqr = res.getValue();
124+
if (dqr.getResultSetCount() == 1) {
125+
if (dqr.getResultSet(0).getRowCount() == 1) {
126+
// Transaction was committed successfully
127+
return;
128+
} else {
129+
// Transaction wann't commit
130+
Status status = Status.of(StatusCode.ABORTED).withCause(ex);
131+
throw ExceptionFactory.createException("Transaction wasn't committed",
132+
new UnexpectedResultException("Transaction not found in " + txTablePath, status)
133+
);
128134
}
129135
}
130136
}
@@ -133,35 +139,52 @@ protected void commitImpl(YdbContext ctx, YdbValidator validator, QueryTransacti
133139
}
134140
}
135141

136-
public static TableDescription validate(YdbContext ctx, String tablePath) throws SQLException {
137-
// validate table name
138-
Result<TableDescription> res = ctx.getRetryCtx().supplyResult(s -> s.describeTable(tablePath)).join();
139-
if (res.isSuccess()) {
140-
return res.getValue();
142+
public static void validate(YdbContext ctx, String tablePath, Cache<String, TableDescription> cache)
143+
throws SQLException {
144+
if (cache.getIfPresent(tablePath) != null) {
145+
return;
141146
}
142147

143-
if (res.getStatus().getCode() != StatusCode.SCHEME_ERROR) {
144-
throw ExceptionFactory.createException(
145-
"Cannot initialize TableTxExecutor with tx table " + tablePath,
146-
new UnexpectedResultException("Cannot describe", res.getStatus()));
147-
}
148+
VALIDATE_LOCK.lock();
149+
try {
150+
if (cache.getIfPresent(tablePath) != null) {
151+
return;
152+
}
148153

149-
// Try to create a table
150-
String query = String.format(CREATE_SQL, tablePath);
151-
Status status = ctx.getRetryCtx().supplyStatus(session -> session.executeSchemeQuery(query)).join();
152-
if (!status.isSuccess()) {
153-
throw ExceptionFactory.createException(
154-
"Cannot initialize TableTxExecutor with tx table " + tablePath,
155-
new UnexpectedResultException("Cannot create table", status));
156-
}
154+
SessionRetryContext retryCtx = SessionRetryContext.create(ctx.getTableClient())
155+
.sessionCreationTimeout(ctx.getOperationProperties().getSessionTimeout())
156+
.idempotent(true)
157+
.build();
157158

158-
Result<TableDescription> res2 = ctx.getRetryCtx().supplyResult(s -> s.describeTable(tablePath)).join();
159-
if (!res2.isSuccess()) {
160-
throw ExceptionFactory.createException(
161-
"Cannot initialize TableTxExecutor with tx table " + tablePath,
162-
new UnexpectedResultException("Cannot describe after creating", res2.getStatus()));
163-
}
159+
// validate table name
160+
Result<TableDescription> res = retryCtx.supplyResult(s -> s.describeTable(tablePath)).join();
161+
if (res.isSuccess()) {
162+
cache.put(tablePath, res.getValue());
163+
return;
164+
}
165+
166+
if (res.getStatus().getCode() != StatusCode.SCHEME_ERROR) {
167+
throw ExceptionFactory.createException("Cannot initialize TableTxExecutor with tx table " + tablePath,
168+
new UnexpectedResultException("Cannot describe", res.getStatus()));
169+
}
170+
171+
// Try to create a table
172+
String query = String.format(CREATE_SQL, tablePath);
173+
Status status = retryCtx.supplyStatus(session -> session.executeSchemeQuery(query)).join();
174+
if (!status.isSuccess()) {
175+
throw ExceptionFactory.createException("Cannot initialize TableTxExecutor with tx table " + tablePath,
176+
new UnexpectedResultException("Cannot create table", status));
177+
}
178+
179+
Result<TableDescription> res2 = retryCtx.supplyResult(s -> s.describeTable(tablePath)).join();
180+
if (!res2.isSuccess()) {
181+
throw ExceptionFactory.createException("Cannot initialize TableTxExecutor with tx table " + tablePath,
182+
new UnexpectedResultException("Cannot describe after creating", res2.getStatus()));
183+
}
164184

165-
return res2.getValue();
185+
cache.put(tablePath, res2.getValue());
186+
} finally {
187+
VALIDATE_LOCK.unlock();
188+
}
166189
}
167190
}

jdbc/src/main/java/tech/ydb/jdbc/context/YdbContext.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -188,10 +188,6 @@ public QueryClient getQueryClient() {
188188
return queryClient;
189189
}
190190

191-
public SessionRetryContext getRetryCtx() {
192-
return retryCtx;
193-
}
194-
195191
public String getUrl() {
196192
return config.getUrl();
197193
}
@@ -205,9 +201,7 @@ public YdbExecutor createExecutor() throws SQLException {
205201
String txValidationTable = operationOptions.getTxValidationTable();
206202
if (txValidationTable != null && !txValidationTable.isEmpty()) {
207203
String tablePath = joined(prefixPath, txValidationTable);
208-
if (tableDescribeCache.getIfPresent(tablePath) == null) {
209-
tableDescribeCache.put(tablePath, TableTxExecutor.validate(this, tablePath));
210-
}
204+
TableTxExecutor.validate(this, tablePath, tableDescribeCache);
211205
return new TableTxExecutor(this, tablePath);
212206
}
213207
return new QueryServiceExecutor(this);

0 commit comments

Comments
 (0)