Skip to content

Commit 798f5a9

Browse files
authored
Merge pull request #543 from alex268/master
Fixed cancel and deadline propogation for the session pool
2 parents 2e3df4c + db52108 commit 798f5a9

File tree

5 files changed

+137
-67
lines changed

5 files changed

+137
-67
lines changed

query/src/main/java/tech/ydb/query/impl/QueryServiceRpc.java

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,6 @@
22

33
import java.util.concurrent.CompletableFuture;
44

5-
import io.grpc.Context;
6-
75
import tech.ydb.core.Result;
86
import tech.ydb.core.grpc.GrpcReadStream;
97
import tech.ydb.core.grpc.GrpcRequestSettings;
@@ -70,14 +68,7 @@ public CompletableFuture<Result<YdbQuery.DeleteSessionResponse>> deleteSession(
7068

7169
public GrpcReadStream<YdbQuery.SessionState> attachSession(
7270
YdbQuery.AttachSessionRequest request, GrpcRequestSettings settings) {
73-
// Execute attachSession call outside current context to avoid span propogation
74-
Context ctx = Context.ROOT.fork();
75-
Context previous = ctx.attach();
76-
try {
77-
return transport.readStreamCall(QueryServiceGrpc.getAttachSessionMethod(), settings, request);
78-
} finally {
79-
ctx.detach(previous);
80-
}
71+
return transport.readStreamCall(QueryServiceGrpc.getAttachSessionMethod(), settings, request);
8172
}
8273

8374
public CompletableFuture<Result<YdbQuery.BeginTransactionResponse>> beginTransaction(

query/src/main/java/tech/ydb/query/impl/SessionImpl.java

Lines changed: 29 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import java.util.concurrent.atomic.AtomicReference;
1111

1212
import com.google.protobuf.TextFormat;
13+
import io.grpc.Context;
1314
import org.slf4j.Logger;
1415
import org.slf4j.LoggerFactory;
1516

@@ -137,28 +138,35 @@ GrpcReadStream<Status> attach(AttachSessionSettings settings) {
137138
YdbQuery.AttachSessionRequest request = YdbQuery.AttachSessionRequest.newBuilder()
138139
.setSessionId(sessionId)
139140
.build();
140-
GrpcRequestSettings grpcSettings = makeOptions(settings).build();
141-
GrpcReadStream<YdbQuery.SessionState> origin = rpc.attachSession(request, grpcSettings);
142-
return new GrpcReadStream<Status>() {
143-
@Override
144-
public CompletableFuture<Status> start(GrpcReadStream.Observer<Status> observer) {
145-
return origin.start(message -> {
146-
if (logger.isTraceEnabled()) {
147-
String msg = TextFormat.shortDebugString(message);
148-
logger.trace("session '{}' got attach stream message {}", sessionId, msg);
149-
}
150-
StatusCode code = StatusCode.fromProto(message.getStatus());
151-
Status status = Status.of(code, Issue.fromPb(message.getIssuesList()));
152-
updateSessionState(status);
153-
observer.onNext(status);
154-
});
155-
}
141+
// Execute attachSession call outside current context to avoid cancellation and deadline propogation
142+
Context ctx = Context.ROOT.fork();
143+
Context previous = ctx.attach();
144+
try {
145+
GrpcRequestSettings grpcSettings = makeOptions(settings).build();
146+
GrpcReadStream<YdbQuery.SessionState> origin = rpc.attachSession(request, grpcSettings);
147+
return new GrpcReadStream<Status>() {
148+
@Override
149+
public CompletableFuture<Status> start(GrpcReadStream.Observer<Status> observer) {
150+
return origin.start(message -> {
151+
if (logger.isTraceEnabled()) {
152+
String msg = TextFormat.shortDebugString(message);
153+
logger.trace("session '{}' got attach stream message {}", sessionId, msg);
154+
}
155+
StatusCode code = StatusCode.fromProto(message.getStatus());
156+
Status status = Status.of(code, Issue.fromPb(message.getIssuesList()));
157+
updateSessionState(status);
158+
observer.onNext(status);
159+
});
160+
}
156161

157-
@Override
158-
public void cancel() {
159-
origin.cancel();
160-
}
161-
};
162+
@Override
163+
public void cancel() {
164+
origin.cancel();
165+
}
166+
};
167+
} finally {
168+
ctx.detach(previous);
169+
}
162170
}
163171

164172
private GrpcRequestSettings.Builder makeOptions(BaseRequestSettings settings) {

query/src/main/java/tech/ydb/query/impl/SessionPool.java

Lines changed: 28 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import java.util.concurrent.atomic.LongAdder;
1313
import java.util.function.BiConsumer;
1414

15+
import io.grpc.Context;
1516
import org.slf4j.Logger;
1617
import org.slf4j.LoggerFactory;
1718

@@ -270,23 +271,38 @@ private class Handler implements WaitingQueue.Handler<PooledQuerySession> {
270271

271272
@Override
272273
public CompletableFuture<PooledQuerySession> create() {
273-
stats.requested.increment();
274-
return SessionImpl
275-
.createSession(rpc, CREATE_SETTINGS, true)
276-
.thenCompose(r -> {
277-
if (!r.isSuccess()) {
278-
stats.failed.increment();
279-
throw new UnexpectedResultException("create session problem", r.getStatus());
280-
}
281-
return new PooledQuerySession(rpc, r.getValue()).start();
282-
})
283-
.thenApply(Result::getValue);
274+
// Execute createSession call outside current context to avoid cancellation and deadline propogation
275+
Context ctx = Context.ROOT.fork();
276+
Context previous = ctx.attach();
277+
try {
278+
stats.requested.increment();
279+
return SessionImpl
280+
.createSession(rpc, CREATE_SETTINGS, true)
281+
.thenCompose(r -> {
282+
if (!r.isSuccess()) {
283+
stats.failed.increment();
284+
throw new UnexpectedResultException("create session problem", r.getStatus());
285+
}
286+
return new PooledQuerySession(rpc, r.getValue()).start();
287+
})
288+
.thenApply(Result::getValue);
289+
} finally {
290+
ctx.detach(previous);
291+
}
284292
}
285293

286294
@Override
287295
public void destroy(PooledQuerySession session) {
288296
stats.deleted.increment();
289-
session.destroy();
297+
298+
// Execute deleteSession call outside current context to avoid cancellation and deadline propogation
299+
Context ctx = Context.ROOT.fork();
300+
Context previous = ctx.attach();
301+
try {
302+
session.destroy();
303+
} finally {
304+
ctx.detach(previous);
305+
}
290306
}
291307
}
292308

table/src/main/java/tech/ydb/table/impl/pool/SessionPool.java

Lines changed: 36 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import java.util.concurrent.atomic.LongAdder;
1414
import java.util.function.BiConsumer;
1515

16+
import io.grpc.Context;
1617
import org.slf4j.Logger;
1718
import org.slf4j.LoggerFactory;
1819

@@ -178,33 +179,47 @@ private class Handler implements WaitingQueue.Handler<ClosableSession> {
178179

179180
@Override
180181
public CompletableFuture<ClosableSession> create() {
181-
stats.requested.increment();
182-
return BaseSession
183-
.createSessionId(tableRpc, CREATE_SETTINGS, true)
184-
.thenApply(response -> {
185-
if (!response.isSuccess()) {
186-
stats.failed.increment();
187-
throw new UnexpectedResultException("create session problem", response.getStatus());
188-
}
189-
return new ClosableSession(response.getValue(), tableRpc, keepQueryText);
190-
});
182+
// Execute createSession call outside current context to avoid cancellation and deadline propogation
183+
Context ctx = Context.ROOT.fork();
184+
Context previous = ctx.attach();
185+
try {
186+
stats.requested.increment();
187+
return BaseSession
188+
.createSessionId(tableRpc, CREATE_SETTINGS, true)
189+
.thenApply(response -> {
190+
if (!response.isSuccess()) {
191+
stats.failed.increment();
192+
throw new UnexpectedResultException("create session problem", response.getStatus());
193+
}
194+
return new ClosableSession(response.getValue(), tableRpc, keepQueryText);
195+
});
196+
} finally {
197+
ctx.detach(previous);
198+
}
191199
}
192200

193201
@Override
194202
public void destroy(ClosableSession session) {
195203
stats.deleted.increment();
196-
session.delete(new DeleteSessionSettings()).whenComplete((status, th) -> {
197-
if (th != null) {
198-
logger.warn("session {} destroyed with exception {}", session.getId(), th.getMessage());
199-
}
200-
if (status != null) {
201-
if (status.isSuccess()) {
202-
logger.debug("session {} successfully destroyed", session.getId());
203-
} else {
204-
logger.warn("session {} destroyed with status {}", session.getId(), status);
204+
// Execute deleteSession call outside current context to avoid cancellation and deadline propogation
205+
Context ctx = Context.ROOT.fork();
206+
Context previous = ctx.attach();
207+
try {
208+
session.delete(new DeleteSessionSettings()).whenComplete((status, th) -> {
209+
if (th != null) {
210+
logger.warn("session {} destroyed with exception {}", session.getId(), th.getMessage());
205211
}
206-
}
207-
});
212+
if (status != null) {
213+
if (status.isSuccess()) {
214+
logger.debug("session {} successfully destroyed", session.getId());
215+
} else {
216+
logger.warn("session {} destroyed with status {}", session.getId(), status);
217+
}
218+
}
219+
});
220+
} finally {
221+
ctx.detach(previous);
222+
}
208223
}
209224
}
210225

table/src/test/java/tech/ydb/table/integration/TableClientTest.java

Lines changed: 43 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
package tech.ydb.table.integration;
22

33
import java.time.Duration;
4+
import java.util.concurrent.CompletableFuture;
45

6+
import io.grpc.Context;
57
import org.junit.After;
68
import org.junit.AfterClass;
79
import org.junit.Assert;
@@ -16,9 +18,10 @@
1618
import tech.ydb.core.StatusCode;
1719
import tech.ydb.core.grpc.GrpcTransport;
1820
import tech.ydb.table.Session;
19-
import tech.ydb.table.TableClient;
21+
import tech.ydb.table.impl.PooledTableClient;
2022
import tech.ydb.table.query.DataQueryResult;
2123
import tech.ydb.table.query.Params;
24+
import tech.ydb.table.rpc.grpc.GrpcTableRpc;
2225
import tech.ydb.table.settings.ExecuteScanQuerySettings;
2326
import tech.ydb.table.transaction.TxControl;
2427
import tech.ydb.test.junit4.YdbHelperRule;
@@ -36,7 +39,7 @@ public class TableClientTest {
3639

3740
private static GrpcTransport transport;
3841

39-
private static TableClient tableClient;
42+
private static PooledTableClient tableClient;
4043

4144
@BeforeClass
4245
public static void initTransport() {
@@ -54,7 +57,7 @@ public static void closeTransport() {
5457
@Before
5558
public void initTableClient() {
5659
grpcInterceptor.reset();
57-
tableClient = TableClient.newClient(transport).build();
60+
tableClient = PooledTableClient.newClient(GrpcTableRpc.useTransport(transport)).build();
5861
}
5962

6063
@After
@@ -151,4 +154,41 @@ public void sessionExecuteScanQueryTest() {
151154
Assert.assertEquals(id2, s3.getId());
152155
}
153156
}
157+
158+
@Test
159+
public void cancelledGrpcContextCloseTest() {
160+
tableClient.updatePoolMaxSize(2);
161+
Context.CancellableContext canceled = Context.current().withCancellation();
162+
163+
Session s1;
164+
Context previous = canceled.attach();
165+
try {
166+
s1 = getSession();
167+
} finally {
168+
canceled.detach(previous);
169+
}
170+
171+
Session s2 = getSession();
172+
173+
// not ready, because the session pool has max size
174+
CompletableFuture<Result<Session>> s3f = tableClient.createSession(Duration.ofSeconds(5));
175+
176+
canceled.cancel(new RuntimeException("test"));
177+
178+
previous = canceled.attach();
179+
try {
180+
Result<DataQueryResult> res = s1.executeDataQuery("SELECT 1 + 2", TxControl.snapshotRo()).join();
181+
// context is closed, session must be invalidated
182+
Assert.assertEquals(StatusCode.CLIENT_CANCELLED, res.getStatus().getCode());
183+
// s1 will be removed, but s3f must be completed by a new CreateSession request
184+
s1.close();
185+
} finally {
186+
canceled.detach(previous);
187+
}
188+
189+
Session s3 = s3f.join().getValue();
190+
Assert.assertNotEquals(s2.getId(), s3.getId());
191+
s2.close();
192+
s3.close();
193+
}
154194
}

0 commit comments

Comments
 (0)