Skip to content

Commit 4f01651

Browse files
committed
Instead of storing MetricRecorder, pass it as is for XdsClient creation
1 parent f60c985 commit 4f01651

15 files changed

+51
-125
lines changed

api/src/main/java/io/grpc/NameResolver.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -410,7 +410,6 @@ public String getOverrideAuthority() {
410410
* Returns the {@link MetricRecorder} that the channel uses to record metrics.
411411
*/
412412
@Nullable
413-
@ExperimentalApi("Insert GitHub issue")
414413
public MetricRecorder getMetricRecorder() {
415414
return metricRecorder;
416415
}
@@ -563,7 +562,6 @@ public Builder setOverrideAuthority(String authority) {
563562
/**
564563
* See {@link Args#getMetricRecorder()}. This is an optional field.
565564
*/
566-
@ExperimentalApi("Insert github issue")
567565
public Builder setMetricRecorder(MetricRecorder metricRecorder) {
568566
this.metricRecorder = metricRecorder;
569567
return this;

xds/src/main/java/io/grpc/xds/SharedXdsClientPoolProvider.java

Lines changed: 9 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,6 @@ final class SharedXdsClientPoolProvider implements XdsClientPoolFactory {
5757
private final Object lock = new Object();
5858
private final AtomicReference<Map<String, ?>> bootstrapOverride = new AtomicReference<>();
5959
private final Map<String, ObjectPool<XdsClient>> targetToXdsClientMap = new ConcurrentHashMap<>();
60-
private final Map<String, MetricRecorder> targetToMetricRecorderMap = new ConcurrentHashMap<>();
6160

6261
SharedXdsClientPoolProvider() {
6362
this(new GrpcBootstrapperImpl());
@@ -83,12 +82,6 @@ public ObjectPool<XdsClient> get(String target) {
8382
return targetToXdsClientMap.get(target);
8483
}
8584

86-
@Override
87-
@Nullable
88-
public ObjectPool<XdsClient> getOrCreate(String target) throws XdsInitializationException {
89-
return this.getOrCreate(target, new MetricRecorder() {});
90-
}
91-
9285
@Override
9386
public ObjectPool<XdsClient> getOrCreate(String target, MetricRecorder metricRecorder)
9487
throws XdsInitializationException {
@@ -107,12 +100,8 @@ public ObjectPool<XdsClient> getOrCreate(String target, MetricRecorder metricRec
107100
if (bootstrapInfo.servers().isEmpty()) {
108101
throw new XdsInitializationException("No xDS server provided");
109102
}
110-
MetricRecorder metricRecorderForTarget = targetToMetricRecorderMap.get(target);
111-
metricRecorder =
112-
metricRecorderForTarget != null ? metricRecorderForTarget : metricRecorder;
113103
ref = new RefCountedXdsClientObjectPool(bootstrapInfo, target, metricRecorder);
114104
targetToXdsClientMap.put(target, ref);
115-
targetToMetricRecorderMap.putIfAbsent(target, metricRecorder);
116105
}
117106
}
118107
}
@@ -124,25 +113,19 @@ public ImmutableList<String> getTargets() {
124113
return ImmutableList.copyOf(targetToXdsClientMap.keySet());
125114
}
126115

127-
@VisibleForTesting
128-
MetricRecorder getMetricRecorder(String target) {
129-
return targetToMetricRecorderMap.get(target);
130-
}
131-
132-
133116
private static class SharedXdsClientPoolProviderHolder {
134117
private static final SharedXdsClientPoolProvider instance = new SharedXdsClientPoolProvider();
135118
}
136119

137120
@ThreadSafe
138121
@VisibleForTesting
139-
static class RefCountedXdsClientObjectPool implements ObjectPool<XdsClient> {
122+
protected class RefCountedXdsClientObjectPool implements ObjectPool<XdsClient> {
140123

141-
private static final ExponentialBackoffPolicy.Provider BACKOFF_POLICY_PROVIDER =
124+
private final ExponentialBackoffPolicy.Provider backoffPolicyProvider =
142125
new ExponentialBackoffPolicy.Provider();
143126
private final BootstrapInfo bootstrapInfo;
144127
private final String target; // The target associated with the xDS client.
145-
private final MetricRecorder metricRecorder;
128+
private final XdsClientMetricReporterImpl xdsClientMetricReporter;
146129
private final Object lock = new Object();
147130
@GuardedBy("lock")
148131
private ScheduledExecutorService scheduler;
@@ -156,7 +139,7 @@ static class RefCountedXdsClientObjectPool implements ObjectPool<XdsClient> {
156139
MetricRecorder metricRecorder) {
157140
this.bootstrapInfo = checkNotNull(bootstrapInfo);
158141
this.target = target;
159-
this.metricRecorder = metricRecorder;
142+
this.xdsClientMetricReporter = new XdsClientMetricReporterImpl(metricRecorder);
160143
}
161144

162145
@Override
@@ -171,13 +154,14 @@ public XdsClient getObject() {
171154
DEFAULT_XDS_TRANSPORT_FACTORY,
172155
bootstrapInfo,
173156
scheduler,
174-
BACKOFF_POLICY_PROVIDER,
157+
backoffPolicyProvider,
175158
GrpcUtil.STOPWATCH_SUPPLIER,
176159
TimeProvider.SYSTEM_TIME_PROVIDER,
177160
MessagePrinter.INSTANCE,
178161
new TlsContextManagerImpl(bootstrapInfo),
179162
getTarget(),
180-
new XdsClientMetricReporterImpl(metricRecorder));
163+
xdsClientMetricReporter);
164+
xdsClientMetricReporter.setXdsClient(xdsClient);
181165
}
182166
refCount++;
183167
return xdsClient;
@@ -191,6 +175,8 @@ public XdsClient returnObject(Object object) {
191175
if (refCount == 0) {
192176
xdsClient.shutdown();
193177
xdsClient = null;
178+
xdsClientMetricReporter.close();
179+
targetToXdsClientMap.remove(target);
194180
scheduler = SharedResourceHolder.release(GrpcUtil.TIMER_SERVICE, scheduler);
195181
}
196182
return null;

xds/src/main/java/io/grpc/xds/XdsClientMetricReporterImpl.java

Lines changed: 13 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import io.grpc.MetricRecorder.BatchRecorder;
2828
import io.grpc.MetricRecorder.Registration;
2929
import io.grpc.xds.client.XdsClient;
30+
import io.grpc.xds.client.XdsClientMetricReporter;
3031
import java.util.Arrays;
3132
import java.util.Collections;
3233
import java.util.concurrent.TimeUnit;
@@ -53,8 +54,6 @@ public class XdsClientMetricReporterImpl implements XdsClientMetricReporter {
5354
private Registration gaugeRegistration = null;
5455
@Nullable
5556
private XdsClient xdsClient = null;
56-
@Nullable
57-
private CallbackMetricReporter callbackMetricReporter = null;
5857

5958
static {
6059
MetricInstrumentRegistry metricInstrumentRegistry
@@ -108,8 +107,7 @@ public void reportServerFailure(long serverFailure, String target, String xdsSer
108107
Arrays.asList(target, xdsServer), Collections.emptyList());
109108
}
110109

111-
@Override
112-
public void setXdsClient(XdsClient client) {
110+
void setXdsClient(XdsClient client) {
113111
this.xdsClient = client;
114112
// register gauge here
115113
this.gaugeRegistration = metricRecorder.registerBatchCallback(new BatchCallback() {
@@ -120,21 +118,22 @@ public void accept(BatchRecorder recorder) {
120118
}, CONNECTED_GAUGE, RESOURCES_GAUGE);
121119
}
122120

123-
@Override
124-
public void close() {
121+
void close() {
125122
if (gaugeRegistration != null) {
126123
gaugeRegistration.close();
127124
}
128125
}
129126

130127
void reportCallbackMetrics(BatchRecorder recorder) {
131-
if (callbackMetricReporter == null) {
132-
// Instantiate only if not injected
133-
callbackMetricReporter = new CallbackMetricReporterImpl(recorder);
134-
}
128+
CallbackMetricReporter callbackMetricReporter = createCallbackMetricReporter(recorder);
135129
try {
136-
reportResourceCounts(callbackMetricReporter);
137-
reportServerConnections(callbackMetricReporter);
130+
SettableFuture<Void> ret = this.xdsClient.reportResourceCounts(
131+
callbackMetricReporter);
132+
// Normally this shouldn't take long, but adding a timeout to avoid indefinite blocking
133+
Void unused = ret.get(5, TimeUnit.SECONDS);
134+
SettableFuture<Void> ret1 = this.xdsClient.reportServerConnections(callbackMetricReporter);
135+
// Normally this shouldn't take long, but adding a timeout to avoid indefinite blocking
136+
Void unused1 = ret1.get(5, TimeUnit.SECONDS);
138137
} catch (Exception e) {
139138
if (e instanceof InterruptedException) {
140139
Thread.currentThread().interrupt(); // re-set the current thread's interruption state
@@ -143,25 +142,12 @@ void reportCallbackMetrics(BatchRecorder recorder) {
143142
}
144143
}
145144

146-
void reportResourceCounts(CallbackMetricReporter callbackMetricReporter) throws Exception {
147-
SettableFuture<Void> ret = this.xdsClient.reportResourceCounts(
148-
callbackMetricReporter);
149-
// Normally this shouldn't take long, but adding a timeout to avoid indefinite blocking
150-
Void unused = ret.get(5, TimeUnit.SECONDS);
151-
}
152-
153-
void reportServerConnections(CallbackMetricReporter callbackMetricReporter) throws Exception {
154-
SettableFuture<Void> ret = this.xdsClient.reportServerConnections(callbackMetricReporter);
155-
// Normally this shouldn't take long, but adding a timeout to avoid indefinite blocking
156-
Void unused = ret.get(5, TimeUnit.SECONDS);
157-
}
158-
159145
/**
160146
* Allows injecting a custom {@link CallbackMetricReporter} for testing purposes.
161147
*/
162148
@VisibleForTesting
163-
void injectCallbackMetricReporter(CallbackMetricReporter reporter) {
164-
this.callbackMetricReporter = reporter;
149+
CallbackMetricReporter createCallbackMetricReporter(BatchRecorder recorder) {
150+
return new CallbackMetricReporterImpl(recorder);
165151
}
166152

167153
@VisibleForTesting

xds/src/main/java/io/grpc/xds/XdsClientPoolFactory.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,6 @@ interface XdsClientPoolFactory {
3030
@Nullable
3131
ObjectPool<XdsClient> get(String target);
3232

33-
ObjectPool<XdsClient> getOrCreate(String target) throws XdsInitializationException;
34-
3533
ObjectPool<XdsClient> getOrCreate(String target, MetricRecorder metricRecorder)
3634
throws XdsInitializationException;
3735

xds/src/main/java/io/grpc/xds/XdsServerWrapper.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import io.grpc.InternalServerInterceptors;
3030
import io.grpc.Metadata;
3131
import io.grpc.MethodDescriptor;
32+
import io.grpc.MetricRecorder;
3233
import io.grpc.Server;
3334
import io.grpc.ServerBuilder;
3435
import io.grpc.ServerCall;
@@ -171,7 +172,7 @@ public void run() {
171172

172173
private void internalStart() {
173174
try {
174-
xdsClientPool = xdsClientPoolFactory.getOrCreate("");
175+
xdsClientPool = xdsClientPoolFactory.getOrCreate("", new MetricRecorder() {});
175176
} catch (Exception e) {
176177
StatusException statusException = Status.UNAVAILABLE.withDescription(
177178
"Failed to initialize xDS").withCause(e).asException();

xds/src/main/java/io/grpc/xds/client/XdsClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,8 @@
2828
import com.google.protobuf.Any;
2929
import io.grpc.ExperimentalApi;
3030
import io.grpc.Status;
31-
import io.grpc.xds.XdsClientMetricReporter.CallbackMetricReporter;
3231
import io.grpc.xds.client.Bootstrapper.ServerInfo;
32+
import io.grpc.xds.client.XdsClientMetricReporter.CallbackMetricReporter;
3333
import java.net.URI;
3434
import java.net.URISyntaxException;
3535
import java.util.ArrayList;

xds/src/main/java/io/grpc/xds/client/XdsClientImpl.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,12 +38,12 @@
3838
import io.grpc.SynchronizationContext.ScheduledHandle;
3939
import io.grpc.internal.BackoffPolicy;
4040
import io.grpc.internal.TimeProvider;
41-
import io.grpc.xds.XdsClientMetricReporter;
42-
import io.grpc.xds.XdsClientMetricReporter.CallbackMetricReporter;
4341
import io.grpc.xds.client.Bootstrapper.AuthorityInfo;
4442
import io.grpc.xds.client.Bootstrapper.ServerInfo;
4543
import io.grpc.xds.client.XdsClient.ResourceMetadata.ResourceMetadataStatus;
4644
import io.grpc.xds.client.XdsClient.ResourceStore;
45+
import io.grpc.xds.client.XdsClientMetricReporter;
46+
import io.grpc.xds.client.XdsClientMetricReporter.CallbackMetricReporter;
4747
import io.grpc.xds.client.XdsLogger.XdsLogLevel;
4848
import java.net.URI;
4949
import java.util.Collection;
@@ -127,7 +127,6 @@ public XdsClientImpl(
127127
this.securityConfig = securityConfig;
128128
this.target = target;
129129
this.metricReporter = metricReporter;
130-
metricReporter.setXdsClient(this);
131130
logId = InternalLogId.allocate("xds-client", null);
132131
logger = XdsLogger.withLogId(logId);
133132
logger.log(XdsLogLevel.INFO, "Created");
@@ -191,7 +190,6 @@ public void run() {
191190
for (final LoadReportClient lrsClient : serverLrsClientMap.values()) {
192191
lrsClient.stopLoadReporting();
193192
}
194-
metricReporter.close();
195193
cleanUpResourceTimers();
196194
}
197195
});

xds/src/main/java/io/grpc/xds/XdsClientMetricReporter.java renamed to xds/src/main/java/io/grpc/xds/client/XdsClientMetricReporter.java

Lines changed: 3 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,9 @@
1414
* limitations under the License.
1515
*/
1616

17-
package io.grpc.xds;
17+
package io.grpc.xds.client;
1818

1919
import io.grpc.Internal;
20-
import io.grpc.xds.client.XdsClient;
2120

2221
/**
2322
* Interface for reporting metrics from the xDS client.
@@ -48,45 +47,19 @@ default void reportResourceUpdates(long validResourceCount, long invalidResource
4847
default void reportServerFailure(long serverFailure, String target, String xdsServer) {
4948
}
5049

51-
/**
52-
* Sets the {@link XdsClient} instance.
53-
*/
54-
default void setXdsClient(XdsClient xdsClient) {
55-
}
56-
57-
/**
58-
* Closes the metric reporter.
59-
*/
60-
default void close() {
61-
}
62-
6350
/**
6451
* Interface for reporting metrics through callback.
6552
*
6653
*/
6754
interface CallbackMetricReporter {
6855

69-
/**
70-
* Reports number of resources in each cache state.
71-
*
72-
* @param resourceCount Number of resources.
73-
* @param cacheState Status of the resource metadata
74-
* {@link io.grpc.xds.client.XdsClient.ResourceMetadata.ResourceMetadataStatus}.
75-
* @param resourceType Type of XDS resource (e.g., "envoy.config.listener.v3.Listener").
76-
* @param target Target of the gRPC channel.
77-
*/
56+
7857
// TODO(@dnvindhya): include the "authority" label once xds.authority is available.
7958
default void reportResourceCounts(long resourceCount, String cacheState, String resourceType,
8059
String target) {
8160
}
8261

83-
/**
84-
* Reports whether xDS client has a working ADS stream to the xDS server.
85-
*
86-
* @param isConnected 1 if the client is connected to the xDS server, 0 otherwise.
87-
* @param target Target of the gRPC channel.
88-
* @param xdsServer Target URI of the xDS server with which the XdsClient is communicating.
89-
*/
62+
9063
default void reportServerConnections(int isConnected, String target, String xdsServer) {
9164
}
9265
}

xds/src/test/java/io/grpc/xds/CsdsServiceTest.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -554,10 +554,6 @@ public void setBootstrapOverride(Map<String, ?> bootstrap) {
554554
throw new UnsupportedOperationException("Should not be called");
555555
}
556556

557-
@Override
558-
public ObjectPool<XdsClient> getOrCreate(String target) {
559-
throw new UnsupportedOperationException("Should not be called");
560-
}
561557

562558
@Override
563559
public ObjectPool<XdsClient> getOrCreate(String target, MetricRecorder metricRecorder) {

xds/src/test/java/io/grpc/xds/GrpcXdsClientImplTestBase.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@
9696
import io.grpc.xds.client.XdsClient.ResourceUpdate;
9797
import io.grpc.xds.client.XdsClient.ResourceWatcher;
9898
import io.grpc.xds.client.XdsClientImpl;
99+
import io.grpc.xds.client.XdsClientMetricReporter;
99100
import io.grpc.xds.client.XdsResourceType;
100101
import io.grpc.xds.client.XdsResourceType.ResourceInvalidException;
101102
import io.grpc.xds.client.XdsTransportFactory;

0 commit comments

Comments
 (0)