Skip to content

Commit 20d60dd

Browse files
committed
Merge remote-tracking branch 'origin/master'
2 parents aa31fe7 + 8021727 commit 20d60dd

File tree

24 files changed

+318
-71
lines changed

24 files changed

+318
-71
lines changed

MAINTAINERS.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,6 @@ for general contribution guidelines.
1111
- [ejona86](https://github.com/ejona86), Google LLC
1212
- [jdcormie](https://github.com/jdcormie), Google LLC
1313
- [kannanjgithub](https://github.com/kannanjgithub), Google LLC
14-
- [larry-safran](https://github.com/larry-safran), Google LLC
15-
- [markb74](https://github.com/markb74), Google LLC
1614
- [ran-su](https://github.com/ran-su), Google LLC
1715
- [sergiitk](https://github.com/sergiitk), Google LLC
1816
- [temawi](https://github.com/temawi), Google LLC
@@ -26,7 +24,9 @@ for general contribution guidelines.
2624
- [ericgribkoff](https://github.com/ericgribkoff)
2725
- [jiangtaoli2016](https://github.com/jiangtaoli2016)
2826
- [jtattermusch](https://github.com/jtattermusch)
27+
- [larry-safran](https://github.com/larry-safran)
2928
- [louiscryan](https://github.com/louiscryan)
29+
- [markb74](https://github.com/markb74)
3030
- [nicolasnoble](https://github.com/nicolasnoble)
3131
- [nmittler](https://github.com/nmittler)
3232
- [sanjaypujare](https://github.com/sanjaypujare)

api/src/main/java/io/grpc/LoadBalancer.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1189,6 +1189,10 @@ public void ignoreRefreshNameResolutionCheck() {
11891189
* Returns a {@link SynchronizationContext} that runs tasks in the same Synchronization Context
11901190
* as that the callback methods on the {@link LoadBalancer} interface are run in.
11911191
*
1192+
* <p>Work added to the synchronization context might not run immediately, so LB implementations
1193+
* must be careful to ensure that any assumptions still hold when it is executed. In particular,
1194+
* the LB might have been shut down or subchannels might have changed state.
1195+
*
11921196
* <p>Pro-tip: in order to call {@link SynchronizationContext#schedule}, you need to provide a
11931197
* {@link ScheduledExecutorService}. {@link #getScheduledExecutorService} is provided for your
11941198
* convenience.

api/src/main/java/io/grpc/Metadata.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222

2323
import com.google.common.annotations.VisibleForTesting;
2424
import com.google.common.base.Preconditions;
25+
import com.google.common.collect.Maps;
26+
import com.google.common.collect.Sets;
2527
import com.google.common.io.BaseEncoding;
2628
import com.google.common.io.ByteStreams;
2729
import java.io.ByteArrayInputStream;
@@ -32,8 +34,6 @@
3234
import java.util.Arrays;
3335
import java.util.BitSet;
3436
import java.util.Collections;
35-
import java.util.HashMap;
36-
import java.util.HashSet;
3737
import java.util.Iterator;
3838
import java.util.List;
3939
import java.util.Locale;
@@ -325,7 +325,7 @@ public Set<String> keys() {
325325
if (isEmpty()) {
326326
return Collections.emptySet();
327327
}
328-
Set<String> ks = new HashSet<>(size);
328+
Set<String> ks = Sets.newHashSetWithExpectedSize(size);
329329
for (int i = 0; i < size; i++) {
330330
ks.add(new String(name(i), 0 /* hibyte */));
331331
}
@@ -526,7 +526,7 @@ public void merge(Metadata other) {
526526
public void merge(Metadata other, Set<Key<?>> keys) {
527527
Preconditions.checkNotNull(other, "other");
528528
// Use ByteBuffer for equals and hashCode.
529-
Map<ByteBuffer, Key<?>> asciiKeys = new HashMap<>(keys.size());
529+
Map<ByteBuffer, Key<?>> asciiKeys = Maps.newHashMapWithExpectedSize(keys.size());
530530
for (Key<?> key : keys) {
531531
asciiKeys.put(ByteBuffer.wrap(key.asciiName()), key);
532532
}

api/src/main/java/io/grpc/NameResolver.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,9 @@ public final void onAddresses(
239239
* {@link ResolutionResult#getAddressesOrError()} is empty, {@link #onError(Status)} will be
240240
* called.
241241
*
242+
* <p>Newer NameResolver implementations should prefer calling onResult2. This method exists to
243+
* facilitate older {@link Listener} implementations to migrate to {@link Listener2}.
244+
*
242245
* @param resolutionResult the resolved server addresses, attributes, and Service Config.
243246
* @since 1.21.0
244247
*/
@@ -248,16 +251,25 @@ public final void onAddresses(
248251
* Handles a name resolving error from the resolver. The listener is responsible for eventually
249252
* invoking {@link NameResolver#refresh()} to re-attempt resolution.
250253
*
254+
* <p>New NameResolver implementations should prefer calling onResult2 which will have the
255+
* address resolution error in {@link ResolutionResult}'s addressesOrError. This method exists
256+
* to facilitate older implementations using {@link Listener} to migrate to {@link Listener2}.
257+
*
251258
* @param error a non-OK status
252259
* @since 1.21.0
253260
*/
254261
@Override
255262
public abstract void onError(Status error);
256263

257264
/**
258-
* Handles updates on resolved addresses and attributes.
265+
* Handles updates on resolved addresses and attributes. Must be called from the same
266+
* {@link SynchronizationContext} available in {@link NameResolver.Args} that is passed
267+
* from the channel.
259268
*
260-
* @param resolutionResult the resolved server addresses, attributes, and Service Config.
269+
* @param resolutionResult the resolved server addresses or error in address resolution,
270+
* attributes, and Service Config or error
271+
* @return status indicating whether the resolutionResult was accepted by the listener,
272+
* typically the result from a load balancer.
261273
* @since 1.66
262274
*/
263275
public Status onResult2(ResolutionResult resolutionResult) {

core/src/main/java/io/grpc/internal/ManagedChannelImpl.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1967,6 +1967,9 @@ public void run() {
19671967
public void requestConnection() {
19681968
syncContext.throwIfNotInThisSynchronizationContext();
19691969
checkState(started, "not started");
1970+
if (shutdown) {
1971+
return;
1972+
}
19701973
subchannel.obtainActiveTransport();
19711974
}
19721975

core/src/main/java/io/grpc/internal/PickFirstLoadBalancer.java

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ private void processSubchannelState(Subchannel subchannel, ConnectivityStateInfo
134134
SubchannelPicker picker;
135135
switch (newState) {
136136
case IDLE:
137-
picker = new RequestConnectionPicker(subchannel);
137+
picker = new RequestConnectionPicker();
138138
break;
139139
case CONNECTING:
140140
// It's safe to use RequestConnectionPicker here, so when coming from IDLE we could leave
@@ -197,22 +197,12 @@ public String toString() {
197197

198198
/** Picker that requests connection during the first pick, and returns noResult. */
199199
private final class RequestConnectionPicker extends SubchannelPicker {
200-
private final Subchannel subchannel;
201200
private final AtomicBoolean connectionRequested = new AtomicBoolean(false);
202201

203-
RequestConnectionPicker(Subchannel subchannel) {
204-
this.subchannel = checkNotNull(subchannel, "subchannel");
205-
}
206-
207202
@Override
208203
public PickResult pickSubchannel(PickSubchannelArgs args) {
209204
if (connectionRequested.compareAndSet(false, true)) {
210-
helper.getSynchronizationContext().execute(new Runnable() {
211-
@Override
212-
public void run() {
213-
subchannel.requestConnection();
214-
}
215-
});
205+
helper.getSynchronizationContext().execute(PickFirstLoadBalancer.this::requestConnection);
216206
}
217207
return PickResult.withNoResult();
218208
}

core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1767,6 +1767,19 @@ public void subchannelsNoConnectionShutdown() {
17671767
any(SocketAddress.class), any(ClientTransportOptions.class), any(ChannelLogger.class));
17681768
}
17691769

1770+
@Test
1771+
public void subchannelsRequestConnectionNoopAfterShutdown() {
1772+
createChannel();
1773+
Subchannel sub1 =
1774+
createSubchannelSafely(helper, addressGroup, Attributes.EMPTY, subchannelStateListener);
1775+
1776+
shutdownSafely(helper, sub1);
1777+
requestConnectionSafely(helper, sub1);
1778+
verify(mockTransportFactory, never())
1779+
.newClientTransport(
1780+
any(SocketAddress.class), any(ClientTransportOptions.class), any(ChannelLogger.class));
1781+
}
1782+
17701783
@Test
17711784
public void subchannelsNoConnectionShutdownNow() {
17721785
createChannel();

examples/src/main/java/io/grpc/examples/customloadbalance/ShufflingPickFirstLoadBalancer.java

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ private void processSubchannelState(Subchannel subchannel, ConnectivityStateInfo
122122
SubchannelPicker picker;
123123
switch (currentState) {
124124
case IDLE:
125-
picker = new RequestConnectionPicker(subchannel);
125+
picker = new RequestConnectionPicker();
126126
break;
127127
case CONNECTING:
128128
picker = new Picker(PickResult.withNoResult());
@@ -182,24 +182,15 @@ public String toString() {
182182
*/
183183
private final class RequestConnectionPicker extends SubchannelPicker {
184184

185-
private final Subchannel subchannel;
186185
private final AtomicBoolean connectionRequested = new AtomicBoolean(false);
187186

188-
RequestConnectionPicker(Subchannel subchannel) {
189-
this.subchannel = checkNotNull(subchannel, "subchannel");
190-
}
191-
192187
@Override
193188
public PickResult pickSubchannel(PickSubchannelArgs args) {
194189
if (connectionRequested.compareAndSet(false, true)) {
195-
helper.getSynchronizationContext().execute(new Runnable() {
196-
@Override
197-
public void run() {
198-
subchannel.requestConnection();
199-
}
200-
});
190+
helper.getSynchronizationContext().execute(
191+
ShufflingPickFirstLoadBalancer.this::requestConnection);
201192
}
202193
return PickResult.withNoResult();
203194
}
204195
}
205-
}
196+
}

netty/src/main/java/io/grpc/netty/NettyClientHandler.java

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -738,14 +738,19 @@ public void operationComplete(ChannelFuture future) throws Exception {
738738

739739
// Attach the client stream to the HTTP/2 stream object as user data.
740740
stream.setHttp2Stream(http2Stream);
741+
promise.setSuccess();
742+
} else {
743+
// Otherwise, the stream has been cancelled and Netty is sending a
744+
// RST_STREAM frame which causes it to purge pending writes from the
745+
// flow-controller and delete the http2Stream. The stream listener has already
746+
// been notified of cancellation so there is nothing to do.
747+
//
748+
// This process has been observed to fail in some circumstances, leaving listeners
749+
// unanswered. Ensure that some exception has been delivered consistent with the
750+
// implied RST_STREAM result above.
751+
Status status = Status.INTERNAL.withDescription("unknown stream for connection");
752+
promise.setFailure(status.asRuntimeException());
741753
}
742-
// Otherwise, the stream has been cancelled and Netty is sending a
743-
// RST_STREAM frame which causes it to purge pending writes from the
744-
// flow-controller and delete the http2Stream. The stream listener has already
745-
// been notified of cancellation so there is nothing to do.
746-
747-
// Just forward on the success status to the original promise.
748-
promise.setSuccess();
749754
} else {
750755
Throwable cause = future.cause();
751756
if (cause instanceof StreamBufferingEncoder.Http2GoAwayException) {
@@ -768,6 +773,19 @@ public void operationComplete(ChannelFuture future) throws Exception {
768773
}
769774
}
770775
});
776+
// When the HEADERS are not buffered because of MAX_CONCURRENT_STREAMS in
777+
// StreamBufferingEncoder, the stream is created immediately even if the bytes of the HEADERS
778+
// are delayed because the OS may have too much buffered and isn't accepting the write. The
779+
// write promise is also delayed until flush(). However, we need to associate the netty stream
780+
// with the transport state so that goingAway() and forcefulClose() and able to notify the
781+
// stream of failures.
782+
//
783+
// This leaves a hole when MAX_CONCURRENT_STREAMS is reached, as http2Stream will be null, but
784+
// it is better than nothing.
785+
Http2Stream http2Stream = connection().stream(streamId);
786+
if (http2Stream != null) {
787+
http2Stream.setProperty(streamKey, stream);
788+
}
771789
}
772790

773791
/**

netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,7 @@ public void cancelBufferedStreamShouldChangeClientStreamStatus() throws Exceptio
268268
// Cancel the stream.
269269
cancelStream(Status.CANCELLED);
270270

271-
assertTrue(createFuture.isSuccess());
271+
assertFalse(createFuture.isSuccess());
272272
verify(streamListener).closed(eq(Status.CANCELLED), same(PROCESSED), any(Metadata.class));
273273
}
274274

@@ -311,7 +311,7 @@ public void cancelWhileBufferedShouldSucceed() throws Exception {
311311
ChannelFuture cancelFuture = cancelStream(Status.CANCELLED);
312312
assertTrue(cancelFuture.isSuccess());
313313
assertTrue(createFuture.isDone());
314-
assertTrue(createFuture.isSuccess());
314+
assertFalse(createFuture.isSuccess());
315315
}
316316

317317
/**
@@ -453,6 +453,26 @@ public void receivedAbruptGoAwayShouldFailRacingQueuedStreamid() throws Exceptio
453453
assertTrue(future.isDone());
454454
}
455455

456+
@Test
457+
public void receivedAbruptGoAwayShouldFailRacingQueuedIoStreamid() throws Exception {
458+
// Purposefully avoid flush(), since we want the write to not actually complete.
459+
// EmbeddedChannel doesn't support flow control, so this is the next closest approximation.
460+
ChannelFuture future = channel().write(
461+
newCreateStreamCommand(grpcHeaders, streamTransportState));
462+
// Read a GOAWAY that indicates our stream can't be sent
463+
channelRead(goAwayFrame(0, 0 /* NO_ERROR */, Unpooled.copiedBuffer("this is a test", UTF_8)));
464+
465+
ArgumentCaptor<Status> captor = ArgumentCaptor.forClass(Status.class);
466+
verify(streamListener).closed(captor.capture(), same(REFUSED),
467+
ArgumentMatchers.<Metadata>notNull());
468+
assertEquals(Status.UNAVAILABLE.getCode(), captor.getValue().getCode());
469+
assertEquals(
470+
"Abrupt GOAWAY closed sent stream. HTTP/2 error code: NO_ERROR, "
471+
+ "debug data: this is a test",
472+
captor.getValue().getDescription());
473+
assertTrue(future.isDone());
474+
}
475+
456476
@Test
457477
public void receivedGoAway_shouldFailBufferedStreamsExceedingMaxConcurrentStreams()
458478
throws Exception {

0 commit comments

Comments
 (0)