Skip to content

Commit 0627bf4

Browse files
committed
Merge remote-tracking branch 'origin/authoritychecktls' into authoritychecktls
2 parents 8576a4e + 4e24446 commit 0627bf4

File tree

35 files changed

+660
-280
lines changed

35 files changed

+660
-280
lines changed

README.md

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,8 @@ For a guided tour, take a look at the [quick start
4444
guide](https://grpc.io/docs/languages/java/quickstart) or the more explanatory [gRPC
4545
basics](https://grpc.io/docs/languages/java/basics).
4646

47-
The [examples](https://github.com/grpc/grpc-java/tree/v1.69.1/examples) and the
48-
[Android example](https://github.com/grpc/grpc-java/tree/v1.69.1/examples/android)
47+
The [examples](https://github.com/grpc/grpc-java/tree/v1.70.0/examples) and the
48+
[Android example](https://github.com/grpc/grpc-java/tree/v1.70.0/examples/android)
4949
are standalone projects that showcase the usage of gRPC.
5050

5151
Download
@@ -56,18 +56,18 @@ Download [the JARs][]. Or for Maven with non-Android, add to your `pom.xml`:
5656
<dependency>
5757
<groupId>io.grpc</groupId>
5858
<artifactId>grpc-netty-shaded</artifactId>
59-
<version>1.69.1</version>
59+
<version>1.70.0</version>
6060
<scope>runtime</scope>
6161
</dependency>
6262
<dependency>
6363
<groupId>io.grpc</groupId>
6464
<artifactId>grpc-protobuf</artifactId>
65-
<version>1.69.1</version>
65+
<version>1.70.0</version>
6666
</dependency>
6767
<dependency>
6868
<groupId>io.grpc</groupId>
6969
<artifactId>grpc-stub</artifactId>
70-
<version>1.69.1</version>
70+
<version>1.70.0</version>
7171
</dependency>
7272
<dependency> <!-- necessary for Java 9+ -->
7373
<groupId>org.apache.tomcat</groupId>
@@ -79,18 +79,18 @@ Download [the JARs][]. Or for Maven with non-Android, add to your `pom.xml`:
7979

8080
Or for Gradle with non-Android, add to your dependencies:
8181
```gradle
82-
runtimeOnly 'io.grpc:grpc-netty-shaded:1.69.1'
83-
implementation 'io.grpc:grpc-protobuf:1.69.1'
84-
implementation 'io.grpc:grpc-stub:1.69.1'
82+
runtimeOnly 'io.grpc:grpc-netty-shaded:1.70.0'
83+
implementation 'io.grpc:grpc-protobuf:1.70.0'
84+
implementation 'io.grpc:grpc-stub:1.70.0'
8585
compileOnly 'org.apache.tomcat:annotations-api:6.0.53' // necessary for Java 9+
8686
```
8787

8888
For Android client, use `grpc-okhttp` instead of `grpc-netty-shaded` and
8989
`grpc-protobuf-lite` instead of `grpc-protobuf`:
9090
```gradle
91-
implementation 'io.grpc:grpc-okhttp:1.69.1'
92-
implementation 'io.grpc:grpc-protobuf-lite:1.69.1'
93-
implementation 'io.grpc:grpc-stub:1.69.1'
91+
implementation 'io.grpc:grpc-okhttp:1.70.0'
92+
implementation 'io.grpc:grpc-protobuf-lite:1.70.0'
93+
implementation 'io.grpc:grpc-stub:1.70.0'
9494
compileOnly 'org.apache.tomcat:annotations-api:6.0.53' // necessary for Java 9+
9595
```
9696

@@ -99,7 +99,7 @@ For [Bazel](https://bazel.build), you can either
9999
(with the GAVs from above), or use `@io_grpc_grpc_java//api` et al (see below).
100100

101101
[the JARs]:
102-
https://search.maven.org/search?q=g:io.grpc%20AND%20v:1.69.1
102+
https://search.maven.org/search?q=g:io.grpc%20AND%20v:1.70.0
103103

104104
Development snapshots are available in [Sonatypes's snapshot
105105
repository](https://oss.sonatype.org/content/repositories/snapshots/).
@@ -131,7 +131,7 @@ For protobuf-based codegen integrated with the Maven build system, you can use
131131
<configuration>
132132
<protocArtifact>com.google.protobuf:protoc:3.25.5:exe:${os.detected.classifier}</protocArtifact>
133133
<pluginId>grpc-java</pluginId>
134-
<pluginArtifact>io.grpc:protoc-gen-grpc-java:1.69.1:exe:${os.detected.classifier}</pluginArtifact>
134+
<pluginArtifact>io.grpc:protoc-gen-grpc-java:1.70.0:exe:${os.detected.classifier}</pluginArtifact>
135135
</configuration>
136136
<executions>
137137
<execution>
@@ -161,7 +161,7 @@ protobuf {
161161
}
162162
plugins {
163163
grpc {
164-
artifact = 'io.grpc:protoc-gen-grpc-java:1.69.1'
164+
artifact = 'io.grpc:protoc-gen-grpc-java:1.70.0'
165165
}
166166
}
167167
generateProtoTasks {
@@ -194,7 +194,7 @@ protobuf {
194194
}
195195
plugins {
196196
grpc {
197-
artifact = 'io.grpc:protoc-gen-grpc-java:1.69.1'
197+
artifact = 'io.grpc:protoc-gen-grpc-java:1.70.0'
198198
}
199199
}
200200
generateProtoTasks {

core/src/main/java/io/grpc/internal/DelayedClientTransport.java

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -140,9 +140,15 @@ public final ClientStream newStream(
140140
ClientTransport transport = GrpcUtil.getTransportFromPickResult(pickResult,
141141
callOptions.isWaitForReady());
142142
if (transport != null) {
143-
return transport.newStream(
143+
ClientStream stream = transport.newStream(
144144
args.getMethodDescriptor(), args.getHeaders(), callOptions,
145145
tracers);
146+
// User code provided authority takes precedence over the LB provided one; this will be
147+
// overwritten by ClientCallImpl if the application sets an authority override
148+
if (pickResult.getAuthorityOverride() != null) {
149+
stream.setAuthority(pickResult.getAuthorityOverride());
150+
}
151+
return stream;
146152
}
147153
}
148154
// This picker's conclusion is "buffer". If there hasn't been a newer picker set (possible
@@ -287,10 +293,6 @@ final void reprocess(@Nullable SubchannelPicker picker) {
287293
for (final PendingStream stream : toProcess) {
288294
PickResult pickResult = picker.pickSubchannel(stream.args);
289295
CallOptions callOptions = stream.args.getCallOptions();
290-
// User code provided authority takes precedence over the LB provided one.
291-
if (callOptions.getAuthority() == null && pickResult.getAuthorityOverride() != null) {
292-
stream.setAuthority(pickResult.getAuthorityOverride());
293-
}
294296
final ClientTransport transport = GrpcUtil.getTransportFromPickResult(pickResult,
295297
callOptions.isWaitForReady());
296298
if (transport != null) {
@@ -301,7 +303,7 @@ final void reprocess(@Nullable SubchannelPicker picker) {
301303
if (callOptions.getExecutor() != null) {
302304
executor = callOptions.getExecutor();
303305
}
304-
Runnable runnable = stream.createRealStream(transport);
306+
Runnable runnable = stream.createRealStream(transport, pickResult.getAuthorityOverride());
305307
if (runnable != null) {
306308
executor.execute(runnable);
307309
}
@@ -354,7 +356,7 @@ private PendingStream(PickSubchannelArgs args, ClientStreamTracer[] tracers) {
354356
}
355357

356358
/** Runnable may be null. */
357-
private Runnable createRealStream(ClientTransport transport) {
359+
private Runnable createRealStream(ClientTransport transport, String authorityOverride) {
358360
ClientStream realStream;
359361
Context origContext = context.attach();
360362
try {
@@ -364,6 +366,13 @@ private Runnable createRealStream(ClientTransport transport) {
364366
} finally {
365367
context.detach(origContext);
366368
}
369+
if (authorityOverride != null) {
370+
// User code provided authority takes precedence over the LB provided one; this will be
371+
// overwritten by an enqueud call from ClientCallImpl if the application sets an authority
372+
// override. We must call the real stream directly because stream.start() has likely already
373+
// been called on the delayed stream.
374+
realStream.setAuthority(authorityOverride);
375+
}
367376
return setStream(realStream);
368377
}
369378

core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java

Lines changed: 40 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -137,38 +137,27 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
137137
final ImmutableList<EquivalentAddressGroup> newImmutableAddressGroups =
138138
ImmutableList.<EquivalentAddressGroup>builder().addAll(cleanServers).build();
139139

140-
if (rawConnectivityState == READY) {
141-
// If the previous ready subchannel exists in new address list,
140+
if (rawConnectivityState == READY || rawConnectivityState == CONNECTING) {
141+
// If the previous ready (or connecting) subchannel exists in new address list,
142142
// keep this connection and don't create new subchannels
143143
SocketAddress previousAddress = addressIndex.getCurrentAddress();
144144
addressIndex.updateGroups(newImmutableAddressGroups);
145145
if (addressIndex.seekTo(previousAddress)) {
146146
SubchannelData subchannelData = subchannels.get(previousAddress);
147147
subchannelData.getSubchannel().updateAddresses(addressIndex.getCurrentEagAsList());
148+
shutdownRemovedAddresses(newImmutableAddressGroups);
148149
return Status.OK;
149150
}
150151
// Previous ready subchannel not in the new list of addresses
151152
} else {
152153
addressIndex.updateGroups(newImmutableAddressGroups);
153154
}
154155

155-
// remove old subchannels that were not in new address list
156-
Set<SocketAddress> oldAddrs = new HashSet<>(subchannels.keySet());
157-
158-
// Flatten the new EAGs addresses
159-
Set<SocketAddress> newAddrs = new HashSet<>();
160-
for (EquivalentAddressGroup endpoint : newImmutableAddressGroups) {
161-
newAddrs.addAll(endpoint.getAddresses());
162-
}
163-
164-
// Shut them down and remove them
165-
for (SocketAddress oldAddr : oldAddrs) {
166-
if (!newAddrs.contains(oldAddr)) {
167-
subchannels.remove(oldAddr).getSubchannel().shutdown();
168-
}
169-
}
156+
// No old addresses means first time through, so we will do an explicit move to CONNECTING
157+
// which is what we implicitly started with
158+
boolean noOldAddrs = shutdownRemovedAddresses(newImmutableAddressGroups);
170159

171-
if (oldAddrs.size() == 0) {
160+
if (noOldAddrs) {
172161
// Make tests happy; they don't properly assume starting in CONNECTING
173162
rawConnectivityState = CONNECTING;
174163
updateBalancingState(CONNECTING, new Picker(PickResult.withNoResult()));
@@ -188,6 +177,31 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
188177
return Status.OK;
189178
}
190179

180+
/**
181+
* Compute the difference between the flattened new addresses and the old addresses that had been
182+
* made into subchannels and then shutdown the matching subchannels.
183+
* @return true if there were no old addresses
184+
*/
185+
private boolean shutdownRemovedAddresses(
186+
ImmutableList<EquivalentAddressGroup> newImmutableAddressGroups) {
187+
188+
Set<SocketAddress> oldAddrs = new HashSet<>(subchannels.keySet());
189+
190+
// Flatten the new EAGs addresses
191+
Set<SocketAddress> newAddrs = new HashSet<>();
192+
for (EquivalentAddressGroup endpoint : newImmutableAddressGroups) {
193+
newAddrs.addAll(endpoint.getAddresses());
194+
}
195+
196+
// Shut them down and remove them
197+
for (SocketAddress oldAddr : oldAddrs) {
198+
if (!newAddrs.contains(oldAddr)) {
199+
subchannels.remove(oldAddr).getSubchannel().shutdown();
200+
}
201+
}
202+
return oldAddrs.isEmpty();
203+
}
204+
191205
private static List<EquivalentAddressGroup> deDupAddresses(List<EquivalentAddressGroup> groups) {
192206
Set<SocketAddress> seenAddresses = new HashSet<>();
193207
List<EquivalentAddressGroup> newGroups = new ArrayList<>();
@@ -290,7 +304,14 @@ void processSubchannelState(SubchannelData subchannelData, ConnectivityStateInfo
290304
cancelScheduleTask();
291305
requestConnection(); // is recursive so might hit the end of the addresses
292306
} else {
293-
scheduleBackoff();
307+
if (subchannels.size() >= addressIndex.size()) {
308+
scheduleBackoff();
309+
} else {
310+
// We must have done a seek to the middle of the list lets start over from the
311+
// beginning
312+
addressIndex.reset();
313+
requestConnection();
314+
}
294315
}
295316
}
296317

core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java

Lines changed: 20 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -503,26 +503,11 @@ public void uncaughtException(Thread t, Throwable e) {
503503
}
504504

505505
@Test
506-
public void reprocess_authorityOverridePresentInCallOptions_authorityOverrideFromLbIsIgnored() {
507-
DelayedStream delayedStream = (DelayedStream) delayedTransport.newStream(
508-
method, headers, callOptions, tracers);
509-
delayedStream.start(mock(ClientStreamListener.class));
510-
SubchannelPicker picker = mock(SubchannelPicker.class);
511-
PickResult pickResult = PickResult.withSubchannel(
512-
mockSubchannel, null, "authority-override-hostname-from-lb");
513-
when(picker.pickSubchannel(any(PickSubchannelArgs.class))).thenReturn(pickResult);
514-
515-
delayedTransport.reprocess(picker);
516-
fakeExecutor.runDueTasks();
517-
518-
verify(mockRealStream, never()).setAuthority("authority-override-hostname-from-lb");
519-
}
520-
521-
@Test
522-
public void
523-
reprocess_authorityOverrideNotInCallOptions_authorityOverrideFromLbIsSetIntoStream() {
506+
public void reprocess_authorityOverrideFromLb() {
507+
InOrder inOrder = inOrder(mockRealStream);
524508
DelayedStream delayedStream = (DelayedStream) delayedTransport.newStream(
525509
method, headers, callOptions.withAuthority(null), tracers);
510+
delayedStream.setAuthority("authority-override-from-calloptions");
526511
delayedStream.start(mock(ClientStreamListener.class));
527512
SubchannelPicker picker = mock(SubchannelPicker.class);
528513
PickResult pickResult = PickResult.withSubchannel(
@@ -536,7 +521,10 @@ public void reprocess_authorityOverridePresentInCallOptions_authorityOverrideFro
536521
delayedTransport.reprocess(picker);
537522
fakeExecutor.runDueTasks();
538523

539-
verify(mockRealStream).setAuthority("authority-override-hostname-from-lb");
524+
// Must be set before start(), and may be overwritten
525+
inOrder.verify(mockRealStream).setAuthority("authority-override-hostname-from-lb");
526+
inOrder.verify(mockRealStream).setAuthority("authority-override-from-calloptions");
527+
inOrder.verify(mockRealStream).start(any(ClientStreamListener.class));
540528
}
541529

542530
@Test
@@ -563,28 +551,26 @@ public void reprocess_NoPendingStream() {
563551
}
564552

565553
@Test
566-
public void newStream_assignsTransport_authorityFromCallOptionsSupersedesAuthorityFromLB() {
554+
public void newStream_authorityOverrideFromLb() {
555+
InOrder inOrder = inOrder(mockRealStream);
567556
SubchannelPicker picker = mock(SubchannelPicker.class);
568-
AbstractSubchannel subchannel = mock(AbstractSubchannel.class);
569-
when(subchannel.getInternalSubchannel()).thenReturn(mockInternalSubchannel);
570557
PickResult pickResult = PickResult.withSubchannel(
571-
subchannel, null, "authority-override-hostname-from-lb");
558+
mockSubchannel, null, "authority-override-hostname-from-lb");
572559
when(picker.pickSubchannel(any(PickSubchannelArgs.class))).thenReturn(pickResult);
573-
ArgumentCaptor<CallOptions> callOptionsArgumentCaptor =
574-
ArgumentCaptor.forClass(CallOptions.class);
575560
when(mockRealTransport.newStream(
576-
any(MethodDescriptor.class), any(Metadata.class), callOptionsArgumentCaptor.capture(),
577-
ArgumentMatchers.<ClientStreamTracer[]>any()))
561+
any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class), any()))
578562
.thenReturn(mockRealStream);
579563
delayedTransport.reprocess(picker);
580-
verifyNoMoreInteractions(picker);
581-
verifyNoMoreInteractions(transportListener);
582564

583-
CallOptions callOptions =
584-
CallOptions.DEFAULT.withAuthority("authority-override-hosstname-from-calloptions");
585-
delayedTransport.newStream(method, headers, callOptions, tracers);
586-
assertThat(callOptionsArgumentCaptor.getValue().getAuthority()).isEqualTo(
587-
"authority-override-hosstname-from-calloptions");
565+
ClientStream stream = delayedTransport.newStream(method, headers, callOptions, tracers);
566+
assertThat(stream).isSameInstanceAs(mockRealStream);
567+
stream.setAuthority("authority-override-from-calloptions");
568+
stream.start(mock(ClientStreamListener.class));
569+
570+
// Must be set before start(), and may be overwritten
571+
inOrder.verify(mockRealStream).setAuthority("authority-override-hostname-from-lb");
572+
inOrder.verify(mockRealStream).setAuthority("authority-override-from-calloptions");
573+
inOrder.verify(mockRealStream).start(any(ClientStreamListener.class));
588574
}
589575

590576
@Test

core/src/test/java/io/grpc/internal/PickFirstLeafLoadBalancerTest.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2133,18 +2133,20 @@ public void lastAddressFailingNotTransientFailure() {
21332133
loadBalancer.acceptResolvedAddresses(
21342134
ResolvedAddresses.newBuilder().setAddresses(newServers).setAttributes(affinity).build());
21352135

2136-
// Verify that no new subchannels were created or started
2136+
// Subchannel 2 should be reused since it was trying to connect and is present.
21372137
inOrder.verify(mockSubchannel1).shutdown();
2138-
inOrder.verify(mockSubchannel3).start(stateListenerCaptor.capture());
2139-
SubchannelStateListener stateListener3 = stateListenerCaptor.getValue();
2140-
inOrder.verify(mockSubchannel3).requestConnection();
2138+
inOrder.verify(mockSubchannel3, never()).start(stateListenerCaptor.capture());
21412139
assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState());
21422140

2143-
// Second address connection attempt is unsuccessful, but should not go into transient failure
2141+
// Second address connection attempt is unsuccessful, so since at end, but don't have all
2142+
// subchannels, schedule a backoff for the first address
21442143
stateListener2.onSubchannelState(ConnectivityStateInfo.forTransientFailure(CONNECTION_ERROR));
2144+
fakeClock.forwardTime(1, TimeUnit.SECONDS);
2145+
inOrder.verify(mockSubchannel3).start(stateListenerCaptor.capture());
2146+
SubchannelStateListener stateListener3 = stateListenerCaptor.getValue();
21452147
assertEquals(CONNECTING, loadBalancer.getConcludedConnectivityState());
21462148

2147-
// Third address connection attempt is unsuccessful, now we enter transient failure
2149+
// Third address connection attempt is unsuccessful, now we enter TF, do name resolution
21482150
stateListener3.onSubchannelState(ConnectivityStateInfo.forTransientFailure(CONNECTION_ERROR));
21492151
assertEquals(TRANSIENT_FAILURE, loadBalancer.getConcludedConnectivityState());
21502152

0 commit comments

Comments
 (0)