Skip to content

Commit 4831431

Browse files
committed
Updated ServerConnectionCallback#reportServerConnectionGauge method signature and addressed review comments.
1 parent 5881cb6 commit 4831431

File tree

5 files changed

+40
-39
lines changed

5 files changed

+40
-39
lines changed

xds/src/main/java/io/grpc/xds/XdsClientMetricReporterImpl.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -162,9 +162,9 @@ public void reportResourceCountGauge(long resourceCount, String cacheState, Stri
162162
}
163163

164164
@Override
165-
public void reportServerConnectionGauge(int isConnected, String target, String xdsServer) {
166-
recorder.recordLongGauge(CONNECTED_GAUGE, isConnected, Arrays.asList(target, xdsServer),
167-
Collections.emptyList());
165+
public void reportServerConnectionGauge(boolean isConnected, String target, String xdsServer) {
166+
recorder.recordLongGauge(CONNECTED_GAUGE, isConnected ? 1 : 0,
167+
Arrays.asList(target, xdsServer), Collections.emptyList());
168168
}
169169
}
170170
}

xds/src/main/java/io/grpc/xds/client/XdsClient.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -388,7 +388,7 @@ void reportResourceCountGauge(long resourceCount, String cacheState, String reso
388388

389389
/** Callback used to report a gauge metric value for server connections. */
390390
public interface ServerConnectionCallback {
391-
void reportServerConnectionGauge(int isConnected, String target, String xdsServer);
391+
void reportServerConnectionGauge(boolean isConnected, String target, String xdsServer);
392392
}
393393

394394
/**
@@ -407,6 +407,9 @@ public SettableFuture<Void> reportResourceCounts(ResourceCallback callback) {
407407

408408
/**
409409
* Reports whether xDS client has a working ADS stream to xDS server.
410+
* The definition of a working stream is defined in gRFC A78.
411+
* @see <a href="https://github.com/grpc/proposal/blob/master/A78-grpc-metrics-wrr-pf-xds.md#xdsclient">
412+
* A78-grpc-metrics-wrr-pf-xds.md</a>
410413
*/
411414
public SettableFuture<Void> reportServerConnections(ServerConnectionCallback callback) {
412415
throw new UnsupportedOperationException();

xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -539,11 +539,9 @@ public SettableFuture<Void> reportServerConnections(ServerConnectionCallback cal
539539
syncContext.execute(() -> {
540540
serverCpClientMap.forEach((serverInfo, controlPlaneClient) ->
541541
callback.reportServerConnectionGauge(
542-
controlPlaneClient.hasWorkingAdsStream() ? 1 : 0,
543-
target,
544-
serverInfo.target()));
542+
controlPlaneClient.hasWorkingAdsStream(), target, serverInfo.target()));
543+
future.set(null);
545544
});
546-
future.set(null);
547545
return future;
548546
}
549547

@@ -554,8 +552,8 @@ public SettableFuture<Void> reportResourceCounts(ResourceCallback callback) {
554552
Map<XdsResourceType<?>, Map<String, Long>> resourceCountsByType =
555553
getResourceCountsByType();
556554
reportResourceCountsToCallback(callback, resourceCountsByType);
555+
future.set(null);
557556
});
558-
future.set(null);
559557
return future;
560558
}
561559

xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java

Lines changed: 26 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -679,7 +679,7 @@ private void callback_ReportServerConnection() {
679679
}
680680
}
681681

682-
private void verifyServerConnection(int times, int isConnected, String dataPlaneTarget,
682+
private void verifyServerConnection(int times, boolean isConnected, String dataPlaneTarget,
683683
String xdsServer) {
684684
verify(serverConnectionCallback, times(times)).reportServerConnectionGauge(
685685
eq(isConnected),
@@ -3774,12 +3774,12 @@ public void streamClosedWithNoResponse() {
37743774
DiscoveryRpcCall call = resourceDiscoveryCalls.poll();
37753775
// Check metric data.
37763776
callback_ReportServerConnection();
3777-
verifyServerConnection(1, 1, CHANNEL_TARGET, xdsServerInfo.target());
3777+
verifyServerConnection(1, true, CHANNEL_TARGET, xdsServerInfo.target());
37783778
// Management server closes the RPC stream before sending any response.
37793779
call.sendCompleted();
37803780
// Check metric data.
37813781
callback_ReportServerConnection();
3782-
verifyServerConnection(1, 0, CHANNEL_TARGET, xdsServerInfo.target());
3782+
verifyServerConnection(1, false, CHANNEL_TARGET, xdsServerInfo.target());
37833783
verify(ldsResourceWatcher, Mockito.timeout(1000).times(1))
37843784
.onError(errorCaptor.capture());
37853785
verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE,
@@ -3797,23 +3797,23 @@ public void streamClosedAfterSendingResponses() {
37973797
DiscoveryRpcCall call = resourceDiscoveryCalls.poll();
37983798
// Check metric data.
37993799
callback_ReportServerConnection();
3800-
verifyServerConnection(1, 1, CHANNEL_TARGET, xdsServerInfo.target());
3800+
verifyServerConnection(1, true, CHANNEL_TARGET, xdsServerInfo.target());
38013801
ScheduledTask ldsResourceTimeout =
38023802
Iterables.getOnlyElement(fakeClock.getPendingTasks(LDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER));
38033803
ScheduledTask rdsResourceTimeout =
38043804
Iterables.getOnlyElement(fakeClock.getPendingTasks(RDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER));
38053805
call.sendResponse(LDS, testListenerRds, VERSION_1, "0000");
38063806
// Check metric data.
38073807
callback_ReportServerConnection();
3808-
verifyServerConnection(2, 1, CHANNEL_TARGET, xdsServerInfo.target());
3808+
verifyServerConnection(2, true, CHANNEL_TARGET, xdsServerInfo.target());
38093809
assertThat(ldsResourceTimeout.isCancelled()).isTrue();
38103810
call.sendResponse(RDS, testRouteConfig, VERSION_1, "0000");
38113811
assertThat(rdsResourceTimeout.isCancelled()).isTrue();
38123812
// Management server closes the RPC stream after sending responses.
38133813
call.sendCompleted();
38143814
// Check metric data.
38153815
callback_ReportServerConnection();
3816-
verifyServerConnection(3, 1, CHANNEL_TARGET, xdsServerInfo.target());
3816+
verifyServerConnection(3, true, CHANNEL_TARGET, xdsServerInfo.target());
38173817
verify(ldsResourceWatcher, never()).onError(errorCaptor.capture());
38183818
verify(rdsResourceWatcher, never()).onError(errorCaptor.capture());
38193819
}
@@ -3824,7 +3824,7 @@ public void streamClosedAndRetryWithBackoff() {
38243824
xdsClient.watchXdsResource(XdsListenerResource.getInstance(), LDS_RESOURCE, ldsResourceWatcher);
38253825
// Check metric data.
38263826
callback_ReportServerConnection();
3827-
verifyServerConnection(1, 1, CHANNEL_TARGET, xdsServerInfo.target());
3827+
verifyServerConnection(1, true, CHANNEL_TARGET, xdsServerInfo.target());
38283828
xdsClient.watchXdsResource(XdsRouteConfigureResource.getInstance(), RDS_RESOURCE,
38293829
rdsResourceWatcher);
38303830
xdsClient.watchXdsResource(XdsClusterResource.getInstance(), CDS_RESOURCE, cdsResourceWatcher);
@@ -3849,7 +3849,7 @@ public void streamClosedAndRetryWithBackoff() {
38493849

38503850
// Check metric data.
38513851
callback_ReportServerConnection();
3852-
verifyServerConnection(1, 0, CHANNEL_TARGET, xdsServerInfo.target());
3852+
verifyServerConnection(1, false, CHANNEL_TARGET, xdsServerInfo.target());
38533853

38543854
// Retry after backoff.
38553855
inOrder.verify(backoffPolicyProvider).get();
@@ -3866,7 +3866,7 @@ public void streamClosedAndRetryWithBackoff() {
38663866

38673867
// Check metric data.
38683868
callback_ReportServerConnection();
3869-
verifyServerConnection(2, 0, CHANNEL_TARGET, xdsServerInfo.target());
3869+
verifyServerConnection(2, false, CHANNEL_TARGET, xdsServerInfo.target());
38703870

38713871
// Management server becomes unreachable.
38723872
String errorMsg = "my fault";
@@ -3882,7 +3882,7 @@ public void streamClosedAndRetryWithBackoff() {
38823882

38833883
// Check metric data.
38843884
callback_ReportServerConnection();
3885-
verifyServerConnection(3, 0, CHANNEL_TARGET, xdsServerInfo.target());
3885+
verifyServerConnection(3, false, CHANNEL_TARGET, xdsServerInfo.target());
38863886

38873887
// Retry after backoff.
38883888
inOrder.verify(backoffPolicy1).nextBackoffNanos();
@@ -3903,7 +3903,7 @@ public void streamClosedAndRetryWithBackoff() {
39033903
call.verifyRequest(LDS, LDS_RESOURCE, "63", "3242", NODE);
39043904
// Check metric data.
39053905
callback_ReportServerConnection();
3906-
verifyServerConnection(2, 1, CHANNEL_TARGET, xdsServerInfo.target());
3906+
verifyServerConnection(2, true, CHANNEL_TARGET, xdsServerInfo.target());
39073907

39083908
List<Any> routeConfigs = ImmutableList.of(
39093909
Any.pack(mf.buildRouteConfiguration(RDS_RESOURCE, mf.buildOpaqueVirtualHosts(2))));
@@ -3920,7 +3920,7 @@ public void streamClosedAndRetryWithBackoff() {
39203920

39213921
// Check metric data.
39223922
callback_ReportServerConnection();
3923-
verifyServerConnection(3, 1, CHANNEL_TARGET, xdsServerInfo.target());
3923+
verifyServerConnection(3, true, CHANNEL_TARGET, xdsServerInfo.target());
39243924

39253925
// Reset backoff sequence and retry after backoff.
39263926
inOrder.verify(backoffPolicyProvider).get();
@@ -3946,7 +3946,7 @@ public void streamClosedAndRetryWithBackoff() {
39463946

39473947
// Check metric data.
39483948
callback_ReportServerConnection();
3949-
verifyServerConnection(4, 0, CHANNEL_TARGET, xdsServerInfo.target());
3949+
verifyServerConnection(4, false, CHANNEL_TARGET, xdsServerInfo.target());
39503950

39513951
// Retry after backoff.
39523952
inOrder.verify(backoffPolicy2).nextBackoffNanos();
@@ -3962,7 +3962,7 @@ public void streamClosedAndRetryWithBackoff() {
39623962

39633963
// Check metric data.
39643964
callback_ReportServerConnection();
3965-
verifyServerConnection(5, 0, CHANNEL_TARGET, xdsServerInfo.target());
3965+
verifyServerConnection(5, false, CHANNEL_TARGET, xdsServerInfo.target());
39663966

39673967
inOrder.verifyNoMoreInteractions();
39683968
}
@@ -3976,7 +3976,7 @@ public void streamClosedAndRetryRaceWithAddRemoveWatchers() {
39763976
DiscoveryRpcCall call = resourceDiscoveryCalls.poll();
39773977
// Check metric data.
39783978
callback_ReportServerConnection();
3979-
verifyServerConnection(1, 1, CHANNEL_TARGET, xdsServerInfo.target());
3979+
verifyServerConnection(1, true, CHANNEL_TARGET, xdsServerInfo.target());
39803980
call.sendError(Status.UNAVAILABLE.asException());
39813981
verify(ldsResourceWatcher, Mockito.timeout(1000).times(1))
39823982
.onError(errorCaptor.capture());
@@ -3989,7 +3989,7 @@ public void streamClosedAndRetryRaceWithAddRemoveWatchers() {
39893989

39903990
// Check metric data.
39913991
callback_ReportServerConnection();
3992-
verifyServerConnection(1, 0, CHANNEL_TARGET, xdsServerInfo.target());
3992+
verifyServerConnection(1, false, CHANNEL_TARGET, xdsServerInfo.target());
39933993

39943994
xdsClient.cancelXdsResourceWatch(XdsListenerResource.getInstance(),
39953995
LDS_RESOURCE, ldsResourceWatcher);
@@ -4007,7 +4007,7 @@ public void streamClosedAndRetryRaceWithAddRemoveWatchers() {
40074007

40084008
// Check metric data.
40094009
callback_ReportServerConnection();
4010-
verifyServerConnection(2,0, CHANNEL_TARGET, xdsServerInfo.target());
4010+
verifyServerConnection(2,false, CHANNEL_TARGET, xdsServerInfo.target());
40114011

40124012
call.sendResponse(LDS, testListenerRds, VERSION_1, "0000");
40134013
List<Any> routeConfigs = ImmutableList.of(
@@ -4016,7 +4016,7 @@ public void streamClosedAndRetryRaceWithAddRemoveWatchers() {
40164016

40174017
// Check metric data.
40184018
callback_ReportServerConnection();
4019-
verifyServerConnection(2, 1, CHANNEL_TARGET, xdsServerInfo.target());
4019+
verifyServerConnection(2, true, CHANNEL_TARGET, xdsServerInfo.target());
40204020

40214021
verifyNoMoreInteractions(ldsResourceWatcher, rdsResourceWatcher);
40224022
}
@@ -4031,7 +4031,7 @@ public void streamClosedAndRetryRestartsResourceInitialFetchTimerForUnresolvedRe
40314031
DiscoveryRpcCall call = resourceDiscoveryCalls.poll();
40324032
// Check metric data.
40334033
callback_ReportServerConnection();
4034-
verifyServerConnection(1, 1, CHANNEL_TARGET, xdsServerInfo.target());
4034+
verifyServerConnection(1, true, CHANNEL_TARGET, xdsServerInfo.target());
40354035
callback_ReportResourceCount();
40364036
verifyResourceCountByCacheState(1, 1, "requested", LDS.typeUrl(), CHANNEL_TARGET);
40374037
verifyResourceCountByCacheState(1, 1, "requested", RDS.typeUrl(), CHANNEL_TARGET);
@@ -4049,7 +4049,7 @@ public void streamClosedAndRetryRestartsResourceInitialFetchTimerForUnresolvedRe
40494049
assertThat(ldsResourceTimeout.isCancelled()).isTrue();
40504050
// Check metric data.
40514051
callback_ReportServerConnection();
4052-
verifyServerConnection(2, 1, CHANNEL_TARGET, xdsServerInfo.target());
4052+
verifyServerConnection(2, true, CHANNEL_TARGET, xdsServerInfo.target());
40534053
callback_ReportResourceCount();
40544054
verifyResourceCountByCacheState(1, 1, "acked", LDS.typeUrl(), CHANNEL_TARGET);
40554055
verifyResourceCountByCacheState(2, 1, "requested", RDS.typeUrl(), CHANNEL_TARGET);
@@ -4060,7 +4060,7 @@ public void streamClosedAndRetryRestartsResourceInitialFetchTimerForUnresolvedRe
40604060
assertThat(rdsResourceTimeout.isCancelled()).isTrue();
40614061
// Check metric data.
40624062
callback_ReportServerConnection();
4063-
verifyServerConnection(3, 1, CHANNEL_TARGET, xdsServerInfo.target());
4063+
verifyServerConnection(3, true, CHANNEL_TARGET, xdsServerInfo.target());
40644064
callback_ReportResourceCount();
40654065
verifyResourceCountByCacheState(2, 1, "acked", LDS.typeUrl(), CHANNEL_TARGET);
40664066
verifyResourceCountByCacheState(1, 1, "acked", RDS.typeUrl(), CHANNEL_TARGET);
@@ -4076,7 +4076,7 @@ public void streamClosedAndRetryRestartsResourceInitialFetchTimerForUnresolvedRe
40764076
verify(edsResourceWatcher, never()).onError(errorCaptor.capture());
40774077
// Check metric data.
40784078
callback_ReportServerConnection();
4079-
verifyServerConnection(4, 1, CHANNEL_TARGET, xdsServerInfo.target());
4079+
verifyServerConnection(4, true, CHANNEL_TARGET, xdsServerInfo.target());
40804080
callback_ReportResourceCount();
40814081
verifyResourceCountByCacheState(3, 1, "acked", LDS.typeUrl(), CHANNEL_TARGET);
40824082
verifyResourceCountByCacheState(2, 1, "acked", RDS.typeUrl(), CHANNEL_TARGET);
@@ -4245,7 +4245,7 @@ public void sendingToStoppedServer() throws Exception {
42454245
fakeClock.forwardTime(14, TimeUnit.SECONDS);
42464246
// Check metric data.
42474247
callback_ReportServerConnection();
4248-
verifyServerConnection(1, 0, CHANNEL_TARGET, xdsServerInfo.target());
4248+
verifyServerConnection(1, false, CHANNEL_TARGET, xdsServerInfo.target());
42494249
callback_ReportResourceCount();
42504250
verifyResourceCountByCacheState(1, 1, "unknown", LDS.typeUrl(), CHANNEL_TARGET);
42514251
verifyResourceCountByCacheState(1, 1, "requested", CDS.typeUrl(), CHANNEL_TARGET);
@@ -4265,7 +4265,7 @@ public void sendingToStoppedServer() throws Exception {
42654265
DiscoveryRpcCall call = resourceDiscoveryCalls.poll(3, TimeUnit.SECONDS);
42664266
// Check metric data.
42674267
callback_ReportServerConnection();
4268-
verifyServerConnection(2, 0, CHANNEL_TARGET, xdsServerInfo.target());
4268+
verifyServerConnection(2, false, CHANNEL_TARGET, xdsServerInfo.target());
42694269
callback_ReportResourceCount();
42704270
verifyResourceCountByCacheState(2, 1, "unknown", LDS.typeUrl(), CHANNEL_TARGET);
42714271
verifyResourceCountByCacheState(2, 1, "requested", CDS.typeUrl(), CHANNEL_TARGET);
@@ -4276,7 +4276,7 @@ public void sendingToStoppedServer() throws Exception {
42764276

42774277
// Check metric data.
42784278
callback_ReportServerConnection();
4279-
verifyServerConnection(3, 0, CHANNEL_TARGET, xdsServerInfo.target());
4279+
verifyServerConnection(3, false, CHANNEL_TARGET, xdsServerInfo.target());
42804280
callback_ReportResourceCount();
42814281
verifyResourceCountByCacheState(1, 1, "requested", LDS.typeUrl(), CHANNEL_TARGET);
42824282
verifyResourceCountByCacheState(3, 1, "requested", CDS.typeUrl(), CHANNEL_TARGET);
@@ -4294,7 +4294,7 @@ public void sendingToStoppedServer() throws Exception {
42944294
verifySubscribedResourcesMetadataSizes(1, 1, 0, 0);
42954295
// Check metric data.
42964296
callback_ReportServerConnection();
4297-
verifyServerConnection(1, 1, CHANNEL_TARGET, xdsServerInfo.target());
4297+
verifyServerConnection(1, true, CHANNEL_TARGET, xdsServerInfo.target());
42984298
callback_ReportResourceCount();
42994299
verifyResourceCountByCacheState(1, 1, "acked", LDS.typeUrl(), CHANNEL_TARGET);
43004300
verifyResourceCountByCacheState(4, 1, "requested", CDS.typeUrl(), CHANNEL_TARGET);

xds/src/test/java/io/grpc/xds/XdsClientMetricReporterImplTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -162,8 +162,8 @@ public void flush() {}
162162
.registerBatchCallback(gaugeBatchCallbackCaptor.capture(), any(), any());
163163
gaugeBatchCallbackCaptor.getValue().accept(mockBatchRecorder);
164164
// Verify that the xdsClient methods were called
165-
verify(mockXdsClient).reportResourceCounts(any(ResourceCallback.class));
166-
verify(mockXdsClient).reportServerConnections(any(ServerConnectionCallback.class));
165+
verify(mockXdsClient).reportResourceCounts(any());
166+
verify(mockXdsClient).reportServerConnections(any());
167167

168168
assertThat(logs.size()).isEqualTo(1);
169169
assertThat(logs.get(0).getLevel()).isEqualTo(Level.WARNING);
@@ -204,7 +204,7 @@ public void metricGauges() {
204204
inOrder.verify(mockBatchRecorder)
205205
.recordLongGauge(eqMetricInstrumentName("grpc.xds_client.resources"), eq(10L), any(),
206206
any());
207-
callback.reportServerConnectionGauge(1, target, "xdsServer");
207+
callback.reportServerConnectionGauge(true, target, "xdsServer");
208208
inOrder.verify(mockBatchRecorder)
209209
.recordLongGauge(eqMetricInstrumentName("grpc.xds_client.connected"), eq(1L), any(), any());
210210

@@ -216,7 +216,7 @@ public void metricReporterCallback() {
216216
MetricReporterCallback callback =
217217
new MetricReporterCallback(mockBatchRecorder);
218218

219-
callback.reportServerConnectionGauge(1, target, server);
219+
callback.reportServerConnectionGauge(true, target, server);
220220
verify(mockBatchRecorder, times(1)).recordLongGauge(
221221
eqMetricInstrumentName("grpc.xds_client.connected"), eq(1L),
222222
eq(Lists.newArrayList(target, server)),

0 commit comments

Comments
 (0)