Skip to content

Commit 7dd4402

Browse files
committed
Return Future instead of SettableFuture for reportServerConnections() and addressed review comments
1 parent e04164b commit 7dd4402

File tree

3 files changed

+28
-35
lines changed

3 files changed

+28
-35
lines changed

xds/src/main/java/io/grpc/xds/XdsClientMetricReporterImpl.java

Lines changed: 18 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,6 @@
1818

1919
import com.google.common.annotations.VisibleForTesting;
2020
import com.google.common.util.concurrent.ListenableFuture;
21-
import com.google.common.util.concurrent.SettableFuture;
22-
import io.grpc.Internal;
2321
import io.grpc.LongCounterMetricInstrument;
2422
import io.grpc.LongGaugeMetricInstrument;
2523
import io.grpc.MetricInstrumentRegistry;
@@ -28,7 +26,6 @@
2826
import io.grpc.MetricRecorder.BatchRecorder;
2927
import io.grpc.MetricRecorder.Registration;
3028
import io.grpc.xds.client.XdsClient;
31-
import io.grpc.xds.client.XdsClient.ResourceCallback;
3229
import io.grpc.xds.client.XdsClient.ResourceMetadata;
3330
import io.grpc.xds.client.XdsClient.ResourceMetadata.ResourceMetadataStatus;
3431
import io.grpc.xds.client.XdsClient.ServerConnectionCallback;
@@ -37,9 +34,9 @@
3734
import java.util.Arrays;
3835
import java.util.Collections;
3936
import java.util.HashMap;
40-
import java.util.Locale;
4137
import java.util.Map;
4238
import java.util.concurrent.ExecutionException;
39+
import java.util.concurrent.Future;
4340
import java.util.concurrent.TimeUnit;
4441
import java.util.concurrent.TimeoutException;
4542
import java.util.logging.Level;
@@ -49,7 +46,6 @@
4946
/**
5047
* XdsClientMetricReporter implementation.
5148
*/
52-
@Internal
5349
final class XdsClientMetricReporterImpl implements XdsClientMetricReporter {
5450

5551
private static final Logger logger = Logger.getLogger(
@@ -139,21 +135,18 @@ void close() {
139135
void reportCallbackMetrics(BatchRecorder recorder, XdsClient xdsClient) {
140136
MetricReporterCallback callback = new MetricReporterCallback(recorder, target);
141137
try {
142-
SettableFuture<Void> reportServerConnectionsCompleted =
143-
xdsClient.reportServerConnections(callback);
138+
Future<Void> reportServerConnectionsCompleted = xdsClient.reportServerConnections(callback);
144139

145140
ListenableFuture<Map<XdsResourceType<?>, Map<String, ResourceMetadata>>>
146141
getResourceMetadataCompleted = xdsClient.getSubscribedResourcesMetadataSnapshot();
147142

148143
Map<XdsResourceType<?>, Map<String, ResourceMetadata>> metadataByType =
149144
getResourceMetadataCompleted.get(10, TimeUnit.SECONDS);
150145

151-
SettableFuture<Void> reportResourceCountsCompleted = computeAndReportResourceCounts(
152-
metadataByType, callback);
146+
computeAndReportResourceCounts(metadataByType, callback);
153147

154148
// Normally this shouldn't take long, but adding a timeout to avoid indefinite blocking
155-
Void unused1 = reportServerConnectionsCompleted.get(5, TimeUnit.SECONDS);
156-
Void unused2 = reportResourceCountsCompleted.get(5, TimeUnit.SECONDS);
149+
Void unused = reportServerConnectionsCompleted.get(5, TimeUnit.SECONDS);
157150
} catch (ExecutionException | TimeoutException | InterruptedException e) {
158151
if (e instanceof InterruptedException) {
159152
Thread.currentThread().interrupt(); // re-set the current thread's interruption state
@@ -162,11 +155,9 @@ void reportCallbackMetrics(BatchRecorder recorder, XdsClient xdsClient) {
162155
}
163156
}
164157

165-
private SettableFuture<Void> computeAndReportResourceCounts(
158+
private void computeAndReportResourceCounts(
166159
Map<XdsResourceType<?>, Map<String, ResourceMetadata>> metadataByType,
167160
MetricReporterCallback callback) {
168-
SettableFuture<Void> future = SettableFuture.create();
169-
170161
for (Map.Entry<XdsResourceType<?>, Map<String, ResourceMetadata>> metadataByTypeEntry :
171162
metadataByType.entrySet()) {
172163
XdsResourceType<?> type = metadataByTypeEntry.getKey();
@@ -180,20 +171,26 @@ private SettableFuture<Void> computeAndReportResourceCounts(
180171
resourceCountsByState.forEach((cacheState, count) ->
181172
callback.reportResourceCountGauge(count, cacheState, type.typeUrl()));
182173
}
183-
future.set(null);
184-
return future;
185174
}
186175

187176
private static String cacheStateFromResourceStatus(ResourceMetadataStatus metadataStatus,
188177
boolean isResourceCached) {
189-
String status = metadataStatus.toString().toLowerCase(Locale.ROOT);
190-
return metadataStatus == ResourceMetadataStatus.NACKED && isResourceCached
191-
? status + "_but_cached" : status;
178+
switch (metadataStatus) {
179+
case REQUESTED:
180+
return "requested";
181+
case DOES_NOT_EXIST:
182+
return "does_not_exist";
183+
case ACKED:
184+
return "acked";
185+
case NACKED:
186+
return isResourceCached ? "nacked_but_cached" : "nacked";
187+
default:
188+
return "unknown";
189+
}
192190
}
193191

194192
@VisibleForTesting
195-
static final class MetricReporterCallback implements ResourceCallback,
196-
ServerConnectionCallback {
193+
static final class MetricReporterCallback implements ServerConnectionCallback {
197194
private final BatchRecorder recorder;
198195
private final String target;
199196

@@ -203,7 +200,6 @@ static final class MetricReporterCallback implements ResourceCallback,
203200
}
204201

205202
// TODO(dnvindhya): include the "authority" label once xds.authority is available.
206-
@Override
207203
public void reportResourceCountGauge(long resourceCount, String cacheState,
208204
String resourceType) {
209205
recorder.recordLongGauge(RESOURCES_GAUGE, resourceCount,

xds/src/main/java/io/grpc/xds/client/XdsClient.java

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import com.google.common.net.UrlEscapers;
2525
import com.google.common.util.concurrent.ListenableFuture;
2626
import com.google.common.util.concurrent.MoreExecutors;
27-
import com.google.common.util.concurrent.SettableFuture;
2827
import com.google.protobuf.Any;
2928
import io.grpc.ExperimentalApi;
3029
import io.grpc.Status;
@@ -37,6 +36,7 @@
3736
import java.util.List;
3837
import java.util.Map;
3938
import java.util.concurrent.Executor;
39+
import java.util.concurrent.Future;
4040
import java.util.concurrent.atomic.AtomicInteger;
4141
import javax.annotation.Nullable;
4242

@@ -386,24 +386,20 @@ public Map<Bootstrapper.ServerInfo, LoadReportClient> getServerLrsClientMap() {
386386
throw new UnsupportedOperationException();
387387
}
388388

389-
/** Callback used to report gauge metric value for resources. */
390-
public interface ResourceCallback {
391-
// TODO(dnvindhya): include the "authority" label once xds.authority is available.
392-
void reportResourceCountGauge(long resourceCount, String cacheState, String resourceType);
393-
}
394-
395389
/** Callback used to report a gauge metric value for server connections. */
396390
public interface ServerConnectionCallback {
397391
void reportServerConnectionGauge(boolean isConnected, String xdsServer);
398392
}
399393

400394
/**
401-
* Reports whether xDS client has a "working" ADS stream to xDS server.
402-
* The definition of a working stream is defined in gRFC A78.
403-
* @see <a href="https://github.com/grpc/proposal/blob/master/A78-grpc-metrics-wrr-pf-xds.md#xdsclient">
404-
* A78-grpc-metrics-wrr-pf-xds.md</a>
395+
* Reports whether xDS client has a "working" ADS stream to xDS server. The definition of a
396+
* working stream is defined in gRFC A78.
397+
*
398+
* @see <a
399+
* href="https://github.com/grpc/proposal/blob/master/A78-grpc-metrics-wrr-pf-xds.md#xdsclient">
400+
* A78-grpc-metrics-wrr-pf-xds.md</a>
405401
*/
406-
public SettableFuture<Void> reportServerConnections(ServerConnectionCallback callback) {
402+
public Future<Void> reportServerConnections(ServerConnectionCallback callback) {
407403
throw new UnsupportedOperationException();
408404
}
409405

xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import java.util.Objects;
5252
import java.util.Set;
5353
import java.util.concurrent.Executor;
54+
import java.util.concurrent.Future;
5455
import java.util.concurrent.ScheduledExecutorService;
5556
import java.util.concurrent.TimeUnit;
5657
import javax.annotation.Nullable;
@@ -529,7 +530,7 @@ private <T extends ResourceUpdate> void handleResourceUpdate(
529530
}
530531

531532
@Override
532-
public SettableFuture<Void> reportServerConnections(ServerConnectionCallback callback) {
533+
public Future<Void> reportServerConnections(ServerConnectionCallback callback) {
533534
SettableFuture<Void> future = SettableFuture.create();
534535
syncContext.execute(() -> {
535536
serverCpClientMap.forEach((serverInfo, controlPlaneClient) ->

0 commit comments

Comments
 (0)