Skip to content

Commit 877ca3f

Browse files
Merge pull request #611 from ydb-platform/support-shutdown-hint
feat: Session attach stream handles NodeShutdown and SessionShutdown hints
2 parents 7c484d8 + a290df6 commit 877ca3f

File tree

4 files changed

+193
-3
lines changed

4 files changed

+193
-3
lines changed

bom/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515

1616
<properties>
1717
<ydb-auth-api.version>1.0.0</ydb-auth-api.version>
18-
<ydb-proto-api.version>1.9.2</ydb-proto-api.version>
18+
<ydb-proto-api.version>1.9.3</ydb-proto-api.version>
1919
<yc-auth.version>2.3.1</yc-auth.version>
2020
</properties>
2121

query/pom.xml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,13 @@
4141
<artifactId>junit</artifactId>
4242
<scope>test</scope>
4343
</dependency>
44+
<!-- Parent BOM uses Mockito 5.x (Java 11+); query tests compile on JDK 8 in CI -->
45+
<dependency>
46+
<groupId>org.mockito</groupId>
47+
<artifactId>mockito-core</artifactId>
48+
<version>4.11.0</version>
49+
<scope>test</scope>
50+
</dependency>
4451
<dependency>
4552
<groupId>tech.ydb.test</groupId>
4653
<artifactId>ydb-junit4-support</artifactId>

query/src/main/java/tech/ydb/query/impl/SessionImpl.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -144,11 +144,16 @@ GrpcReadStream<Status> attach(AttachSessionSettings settings) {
144144
YdbQuery.AttachSessionRequest request = YdbQuery.AttachSessionRequest.newBuilder()
145145
.setSessionId(sessionId)
146146
.build();
147-
// Execute attachSession call outside current context to avoid cancellation and deadline propogation
147+
// Execute attachSession call outside current context to avoid cancellation and deadline propagation
148148
Context ctx = Context.ROOT.fork();
149149
Context previous = ctx.attach();
150150
try {
151-
GrpcRequestSettings grpcSettings = makeOptions(settings).disableDeadline().build();
151+
AtomicBoolean pessimizationHook = new AtomicBoolean(false);
152+
153+
GrpcRequestSettings grpcSettings = makeOptions(settings)
154+
.withPessimizationHook(pessimizationHook::get)
155+
.disableDeadline()
156+
.build();
152157
GrpcReadStream<YdbQuery.SessionState> origin = rpc.attachSession(request, grpcSettings);
153158
return new GrpcReadStream<Status>() {
154159
@Override
@@ -161,6 +166,19 @@ public CompletableFuture<Status> start(GrpcReadStream.Observer<Status> observer)
161166
StatusCode code = StatusCode.fromProto(message.getStatus());
162167
Status status = Status.of(code, Issue.fromPb(message.getIssuesList()));
163168
updateSessionState(status);
169+
// The hint is sent by the server with a success status.
170+
switch (message.getSessionHintCase()) {
171+
case NODE_SHUTDOWN:
172+
pessimizationHook.set(nodeID != 0);
173+
updateSessionState(Status.of(StatusCode.BAD_SESSION));
174+
break;
175+
case SESSION_SHUTDOWN:
176+
updateSessionState(Status.of(StatusCode.BAD_SESSION));
177+
break;
178+
default:
179+
break;
180+
}
181+
164182
observer.onNext(status);
165183
});
166184
}
Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
package tech.ydb.query.impl;
2+
3+
import java.util.concurrent.CompletableFuture;
4+
import java.util.concurrent.atomic.AtomicReference;
5+
6+
import org.junit.Assert;
7+
import org.junit.Test;
8+
9+
import tech.ydb.core.Status;
10+
import tech.ydb.core.StatusCode;
11+
import tech.ydb.core.grpc.GrpcReadStream;
12+
import tech.ydb.core.grpc.GrpcRequestSettings;
13+
import tech.ydb.core.grpc.GrpcTransport;
14+
import tech.ydb.core.tracing.NoopTracer;
15+
import tech.ydb.proto.StatusCodesProtos.StatusIds;
16+
import tech.ydb.proto.query.YdbQuery;
17+
import tech.ydb.query.settings.AttachSessionSettings;
18+
19+
import static org.mockito.Mockito.mock;
20+
import static org.mockito.Mockito.when;
21+
22+
/**
23+
* {@link SessionImpl#attach} sets {@link GrpcRequestSettings#getPessimizationHook()} when the attach
24+
* stream delivers {@code NODE_SHUTDOWN} with a non-zero node id. The transport applies pessimization
25+
* on stream completion ({@code postComplete}).
26+
*/
27+
public class SessionImplAttachHintTest {
28+
29+
private static final AttachSessionSettings ATTACH_SETTINGS = AttachSessionSettings.newBuilder().build();
30+
31+
/**
32+
* {@link QueryServiceRpc} reads {@link GrpcTransport#getTracer()} in the constructor; attach tests override
33+
* {@link QueryServiceRpc#attachSession} and do not use the transport otherwise.
34+
*/
35+
private static final GrpcTransport DUMMY_TRANSPORT = mock(GrpcTransport.class);
36+
37+
static {
38+
when(DUMMY_TRANSPORT.getTracer()).thenReturn(NoopTracer.INSTANCE);
39+
}
40+
41+
@Test
42+
public void nodeShutdownHint_setsPessimizationHookWhenNodeIdKnown() {
43+
YdbQuery.CreateSessionResponse createResponse = YdbQuery.CreateSessionResponse.newBuilder()
44+
.setSessionId("s1")
45+
.setNodeId(42)
46+
.build();
47+
48+
YdbQuery.SessionState msg = YdbQuery.SessionState.newBuilder()
49+
.setStatus(StatusIds.StatusCode.SUCCESS)
50+
.setNodeShutdown(YdbQuery.NodeShutdownHint.getDefaultInstance())
51+
.build();
52+
53+
TestRpc rpc = new TestRpc(singleMessageStream(msg));
54+
try (TestSession session = new TestSession(rpc, createResponse)) {
55+
session.attach(ATTACH_SETTINGS).start(s -> {
56+
}).join();
57+
58+
Assert.assertNotNull(rpc.capturedSettings);
59+
Assert.assertNotNull(rpc.capturedSettings.getPessimizationHook());
60+
Assert.assertTrue(rpc.capturedSettings.getPessimizationHook().getAsBoolean());
61+
Assert.assertEquals(StatusCode.BAD_SESSION, session.getLastSessionState().getCode());
62+
}
63+
}
64+
65+
@Test
66+
public void nodeShutdownHint_doesNotPessimizeWhenNodeIdIsZero() {
67+
YdbQuery.CreateSessionResponse createResponse = YdbQuery.CreateSessionResponse.newBuilder()
68+
.setSessionId("s-zero")
69+
.setNodeId(0)
70+
.build();
71+
72+
YdbQuery.SessionState msg = YdbQuery.SessionState.newBuilder()
73+
.setStatus(StatusIds.StatusCode.SUCCESS)
74+
.setNodeShutdown(YdbQuery.NodeShutdownHint.getDefaultInstance())
75+
.build();
76+
77+
TestRpc rpc = new TestRpc(singleMessageStream(msg));
78+
try (TestSession session = new TestSession(rpc, createResponse)) {
79+
session.attach(ATTACH_SETTINGS).start(s -> {
80+
}).join();
81+
82+
Assert.assertNotNull(rpc.capturedSettings);
83+
Assert.assertNotNull(rpc.capturedSettings.getPessimizationHook());
84+
Assert.assertFalse(rpc.capturedSettings.getPessimizationHook().getAsBoolean());
85+
Assert.assertEquals(StatusCode.BAD_SESSION, session.getLastSessionState().getCode());
86+
}
87+
}
88+
89+
@Test
90+
public void sessionShutdownHint_doesNotPessimizeEndpoint() {
91+
YdbQuery.CreateSessionResponse createResponse = YdbQuery.CreateSessionResponse.newBuilder()
92+
.setSessionId("s2")
93+
.setNodeId(99)
94+
.build();
95+
96+
YdbQuery.SessionState msg = YdbQuery.SessionState.newBuilder()
97+
.setStatus(StatusIds.StatusCode.SUCCESS)
98+
.setSessionShutdown(YdbQuery.SessionShutdownHint.getDefaultInstance())
99+
.build();
100+
101+
TestRpc rpc = new TestRpc(singleMessageStream(msg));
102+
try (TestSession session = new TestSession(rpc, createResponse)) {
103+
session.attach(ATTACH_SETTINGS).start(s -> {
104+
}).join();
105+
106+
Assert.assertNotNull(rpc.capturedSettings);
107+
Assert.assertNotNull(rpc.capturedSettings.getPessimizationHook());
108+
Assert.assertFalse(rpc.capturedSettings.getPessimizationHook().getAsBoolean());
109+
Assert.assertEquals(StatusCode.BAD_SESSION, session.getLastSessionState().getCode());
110+
}
111+
}
112+
113+
private static GrpcReadStream<YdbQuery.SessionState> singleMessageStream(YdbQuery.SessionState message) {
114+
return new GrpcReadStream<YdbQuery.SessionState>() {
115+
@Override
116+
public CompletableFuture<Status> start(Observer<YdbQuery.SessionState> observer) {
117+
observer.onNext(message);
118+
return CompletableFuture.completedFuture(Status.SUCCESS);
119+
}
120+
121+
@Override
122+
public void cancel() {
123+
}
124+
};
125+
}
126+
127+
private static final class TestRpc extends QueryServiceRpc {
128+
private final GrpcReadStream<YdbQuery.SessionState> attachStream;
129+
private GrpcRequestSettings capturedSettings;
130+
131+
TestRpc(GrpcReadStream<YdbQuery.SessionState> attachStream) {
132+
super(DUMMY_TRANSPORT);
133+
this.attachStream = attachStream;
134+
}
135+
136+
@Override
137+
public GrpcReadStream<YdbQuery.SessionState> attachSession(
138+
YdbQuery.AttachSessionRequest request,
139+
GrpcRequestSettings settings) {
140+
this.capturedSettings = settings;
141+
return attachStream;
142+
}
143+
}
144+
145+
private static final class TestSession extends SessionImpl {
146+
private final AtomicReference<Status> lastState = new AtomicReference<>();
147+
148+
TestSession(QueryServiceRpc rpc, YdbQuery.CreateSessionResponse response) {
149+
super(rpc, response);
150+
}
151+
152+
@Override
153+
public void updateSessionState(Status status) {
154+
lastState.set(status);
155+
}
156+
157+
Status getLastSessionState() {
158+
return lastState.get();
159+
}
160+
161+
@Override
162+
public void close() {
163+
}
164+
}
165+
}

0 commit comments

Comments
 (0)