Skip to content

Commit 4a41c19

Browse files
committed
ClusterResolverLoadBalancer handle update for resolved address or error via the same listener callback with a ResolutionResult
1 parent d60e6fc commit 4a41c19

File tree

2 files changed

+190
-71
lines changed

2 files changed

+190
-71
lines changed

xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java

Lines changed: 70 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import io.grpc.NameResolver;
3434
import io.grpc.NameResolver.ResolutionResult;
3535
import io.grpc.Status;
36+
import io.grpc.StatusOr;
3637
import io.grpc.SynchronizationContext;
3738
import io.grpc.SynchronizationContext.ScheduledHandle;
3839
import io.grpc.internal.BackoffPolicy;
@@ -657,79 +658,84 @@ private class NameResolverListener extends NameResolver.Listener2 {
657658

658659
@Override
659660
public void onResult(final ResolutionResult resolutionResult) {
660-
class NameResolved implements Runnable {
661-
@Override
662-
public void run() {
663-
if (shutdown) {
664-
return;
665-
}
666-
backoffPolicy = null; // reset backoff sequence if succeeded
667-
// Arbitrary priority notation for all DNS-resolved endpoints.
668-
String priorityName = priorityName(name, 0); // value doesn't matter
669-
List<EquivalentAddressGroup> addresses = new ArrayList<>();
670-
for (EquivalentAddressGroup eag : resolutionResult.getAddresses()) {
671-
// No weight attribute is attached, all endpoint-level LB policy should be able
672-
// to handle such it.
673-
String localityName = localityName(LOGICAL_DNS_CLUSTER_LOCALITY);
674-
Attributes attr = eag.getAttributes().toBuilder()
675-
.set(XdsAttributes.ATTR_LOCALITY, LOGICAL_DNS_CLUSTER_LOCALITY)
676-
.set(XdsAttributes.ATTR_LOCALITY_NAME, localityName)
677-
.set(XdsAttributes.ATTR_ADDRESS_NAME, dnsHostName)
678-
.build();
679-
eag = new EquivalentAddressGroup(eag.getAddresses(), attr);
680-
eag = AddressFilter.setPathFilter(eag, Arrays.asList(priorityName, localityName));
681-
addresses.add(eag);
682-
}
683-
PriorityChildConfig priorityChildConfig = generateDnsBasedPriorityChildConfig(
684-
name, lrsServerInfo, maxConcurrentRequests, tlsContext, filterMetadata,
685-
lbRegistry, Collections.<DropOverload>emptyList());
686-
status = Status.OK;
687-
resolved = true;
688-
result = new ClusterResolutionResult(addresses, priorityName, priorityChildConfig);
689-
handleEndpointResourceUpdate();
661+
syncContext.execute(() -> onResult2(resolutionResult));
662+
}
663+
664+
@Override
665+
public Status onResult2(final ResolutionResult resolutionResult) {
666+
if (shutdown) {
667+
return Status.OK;
668+
}
669+
// Arbitrary priority notation for all DNS-resolved endpoints.
670+
String priorityName = priorityName(name, 0); // value doesn't matter
671+
List<EquivalentAddressGroup> addresses = new ArrayList<>();
672+
StatusOr<List<EquivalentAddressGroup>> addressesOrError =
673+
resolutionResult.getAddressesOrError();
674+
if (addressesOrError.hasValue()) {
675+
backoffPolicy = null; // reset backoff sequence if succeeded
676+
for (EquivalentAddressGroup eag : resolutionResult.getAddresses()) {
677+
// No weight attribute is attached, all endpoint-level LB policy should be able
678+
// to handle such it.
679+
String localityName = localityName(LOGICAL_DNS_CLUSTER_LOCALITY);
680+
Attributes attr = eag.getAttributes().toBuilder()
681+
.set(XdsAttributes.ATTR_LOCALITY, LOGICAL_DNS_CLUSTER_LOCALITY)
682+
.set(XdsAttributes.ATTR_LOCALITY_NAME, localityName)
683+
.set(XdsAttributes.ATTR_ADDRESS_NAME, dnsHostName)
684+
.build();
685+
eag = new EquivalentAddressGroup(eag.getAddresses(), attr);
686+
eag = AddressFilter.setPathFilter(eag, Arrays.asList(priorityName, localityName));
687+
addresses.add(eag);
690688
}
689+
PriorityChildConfig priorityChildConfig = generateDnsBasedPriorityChildConfig(
690+
name, lrsServerInfo, maxConcurrentRequests, tlsContext, filterMetadata,
691+
lbRegistry, Collections.<DropOverload>emptyList());
692+
status = Status.OK;
693+
resolved = true;
694+
result = new ClusterResolutionResult(addresses, priorityName, priorityChildConfig);
695+
handleEndpointResourceUpdate();
696+
return status;
697+
} else {
698+
handleErrorInSyncContext(addressesOrError.getStatus());
699+
return addressesOrError.getStatus();
691700
}
692-
693-
syncContext.execute(new NameResolved());
694701
}
695702

696703
@Override
697704
public void onError(final Status error) {
698-
syncContext.execute(new Runnable() {
699-
@Override
700-
public void run() {
701-
if (shutdown) {
702-
return;
703-
}
704-
status = error;
705-
// NameResolver.Listener API cannot distinguish between address-not-found and
706-
// transient errors. If the error occurs in the first resolution, treat it as
707-
// address not found. Otherwise, either there is previously resolved addresses
708-
// previously encountered error, propagate the error to downstream/upstream and
709-
// let downstream/upstream handle it.
710-
if (!resolved) {
711-
resolved = true;
712-
handleEndpointResourceUpdate();
713-
} else {
714-
handleEndpointResolutionError();
715-
}
716-
if (scheduledRefresh != null && scheduledRefresh.isPending()) {
717-
return;
718-
}
719-
if (backoffPolicy == null) {
720-
backoffPolicy = backoffPolicyProvider.get();
721-
}
722-
long delayNanos = backoffPolicy.nextBackoffNanos();
723-
logger.log(XdsLogLevel.DEBUG,
705+
syncContext.execute(() -> handleErrorInSyncContext(error));
706+
}
707+
708+
private void handleErrorInSyncContext(final Status error) {
709+
if (shutdown) {
710+
return;
711+
}
712+
status = error;
713+
// NameResolver.Listener API cannot distinguish between address-not-found and
714+
// transient errors. If the error occurs in the first resolution, treat it as
715+
// address not found. Otherwise, either there is previously resolved addresses
716+
// previously encountered error, propagate the error to downstream/upstream and
717+
// let downstream/upstream handle it.
718+
if (!resolved) {
719+
resolved = true;
720+
handleEndpointResourceUpdate();
721+
} else {
722+
handleEndpointResolutionError();
723+
}
724+
if (scheduledRefresh != null && scheduledRefresh.isPending()) {
725+
return;
726+
}
727+
if (backoffPolicy == null) {
728+
backoffPolicy = backoffPolicyProvider.get();
729+
}
730+
long delayNanos = backoffPolicy.nextBackoffNanos();
731+
logger.log(XdsLogLevel.DEBUG,
724732
"Logical DNS resolver for cluster {0} encountered name resolution "
725-
+ "error: {1}, scheduling DNS resolution backoff for {2} ns",
733+
+ "error: {1}, scheduling DNS resolution backoff for {2} ns",
726734
name, error, delayNanos);
727-
scheduledRefresh =
735+
scheduledRefresh =
728736
syncContext.schedule(
729-
new DelayedNameResolverRefresh(), delayNanos, TimeUnit.NANOSECONDS,
730-
timeService);
731-
}
732-
});
737+
new DelayedNameResolverRefresh(), delayNanos, TimeUnit.NANOSECONDS,
738+
timeService);
733739
}
734740
}
735741
}

xds/src/test/java/io/grpc/xds/ClusterResolverLoadBalancerTest.java

Lines changed: 120 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,7 @@ public XdsClient returnObject(Object object) {
200200
private ArgumentCaptor<SubchannelPicker> pickerCaptor;
201201
private int xdsClientRefs;
202202
private ClusterResolverLoadBalancer loadBalancer;
203+
private NameResolverProvider fakeNameResolverProvider;
203204

204205
@Before
205206
public void setUp() throws URISyntaxException {
@@ -216,7 +217,8 @@ public void setUp() throws URISyntaxException {
216217
.setServiceConfigParser(mock(ServiceConfigParser.class))
217218
.setChannelLogger(mock(ChannelLogger.class))
218219
.build();
219-
nsRegistry.register(new FakeNameResolverProvider());
220+
fakeNameResolverProvider = new FakeNameResolverProvider(false);
221+
nsRegistry.register(fakeNameResolverProvider);
220222
when(helper.getNameResolverRegistry()).thenReturn(nsRegistry);
221223
when(helper.getNameResolverArgs()).thenReturn(args);
222224
when(helper.getSynchronizationContext()).thenReturn(syncContext);
@@ -857,6 +859,41 @@ public void onlyLogicalDnsCluster_endpointsResolved() {
857859

858860
}
859861

862+
@Test
863+
public void oldListenerCallback_onlyLogicalDnsCluster_endpointsResolved() {
864+
nsRegistry.deregister(fakeNameResolverProvider);
865+
nsRegistry.register(new FakeNameResolverProvider(true));
866+
ClusterResolverConfig config = new ClusterResolverConfig(
867+
Collections.singletonList(logicalDnsDiscoveryMechanism), roundRobin, false);
868+
deliverLbConfig(config);
869+
FakeNameResolver resolver = assertResolverCreated("/" + DNS_HOST_NAME);
870+
assertThat(childBalancers).isEmpty();
871+
EquivalentAddressGroup endpoint1 = makeAddress("endpoint-addr-1");
872+
EquivalentAddressGroup endpoint2 = makeAddress("endpoint-addr-2");
873+
resolver.deliverEndpointAddresses(Arrays.asList(endpoint1, endpoint2));
874+
875+
assertThat(childBalancers).hasSize(1);
876+
FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
877+
assertThat(childBalancer.name).isEqualTo(PRIORITY_POLICY_NAME);
878+
PriorityLbConfig priorityLbConfig = (PriorityLbConfig) childBalancer.config;
879+
String priority = Iterables.getOnlyElement(priorityLbConfig.priorities);
880+
PriorityChildConfig priorityChildConfig = priorityLbConfig.childConfigs.get(priority);
881+
assertThat(priorityChildConfig.ignoreReresolution).isFalse();
882+
assertThat(GracefulSwitchLoadBalancerAccessor.getChildProvider(priorityChildConfig.childConfig)
883+
.getPolicyName())
884+
.isEqualTo(CLUSTER_IMPL_POLICY_NAME);
885+
ClusterImplConfig clusterImplConfig = (ClusterImplConfig)
886+
GracefulSwitchLoadBalancerAccessor.getChildConfig(priorityChildConfig.childConfig);
887+
assertClusterImplConfig(clusterImplConfig, CLUSTER_DNS, null, LRS_SERVER_INFO, 300L, null,
888+
Collections.<DropOverload>emptyList(), "pick_first");
889+
assertAddressesEqual(Arrays.asList(endpoint1, endpoint2), childBalancer.addresses);
890+
assertThat(childBalancer.addresses.get(0).getAttributes()
891+
.get(XdsAttributes.ATTR_ADDRESS_NAME)).isEqualTo(DNS_HOST_NAME);
892+
assertThat(childBalancer.addresses.get(1).getAttributes()
893+
.get(XdsAttributes.ATTR_ADDRESS_NAME)).isEqualTo(DNS_HOST_NAME);
894+
895+
}
896+
860897
@Test
861898
public void onlyLogicalDnsCluster_handleRefreshNameResolution() {
862899
ClusterResolverConfig config = new ClusterResolverConfig(
@@ -920,6 +957,55 @@ public void onlyLogicalDnsCluster_resolutionError_backoffAndRefresh() {
920957
inOrder.verifyNoMoreInteractions();
921958
}
922959

960+
@Test
961+
public void oldListenerCallback_onlyLogicalDnsCluster_resolutionError_backoffAndRefresh() {
962+
nsRegistry.deregister(fakeNameResolverProvider);
963+
nsRegistry.register(new FakeNameResolverProvider(true));
964+
InOrder inOrder = Mockito.inOrder(helper, backoffPolicyProvider,
965+
backoffPolicy1, backoffPolicy2);
966+
ClusterResolverConfig config = new ClusterResolverConfig(
967+
Collections.singletonList(logicalDnsDiscoveryMechanism), roundRobin, false);
968+
deliverLbConfig(config);
969+
FakeNameResolver resolver = assertResolverCreated("/" + DNS_HOST_NAME);
970+
assertThat(childBalancers).isEmpty();
971+
Status error = Status.UNAVAILABLE.withDescription("cannot reach DNS server");
972+
resolver.deliverError(error);
973+
inOrder.verify(helper).updateBalancingState(
974+
eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture());
975+
assertPicker(pickerCaptor.getValue(), error, null);
976+
assertThat(resolver.refreshCount).isEqualTo(0);
977+
inOrder.verify(backoffPolicyProvider).get();
978+
inOrder.verify(backoffPolicy1).nextBackoffNanos();
979+
assertThat(fakeClock.getPendingTasks()).hasSize(1);
980+
assertThat(Iterables.getOnlyElement(fakeClock.getPendingTasks()).getDelay(TimeUnit.SECONDS))
981+
.isEqualTo(1L);
982+
fakeClock.forwardTime(1L, TimeUnit.SECONDS);
983+
assertThat(resolver.refreshCount).isEqualTo(1);
984+
985+
error = Status.UNKNOWN.withDescription("I am lost");
986+
resolver.deliverError(error);
987+
inOrder.verify(helper).updateBalancingState(
988+
eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture());
989+
inOrder.verify(backoffPolicy1).nextBackoffNanos();
990+
assertPicker(pickerCaptor.getValue(), error, null);
991+
assertThat(fakeClock.getPendingTasks()).hasSize(1);
992+
assertThat(Iterables.getOnlyElement(fakeClock.getPendingTasks()).getDelay(TimeUnit.SECONDS))
993+
.isEqualTo(10L);
994+
fakeClock.forwardTime(10L, TimeUnit.SECONDS);
995+
assertThat(resolver.refreshCount).isEqualTo(2);
996+
997+
// Succeed.
998+
EquivalentAddressGroup endpoint1 = makeAddress("endpoint-addr-1");
999+
EquivalentAddressGroup endpoint2 = makeAddress("endpoint-addr-2");
1000+
resolver.deliverEndpointAddresses(Arrays.asList(endpoint1, endpoint2));
1001+
assertThat(childBalancers).hasSize(1);
1002+
assertAddressesEqual(Arrays.asList(endpoint1, endpoint2),
1003+
Iterables.getOnlyElement(childBalancers).addresses);
1004+
1005+
assertThat(fakeClock.getPendingTasks()).isEmpty();
1006+
inOrder.verifyNoMoreInteractions();
1007+
}
1008+
9231009
@Test
9241010
public void onlyLogicalDnsCluster_refreshNameResolutionRaceWithResolutionError() {
9251011
InOrder inOrder = Mockito.inOrder(backoffPolicyProvider, backoffPolicy1, backoffPolicy2);
@@ -1319,10 +1405,18 @@ void deliverError(Status error) {
13191405
}
13201406

13211407
private class FakeNameResolverProvider extends NameResolverProvider {
1408+
private final boolean useOldListenerCallback;
1409+
1410+
private FakeNameResolverProvider(boolean useOldListenerCallback) {
1411+
this.useOldListenerCallback = useOldListenerCallback;
1412+
}
1413+
13221414
@Override
13231415
public NameResolver newNameResolver(URI targetUri, NameResolver.Args args) {
13241416
assertThat(targetUri.getScheme()).isEqualTo("dns");
1325-
FakeNameResolver resolver = new FakeNameResolver(targetUri);
1417+
FakeNameResolver resolver = useOldListenerCallback
1418+
? new FakeNameResolverUsingOldListenerCallback(targetUri)
1419+
: new FakeNameResolver(targetUri);
13261420
resolvers.add(resolver);
13271421
return resolver;
13281422
}
@@ -1343,9 +1437,10 @@ protected int priority() {
13431437
}
13441438
}
13451439

1440+
13461441
private class FakeNameResolver extends NameResolver {
13471442
private final URI targetUri;
1348-
private Listener2 listener;
1443+
protected Listener2 listener;
13491444
private int refreshCount;
13501445

13511446
private FakeNameResolver(URI targetUri) {
@@ -1372,19 +1467,37 @@ public void shutdown() {
13721467
resolvers.remove(this);
13731468
}
13741469

1375-
private void deliverEndpointAddresses(List<EquivalentAddressGroup> addresses) {
1470+
protected void deliverEndpointAddresses(List<EquivalentAddressGroup> addresses) {
1471+
syncContext.execute(() -> listener.onResult2(ResolutionResult.newBuilder()
1472+
.setAddressesOrError(StatusOr.fromValue(addresses)).build()) );
1473+
}
1474+
1475+
protected void deliverError(Status error) {
1476+
syncContext.execute(() -> listener.onResult2(ResolutionResult.newBuilder()
1477+
.setAddressesOrError(StatusOr.fromStatus(error)).build()));
1478+
}
1479+
}
1480+
1481+
private class FakeNameResolverUsingOldListenerCallback extends FakeNameResolver {
1482+
private FakeNameResolverUsingOldListenerCallback(URI targetUri) {
1483+
super(targetUri);
1484+
}
1485+
1486+
@Override
1487+
protected void deliverEndpointAddresses(List<EquivalentAddressGroup> addresses) {
13761488
listener.onResult(ResolutionResult.newBuilder()
1377-
.setAddressesOrError(StatusOr.fromValue(addresses)).build());
1489+
.setAddressesOrError(StatusOr.fromValue(addresses)).build());
13781490
}
13791491

1380-
private void deliverError(Status error) {
1492+
@Override
1493+
protected void deliverError(Status error) {
13811494
listener.onError(error);
13821495
}
13831496
}
13841497

13851498
private final class FakeLoadBalancerProvider extends LoadBalancerProvider {
13861499
private final String policyName;
1387-
1500+
13881501
FakeLoadBalancerProvider(String policyName) {
13891502
this.policyName = policyName;
13901503
}

0 commit comments

Comments
 (0)