diff --git a/query/src/main/java/tech/ydb/query/impl/QueryServiceRpc.java b/query/src/main/java/tech/ydb/query/impl/QueryServiceRpc.java index e1ef7fc8..25f0dfb0 100644 --- a/query/src/main/java/tech/ydb/query/impl/QueryServiceRpc.java +++ b/query/src/main/java/tech/ydb/query/impl/QueryServiceRpc.java @@ -2,8 +2,6 @@ import java.util.concurrent.CompletableFuture; -import io.grpc.Context; - import tech.ydb.core.Result; import tech.ydb.core.grpc.GrpcReadStream; import tech.ydb.core.grpc.GrpcRequestSettings; @@ -70,14 +68,7 @@ public CompletableFuture> deleteSession( public GrpcReadStream attachSession( YdbQuery.AttachSessionRequest request, GrpcRequestSettings settings) { - // Execute attachSession call outside current context to avoid span propogation - Context ctx = Context.ROOT.fork(); - Context previous = ctx.attach(); - try { - return transport.readStreamCall(QueryServiceGrpc.getAttachSessionMethod(), settings, request); - } finally { - ctx.detach(previous); - } + return transport.readStreamCall(QueryServiceGrpc.getAttachSessionMethod(), settings, request); } public CompletableFuture> beginTransaction( diff --git a/query/src/main/java/tech/ydb/query/impl/SessionImpl.java b/query/src/main/java/tech/ydb/query/impl/SessionImpl.java index c146018b..420d5b57 100644 --- a/query/src/main/java/tech/ydb/query/impl/SessionImpl.java +++ b/query/src/main/java/tech/ydb/query/impl/SessionImpl.java @@ -10,6 +10,7 @@ import java.util.concurrent.atomic.AtomicReference; import com.google.protobuf.TextFormat; +import io.grpc.Context; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -137,28 +138,35 @@ GrpcReadStream attach(AttachSessionSettings settings) { YdbQuery.AttachSessionRequest request = YdbQuery.AttachSessionRequest.newBuilder() .setSessionId(sessionId) .build(); - GrpcRequestSettings grpcSettings = makeOptions(settings).build(); - GrpcReadStream origin = rpc.attachSession(request, grpcSettings); - return new GrpcReadStream() { - @Override - public CompletableFuture start(GrpcReadStream.Observer observer) { - return origin.start(message -> { - if (logger.isTraceEnabled()) { - String msg = TextFormat.shortDebugString(message); - logger.trace("session '{}' got attach stream message {}", sessionId, msg); - } - StatusCode code = StatusCode.fromProto(message.getStatus()); - Status status = Status.of(code, Issue.fromPb(message.getIssuesList())); - updateSessionState(status); - observer.onNext(status); - }); - } + // Execute attachSession call outside current context to avoid cancellation and deadline propogation + Context ctx = Context.ROOT.fork(); + Context previous = ctx.attach(); + try { + GrpcRequestSettings grpcSettings = makeOptions(settings).build(); + GrpcReadStream origin = rpc.attachSession(request, grpcSettings); + return new GrpcReadStream() { + @Override + public CompletableFuture start(GrpcReadStream.Observer observer) { + return origin.start(message -> { + if (logger.isTraceEnabled()) { + String msg = TextFormat.shortDebugString(message); + logger.trace("session '{}' got attach stream message {}", sessionId, msg); + } + StatusCode code = StatusCode.fromProto(message.getStatus()); + Status status = Status.of(code, Issue.fromPb(message.getIssuesList())); + updateSessionState(status); + observer.onNext(status); + }); + } - @Override - public void cancel() { - origin.cancel(); - } - }; + @Override + public void cancel() { + origin.cancel(); + } + }; + } finally { + ctx.detach(previous); + } } private GrpcRequestSettings.Builder makeOptions(BaseRequestSettings settings) { diff --git a/query/src/main/java/tech/ydb/query/impl/SessionPool.java b/query/src/main/java/tech/ydb/query/impl/SessionPool.java index 39e2dd45..5a0ae1d6 100644 --- a/query/src/main/java/tech/ydb/query/impl/SessionPool.java +++ b/query/src/main/java/tech/ydb/query/impl/SessionPool.java @@ -12,6 +12,7 @@ import java.util.concurrent.atomic.LongAdder; import java.util.function.BiConsumer; +import io.grpc.Context; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -270,23 +271,38 @@ private class Handler implements WaitingQueue.Handler { @Override public CompletableFuture create() { - stats.requested.increment(); - return SessionImpl - .createSession(rpc, CREATE_SETTINGS, true) - .thenCompose(r -> { - if (!r.isSuccess()) { - stats.failed.increment(); - throw new UnexpectedResultException("create session problem", r.getStatus()); - } - return new PooledQuerySession(rpc, r.getValue()).start(); - }) - .thenApply(Result::getValue); + // Execute createSession call outside current context to avoid cancellation and deadline propogation + Context ctx = Context.ROOT.fork(); + Context previous = ctx.attach(); + try { + stats.requested.increment(); + return SessionImpl + .createSession(rpc, CREATE_SETTINGS, true) + .thenCompose(r -> { + if (!r.isSuccess()) { + stats.failed.increment(); + throw new UnexpectedResultException("create session problem", r.getStatus()); + } + return new PooledQuerySession(rpc, r.getValue()).start(); + }) + .thenApply(Result::getValue); + } finally { + ctx.detach(previous); + } } @Override public void destroy(PooledQuerySession session) { stats.deleted.increment(); - session.destroy(); + + // Execute deleteSession call outside current context to avoid cancellation and deadline propogation + Context ctx = Context.ROOT.fork(); + Context previous = ctx.attach(); + try { + session.destroy(); + } finally { + ctx.detach(previous); + } } } diff --git a/table/src/main/java/tech/ydb/table/impl/pool/SessionPool.java b/table/src/main/java/tech/ydb/table/impl/pool/SessionPool.java index da14798e..906317fa 100644 --- a/table/src/main/java/tech/ydb/table/impl/pool/SessionPool.java +++ b/table/src/main/java/tech/ydb/table/impl/pool/SessionPool.java @@ -13,6 +13,7 @@ import java.util.concurrent.atomic.LongAdder; import java.util.function.BiConsumer; +import io.grpc.Context; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -178,33 +179,47 @@ private class Handler implements WaitingQueue.Handler { @Override public CompletableFuture create() { - stats.requested.increment(); - return BaseSession - .createSessionId(tableRpc, CREATE_SETTINGS, true) - .thenApply(response -> { - if (!response.isSuccess()) { - stats.failed.increment(); - throw new UnexpectedResultException("create session problem", response.getStatus()); - } - return new ClosableSession(response.getValue(), tableRpc, keepQueryText); - }); + // Execute createSession call outside current context to avoid cancellation and deadline propogation + Context ctx = Context.ROOT.fork(); + Context previous = ctx.attach(); + try { + stats.requested.increment(); + return BaseSession + .createSessionId(tableRpc, CREATE_SETTINGS, true) + .thenApply(response -> { + if (!response.isSuccess()) { + stats.failed.increment(); + throw new UnexpectedResultException("create session problem", response.getStatus()); + } + return new ClosableSession(response.getValue(), tableRpc, keepQueryText); + }); + } finally { + ctx.detach(previous); + } } @Override public void destroy(ClosableSession session) { stats.deleted.increment(); - session.delete(new DeleteSessionSettings()).whenComplete((status, th) -> { - if (th != null) { - logger.warn("session {} destroyed with exception {}", session.getId(), th.getMessage()); - } - if (status != null) { - if (status.isSuccess()) { - logger.debug("session {} successfully destroyed", session.getId()); - } else { - logger.warn("session {} destroyed with status {}", session.getId(), status); + // Execute deleteSession call outside current context to avoid cancellation and deadline propogation + Context ctx = Context.ROOT.fork(); + Context previous = ctx.attach(); + try { + session.delete(new DeleteSessionSettings()).whenComplete((status, th) -> { + if (th != null) { + logger.warn("session {} destroyed with exception {}", session.getId(), th.getMessage()); } - } - }); + if (status != null) { + if (status.isSuccess()) { + logger.debug("session {} successfully destroyed", session.getId()); + } else { + logger.warn("session {} destroyed with status {}", session.getId(), status); + } + } + }); + } finally { + ctx.detach(previous); + } } } diff --git a/table/src/test/java/tech/ydb/table/integration/TableClientTest.java b/table/src/test/java/tech/ydb/table/integration/TableClientTest.java index 5a2a303f..e5708282 100644 --- a/table/src/test/java/tech/ydb/table/integration/TableClientTest.java +++ b/table/src/test/java/tech/ydb/table/integration/TableClientTest.java @@ -1,7 +1,9 @@ package tech.ydb.table.integration; import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import io.grpc.Context; import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; @@ -16,9 +18,10 @@ import tech.ydb.core.StatusCode; import tech.ydb.core.grpc.GrpcTransport; import tech.ydb.table.Session; -import tech.ydb.table.TableClient; +import tech.ydb.table.impl.PooledTableClient; import tech.ydb.table.query.DataQueryResult; import tech.ydb.table.query.Params; +import tech.ydb.table.rpc.grpc.GrpcTableRpc; import tech.ydb.table.settings.ExecuteScanQuerySettings; import tech.ydb.table.transaction.TxControl; import tech.ydb.test.junit4.YdbHelperRule; @@ -36,7 +39,7 @@ public class TableClientTest { private static GrpcTransport transport; - private static TableClient tableClient; + private static PooledTableClient tableClient; @BeforeClass public static void initTransport() { @@ -54,7 +57,7 @@ public static void closeTransport() { @Before public void initTableClient() { grpcInterceptor.reset(); - tableClient = TableClient.newClient(transport).build(); + tableClient = PooledTableClient.newClient(GrpcTableRpc.useTransport(transport)).build(); } @After @@ -151,4 +154,41 @@ public void sessionExecuteScanQueryTest() { Assert.assertEquals(id2, s3.getId()); } } + + @Test + public void cancelledGrpcContextCloseTest() { + tableClient.updatePoolMaxSize(2); + Context.CancellableContext canceled = Context.current().withCancellation(); + + Session s1; + Context previous = canceled.attach(); + try { + s1 = getSession(); + } finally { + canceled.detach(previous); + } + + Session s2 = getSession(); + + // not ready, because the session pool has max size + CompletableFuture> s3f = tableClient.createSession(Duration.ofSeconds(5)); + + canceled.cancel(new RuntimeException("test")); + + previous = canceled.attach(); + try { + Result res = s1.executeDataQuery("SELECT 1 + 2", TxControl.snapshotRo()).join(); + // context is closed, session must be invalidated + Assert.assertEquals(StatusCode.CLIENT_CANCELLED, res.getStatus().getCode()); + // s1 will be removed, but s3f must be completed by a new CreateSession request + s1.close(); + } finally { + canceled.detach(previous); + } + + Session s3 = s3f.join().getValue(); + Assert.assertNotEquals(s2.getId(), s3.getId()); + s2.close(); + s3.close(); + } }