Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions query/src/main/java/tech/ydb/query/impl/SessionImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,7 @@ public QueryStream createQuery(String query, TxMode tx, Params prms, ExecuteQuer
YdbQuery.TransactionControl tc = TxControl.txModeCtrl(tx, true);
return new StreamImpl(createGrpcStream(query, tc, prms, settings)) {
@Override
void handleTxMeta(YdbQuery.TransactionMeta meta) {
String txID = meta == null ? null : meta.getId();
void handleTxMeta(String txID) {
if (txID != null && !txID.isEmpty()) {
logger.warn("{} got unexpected transaction id {}", SessionImpl.this, txID);
}
Expand Down Expand Up @@ -253,7 +252,7 @@ abstract class StreamImpl implements QueryStream {
this.grpcStream = grpcStream;
}

abstract void handleTxMeta(YdbQuery.TransactionMeta meta);
abstract void handleTxMeta(String txId);
void handleCompletion(Status status, Throwable th) { }

@Override
Expand All @@ -276,7 +275,7 @@ public CompletableFuture<Result<QueryInfo>> execute(PartsHandler handler) {
}

if (msg.hasTxMeta()) {
handleTxMeta(msg.getTxMeta());
handleTxMeta(msg.getTxMeta().getId());
}
if (issues.length > 0) {
if (handler != null) {
Expand Down Expand Up @@ -352,8 +351,8 @@ public QueryStream createQuery(String query, boolean commitAtEnd, Params prms, E

return new StreamImpl(createGrpcStream(query, tc, prms, settings)) {
@Override
void handleTxMeta(YdbQuery.TransactionMeta meta) {
String newId = meta == null || meta.getId() == null || meta.getId().isEmpty() ? null : meta.getId();
void handleTxMeta(String txID) {
String newId = txID == null || txID.isEmpty() ? null : txID;
if (!txId.compareAndSet(currentId, newId)) {
logger.warn("{} lost transaction meta id {}", SessionImpl.this, newId);
}
Expand Down
8 changes: 4 additions & 4 deletions query/src/main/java/tech/ydb/query/impl/TableClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -156,17 +156,17 @@ public CompletableFuture<Result<DataQueryResult>> executeDataQueryInternal(
.withRequestTimeout(settings.getTimeoutDuration())
.build();

final AtomicReference<String> txID = new AtomicReference<>("");
final AtomicReference<String> txRef = new AtomicReference<>("");
QueryStream stream = querySession.new StreamImpl(querySession.createGrpcStream(query, tc, prms, qs)) {
@Override
void handleTxMeta(YdbQuery.TransactionMeta meta) {
txID.set(meta.getId());
void handleTxMeta(String txID) {
txRef.set(txID);
}
};

return QueryReader.readFrom(stream)
.thenApply(r -> r.map(
reader -> new ProxedDataQueryResult(txID.get(), reader)
reader -> new ProxedDataQueryResult(txRef.get(), reader)
));
}

Expand Down