Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 1 addition & 10 deletions query/src/main/java/tech/ydb/query/impl/QueryServiceRpc.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

import java.util.concurrent.CompletableFuture;

import io.grpc.Context;

import tech.ydb.core.Result;
import tech.ydb.core.grpc.GrpcReadStream;
import tech.ydb.core.grpc.GrpcRequestSettings;
Expand Down Expand Up @@ -70,14 +68,7 @@ public CompletableFuture<Result<YdbQuery.DeleteSessionResponse>> deleteSession(

public GrpcReadStream<YdbQuery.SessionState> attachSession(
YdbQuery.AttachSessionRequest request, GrpcRequestSettings settings) {
// Execute attachSession call outside current context to avoid span propogation
Context ctx = Context.ROOT.fork();
Context previous = ctx.attach();
try {
return transport.readStreamCall(QueryServiceGrpc.getAttachSessionMethod(), settings, request);
} finally {
ctx.detach(previous);
}
return transport.readStreamCall(QueryServiceGrpc.getAttachSessionMethod(), settings, request);
}

public CompletableFuture<Result<YdbQuery.BeginTransactionResponse>> beginTransaction(
Expand Down
50 changes: 29 additions & 21 deletions query/src/main/java/tech/ydb/query/impl/SessionImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import java.util.concurrent.atomic.AtomicReference;

import com.google.protobuf.TextFormat;
import io.grpc.Context;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -137,28 +138,35 @@ GrpcReadStream<Status> attach(AttachSessionSettings settings) {
YdbQuery.AttachSessionRequest request = YdbQuery.AttachSessionRequest.newBuilder()
.setSessionId(sessionId)
.build();
GrpcRequestSettings grpcSettings = makeOptions(settings).build();
GrpcReadStream<YdbQuery.SessionState> origin = rpc.attachSession(request, grpcSettings);
return new GrpcReadStream<Status>() {
@Override
public CompletableFuture<Status> start(GrpcReadStream.Observer<Status> observer) {
return origin.start(message -> {
if (logger.isTraceEnabled()) {
String msg = TextFormat.shortDebugString(message);
logger.trace("session '{}' got attach stream message {}", sessionId, msg);
}
StatusCode code = StatusCode.fromProto(message.getStatus());
Status status = Status.of(code, Issue.fromPb(message.getIssuesList()));
updateSessionState(status);
observer.onNext(status);
});
}
// Execute attachSession call outside current context to avoid cancellation and deadline propogation
Context ctx = Context.ROOT.fork();
Context previous = ctx.attach();
try {
GrpcRequestSettings grpcSettings = makeOptions(settings).build();
GrpcReadStream<YdbQuery.SessionState> origin = rpc.attachSession(request, grpcSettings);
return new GrpcReadStream<Status>() {
@Override
public CompletableFuture<Status> start(GrpcReadStream.Observer<Status> observer) {
return origin.start(message -> {
if (logger.isTraceEnabled()) {
String msg = TextFormat.shortDebugString(message);
logger.trace("session '{}' got attach stream message {}", sessionId, msg);
}
StatusCode code = StatusCode.fromProto(message.getStatus());
Status status = Status.of(code, Issue.fromPb(message.getIssuesList()));
updateSessionState(status);
observer.onNext(status);
});
}

@Override
public void cancel() {
origin.cancel();
}
};
@Override
public void cancel() {
origin.cancel();
}
};
} finally {
ctx.detach(previous);
}
}

private GrpcRequestSettings.Builder makeOptions(BaseRequestSettings settings) {
Expand Down
40 changes: 28 additions & 12 deletions query/src/main/java/tech/ydb/query/impl/SessionPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import java.util.concurrent.atomic.LongAdder;
import java.util.function.BiConsumer;

import io.grpc.Context;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -270,23 +271,38 @@ private class Handler implements WaitingQueue.Handler<PooledQuerySession> {

@Override
public CompletableFuture<PooledQuerySession> create() {
stats.requested.increment();
return SessionImpl
.createSession(rpc, CREATE_SETTINGS, true)
.thenCompose(r -> {
if (!r.isSuccess()) {
stats.failed.increment();
throw new UnexpectedResultException("create session problem", r.getStatus());
}
return new PooledQuerySession(rpc, r.getValue()).start();
})
.thenApply(Result::getValue);
// Execute createSession call outside current context to avoid cancellation and deadline propogation
Context ctx = Context.ROOT.fork();
Context previous = ctx.attach();
try {
stats.requested.increment();
return SessionImpl
.createSession(rpc, CREATE_SETTINGS, true)
.thenCompose(r -> {
if (!r.isSuccess()) {
stats.failed.increment();
throw new UnexpectedResultException("create session problem", r.getStatus());
}
return new PooledQuerySession(rpc, r.getValue()).start();
})
.thenApply(Result::getValue);
} finally {
ctx.detach(previous);
}
}

@Override
public void destroy(PooledQuerySession session) {
stats.deleted.increment();
session.destroy();

// Execute deleteSession call outside current context to avoid cancellation and deadline propogation
Context ctx = Context.ROOT.fork();
Context previous = ctx.attach();
try {
session.destroy();
} finally {
ctx.detach(previous);
}
}
}

Expand Down
57 changes: 36 additions & 21 deletions table/src/main/java/tech/ydb/table/impl/pool/SessionPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import java.util.concurrent.atomic.LongAdder;
import java.util.function.BiConsumer;

import io.grpc.Context;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -178,33 +179,47 @@ private class Handler implements WaitingQueue.Handler<ClosableSession> {

@Override
public CompletableFuture<ClosableSession> create() {
stats.requested.increment();
return BaseSession
.createSessionId(tableRpc, CREATE_SETTINGS, true)
.thenApply(response -> {
if (!response.isSuccess()) {
stats.failed.increment();
throw new UnexpectedResultException("create session problem", response.getStatus());
}
return new ClosableSession(response.getValue(), tableRpc, keepQueryText);
});
// Execute createSession call outside current context to avoid cancellation and deadline propogation
Context ctx = Context.ROOT.fork();
Context previous = ctx.attach();
try {
stats.requested.increment();
return BaseSession
.createSessionId(tableRpc, CREATE_SETTINGS, true)
.thenApply(response -> {
if (!response.isSuccess()) {
stats.failed.increment();
throw new UnexpectedResultException("create session problem", response.getStatus());
}
return new ClosableSession(response.getValue(), tableRpc, keepQueryText);
});
} finally {
ctx.detach(previous);
}
}

@Override
public void destroy(ClosableSession session) {
stats.deleted.increment();
session.delete(new DeleteSessionSettings()).whenComplete((status, th) -> {
if (th != null) {
logger.warn("session {} destroyed with exception {}", session.getId(), th.getMessage());
}
if (status != null) {
if (status.isSuccess()) {
logger.debug("session {} successfully destroyed", session.getId());
} else {
logger.warn("session {} destroyed with status {}", session.getId(), status);
// Execute deleteSession call outside current context to avoid cancellation and deadline propogation
Context ctx = Context.ROOT.fork();
Context previous = ctx.attach();
try {
session.delete(new DeleteSessionSettings()).whenComplete((status, th) -> {
if (th != null) {
logger.warn("session {} destroyed with exception {}", session.getId(), th.getMessage());
}
}
});
if (status != null) {
if (status.isSuccess()) {
logger.debug("session {} successfully destroyed", session.getId());
} else {
logger.warn("session {} destroyed with status {}", session.getId(), status);
}
}
});
} finally {
ctx.detach(previous);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package tech.ydb.table.integration;

import java.time.Duration;
import java.util.concurrent.CompletableFuture;

import io.grpc.Context;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
Expand All @@ -16,9 +18,10 @@
import tech.ydb.core.StatusCode;
import tech.ydb.core.grpc.GrpcTransport;
import tech.ydb.table.Session;
import tech.ydb.table.TableClient;
import tech.ydb.table.impl.PooledTableClient;
import tech.ydb.table.query.DataQueryResult;
import tech.ydb.table.query.Params;
import tech.ydb.table.rpc.grpc.GrpcTableRpc;
import tech.ydb.table.settings.ExecuteScanQuerySettings;
import tech.ydb.table.transaction.TxControl;
import tech.ydb.test.junit4.YdbHelperRule;
Expand All @@ -36,7 +39,7 @@ public class TableClientTest {

private static GrpcTransport transport;

private static TableClient tableClient;
private static PooledTableClient tableClient;

@BeforeClass
public static void initTransport() {
Expand All @@ -54,7 +57,7 @@ public static void closeTransport() {
@Before
public void initTableClient() {
grpcInterceptor.reset();
tableClient = TableClient.newClient(transport).build();
tableClient = PooledTableClient.newClient(GrpcTableRpc.useTransport(transport)).build();
}

@After
Expand Down Expand Up @@ -151,4 +154,41 @@ public void sessionExecuteScanQueryTest() {
Assert.assertEquals(id2, s3.getId());
}
}

@Test
public void cancelledGrpcContextCloseTest() {
tableClient.updatePoolMaxSize(2);
Context.CancellableContext canceled = Context.current().withCancellation();

Session s1;
Context previous = canceled.attach();
try {
s1 = getSession();
} finally {
canceled.detach(previous);
}

Session s2 = getSession();

// not ready, because the session pool has max size
CompletableFuture<Result<Session>> s3f = tableClient.createSession(Duration.ofSeconds(5));

canceled.cancel(new RuntimeException("test"));

previous = canceled.attach();
try {
Result<DataQueryResult> res = s1.executeDataQuery("SELECT 1 + 2", TxControl.snapshotRo()).join();
// context is closed, session must be invalidated
Assert.assertEquals(StatusCode.CLIENT_CANCELLED, res.getStatus().getCode());
// s1 will be removed, but s3f must be completed by a new CreateSession request
s1.close();
} finally {
canceled.detach(previous);
}

Session s3 = s3f.join().getValue();
Assert.assertNotEquals(s2.getId(), s3.getId());
s2.close();
s3.close();
}
}