Skip to content

Commit 60d25c5

Browse files
committed
New barrier implementation
1 parent 257e579 commit 60d25c5

File tree

6 files changed

+71
-43
lines changed

6 files changed

+71
-43
lines changed

jdbc/src/main/java/tech/ydb/jdbc/context/BaseYdbExecutor.java

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,10 @@
22

33
import java.sql.SQLException;
44
import java.util.Collections;
5+
import java.util.concurrent.CompletableFuture;
6+
import java.util.concurrent.atomic.AtomicReference;
57

8+
import tech.ydb.jdbc.YdbConst;
69
import tech.ydb.jdbc.YdbStatement;
710
import tech.ydb.jdbc.impl.YdbQueryResult;
811
import tech.ydb.jdbc.query.QueryType;
@@ -17,9 +20,31 @@
1720
*/
1821
public abstract class BaseYdbExecutor implements YdbExecutor {
1922
private final SessionRetryContext retryCtx;
23+
private final AtomicReference<Barrier> barrier;
2024

2125
public BaseYdbExecutor(YdbContext ctx) {
2226
this.retryCtx = ctx.getRetryCtx();
27+
this.barrier = new AtomicReference<>(new Barrier());
28+
this.barrier.get().open();
29+
}
30+
31+
protected void checkBarrier() {
32+
barrier.get().waitOpening();
33+
}
34+
35+
protected Barrier createBarrier() {
36+
Barrier newBarrier = new Barrier();
37+
Barrier prev = barrier.getAndSet(newBarrier);
38+
prev.waitOpening();
39+
return newBarrier;
40+
}
41+
42+
@Override
43+
public void ensureOpened() throws SQLException {
44+
checkBarrier();
45+
if (isClosed()) {
46+
throw new SQLException(YdbConst.CLOSED_CONNECTION);
47+
}
2348
}
2449

2550
@Override
@@ -43,13 +68,25 @@ public YdbQueryResult executeSchemeQuery(YdbStatement statement, YdbQuery query)
4368
public YdbQueryResult executeBulkUpsert(YdbStatement statement, YdbQuery query, String tablePath, ListValue rows)
4469
throws SQLException {
4570
ensureOpened();
71+
4672
String yql = query.getPreparedYql();
4773
YdbValidator validator = statement.getValidator();
48-
4974
validator.execute(QueryType.BULK_QUERY + " >>\n" + yql,
5075
() -> retryCtx.supplyStatus(session -> session.executeBulkUpsert(tablePath, rows))
5176
);
5277

5378
return new StaticQueryResult(query, Collections.emptyList());
5479
}
80+
81+
public class Barrier {
82+
private final CompletableFuture<Void> future = new CompletableFuture<>();
83+
84+
public void open() {
85+
future.complete(null);
86+
}
87+
88+
public void waitOpening() {
89+
future.join();
90+
}
91+
}
5592
}

jdbc/src/main/java/tech/ydb/jdbc/context/QueryServiceExecutor.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ protected QuerySession createNewQuerySession(YdbValidator validator) throws SQLE
8282

8383
@Override
8484
public void close() {
85+
checkBarrier();
8586
cleanTx();
8687
isClosed = true;
8788
}
@@ -96,6 +97,8 @@ private void cleanTx() {
9697

9798
@Override
9899
public void setTransactionLevel(int level) throws SQLException {
100+
ensureOpened();
101+
99102
if (level == transactionLevel) {
100103
return;
101104
}
@@ -111,6 +114,8 @@ public void setTransactionLevel(int level) throws SQLException {
111114

112115
@Override
113116
public void setReadOnly(boolean readOnly) throws SQLException {
117+
ensureOpened();
118+
114119
if (readOnly == isReadOnly) {
115120
return;
116121
}
@@ -125,6 +130,8 @@ public void setReadOnly(boolean readOnly) throws SQLException {
125130

126131
@Override
127132
public void setAutoCommit(boolean autoCommit) throws SQLException {
133+
ensureOpened();
134+
128135
if (autoCommit == isAutoCommit) {
129136
return;
130137
}
@@ -138,11 +145,13 @@ public void setAutoCommit(boolean autoCommit) throws SQLException {
138145

139146
@Override
140147
public boolean isClosed() {
148+
checkBarrier();
141149
return isClosed;
142150
}
143151

144152
@Override
145153
public String txID() {
154+
checkBarrier();
146155
return tx != null ? tx.getId() : null;
147156
}
148157

@@ -247,6 +256,8 @@ public YdbQueryResult executeScanQuery(YdbStatement statement, YdbQuery query, S
247256
throws SQLException {
248257
ensureOpened();
249258

259+
Barrier barrier = createBarrier();
260+
250261
YdbContext ctx = statement.getConnection().getCtx();
251262
YdbValidator validator = statement.getValidator();
252263

@@ -260,9 +271,7 @@ public YdbQueryResult executeScanQuery(YdbStatement statement, YdbQuery query, S
260271
return validator.call(msg, () -> {
261272
QueryStream stream = session.createQuery(yql, TxMode.SNAPSHOT_RO, params, settings);
262273
StreamQueryResult result = new StreamQueryResult(msg, statement, query, stream::cancel);
263-
return result.execute(stream, () -> {
264-
session.close();
265-
});
274+
return result.execute(stream, barrier);
266275
});
267276
}
268277

jdbc/src/main/java/tech/ydb/jdbc/context/StreamQueryResult.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -77,17 +77,20 @@ public StreamQueryResult(String msg, YdbStatement statement, YdbQuery query, Run
7777
}
7878
}
7979

80-
public CompletableFuture<Result<StreamQueryResult>> execute(QueryStream stream, Runnable finish) {
80+
public CompletableFuture<Result<StreamQueryResult>> execute(QueryStream stream, BaseYdbExecutor.Barrier barrier) {
8181
stream.execute(new QueryPartsHandler())
8282
.thenApply(Result::getStatus)
8383
.whenComplete(this::onStreamFinished)
84-
.thenRun(finish);
84+
.thenRun(barrier::open);
8585
return startFuture;
8686
}
8787

88-
public CompletableFuture<Result<StreamQueryResult>> execute(GrpcReadStream<ResultSetReader> stream) {
88+
public CompletableFuture<Result<StreamQueryResult>> execute(
89+
GrpcReadStream<ResultSetReader> stream, BaseYdbExecutor.Barrier barrier
90+
) {
8991
stream.start(rsr -> onResultSet(0, rsr))
90-
.whenComplete(this::onStreamFinished);
92+
.whenComplete(this::onStreamFinished)
93+
.thenRun(barrier::open);
9194
return startFuture;
9295
}
9396

jdbc/src/main/java/tech/ydb/jdbc/context/TableServiceExecutor.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ public TableServiceExecutor(YdbContext ctx, int transactionLevel, boolean autoCo
5252

5353
@Override
5454
public void close() {
55+
checkBarrier();
5556
tx = null;
5657
}
5758

@@ -74,26 +75,31 @@ protected Session createNewTableSession(YdbValidator validator) throws SQLExcept
7475

7576
@Override
7677
public void setTransactionLevel(int level) throws SQLException {
78+
ensureOpened();
7779
updateState(tx.withTransactionLevel(level));
7880
}
7981

8082
@Override
8183
public void setReadOnly(boolean readOnly) throws SQLException {
84+
ensureOpened();
8285
updateState(tx.withReadOnly(readOnly));
8386
}
8487

8588
@Override
8689
public void setAutoCommit(boolean autoCommit) throws SQLException {
90+
ensureOpened();
8791
updateState(tx.withAutoCommit(autoCommit));
8892
}
8993

9094
@Override
9195
public boolean isClosed() {
96+
checkBarrier();
9297
return tx == null;
9398
}
9499

95100
@Override
96101
public String txID() {
102+
checkBarrier();
97103
return tx != null ? tx.txID() : null;
98104
}
99105

@@ -232,6 +238,8 @@ public YdbQueryResult executeScanQuery(YdbStatement statement, YdbQuery query, S
232238
throws SQLException {
233239
ensureOpened();
234240

241+
Barrier barrier = createBarrier();
242+
235243
YdbContext ctx = statement.getConnection().getCtx();
236244
YdbValidator validator = statement.getValidator();
237245
Duration scanQueryTimeout = ctx.getOperationProperties().getScanQueryTimeout();
@@ -245,7 +253,7 @@ public YdbQueryResult executeScanQuery(YdbStatement statement, YdbQuery query, S
245253
return validator.call(msg, () -> {
246254
GrpcReadStream<ResultSetReader> stream = session.executeScanQuery(yql, params, settings);
247255
StreamQueryResult result = new StreamQueryResult(msg, statement, query, stream::cancel);
248-
return result.execute(stream);
256+
return result.execute(stream, barrier);
249257
});
250258
}
251259

jdbc/src/main/java/tech/ydb/jdbc/context/YdbExecutor.java

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import java.sql.SQLException;
44

5-
import tech.ydb.jdbc.YdbConst;
65
import tech.ydb.jdbc.YdbStatement;
76
import tech.ydb.jdbc.impl.YdbQueryResult;
87
import tech.ydb.jdbc.query.YdbQuery;
@@ -14,13 +13,9 @@
1413
* @author Aleksandr Gorshenin
1514
*/
1615
public interface YdbExecutor {
17-
default void ensureOpened() throws SQLException {
18-
if (isClosed()) {
19-
throw new SQLException(YdbConst.CLOSED_CONNECTION);
20-
}
21-
}
22-
16+
void close();
2317
boolean isClosed();
18+
void ensureOpened() throws SQLException;
2419

2520
String txID();
2621
int transactionLevel() throws SQLException;
@@ -46,6 +41,4 @@ YdbQueryResult executeDataQuery(YdbStatement statement, YdbQuery query, String y
4641
void rollback(YdbContext ctx, YdbValidator validator) throws SQLException;
4742

4843
boolean isValid(YdbValidator validator, int timeout) throws SQLException;
49-
50-
void close();
5144
}

jdbc/src/main/java/tech/ydb/jdbc/impl/YdbConnectionImpl.java

Lines changed: 3 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
import java.util.Map;
1818
import java.util.Properties;
1919
import java.util.concurrent.Executor;
20-
import java.util.concurrent.atomic.AtomicReference;
2120
import java.util.logging.Level;
2221
import java.util.logging.Logger;
2322

@@ -39,7 +38,6 @@ public class YdbConnectionImpl implements YdbConnection {
3938
private final YdbContext ctx;
4039
private final YdbValidator validator;
4140
private final YdbExecutor executor;
42-
private final AtomicReference<YdbStatement> currState = new AtomicReference<>();
4341

4442
public YdbConnectionImpl(YdbContext context) throws SQLException {
4543
this.ctx = context;
@@ -85,35 +83,18 @@ public void setAutoCommit(boolean autoCommit) throws SQLException {
8583
executor.setAutoCommit(autoCommit);
8684
}
8785

88-
private <T extends YdbStatement> T updateStatement(T statement) throws SQLException {
89-
Statement prev = currState.getAndSet(statement);
90-
if (prev != null) {
91-
prev.close();
92-
}
93-
return statement;
94-
}
95-
96-
private void waitStatementReady() throws SQLException {
97-
YdbStatement curr = currState.get();
98-
if (curr != null) {
99-
curr.waitReady();
100-
}
101-
}
102-
10386
@Override
10487
public boolean getAutoCommit() throws SQLException {
10588
return executor.isAutoCommit();
10689
}
10790

10891
@Override
10992
public void commit() throws SQLException {
110-
waitStatementReady();
11193
executor.commit(ctx, validator);
11294
}
11395

11496
@Override
11597
public void rollback() throws SQLException {
116-
waitStatementReady();
11798
executor.rollback(ctx, validator);
11899
}
119100

@@ -130,7 +111,7 @@ public void close() throws SQLException {
130111
}
131112

132113
@Override
133-
public boolean isClosed() {
114+
public boolean isClosed() throws SQLException {
134115
return executor.isClosed();
135116
}
136117

@@ -186,8 +167,6 @@ public SQLWarning getWarnings() throws SQLException {
186167
@Override
187168
public void clearWarnings() throws SQLException {
188169
executor.ensureOpened();
189-
190-
waitStatementReady();
191170
validator.clearWarnings();
192171
}
193172

@@ -231,7 +210,7 @@ public YdbStatement createStatement(int resultSetType, int resultSetConcurrency,
231210
int resultSetHoldability) throws SQLException {
232211
executor.ensureOpened();
233212
checkStatementParams(resultSetType, resultSetConcurrency, resultSetHoldability);
234-
return updateStatement(new YdbStatementImpl(this, resultSetType));
213+
return new YdbStatementImpl(this, resultSetType);
235214
}
236215

237216
@Override
@@ -263,7 +242,7 @@ private YdbPreparedStatement prepareStatement(String sql, int resultSetType, Ydb
263242
YdbQuery query = ctx.findOrParseYdbQuery(sql);
264243

265244
YdbPreparedQuery params = ctx.findOrPrepareParams(query, mode);
266-
return updateStatement(new YdbPreparedStatementImpl(this, query, params, resultSetType));
245+
return new YdbPreparedStatementImpl(this, query, params, resultSetType);
267246
}
268247

269248
@Override
@@ -309,7 +288,6 @@ public int getNetworkTimeout() throws SQLException {
309288

310289
@Override
311290
public String getYdbTxId() throws SQLException {
312-
waitStatementReady();
313291
return executor.txID();
314292
}
315293

0 commit comments

Comments
 (0)