Skip to content

Commit df366dd

Browse files
committed
changes in rerprocess as well.
1 parent 90aefb2 commit df366dd

File tree

4 files changed

+57
-8
lines changed

4 files changed

+57
-8
lines changed

core/src/main/java/io/grpc/internal/DelayedClientTransport.java

Lines changed: 35 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,19 +20,22 @@
2020
import com.google.common.util.concurrent.ListenableFuture;
2121
import com.google.common.util.concurrent.SettableFuture;
2222
import com.google.errorprone.annotations.concurrent.GuardedBy;
23+
import io.grpc.Attributes.Builder;
2324
import io.grpc.CallOptions;
2425
import io.grpc.ClientStreamTracer;
2526
import io.grpc.Context;
2627
import io.grpc.InternalChannelz.SocketStats;
2728
import io.grpc.InternalLogId;
2829
import io.grpc.LoadBalancer.PickResult;
2930
import io.grpc.LoadBalancer.PickSubchannelArgs;
31+
import io.grpc.LoadBalancer.Subchannel;
3032
import io.grpc.LoadBalancer.SubchannelPicker;
3133
import io.grpc.Metadata;
3234
import io.grpc.MethodDescriptor;
3335
import io.grpc.Status;
3436
import io.grpc.SynchronizationContext;
3537
import io.grpc.internal.ClientStreamListener.RpcProgress;
38+
import io.grpc.internal.InternalSubchannel.SubchannelStatusListener;
3639
import java.util.ArrayList;
3740
import java.util.Collection;
3841
import java.util.Collections;
@@ -129,6 +132,7 @@ public final ClientStream newStream(
129132
if (state.shutdownStatus != null) {
130133
return new FailingClientStream(state.shutdownStatus, tracers);
131134
}
135+
InternalSubchannel internalSubchannel = null;
132136
if (state.lastPicker != null) {
133137
PickResult pickResult = state.lastPicker.pickSubchannel(args);
134138
callOptions = args.getCallOptions();
@@ -149,14 +153,21 @@ public final ClientStream newStream(
149153
stream.setAuthority(pickResult.getAuthorityOverride());
150154
}
151155
return stream;
156+
} else {
157+
if (args.getCallOptions().isWaitForReady()) {
158+
Subchannel subchannel = pickResult.getSubchannel();
159+
if (subchannel != null) {
160+
internalSubchannel = (InternalSubchannel) subchannel.getInternalSubchannel();
161+
}
162+
}
152163
}
153164
}
154165
// This picker's conclusion is "buffer". If there hasn't been a newer picker set (possible
155166
// race with reprocess()), we will buffer the RPC. Otherwise, will try with the new picker.
156167
synchronized (lock) {
157168
PickerState newerState = pickerState;
158169
if (state == newerState) {
159-
return createPendingStream(args, tracers);
170+
return createPendingStream(args, tracers, internalSubchannel);
160171
}
161172
state = newerState;
162173
}
@@ -172,8 +183,8 @@ public final ClientStream newStream(
172183
*/
173184
@GuardedBy("lock")
174185
private PendingStream createPendingStream(
175-
PickSubchannelArgs args, ClientStreamTracer[] tracers) {
176-
PendingStream pendingStream = new PendingStream(args, tracers);
186+
PickSubchannelArgs args, ClientStreamTracer[] tracers, InternalSubchannel internalSubchannel) {
187+
PendingStream pendingStream = new PendingStream(args, tracers, internalSubchannel);
177188
pendingStreams.add(pendingStream);
178189
if (getPendingStreamsCount() == 1) {
179190
syncContext.executeLater(reportTransportInUse);
@@ -308,7 +319,13 @@ final void reprocess(@Nullable SubchannelPicker picker) {
308319
executor.execute(runnable);
309320
}
310321
toRemove.add(stream);
311-
} // else: stay pending
322+
} else { // else: stay pending
323+
Subchannel subchannel = pickResult.getSubchannel();
324+
if (subchannel != null) {
325+
InternalSubchannel internalSubchannel = (InternalSubchannel) subchannel.getInternalSubchannel();
326+
internalSubchannel.addSubchannelStatusListener(stream);
327+
}
328+
}
312329
}
313330

314331
synchronized (lock) {
@@ -345,14 +362,18 @@ public InternalLogId getLogId() {
345362
return logId;
346363
}
347364

348-
private class PendingStream extends DelayedStream {
365+
private class PendingStream extends DelayedStream implements SubchannelStatusListener {
349366
private final PickSubchannelArgs args;
350367
private final Context context = Context.current();
351368
private final ClientStreamTracer[] tracers;
369+
private Status lastTransportStatus;
352370

353-
private PendingStream(PickSubchannelArgs args, ClientStreamTracer[] tracers) {
371+
private PendingStream(PickSubchannelArgs args, ClientStreamTracer[] tracers, InternalSubchannel internalSubchannel) {
354372
this.args = args;
355373
this.tracers = tracers;
374+
if (internalSubchannel != null) {
375+
internalSubchannel.addSubchannelStatusListener(this);
376+
}
356377
}
357378

358379
/** Runnable may be null. */
@@ -406,8 +427,16 @@ public void appendTimeoutInsight(InsightBuilder insight) {
406427
if (args.getCallOptions().isWaitForReady()) {
407428
insight.append("wait_for_ready");
408429
}
430+
if (lastTransportStatus != null && !lastTransportStatus.isOk()) {
431+
insight.appendKeyValue("Last Transport Status", lastTransportStatus);
432+
}
409433
super.appendTimeoutInsight(insight);
410434
}
435+
436+
@Override
437+
public void updateState(Status s) {
438+
this.lastTransportStatus = s;
439+
}
411440
}
412441

413442
static final class PickerState {

core/src/main/java/io/grpc/internal/InternalSubchannel.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ final class InternalSubchannel implements InternalInstrumented<ChannelStats>, Tr
8181
private final boolean reconnectDisabled;
8282

8383
private final List<ClientTransportFilter> transportFilters;
84+
private final List<SubchannelStatusListener> subchannelStatusListeners = new ArrayList<>();
8485

8586
/**
8687
* All field must be mutated in the syncContext.
@@ -160,6 +161,7 @@ protected void handleNotInUse() {
160161
private Status shutdownReason;
161162

162163
private volatile Attributes connectedAddressAttributes;
164+
private Status transportStatus;
163165

164166
InternalSubchannel(LoadBalancer.CreateSubchannelArgs args, String authority, String userAgent,
165167
BackoffPolicy.Provider backoffPolicyProvider,
@@ -194,10 +196,18 @@ protected void handleNotInUse() {
194196
this.reconnectDisabled = args.getOption(LoadBalancer.DISABLE_SUBCHANNEL_RECONNECT_KEY);
195197
}
196198

199+
Status getTransportStatus() {
200+
return transportStatus;
201+
}
202+
197203
ChannelLogger getChannelLogger() {
198204
return channelLogger;
199205
}
200206

207+
void addSubchannelStatusListener(SubchannelStatusListener subchannelStatusListener) {
208+
subchannelStatusListeners.add(subchannelStatusListener);
209+
}
210+
201211
@Override
202212
public ClientTransport obtainActiveTransport() {
203213
ClientTransport savedTransport = activeTransport;
@@ -605,12 +615,16 @@ public void transportInUse(boolean inUse) {
605615

606616
@Override
607617
public void transportShutdown(final Status s) {
618+
transportStatus = s;
608619
channelLogger.log(
609620
ChannelLogLevel.INFO, "{0} SHUTDOWN with {1}", transport.getLogId(), printShortStatus(s));
610621
shutdownInitiated = true;
611622
syncContext.execute(new Runnable() {
612623
@Override
613624
public void run() {
625+
for (SubchannelStatusListener subchannelStatusListener: subchannelStatusListeners) {
626+
subchannelStatusListener.updateState(s);
627+
}
614628
if (state.getState() == SHUTDOWN) {
615629
return;
616630
}
@@ -660,6 +674,10 @@ public void run() {
660674
}
661675
}
662676

677+
interface SubchannelStatusListener {
678+
void updateState(Status s);
679+
}
680+
663681
// All methods are called in syncContext
664682
abstract static class Callback {
665683
/**

examples/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ dependencies {
4545
protobuf {
4646
protoc { artifact = "com.google.protobuf:protoc:${protocVersion}" }
4747
plugins {
48-
grpc { artifact = "io.grpc:protoc-gen-grpc-java:${grpcVersion}" }
48+
grpc { artifact = "io.grpc:protoc-gen-grpc-java:1.70.0" }
4949
}
5050
generateProtoTasks {
5151
all()*.plugins { grpc {} }

examples/src/main/java/io/grpc/examples/helloworld/HelloWorldClient.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package io.grpc.examples.helloworld;
1818

19+
import static io.grpc.examples.helloworld.GreeterGrpc.getSayHelloMethod;
20+
1921
import io.grpc.Channel;
2022
import io.grpc.Grpc;
2123
import io.grpc.InsecureChannelCredentials;
@@ -48,7 +50,7 @@ public void greet(String name) {
4850
HelloRequest request = HelloRequest.newBuilder().setName(name).build();
4951
HelloReply response;
5052
try {
51-
response = blockingStub.sayHello(request);
53+
response = blockingStub.withWaitForReady().withDeadline(io.grpc.Deadline.after(10, TimeUnit.SECONDS)).sayHello(request);
5254
} catch (StatusRuntimeException e) {
5355
logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
5456
return;

0 commit comments

Comments
 (0)