Skip to content

Commit 3915d02

Browse files
authored
core: Implement oobChannel with resolvingOobChannel
The most important part of this change is to ensure that CallCredentials are not propagated to the OOB channel. Because the authority of the OOB channel doesn't match the parent channel, we must ensure that any bearer tokens are not sent to the different server. However, this was not a problem because resolvingOobChannel has the same constraint. (RLS has a different constraint, but we were able to let RLS manage that itself.) This commit does change the behavior of channelz, shutdown, and metrics for the OOB channel. Previously the OOB channel was registered with channelz, but it is only a TODO for resolving channel. Channel shutdown no longer shuts down the OOB channel and it no longer waits for the OOB channel to terminate before becoming terminated itself. That is also a pre-existing TODO. Since ManagedChannelImplBuilder is now being used, global configurators and census are enabled. The proper behavior here is still being determined, but we would want it to be the same for resolving OOB channel and OOB channel. The OOB channel used to refresh the name resolution when the subchannel went IDLE or TF. That is an older behavior from back when regular subchannels would also cause the name resolver to refresh. Now-a-days that goes though the LB tree. gRPC-LB already refreshes name resolution when its RPC closes, so no longer doing it automatically should be fine. balancerRpcExecutorPool no longer has its lifetime managed by the child. It'd be easiest to not use it at all from OOB channel, which wouldn't actually change the regular behavior, as channels already use the same executor by default. However, the tests are making use of the executor being injected, so some propagation needs to be preserved. Lots of OOB channel tests were deleted, but these were either testing OobChannel, which is now gone, or things like channelz, which are known to no longer work like before.
1 parent 228fc8e commit 3915d02

File tree

5 files changed

+173
-861
lines changed

5 files changed

+173
-861
lines changed

core/src/main/java/io/grpc/internal/ManagedChannelImpl.java

Lines changed: 47 additions & 106 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@
9595
import io.grpc.internal.RetriableStream.ChannelBufferMeter;
9696
import io.grpc.internal.RetriableStream.Throttle;
9797
import java.net.URI;
98+
import java.net.URISyntaxException;
9899
import java.util.ArrayList;
99100
import java.util.Collection;
100101
import java.util.Collections;
@@ -167,11 +168,9 @@ public Result selectConfig(PickSubchannelArgs args) {
167168
@Nullable
168169
private final ChannelCredentials originalChannelCreds;
169170
private final ClientTransportFactory transportFactory;
170-
private final ClientTransportFactory oobTransportFactory;
171171
private final RestrictedScheduledExecutor scheduledExecutor;
172172
private final Executor executor;
173173
private final ObjectPool<? extends Executor> executorPool;
174-
private final ObjectPool<? extends Executor> balancerRpcExecutorPool;
175174
private final ExecutorHolder balancerRpcExecutorHolder;
176175
private final ExecutorHolder offloadExecutorHolder;
177176
private final TimeProvider timeProvider;
@@ -240,9 +239,6 @@ public void uncaughtException(Thread t, Throwable e) {
240239
private Collection<RealChannel.PendingCall<?, ?>> pendingCalls;
241240
private final Object pendingCallsInUseObject = new Object();
242241

243-
// Must be mutated from syncContext
244-
private final Set<OobChannel> oobChannels = new HashSet<>(1, .75f);
245-
246242
// reprocess() must be run from syncContext
247243
private final DelayedClientTransport delayedTransport;
248244
private final UncommittedRetriableStreamsRegistry uncommittedRetriableStreamsRegistry
@@ -312,9 +308,6 @@ private void maybeShutdownNowSubchannels() {
312308
for (InternalSubchannel subchannel : subchannels) {
313309
subchannel.shutdownNow(SHUTDOWN_NOW_STATUS);
314310
}
315-
for (OobChannel oobChannel : oobChannels) {
316-
oobChannel.getInternalSubchannel().shutdownNow(SHUTDOWN_NOW_STATUS);
317-
}
318311
}
319312
}
320313

@@ -334,7 +327,6 @@ public void run() {
334327
builder.setTarget(target).setState(channelStateManager.getState());
335328
List<InternalWithLogId> children = new ArrayList<>();
336329
children.addAll(subchannels);
337-
children.addAll(oobChannels);
338330
builder.setSubchannels(children);
339331
ret.set(builder.build());
340332
}
@@ -564,8 +556,6 @@ ClientStream newSubstream(
564556
new ExecutorHolder(checkNotNull(builder.offloadExecutorPool, "offloadExecutorPool"));
565557
this.transportFactory = new CallCredentialsApplyingTransportFactory(
566558
clientTransportFactory, builder.callCredentials, this.offloadExecutorHolder);
567-
this.oobTransportFactory = new CallCredentialsApplyingTransportFactory(
568-
clientTransportFactory, null, this.offloadExecutorHolder);
569559
this.scheduledExecutor =
570560
new RestrictedScheduledExecutor(transportFactory.getScheduledExecutorService());
571561
maxTraceEvents = builder.maxTraceEvents;
@@ -604,8 +594,8 @@ ClientStream newSubstream(
604594
this.nameResolverArgs = nameResolverArgsBuilder.build();
605595
this.nameResolver = getNameResolver(
606596
targetUri, authorityOverride, nameResolverProvider, nameResolverArgs);
607-
this.balancerRpcExecutorPool = checkNotNull(balancerRpcExecutorPool, "balancerRpcExecutorPool");
608-
this.balancerRpcExecutorHolder = new ExecutorHolder(balancerRpcExecutorPool);
597+
this.balancerRpcExecutorHolder = new ExecutorHolder(
598+
checkNotNull(balancerRpcExecutorPool, "balancerRpcExecutorPool"));
609599
this.delayedTransport = new DelayedClientTransport(this.executor, this.syncContext);
610600
this.delayedTransport.start(delayedTransportListener);
611601
this.backoffPolicyProvider = backoffPolicyProvider;
@@ -1187,7 +1177,7 @@ private void maybeTerminateChannel() {
11871177
if (terminated) {
11881178
return;
11891179
}
1190-
if (shutdown.get() && subchannels.isEmpty() && oobChannels.isEmpty()) {
1180+
if (shutdown.get() && subchannels.isEmpty()) {
11911181
channelLogger.log(ChannelLogLevel.INFO, "Terminated");
11921182
channelz.removeRootChannel(this);
11931183
executorPool.returnObject(executor);
@@ -1201,13 +1191,6 @@ private void maybeTerminateChannel() {
12011191
}
12021192
}
12031193

1204-
// Must be called from syncContext
1205-
private void handleInternalSubchannelState(ConnectivityStateInfo newState) {
1206-
if (newState.getState() == TRANSIENT_FAILURE || newState.getState() == IDLE) {
1207-
refreshNameResolution();
1208-
}
1209-
}
1210-
12111194
@Override
12121195
public ConnectivityState getState(boolean requestConnection) {
12131196
ConnectivityState savedChannelState = channelStateManager.getState();
@@ -1253,9 +1236,6 @@ public void run() {
12531236
for (InternalSubchannel subchannel : subchannels) {
12541237
subchannel.resetConnectBackoff();
12551238
}
1256-
for (OobChannel oobChannel : oobChannels) {
1257-
oobChannel.resetConnectBackoff();
1258-
}
12591239
}
12601240
}
12611241

@@ -1413,86 +1393,28 @@ public ManagedChannel createOobChannel(EquivalentAddressGroup addressGroup, Stri
14131393
@Override
14141394
public ManagedChannel createOobChannel(List<EquivalentAddressGroup> addressGroup,
14151395
String authority) {
1416-
// TODO(ejona): can we be even stricter? Like terminating?
1417-
checkState(!terminated, "Channel is terminated");
1418-
long oobChannelCreationTime = timeProvider.currentTimeNanos();
1419-
InternalLogId oobLogId = InternalLogId.allocate("OobChannel", /*details=*/ null);
1420-
InternalLogId subchannelLogId =
1421-
InternalLogId.allocate("Subchannel-OOB", /*details=*/ authority);
1422-
ChannelTracer oobChannelTracer =
1423-
new ChannelTracer(
1424-
oobLogId, maxTraceEvents, oobChannelCreationTime,
1425-
"OobChannel for " + addressGroup);
1426-
final OobChannel oobChannel = new OobChannel(
1427-
authority, balancerRpcExecutorPool, oobTransportFactory.getScheduledExecutorService(),
1428-
syncContext, callTracerFactory.create(), oobChannelTracer, channelz, timeProvider);
1429-
channelTracer.reportEvent(new ChannelTrace.Event.Builder()
1430-
.setDescription("Child OobChannel created")
1431-
.setSeverity(ChannelTrace.Event.Severity.CT_INFO)
1432-
.setTimestampNanos(oobChannelCreationTime)
1433-
.setChannelRef(oobChannel)
1434-
.build());
1435-
ChannelTracer subchannelTracer =
1436-
new ChannelTracer(subchannelLogId, maxTraceEvents, oobChannelCreationTime,
1437-
"Subchannel for " + addressGroup);
1438-
ChannelLogger subchannelLogger = new ChannelLoggerImpl(subchannelTracer, timeProvider);
1439-
final class ManagedOobChannelCallback extends InternalSubchannel.Callback {
1440-
@Override
1441-
void onTerminated(InternalSubchannel is) {
1442-
oobChannels.remove(oobChannel);
1443-
channelz.removeSubchannel(is);
1444-
oobChannel.handleSubchannelTerminated();
1445-
maybeTerminateChannel();
1446-
}
1447-
1448-
@Override
1449-
void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) {
1450-
// TODO(chengyuanzhang): change to let LB policies explicitly manage OOB channel's
1451-
// state and refresh name resolution if necessary.
1452-
handleInternalSubchannelState(newState);
1453-
oobChannel.handleSubchannelStateChange(newState);
1454-
}
1455-
}
1456-
1457-
final InternalSubchannel internalSubchannel = new InternalSubchannel(
1458-
CreateSubchannelArgs.newBuilder().setAddresses(addressGroup).build(),
1459-
authority, userAgent, backoffPolicyProvider, oobTransportFactory,
1460-
oobTransportFactory.getScheduledExecutorService(), stopwatchSupplier, syncContext,
1461-
// All callback methods are run from syncContext
1462-
new ManagedOobChannelCallback(),
1463-
channelz,
1464-
callTracerFactory.create(),
1465-
subchannelTracer,
1466-
subchannelLogId,
1467-
subchannelLogger,
1468-
transportFilters,
1469-
target,
1470-
lbHelper.getMetricRecorder());
1471-
oobChannelTracer.reportEvent(new ChannelTrace.Event.Builder()
1472-
.setDescription("Child Subchannel created")
1473-
.setSeverity(ChannelTrace.Event.Severity.CT_INFO)
1474-
.setTimestampNanos(oobChannelCreationTime)
1475-
.setSubchannelRef(internalSubchannel)
1476-
.build());
1477-
channelz.addSubchannel(oobChannel);
1478-
channelz.addSubchannel(internalSubchannel);
1479-
oobChannel.setSubchannel(internalSubchannel);
1480-
final class AddOobChannel implements Runnable {
1481-
@Override
1482-
public void run() {
1483-
if (terminating) {
1484-
oobChannel.shutdown();
1485-
}
1486-
if (!terminated) {
1487-
// If channel has not terminated, it will track the subchannel and block termination
1488-
// for it.
1489-
oobChannels.add(oobChannel);
1490-
}
1491-
}
1492-
}
1493-
1494-
syncContext.execute(new AddOobChannel());
1495-
return oobChannel;
1396+
NameResolverRegistry nameResolverRegistry = new NameResolverRegistry();
1397+
OobNameResolverProvider resolverProvider =
1398+
new OobNameResolverProvider(authority, addressGroup, syncContext);
1399+
nameResolverRegistry.register(resolverProvider);
1400+
// We could use a hard-coded target, as the name resolver won't actually use this string.
1401+
// However, that would make debugging less clear, as we use the target to identify the
1402+
// channel.
1403+
String target;
1404+
try {
1405+
target = new URI("oob", "", "/" + authority, null, null).toString();
1406+
} catch (URISyntaxException ex) {
1407+
// Any special characters in the path will be percent encoded. So this should be impossible.
1408+
throw new AssertionError(ex);
1409+
}
1410+
ManagedChannel delegate = createResolvingOobChannelBuilder(
1411+
target, new DefaultChannelCreds(), nameResolverRegistry)
1412+
// TODO(zdapeng): executors should not outlive the parent channel.
1413+
.executor(balancerRpcExecutorHolder.getExecutor())
1414+
.idleTimeout(Integer.MAX_VALUE, TimeUnit.SECONDS)
1415+
.disableRetry()
1416+
.build();
1417+
return new OobChannel(delegate, resolverProvider);
14961418
}
14971419

14981420
@Deprecated
@@ -1504,11 +1426,17 @@ public ManagedChannelBuilder<?> createResolvingOobChannelBuilder(String target)
15041426
.overrideAuthority(getAuthority());
15051427
}
15061428

1507-
// TODO(creamsoup) prevent main channel to shutdown if oob channel is not terminated
1508-
// TODO(zdapeng) register the channel as a subchannel of the parent channel in channelz.
15091429
@Override
15101430
public ManagedChannelBuilder<?> createResolvingOobChannelBuilder(
15111431
final String target, final ChannelCredentials channelCreds) {
1432+
return createResolvingOobChannelBuilder(target, channelCreds, nameResolverRegistry);
1433+
}
1434+
1435+
// TODO(creamsoup) prevent main channel to shutdown if oob channel is not terminated
1436+
// TODO(zdapeng) register the channel as a subchannel of the parent channel in channelz.
1437+
private ManagedChannelBuilder<?> createResolvingOobChannelBuilder(
1438+
final String target, final ChannelCredentials channelCreds,
1439+
NameResolverRegistry nameResolverRegistry) {
15121440
checkNotNull(channelCreds, "channelCreds");
15131441

15141442
final class ResolvingOobChannelBuilder
@@ -1641,6 +1569,19 @@ public ChannelCredentials withoutBearerTokens() {
16411569
}
16421570
}
16431571

1572+
static final class OobChannel extends ForwardingManagedChannel {
1573+
private final OobNameResolverProvider resolverProvider;
1574+
1575+
public OobChannel(ManagedChannel delegate, OobNameResolverProvider resolverProvider) {
1576+
super(delegate);
1577+
this.resolverProvider = checkNotNull(resolverProvider, "resolverProvider");
1578+
}
1579+
1580+
public void updateAddresses(List<EquivalentAddressGroup> eags) {
1581+
resolverProvider.updateAddresses(eags);
1582+
}
1583+
}
1584+
16441585
final class NameResolverListener extends NameResolver.Listener2 {
16451586
final LbHelperImpl helper;
16461587
final NameResolver resolver;

0 commit comments

Comments
 (0)