Skip to content

Commit ed0e695

Browse files
authored
Fixed YdbExecutor concurrency problems (#68)
2 parents 257e579 + 088f8a8 commit ed0e695

File tree

8 files changed

+173
-59
lines changed

8 files changed

+173
-59
lines changed

.github/workflows/ci.yaml

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
name: YDB JDBC Driver CI
2+
3+
on:
4+
push:
5+
branches:
6+
- master
7+
- release*
8+
pull_request:
9+
type: [opened, reopened, edited, synchronize]
10+
11+
jobs:
12+
prepare:
13+
name: Prepare Maven cache
14+
runs-on: ubuntu-latest
15+
16+
env:
17+
MAVEN_ARGS: --batch-mode -Dstyle.color=always
18+
19+
steps:
20+
- name: Checkout YDB JDBC Driver
21+
uses: actions/checkout@v4
22+
with:
23+
path: jdbc
24+
25+
- name: Checkout YDB Java Examples
26+
uses: actions/checkout@v4
27+
with:
28+
repository: ydb-platform/ydb-java-examples
29+
path: examples
30+
31+
- name: Set up Java
32+
uses: actions/setup-java@v4
33+
with:
34+
java-version: 17
35+
distribution: 'temurin'
36+
cache: 'maven'
37+
cache-dependency-path: |
38+
jdbc/pom.xml
39+
40+
- name: Download YDB JDBC Driver dependencies
41+
working-directory: ./jdbc
42+
run: mvn $MAVEN_ARGS dependency:go-offline
43+
44+
- name: Download YDB Java Examples dependencies
45+
working-directory: ./examples
46+
run: mvn $MAVEN_ARGS dependency:go-offline
47+
48+
build:
49+
name: YDB JDBC Driver CI on JDK
50+
runs-on: ubuntu-latest
51+
needs: prepare
52+
53+
strategy:
54+
matrix:
55+
java: [ '8', '11', '17']
56+
57+
env:
58+
MAVEN_ARGS: --batch-mode -Dstyle.color=always -DYDB_DOCKER_ISOLATION=true
59+
60+
steps:
61+
- name: Checkout YDB JDBC Driver
62+
uses: actions/checkout@v4
63+
with:
64+
path: jdbc
65+
66+
- name: Set up Java
67+
uses: actions/setup-java@v4
68+
with:
69+
java-version: ${{ matrix.java }}
70+
distribution: 'temurin'
71+
cache: 'maven'
72+
cache-dependency-path: |
73+
jdbc/pom.xml
74+
75+
- name: Extract YDB JDBC Driver version
76+
working-directory: ./jdbc
77+
run: |
78+
VERSION=$(mvn help:evaluate -Dexpression=project.version -q -DforceStdout)
79+
echo "JDBC_VERSION=$VERSION" >> "$GITHUB_ENV"
80+
81+
- name: Build YDB JDBC Driver
82+
working-directory: ./jdbc
83+
run: mvn $MAVEN_ARGS install
84+
85+
- name: Checkout YDB Java Examples
86+
uses: actions/checkout@v4
87+
with:
88+
repository: ydb-platform/ydb-java-examples
89+
path: examples
90+
91+
- name: Test examples with Maven
92+
working-directory: ./examples
93+
run: mvn $MAVEN_ARGS -Dydb.jdbc.version=$JDBC_VERSION test

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
[![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](https://github.com/ydb-platform/ydb-jdbc-driver/blob/master/LICENSE)
22
[![Maven metadata URL](https://img.shields.io/maven-metadata/v?metadataUrl=https%3A%2F%2Frepo1.maven.org%2Fmaven2%2Ftech%2Fydb%2Fjdbc%2Fydb-jdbc-driver%2Fmaven-metadata.xml)](https://mvnrepository.com/artifact/tech.ydb.jdbc/ydb-jdbc-driver)
33
[![Build](https://img.shields.io/github/actions/workflow/status/ydb-platform/ydb-jdbc-driver/build.yaml)](https://github.com/ydb-platform/ydb-jdbc-driver/actions/workflows/build.yaml)
4+
[![CI](https://img.shields.io/github/actions/workflow/status/ydb-platform/ydb-jdbc-driver/ci.yaml?label=CI)](https://github.com/ydb-platform/ydb-jdbc-driver/actions/workflows/ci.yaml)
45
[![Codecov](https://img.shields.io/codecov/c/github/ydb-platform/ydb-jdbc-driver)](https://app.codecov.io/gh/ydb-platform/ydb-jdbc-driver)
56

67
## JDBC Driver for YDB

jdbc/src/main/java/tech/ydb/jdbc/context/BaseYdbExecutor.java

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@
22

33
import java.sql.SQLException;
44
import java.util.Collections;
5+
import java.util.concurrent.atomic.AtomicReference;
56

7+
import tech.ydb.jdbc.YdbConst;
68
import tech.ydb.jdbc.YdbStatement;
79
import tech.ydb.jdbc.impl.YdbQueryResult;
810
import tech.ydb.jdbc.query.QueryType;
@@ -17,9 +19,34 @@
1719
*/
1820
public abstract class BaseYdbExecutor implements YdbExecutor {
1921
private final SessionRetryContext retryCtx;
22+
private final AtomicReference<YdbQueryResult> currResult;
2023

2124
public BaseYdbExecutor(YdbContext ctx) {
2225
this.retryCtx = ctx.getRetryCtx();
26+
this.currResult = new AtomicReference<>();
27+
}
28+
29+
protected void closeCurrentResult() throws SQLException {
30+
YdbQueryResult rs = currResult.get();
31+
if (rs != null) {
32+
rs.close();
33+
}
34+
}
35+
36+
protected YdbQueryResult updateCurrentResult(YdbQueryResult result) throws SQLException {
37+
YdbQueryResult old = currResult.getAndSet(result);
38+
if (old != null) {
39+
old.close();
40+
}
41+
return result;
42+
}
43+
44+
@Override
45+
public void ensureOpened() throws SQLException {
46+
closeCurrentResult();
47+
if (isClosed()) {
48+
throw new SQLException(YdbConst.CLOSED_CONNECTION);
49+
}
2350
}
2451

2552
@Override
@@ -36,20 +63,20 @@ public YdbQueryResult executeSchemeQuery(YdbStatement statement, YdbQuery query)
3663
() -> retryCtx.supplyStatus(session -> session.executeSchemeQuery(yql, settings))
3764
);
3865

39-
return new StaticQueryResult(query, Collections.emptyList());
66+
return updateCurrentResult(new StaticQueryResult(query, Collections.emptyList()));
4067
}
4168

4269
@Override
4370
public YdbQueryResult executeBulkUpsert(YdbStatement statement, YdbQuery query, String tablePath, ListValue rows)
4471
throws SQLException {
4572
ensureOpened();
73+
4674
String yql = query.getPreparedYql();
4775
YdbValidator validator = statement.getValidator();
48-
4976
validator.execute(QueryType.BULK_QUERY + " >>\n" + yql,
5077
() -> retryCtx.supplyStatus(session -> session.executeBulkUpsert(tablePath, rows))
5178
);
5279

53-
return new StaticQueryResult(query, Collections.emptyList());
80+
return updateCurrentResult(new StaticQueryResult(query, Collections.emptyList()));
5481
}
5582
}

jdbc/src/main/java/tech/ydb/jdbc/context/QueryServiceExecutor.java

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,8 @@ protected QuerySession createNewQuerySession(YdbValidator validator) throws SQLE
8181
}
8282

8383
@Override
84-
public void close() {
84+
public void close() throws SQLException {
85+
closeCurrentResult();
8586
cleanTx();
8687
isClosed = true;
8788
}
@@ -96,6 +97,8 @@ private void cleanTx() {
9697

9798
@Override
9899
public void setTransactionLevel(int level) throws SQLException {
100+
ensureOpened();
101+
99102
if (level == transactionLevel) {
100103
return;
101104
}
@@ -111,6 +114,8 @@ public void setTransactionLevel(int level) throws SQLException {
111114

112115
@Override
113116
public void setReadOnly(boolean readOnly) throws SQLException {
117+
ensureOpened();
118+
114119
if (readOnly == isReadOnly) {
115120
return;
116121
}
@@ -125,6 +130,8 @@ public void setReadOnly(boolean readOnly) throws SQLException {
125130

126131
@Override
127132
public void setAutoCommit(boolean autoCommit) throws SQLException {
133+
ensureOpened();
134+
128135
if (autoCommit == isAutoCommit) {
129136
return;
130137
}
@@ -137,12 +144,14 @@ public void setAutoCommit(boolean autoCommit) throws SQLException {
137144
}
138145

139146
@Override
140-
public boolean isClosed() {
147+
public boolean isClosed() throws SQLException {
148+
closeCurrentResult();
141149
return isClosed;
142150
}
143151

144152
@Override
145-
public String txID() {
153+
public String txID() throws SQLException {
154+
closeCurrentResult();
146155
return tx != null ? tx.getId() : null;
147156
}
148157

@@ -234,7 +243,7 @@ public YdbQueryResult executeDataQuery(
234243
for (ResultSetReader rst: result) {
235244
readers.add(new YdbStaticResultSet(statement, rst));
236245
}
237-
return new StaticQueryResult(query, readers);
246+
return updateCurrentResult(new StaticQueryResult(query, readers));
238247
} finally {
239248
if (!tx.isActive()) {
240249
cleanTx();
@@ -257,13 +266,13 @@ public YdbQueryResult executeScanQuery(YdbStatement statement, YdbQuery query, S
257266

258267
final QuerySession session = createNewQuerySession(validator);
259268
String msg = "STREAM_QUERY >>\n" + yql;
260-
return validator.call(msg, () -> {
269+
StreamQueryResult lazy = validator.call(msg, () -> {
261270
QueryStream stream = session.createQuery(yql, TxMode.SNAPSHOT_RO, params, settings);
262271
StreamQueryResult result = new StreamQueryResult(msg, statement, query, stream::cancel);
263-
return result.execute(stream, () -> {
264-
session.close();
265-
});
272+
return result.execute(stream, session::close);
266273
});
274+
275+
return updateCurrentResult(lazy);
267276
}
268277

269278
@Override
@@ -283,7 +292,7 @@ public YdbQueryResult executeSchemeQuery(YdbStatement statement, YdbQuery query)
283292
);
284293
}
285294

286-
return new StaticQueryResult(query, Collections.emptyList());
295+
return updateCurrentResult(new StaticQueryResult(query, Collections.emptyList()));
287296
}
288297

289298
@Override
@@ -309,7 +318,9 @@ public YdbQueryResult executeExplainQuery(YdbStatement statement, YdbQuery query
309318
throw new SQLException("No explain data");
310319
}
311320

312-
return new StaticQueryResult(statement, res.getStats().getQueryAst(), res.getStats().getQueryPlan());
321+
return updateCurrentResult(
322+
new StaticQueryResult(statement, res.getStats().getQueryAst(), res.getStats().getQueryPlan())
323+
);
313324
}
314325
}
315326

jdbc/src/main/java/tech/ydb/jdbc/context/StreamQueryResult.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,9 +85,12 @@ public CompletableFuture<Result<StreamQueryResult>> execute(QueryStream stream,
8585
return startFuture;
8686
}
8787

88-
public CompletableFuture<Result<StreamQueryResult>> execute(GrpcReadStream<ResultSetReader> stream) {
88+
public CompletableFuture<Result<StreamQueryResult>> execute(
89+
GrpcReadStream<ResultSetReader> stream, Runnable finish
90+
) {
8991
stream.start(rsr -> onResultSet(0, rsr))
90-
.whenComplete(this::onStreamFinished);
92+
.whenComplete(this::onStreamFinished)
93+
.thenRun(finish);
9194
return startFuture;
9295
}
9396

jdbc/src/main/java/tech/ydb/jdbc/context/TableServiceExecutor.java

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,8 @@ public TableServiceExecutor(YdbContext ctx, int transactionLevel, boolean autoCo
5151
}
5252

5353
@Override
54-
public void close() {
54+
public void close() throws SQLException {
55+
closeCurrentResult();
5556
tx = null;
5657
}
5758

@@ -74,26 +75,31 @@ protected Session createNewTableSession(YdbValidator validator) throws SQLExcept
7475

7576
@Override
7677
public void setTransactionLevel(int level) throws SQLException {
78+
ensureOpened();
7779
updateState(tx.withTransactionLevel(level));
7880
}
7981

8082
@Override
8183
public void setReadOnly(boolean readOnly) throws SQLException {
84+
ensureOpened();
8285
updateState(tx.withReadOnly(readOnly));
8386
}
8487

8588
@Override
8689
public void setAutoCommit(boolean autoCommit) throws SQLException {
90+
ensureOpened();
8791
updateState(tx.withAutoCommit(autoCommit));
8892
}
8993

9094
@Override
91-
public boolean isClosed() {
95+
public boolean isClosed() throws SQLException {
96+
closeCurrentResult();
9297
return tx == null;
9398
}
9499

95100
@Override
96-
public String txID() {
101+
public String txID() throws SQLException {
102+
closeCurrentResult();
97103
return tx != null ? tx.txID() : null;
98104
}
99105

@@ -191,7 +197,7 @@ public YdbQueryResult executeExplainQuery(YdbStatement statement, YdbQuery query
191197
try (Session session = createNewTableSession(validator)) {
192198
String msg = QueryType.EXPLAIN_QUERY + " >>\n" + yql;
193199
ExplainDataQueryResult res = validator.call(msg, () -> session.explainDataQuery(yql, settings));
194-
return new StaticQueryResult(statement, res.getQueryAst(), res.getQueryPlan());
200+
return updateCurrentResult(new StaticQueryResult(statement, res.getQueryAst(), res.getQueryPlan()));
195201
}
196202
}
197203

@@ -220,7 +226,7 @@ public YdbQueryResult executeDataQuery(YdbStatement statement, YdbQuery query, S
220226
readers.add(new YdbStaticResultSet(statement, rs));
221227
}
222228

223-
return new StaticQueryResult(query, readers);
229+
return updateCurrentResult(new StaticQueryResult(query, readers));
224230
} catch (SQLException | RuntimeException ex) {
225231
updateState(tx.withRollback(session));
226232
throw ex;
@@ -239,14 +245,16 @@ public YdbQueryResult executeScanQuery(YdbStatement statement, YdbQuery query, S
239245
.withRequestTimeout(scanQueryTimeout)
240246
.build();
241247

242-
final Session session = tx.getSession(validator);
248+
final Session session = createNewTableSession(validator);
243249

244250
String msg = QueryType.SCAN_QUERY + " >>\n" + yql;
245-
return validator.call(msg, () -> {
251+
StreamQueryResult lazy = validator.call(msg, () -> {
246252
GrpcReadStream<ResultSetReader> stream = session.executeScanQuery(yql, params, settings);
247253
StreamQueryResult result = new StreamQueryResult(msg, statement, query, stream::cancel);
248-
return result.execute(stream);
254+
return result.execute(stream, session::close);
249255
});
256+
257+
return updateCurrentResult(lazy);
250258
}
251259

252260
@Override

0 commit comments

Comments
 (0)