Skip to content

Commit 79db3eb

Browse files
committed
Replace barriers by resultset keeping
1 parent 0d69e80 commit 79db3eb

File tree

4 files changed

+46
-57
lines changed

4 files changed

+46
-57
lines changed

jdbc/src/main/java/tech/ydb/jdbc/context/BaseYdbExecutor.java

Lines changed: 16 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import java.sql.SQLException;
44
import java.util.Collections;
5-
import java.util.concurrent.CompletableFuture;
65
import java.util.concurrent.atomic.AtomicReference;
76

87
import tech.ydb.jdbc.YdbConst;
@@ -20,28 +19,31 @@
2019
*/
2120
public abstract class BaseYdbExecutor implements YdbExecutor {
2221
private final SessionRetryContext retryCtx;
23-
private final AtomicReference<Barrier> barrier;
22+
private final AtomicReference<YdbQueryResult> currResult;
2423

2524
public BaseYdbExecutor(YdbContext ctx) {
2625
this.retryCtx = ctx.getRetryCtx();
27-
this.barrier = new AtomicReference<>(new Barrier());
28-
this.barrier.get().open();
26+
this.currResult = new AtomicReference<>();
2927
}
3028

31-
protected void checkBarrier() {
32-
barrier.get().waitOpening();
29+
protected void closeCurrentResult() throws SQLException {
30+
YdbQueryResult rs = currResult.get();
31+
if (rs != null) {
32+
rs.close();
33+
}
3334
}
3435

35-
protected Barrier createBarrier() {
36-
Barrier newBarrier = new Barrier();
37-
Barrier prev = barrier.getAndSet(newBarrier);
38-
prev.waitOpening();
39-
return newBarrier;
36+
protected YdbQueryResult updateCurrentResult(YdbQueryResult result) throws SQLException {
37+
YdbQueryResult old = currResult.getAndSet(result);
38+
if (old != null) {
39+
old.close();
40+
}
41+
return result;
4042
}
4143

4244
@Override
4345
public void ensureOpened() throws SQLException {
44-
checkBarrier();
46+
closeCurrentResult();
4547
if (isClosed()) {
4648
throw new SQLException(YdbConst.CLOSED_CONNECTION);
4749
}
@@ -61,7 +63,7 @@ public YdbQueryResult executeSchemeQuery(YdbStatement statement, YdbQuery query)
6163
() -> retryCtx.supplyStatus(session -> session.executeSchemeQuery(yql, settings))
6264
);
6365

64-
return new StaticQueryResult(query, Collections.emptyList());
66+
return updateCurrentResult(new StaticQueryResult(query, Collections.emptyList()));
6567
}
6668

6769
@Override
@@ -75,18 +77,6 @@ public YdbQueryResult executeBulkUpsert(YdbStatement statement, YdbQuery query,
7577
() -> retryCtx.supplyStatus(session -> session.executeBulkUpsert(tablePath, rows))
7678
);
7779

78-
return new StaticQueryResult(query, Collections.emptyList());
79-
}
80-
81-
public class Barrier {
82-
private final CompletableFuture<Void> future = new CompletableFuture<>();
83-
84-
public void open() {
85-
future.complete(null);
86-
}
87-
88-
public void waitOpening() {
89-
future.join();
90-
}
80+
return updateCurrentResult(new StaticQueryResult(query, Collections.emptyList()));
9181
}
9282
}

jdbc/src/main/java/tech/ydb/jdbc/context/QueryServiceExecutor.java

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -81,8 +81,8 @@ protected QuerySession createNewQuerySession(YdbValidator validator) throws SQLE
8181
}
8282

8383
@Override
84-
public void close() {
85-
checkBarrier();
84+
public void close() throws SQLException {
85+
closeCurrentResult();
8686
cleanTx();
8787
isClosed = true;
8888
}
@@ -144,14 +144,14 @@ public void setAutoCommit(boolean autoCommit) throws SQLException {
144144
}
145145

146146
@Override
147-
public boolean isClosed() {
148-
checkBarrier();
147+
public boolean isClosed() throws SQLException {
148+
closeCurrentResult();
149149
return isClosed;
150150
}
151151

152152
@Override
153-
public String txID() {
154-
checkBarrier();
153+
public String txID() throws SQLException {
154+
closeCurrentResult();
155155
return tx != null ? tx.getId() : null;
156156
}
157157

@@ -243,7 +243,7 @@ public YdbQueryResult executeDataQuery(
243243
for (ResultSetReader rst: result) {
244244
readers.add(new YdbStaticResultSet(statement, rst));
245245
}
246-
return new StaticQueryResult(query, readers);
246+
return updateCurrentResult(new StaticQueryResult(query, readers));
247247
} finally {
248248
if (!tx.isActive()) {
249249
cleanTx();
@@ -256,8 +256,6 @@ public YdbQueryResult executeScanQuery(YdbStatement statement, YdbQuery query, S
256256
throws SQLException {
257257
ensureOpened();
258258

259-
Barrier barrier = createBarrier();
260-
261259
YdbContext ctx = statement.getConnection().getCtx();
262260
YdbValidator validator = statement.getValidator();
263261

@@ -268,11 +266,13 @@ public YdbQueryResult executeScanQuery(YdbStatement statement, YdbQuery query, S
268266

269267
final QuerySession session = createNewQuerySession(validator);
270268
String msg = "STREAM_QUERY >>\n" + yql;
271-
return validator.call(msg, () -> {
269+
StreamQueryResult lazy = validator.call(msg, () -> {
272270
QueryStream stream = session.createQuery(yql, TxMode.SNAPSHOT_RO, params, settings);
273271
StreamQueryResult result = new StreamQueryResult(msg, statement, query, stream::cancel);
274-
return result.execute(stream, barrier::open);
272+
return result.execute(stream, session::close);
275273
});
274+
275+
return updateCurrentResult(lazy);
276276
}
277277

278278
@Override
@@ -292,7 +292,7 @@ public YdbQueryResult executeSchemeQuery(YdbStatement statement, YdbQuery query)
292292
);
293293
}
294294

295-
return new StaticQueryResult(query, Collections.emptyList());
295+
return updateCurrentResult(new StaticQueryResult(query, Collections.emptyList()));
296296
}
297297

298298
@Override
@@ -318,7 +318,9 @@ public YdbQueryResult executeExplainQuery(YdbStatement statement, YdbQuery query
318318
throw new SQLException("No explain data");
319319
}
320320

321-
return new StaticQueryResult(statement, res.getStats().getQueryAst(), res.getStats().getQueryPlan());
321+
return updateCurrentResult(
322+
new StaticQueryResult(statement, res.getStats().getQueryAst(), res.getStats().getQueryPlan())
323+
);
322324
}
323325
}
324326

jdbc/src/main/java/tech/ydb/jdbc/context/TableServiceExecutor.java

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,8 @@ public TableServiceExecutor(YdbContext ctx, int transactionLevel, boolean autoCo
5151
}
5252

5353
@Override
54-
public void close() {
55-
checkBarrier();
54+
public void close() throws SQLException {
55+
closeCurrentResult();
5656
tx = null;
5757
}
5858

@@ -92,14 +92,14 @@ public void setAutoCommit(boolean autoCommit) throws SQLException {
9292
}
9393

9494
@Override
95-
public boolean isClosed() {
96-
checkBarrier();
95+
public boolean isClosed() throws SQLException {
96+
closeCurrentResult();
9797
return tx == null;
9898
}
9999

100100
@Override
101-
public String txID() {
102-
checkBarrier();
101+
public String txID() throws SQLException {
102+
closeCurrentResult();
103103
return tx != null ? tx.txID() : null;
104104
}
105105

@@ -197,7 +197,7 @@ public YdbQueryResult executeExplainQuery(YdbStatement statement, YdbQuery query
197197
try (Session session = createNewTableSession(validator)) {
198198
String msg = QueryType.EXPLAIN_QUERY + " >>\n" + yql;
199199
ExplainDataQueryResult res = validator.call(msg, () -> session.explainDataQuery(yql, settings));
200-
return new StaticQueryResult(statement, res.getQueryAst(), res.getQueryPlan());
200+
return updateCurrentResult(new StaticQueryResult(statement, res.getQueryAst(), res.getQueryPlan()));
201201
}
202202
}
203203

@@ -226,7 +226,7 @@ public YdbQueryResult executeDataQuery(YdbStatement statement, YdbQuery query, S
226226
readers.add(new YdbStaticResultSet(statement, rs));
227227
}
228228

229-
return new StaticQueryResult(query, readers);
229+
return updateCurrentResult(new StaticQueryResult(query, readers));
230230
} catch (SQLException | RuntimeException ex) {
231231
updateState(tx.withRollback(session));
232232
throw ex;
@@ -238,8 +238,6 @@ public YdbQueryResult executeScanQuery(YdbStatement statement, YdbQuery query, S
238238
throws SQLException {
239239
ensureOpened();
240240

241-
final Barrier barrier = createBarrier();
242-
243241
YdbContext ctx = statement.getConnection().getCtx();
244242
YdbValidator validator = statement.getValidator();
245243
Duration scanQueryTimeout = ctx.getOperationProperties().getScanQueryTimeout();
@@ -250,14 +248,13 @@ public YdbQueryResult executeScanQuery(YdbStatement statement, YdbQuery query, S
250248
final Session session = createNewTableSession(validator);
251249

252250
String msg = QueryType.SCAN_QUERY + " >>\n" + yql;
253-
return validator.call(msg, () -> {
251+
StreamQueryResult lazy = validator.call(msg, () -> {
254252
GrpcReadStream<ResultSetReader> stream = session.executeScanQuery(yql, params, settings);
255253
StreamQueryResult result = new StreamQueryResult(msg, statement, query, stream::cancel);
256-
return result.execute(stream, () -> {
257-
session.close();
258-
barrier.open();
259-
});
254+
return result.execute(stream, session::close);
260255
});
256+
257+
return updateCurrentResult(lazy);
261258
}
262259

263260
@Override

jdbc/src/main/java/tech/ydb/jdbc/context/YdbExecutor.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,11 @@
1313
* @author Aleksandr Gorshenin
1414
*/
1515
public interface YdbExecutor {
16-
void close();
17-
boolean isClosed();
16+
void close() throws SQLException;
17+
boolean isClosed() throws SQLException;
1818
void ensureOpened() throws SQLException;
1919

20-
String txID();
20+
String txID() throws SQLException;
2121
int transactionLevel() throws SQLException;
2222

2323
boolean isInsideTransaction() throws SQLException;

0 commit comments

Comments
 (0)