Skip to content

Commit e100343

Browse files
committed
Fixed inconsistent transaction state
1 parent f0d05e9 commit e100343

File tree

2 files changed

+24
-16
lines changed

2 files changed

+24
-16
lines changed

jdbc/src/main/java/tech/ydb/jdbc/context/QueryServiceExecutor.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -212,16 +212,17 @@ public YdbQueryResult executeDataQuery(
212212
}
213213

214214
String msg = "STREAM_QUERY >>\n" + yql;
215-
QueryStream stream = tx.createQuery(yql, isAutoCommit, params, settings);
216-
StreamQueryResult result = new StreamQueryResult(msg, statement, query, stream::cancel);
217-
218-
result.start(stream).thenRun(() -> {
215+
try {
216+
return validator.call(msg, () -> {
217+
QueryStream stream = tx.createQuery(yql, isAutoCommit, params, settings);
218+
StreamQueryResult result = new StreamQueryResult(msg, statement, query, stream::cancel);
219+
return result.execute(stream);
220+
});
221+
} finally {
219222
if (!tx.isActive()) {
220223
cleanTx();
221224
}
222-
});
223-
224-
return result;
225+
}
225226
}
226227

227228
@Override

jdbc/src/main/java/tech/ydb/jdbc/context/StreamQueryResult.java

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,9 @@ public class StreamQueryResult implements YdbQueryResult {
4040
private final String msg;
4141
private final YdbStatement statement;
4242
private final Runnable stopRunnable;
43-
private final CompletableFuture<Status> statusFuture = new CompletableFuture<>();
43+
44+
private final CompletableFuture<Status> finishFuture = new CompletableFuture<>();
45+
private final CompletableFuture<Result<StreamQueryResult>> startFuture = new CompletableFuture<>();
4446

4547
private final int[] resultIndexes;
4648
private final List<CompletableFuture<Result<LazyResultSet>>> resultFutures = new ArrayList<>();
@@ -72,31 +74,34 @@ public StreamQueryResult(String msg, YdbStatement statement, YdbQuery query, Run
7274
}
7375
}
7476

75-
public CompletableFuture<Status> start(QueryStream stream) {
76-
return stream
77-
.execute(new QueryPartsHandler())
77+
public CompletableFuture<Result<StreamQueryResult>> execute(QueryStream stream) {
78+
stream.execute(new QueryPartsHandler())
7879
.thenApply(Result::getStatus)
79-
.whenComplete(this::onStreamFinish);
80+
.whenComplete(this::onStreamFinished);
81+
return startFuture;
8082
}
8183

82-
private void onStreamFinish(Status status, Throwable th) {
84+
private void onStreamFinished(Status status, Throwable th) {
8385
if (th != null) {
84-
statusFuture.completeExceptionally(th);
86+
finishFuture.completeExceptionally(th);
8587
for (CompletableFuture<Result<LazyResultSet>> future: resultFutures) {
8688
future.completeExceptionally(th);
8789
}
90+
startFuture.completeExceptionally(th);
8891
}
8992

9093
if (status != null) {
91-
statusFuture.complete(status);
94+
finishFuture.complete(status);
9295
if (status.isSuccess()) {
9396
for (CompletableFuture<Result<LazyResultSet>> future: resultFutures) {
9497
future.complete(Result.success(new LazyResultSet(statement, new ColumnInfo[0]), status));
9598
}
99+
startFuture.complete(Result.success(this));
96100
} else {
97101
for (CompletableFuture<Result<LazyResultSet>> future: resultFutures) {
98102
future.complete(Result.fail(status));
99103
}
104+
startFuture.complete(Result.fail(status));
100105
}
101106
}
102107

@@ -112,7 +117,7 @@ private void onStreamFinish(Status status, Throwable th) {
112117

113118
@Override
114119
public void close() throws SQLException {
115-
Status status = statusFuture.join();
120+
Status status = finishFuture.join();
116121
if (!status.isSuccess()) {
117122
throw ExceptionFactory.createException("Cannot execute '" + msg + "' with " + status,
118123
new UnexpectedResultException("Unexpected status", status)
@@ -218,11 +223,13 @@ private void onResultSet(int index, ResultSetReader rsr) {
218223
private class QueryPartsHandler implements QueryStream.PartsHandler {
219224
@Override
220225
public void onIssues(Issue[] issues) {
226+
startFuture.complete(Result.success(StreamQueryResult.this));
221227
statement.getValidator().addStatusIssues(Arrays.asList(issues));
222228
}
223229

224230
@Override
225231
public void onNextPart(QueryResultPart part) {
232+
startFuture.complete(Result.success(StreamQueryResult.this));
226233
onResultSet((int) part.getResultSetIndex(), part.getResultSetReader());
227234
}
228235
}

0 commit comments

Comments
 (0)