Skip to content

Commit 3a283e4

Browse files
committed
Moved resource count metric reporting from XdsClientImpl to XdsClientMetricReporterImpl and addressed review comments
1 parent cbe2659 commit 3a283e4

File tree

8 files changed

+300
-488
lines changed

8 files changed

+300
-488
lines changed

xds/src/main/java/io/grpc/xds/SharedXdsClientPoolProvider.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -121,25 +121,27 @@ private static class SharedXdsClientPoolProviderHolder {
121121

122122
@ThreadSafe
123123
@VisibleForTesting
124-
protected class RefCountedXdsClientObjectPool implements ObjectPool<XdsClient> {
124+
class RefCountedXdsClientObjectPool implements ObjectPool<XdsClient> {
125125

126126
private final BootstrapInfo bootstrapInfo;
127127
private final String target; // The target associated with the xDS client.
128-
private final XdsClientMetricReporterImpl xdsClientMetricReporter;
128+
private final MetricRecorder metricRecorder;
129129
private final Object lock = new Object();
130130
@GuardedBy("lock")
131131
private ScheduledExecutorService scheduler;
132132
@GuardedBy("lock")
133133
private XdsClient xdsClient;
134134
@GuardedBy("lock")
135135
private int refCount;
136+
@GuardedBy("lock")
137+
private XdsClientMetricReporterImpl metricReporter;
136138

137139
@VisibleForTesting
138140
RefCountedXdsClientObjectPool(BootstrapInfo bootstrapInfo, String target,
139141
MetricRecorder metricRecorder) {
140142
this.bootstrapInfo = checkNotNull(bootstrapInfo);
141143
this.target = target;
142-
this.xdsClientMetricReporter = new XdsClientMetricReporterImpl(metricRecorder, target);
144+
this.metricRecorder = metricRecorder;
143145
}
144146

145147
@Override
@@ -150,6 +152,7 @@ public XdsClient getObject() {
150152
log.log(Level.INFO, "xDS node ID: {0}", bootstrapInfo.node().getId());
151153
}
152154
scheduler = SharedResourceHolder.get(GrpcUtil.TIMER_SERVICE);
155+
metricReporter = new XdsClientMetricReporterImpl(metricRecorder, target);
153156
xdsClient = new XdsClientImpl(
154157
DEFAULT_XDS_TRANSPORT_FACTORY,
155158
bootstrapInfo,
@@ -159,8 +162,8 @@ public XdsClient getObject() {
159162
TimeProvider.SYSTEM_TIME_PROVIDER,
160163
MessagePrinter.INSTANCE,
161164
new TlsContextManagerImpl(bootstrapInfo),
162-
xdsClientMetricReporter);
163-
xdsClientMetricReporter.setXdsClient(xdsClient);
165+
metricReporter);
166+
metricReporter.setXdsClient(xdsClient);
164167
}
165168
refCount++;
166169
return xdsClient;
@@ -174,7 +177,8 @@ public XdsClient returnObject(Object object) {
174177
if (refCount == 0) {
175178
xdsClient.shutdown();
176179
xdsClient = null;
177-
xdsClientMetricReporter.close();
180+
metricReporter.close();
181+
metricReporter = null;
178182
targetToXdsClientMap.remove(target);
179183
scheduler = SharedResourceHolder.release(GrpcUtil.TIMER_SERVICE, scheduler);
180184
}

xds/src/main/java/io/grpc/xds/XdsClientMetricReporterImpl.java

Lines changed: 58 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package io.grpc.xds;
1818

1919
import com.google.common.annotations.VisibleForTesting;
20+
import com.google.common.util.concurrent.ListenableFuture;
2021
import com.google.common.util.concurrent.SettableFuture;
2122
import io.grpc.Internal;
2223
import io.grpc.LongCounterMetricInstrument;
@@ -28,11 +29,19 @@
2829
import io.grpc.MetricRecorder.Registration;
2930
import io.grpc.xds.client.XdsClient;
3031
import io.grpc.xds.client.XdsClient.ResourceCallback;
32+
import io.grpc.xds.client.XdsClient.ResourceMetadata;
33+
import io.grpc.xds.client.XdsClient.ResourceMetadata.ResourceMetadataStatus;
3134
import io.grpc.xds.client.XdsClient.ServerConnectionCallback;
3235
import io.grpc.xds.client.XdsClientMetricReporter;
36+
import io.grpc.xds.client.XdsResourceType;
3337
import java.util.Arrays;
3438
import java.util.Collections;
39+
import java.util.HashMap;
40+
import java.util.Locale;
41+
import java.util.Map;
42+
import java.util.concurrent.ExecutionException;
3543
import java.util.concurrent.TimeUnit;
44+
import java.util.concurrent.TimeoutException;
3645
import java.util.logging.Level;
3746
import java.util.logging.Logger;
3847
import javax.annotation.Nullable;
@@ -41,7 +50,7 @@
4150
* XdsClientMetricReporter implementation.
4251
*/
4352
@Internal
44-
public class XdsClientMetricReporterImpl implements XdsClientMetricReporter {
53+
final class XdsClientMetricReporterImpl implements XdsClientMetricReporter {
4554

4655
private static final Logger logger = Logger.getLogger(
4756
XdsClientMetricReporterImpl.class.getName());
@@ -55,8 +64,6 @@ public class XdsClientMetricReporterImpl implements XdsClientMetricReporter {
5564
private final String target;
5665
@Nullable
5766
private Registration gaugeRegistration = null;
58-
@Nullable
59-
private XdsClient xdsClient = null;
6067

6168
static {
6269
MetricInstrumentRegistry metricInstrumentRegistry
@@ -111,41 +118,79 @@ public void reportServerFailure(long serverFailure, String xdsServer) {
111118
Arrays.asList(target, xdsServer), Collections.emptyList());
112119
}
113120

114-
void setXdsClient(XdsClient client) {
115-
this.xdsClient = client;
121+
void setXdsClient(XdsClient xdsClient) {
122+
assert gaugeRegistration == null;
116123
// register gauge here
117124
this.gaugeRegistration = metricRecorder.registerBatchCallback(new BatchCallback() {
118125
@Override
119126
public void accept(BatchRecorder recorder) {
120-
reportCallbackMetrics(recorder);
127+
reportCallbackMetrics(recorder, xdsClient);
121128
}
122129
}, CONNECTED_GAUGE, RESOURCES_GAUGE);
123130
}
124131

125132
void close() {
126133
if (gaugeRegistration != null) {
127134
gaugeRegistration.close();
135+
gaugeRegistration = null;
128136
}
129137
}
130138

131-
void reportCallbackMetrics(BatchRecorder recorder) {
139+
void reportCallbackMetrics(BatchRecorder recorder, XdsClient xdsClient) {
132140
MetricReporterCallback callback = new MetricReporterCallback(recorder, target);
133141
try {
134-
SettableFuture<Void> reportResourceCountsCompleted = this.xdsClient.reportResourceCounts(
135-
callback);
136142
SettableFuture<Void> reportServerConnectionsCompleted =
137-
this.xdsClient.reportServerConnections(callback);
143+
xdsClient.reportServerConnections(callback);
144+
145+
ListenableFuture<Map<XdsResourceType<?>, Map<String, ResourceMetadata>>>
146+
getResourceMetadataCompleted = xdsClient.getSubscribedResourcesMetadataSnapshot();
147+
148+
Map<XdsResourceType<?>, Map<String, ResourceMetadata>> metadataByType =
149+
getResourceMetadataCompleted.get(10, TimeUnit.SECONDS);
150+
151+
SettableFuture<Void> reportResourceCountsCompleted = computeAndReportResourceCounts(
152+
metadataByType, callback);
153+
138154
// Normally this shouldn't take long, but adding a timeout to avoid indefinite blocking
139-
Void unused1 = reportResourceCountsCompleted.get(5, TimeUnit.SECONDS);
140-
Void unused2 = reportServerConnectionsCompleted.get(5, TimeUnit.SECONDS);
141-
} catch (Exception e) {
155+
Void unused1 = reportServerConnectionsCompleted.get(5, TimeUnit.SECONDS);
156+
Void unused2 = reportResourceCountsCompleted.get(5, TimeUnit.SECONDS);
157+
} catch (ExecutionException | TimeoutException | InterruptedException e) {
142158
if (e instanceof InterruptedException) {
143159
Thread.currentThread().interrupt(); // re-set the current thread's interruption state
144160
}
145161
logger.log(Level.WARNING, "Failed to report gauge metrics", e);
146162
}
147163
}
148164

165+
private SettableFuture<Void> computeAndReportResourceCounts(
166+
Map<XdsResourceType<?>, Map<String, ResourceMetadata>> metadataByType,
167+
MetricReporterCallback callback) {
168+
SettableFuture<Void> future = SettableFuture.create();
169+
170+
for (Map.Entry<XdsResourceType<?>, Map<String, ResourceMetadata>> metadataByTypeEntry :
171+
metadataByType.entrySet()) {
172+
XdsResourceType<?> type = metadataByTypeEntry.getKey();
173+
174+
Map<String, Long> resourceCountsByState = new HashMap<>();
175+
for (ResourceMetadata metadata : metadataByTypeEntry.getValue().values()) {
176+
String cacheState = cacheStateFromResourceStatus(metadata.getStatus(), metadata.isCached());
177+
resourceCountsByState.compute(cacheState, (k, v) -> (v == null) ? 1 : v + 1);
178+
}
179+
180+
resourceCountsByState.forEach((cacheState, count) ->
181+
callback.reportResourceCountGauge(count, cacheState, type.typeUrl()));
182+
}
183+
future.set(null);
184+
return future;
185+
}
186+
187+
private static String cacheStateFromResourceStatus(ResourceMetadataStatus metadataStatus,
188+
boolean isResourceCached) {
189+
String status = metadataStatus.toString().toLowerCase(Locale.ROOT);
190+
return metadataStatus == ResourceMetadataStatus.NACKED && isResourceCached
191+
? status + "_but_cached" : status;
192+
}
193+
149194
@VisibleForTesting
150195
static final class MetricReporterCallback implements ResourceCallback,
151196
ServerConnectionCallback {

xds/src/main/java/io/grpc/xds/client/ControlPlaneClient.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -234,13 +234,7 @@ void readyHandler() {
234234
*/
235235
// Must be synchronized
236236
boolean hasWorkingAdsStream() {
237-
if (streamClosedNoResponse || shutdown) {
238-
return false;
239-
}
240-
if (adsStream == null) {
241-
return true;
242-
}
243-
return adsStream.responseReceived || !adsStream.closed;
237+
return !streamClosedNoResponse;
244238
}
245239

246240

xds/src/main/java/io/grpc/xds/client/XdsClient.java

Lines changed: 18 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -155,44 +155,46 @@ public static final class ResourceMetadata {
155155
private final String version;
156156
private final ResourceMetadataStatus status;
157157
private final long updateTimeNanos;
158+
private final boolean cached;
158159
@Nullable private final Any rawResource;
159160
@Nullable private final UpdateFailureState errorState;
160161

161162
private ResourceMetadata(
162-
ResourceMetadataStatus status, String version, long updateTimeNanos,
163+
ResourceMetadataStatus status, String version, long updateTimeNanos, boolean cached,
163164
@Nullable Any rawResource, @Nullable UpdateFailureState errorState) {
164165
this.status = checkNotNull(status, "status");
165166
this.version = checkNotNull(version, "version");
166167
this.updateTimeNanos = updateTimeNanos;
168+
this.cached = cached;
167169
this.rawResource = rawResource;
168170
this.errorState = errorState;
169171
}
170172

171-
static ResourceMetadata newResourceMetadataUnknown() {
172-
return new ResourceMetadata(ResourceMetadataStatus.UNKNOWN, "", 0, null, null);
173+
public static ResourceMetadata newResourceMetadataUnknown() {
174+
return new ResourceMetadata(ResourceMetadataStatus.UNKNOWN, "", 0, false,null, null);
173175
}
174176

175-
static ResourceMetadata newResourceMetadataRequested() {
176-
return new ResourceMetadata(ResourceMetadataStatus.REQUESTED, "", 0, null, null);
177+
public static ResourceMetadata newResourceMetadataRequested(boolean cached) {
178+
return new ResourceMetadata(ResourceMetadataStatus.REQUESTED, "", 0, cached, null, null);
177179
}
178180

179-
static ResourceMetadata newResourceMetadataDoesNotExist() {
180-
return new ResourceMetadata(ResourceMetadataStatus.DOES_NOT_EXIST, "", 0, null, null);
181+
public static ResourceMetadata newResourceMetadataDoesNotExist() {
182+
return new ResourceMetadata(ResourceMetadataStatus.DOES_NOT_EXIST, "", 0, false, null, null);
181183
}
182184

183185
public static ResourceMetadata newResourceMetadataAcked(
184186
Any rawResource, String version, long updateTimeNanos) {
185187
checkNotNull(rawResource, "rawResource");
186188
return new ResourceMetadata(
187-
ResourceMetadataStatus.ACKED, version, updateTimeNanos, rawResource, null);
189+
ResourceMetadataStatus.ACKED, version, updateTimeNanos, true, rawResource, null);
188190
}
189191

190-
static ResourceMetadata newResourceMetadataNacked(
192+
public static ResourceMetadata newResourceMetadataNacked(
191193
ResourceMetadata metadata, String failedVersion, long failedUpdateTime,
192-
String failedDetails) {
194+
String failedDetails, boolean cached) {
193195
checkNotNull(metadata, "metadata");
194196
return new ResourceMetadata(ResourceMetadataStatus.NACKED,
195-
metadata.getVersion(), metadata.getUpdateTimeNanos(), metadata.getRawResource(),
197+
metadata.getVersion(), metadata.getUpdateTimeNanos(), cached, metadata.getRawResource(),
196198
new UpdateFailureState(failedVersion, failedUpdateTime, failedDetails));
197199
}
198200

@@ -211,6 +213,11 @@ public long getUpdateTimeNanos() {
211213
return updateTimeNanos;
212214
}
213215

216+
/** Returns whether the resource was cached. */
217+
public boolean isCached() {
218+
return cached;
219+
}
220+
214221
/** The last successfully updated xDS resource as it was returned by the server. */
215222
@Nullable
216223
public Any getRawResource() {
@@ -390,20 +397,6 @@ public interface ServerConnectionCallback {
390397
void reportServerConnectionGauge(boolean isConnected, String xdsServer);
391398
}
392399

393-
/**
394-
* Reports the number of resources in each cache state.
395-
*
396-
* <p>Cache state is determined by two factors:
397-
* <ul>
398-
* <li>Whether the resource is cached.
399-
* <li>The {@link io.grpc.xds.client.XdsClient.ResourceMetadata.ResourceMetadataStatus} of the
400-
* resource.
401-
* </ul>
402-
*/
403-
public SettableFuture<Void> reportResourceCounts(ResourceCallback callback) {
404-
throw new UnsupportedOperationException();
405-
}
406-
407400
/**
408401
* Reports whether xDS client has a working ADS stream to xDS server.
409402
* The definition of a working stream is defined in gRFC A78.

xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java

Lines changed: 5 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -40,15 +40,13 @@
4040
import io.grpc.internal.TimeProvider;
4141
import io.grpc.xds.client.Bootstrapper.AuthorityInfo;
4242
import io.grpc.xds.client.Bootstrapper.ServerInfo;
43-
import io.grpc.xds.client.XdsClient.ResourceMetadata.ResourceMetadataStatus;
4443
import io.grpc.xds.client.XdsClient.ResourceStore;
4544
import io.grpc.xds.client.XdsLogger.XdsLogLevel;
4645
import java.net.URI;
4746
import java.util.Collection;
4847
import java.util.Collections;
4948
import java.util.HashMap;
5049
import java.util.List;
51-
import java.util.Locale;
5250
import java.util.Map;
5351
import java.util.Objects;
5452
import java.util.Set;
@@ -542,32 +540,6 @@ public SettableFuture<Void> reportServerConnections(ServerConnectionCallback cal
542540
return future;
543541
}
544542

545-
@Override
546-
public SettableFuture<Void> reportResourceCounts(ResourceCallback callback) {
547-
SettableFuture<Void> future = SettableFuture.create();
548-
syncContext.execute(() -> {
549-
for (XdsResourceType<? extends ResourceUpdate> resourceType : resourceSubscribers.keySet()) {
550-
Map<String, Long> resourceCountsByState = new HashMap<>();
551-
for (ResourceSubscriber<? extends ResourceUpdate> subscriber :
552-
resourceSubscribers.get(resourceType).values()) {
553-
String cacheState = cacheStateFromResourceStatus(subscriber.metadata,
554-
subscriber.data != null);
555-
resourceCountsByState.compute(cacheState, (k, v) -> (v == null) ? 1 : v + 1);
556-
}
557-
resourceCountsByState.forEach((cacheState, count) ->
558-
callback.reportResourceCountGauge(count, cacheState, resourceType.typeUrl()));
559-
}
560-
future.set(null);
561-
});
562-
return future;
563-
}
564-
565-
private String cacheStateFromResourceStatus(ResourceMetadata metadata, boolean isResourceCached) {
566-
String status = metadata.getStatus().toString().toLowerCase(Locale.ROOT);
567-
return metadata.getStatus() == ResourceMetadataStatus.NACKED && isResourceCached
568-
? status + "_but_cached" : status;
569-
}
570-
571543
/** Tracks a single subscribed resource. */
572544
private final class ResourceSubscriber<T extends ResourceUpdate> {
573545
@Nullable private final ServerInfo serverInfo;
@@ -663,7 +635,7 @@ public String toString() {
663635
}
664636

665637
// Initial fetch scheduled or rescheduled, transition metadata state to REQUESTED.
666-
metadata = ResourceMetadata.newResourceMetadataRequested();
638+
metadata = ResourceMetadata.newResourceMetadataRequested(this.data != null);
667639

668640
respTimer = syncContext.schedule(
669641
new ResourceNotFound(), INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS,
@@ -706,10 +678,10 @@ void onData(ParsedResource<T> parsedResource, String version, long updateTime,
706678
respTimer.cancel();
707679
respTimer = null;
708680
}
709-
this.metadata = ResourceMetadata
710-
.newResourceMetadataAcked(parsedResource.getRawResource(), version, updateTime);
711681
ResourceUpdate oldData = this.data;
712682
this.data = parsedResource.getResourceUpdate();
683+
this.metadata = ResourceMetadata
684+
.newResourceMetadataAcked(parsedResource.getRawResource(), version, updateTime);
713685
absent = false;
714686
if (resourceDeletionIgnored) {
715687
logger.log(XdsLogLevel.FORCE_INFO, "xds server {0}: server returned new version "
@@ -803,7 +775,8 @@ void onError(Status error, @Nullable ProcessingTracker tracker) {
803775

804776
void onRejected(String rejectedVersion, long rejectedTime, String rejectedDetails) {
805777
metadata = ResourceMetadata
806-
.newResourceMetadataNacked(metadata, rejectedVersion, rejectedTime, rejectedDetails);
778+
.newResourceMetadataNacked(metadata, rejectedVersion, rejectedTime, rejectedDetails,
779+
data != null);
807780
}
808781

809782
private void notifyWatcher(ResourceWatcher<T> watcher, T update) {

0 commit comments

Comments
 (0)