Skip to content

Commit d085aca

Browse files
committed
Rebase master
1 parent 1993e68 commit d085aca

19 files changed

+1784
-108
lines changed

api/src/main/java/io/grpc/NameResolver.java

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -288,6 +288,7 @@ public static final class Args {
288288
@Nullable private final ChannelLogger channelLogger;
289289
@Nullable private final Executor executor;
290290
@Nullable private final String overrideAuthority;
291+
@Nullable private final MetricRecorder metricRecorder;
291292

292293
private Args(
293294
Integer defaultPort,
@@ -297,7 +298,8 @@ private Args(
297298
@Nullable ScheduledExecutorService scheduledExecutorService,
298299
@Nullable ChannelLogger channelLogger,
299300
@Nullable Executor executor,
300-
@Nullable String overrideAuthority) {
301+
@Nullable String overrideAuthority,
302+
@Nullable MetricRecorder metricRecorder) {
301303
this.defaultPort = checkNotNull(defaultPort, "defaultPort not set");
302304
this.proxyDetector = checkNotNull(proxyDetector, "proxyDetector not set");
303305
this.syncContext = checkNotNull(syncContext, "syncContext not set");
@@ -306,6 +308,7 @@ private Args(
306308
this.channelLogger = channelLogger;
307309
this.executor = executor;
308310
this.overrideAuthority = overrideAuthority;
311+
this.metricRecorder = metricRecorder;
309312
}
310313

311314
/**
@@ -403,6 +406,17 @@ public String getOverrideAuthority() {
403406
return overrideAuthority;
404407
}
405408

409+
/**
410+
* Returns the {@link MetricRecorder} that the channel uses to record metrics.
411+
*
412+
* @since 1.67.0
413+
*/
414+
@Nullable
415+
@ExperimentalApi("Insert GitHub issue")
416+
public MetricRecorder getMetricRecorder() {
417+
return metricRecorder;
418+
}
419+
406420

407421
@Override
408422
public String toString() {
@@ -415,6 +429,7 @@ public String toString() {
415429
.add("channelLogger", channelLogger)
416430
.add("executor", executor)
417431
.add("overrideAuthority", overrideAuthority)
432+
.add("metricRecorder", metricRecorder)
418433
.toString();
419434
}
420435

@@ -433,6 +448,7 @@ public Builder toBuilder() {
433448
builder.setChannelLogger(channelLogger);
434449
builder.setOffloadExecutor(executor);
435450
builder.setOverrideAuthority(overrideAuthority);
451+
builder.setMetricRecorder(metricRecorder);
436452
return builder;
437453
}
438454

@@ -459,6 +475,7 @@ public static final class Builder {
459475
private ChannelLogger channelLogger;
460476
private Executor executor;
461477
private String overrideAuthority;
478+
private MetricRecorder metricRecorder;
462479

463480
Builder() {
464481
}
@@ -545,6 +562,17 @@ public Builder setOverrideAuthority(String authority) {
545562
return this;
546563
}
547564

565+
/**
566+
* See {@link Args#getMetricRecorder()}. This is an optional field.
567+
*
568+
* @since 1.67.0
569+
*/
570+
@ExperimentalApi("Insert github issue")
571+
public Builder setMetricRecorder(MetricRecorder metricRecorder) {
572+
this.metricRecorder = metricRecorder;
573+
return this;
574+
}
575+
548576
/**
549577
* Builds an {@link Args}.
550578
*
@@ -554,7 +582,8 @@ public Args build() {
554582
return
555583
new Args(
556584
defaultPort, proxyDetector, syncContext, serviceConfigParser,
557-
scheduledExecutorService, channelLogger, executor, overrideAuthority);
585+
scheduledExecutorService, channelLogger, executor, overrideAuthority,
586+
metricRecorder);
558587
}
559588
}
560589
}

core/src/main/java/io/grpc/internal/ManagedChannelImpl.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -589,6 +589,8 @@ ClientStream newSubstream(
589589
builder.maxHedgedAttempts,
590590
loadBalancerFactory);
591591
this.authorityOverride = builder.authorityOverride;
592+
this.metricRecorder = new MetricRecorderImpl(builder.metricSinks,
593+
MetricInstrumentRegistry.getDefaultRegistry());
592594
this.nameResolverArgs =
593595
NameResolver.Args.newBuilder()
594596
.setDefaultPort(builder.getDefaultPort())
@@ -599,6 +601,7 @@ ClientStream newSubstream(
599601
.setChannelLogger(channelLogger)
600602
.setOffloadExecutor(this.offloadExecutorHolder)
601603
.setOverrideAuthority(this.authorityOverride)
604+
.setMetricRecorder(this.metricRecorder)
602605
.build();
603606
this.nameResolver = getNameResolver(
604607
targetUri, authorityOverride, nameResolverProvider, nameResolverArgs);
@@ -671,8 +674,6 @@ public CallTracer create() {
671674
}
672675
serviceConfigUpdated = true;
673676
}
674-
this.metricRecorder = new MetricRecorderImpl(builder.metricSinks,
675-
MetricInstrumentRegistry.getDefaultRegistry());
676677
}
677678

678679
@VisibleForTesting

xds/src/main/java/io/grpc/xds/InternalSharedXdsClientPoolProvider.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package io.grpc.xds;
1818

1919
import io.grpc.Internal;
20+
import io.grpc.MetricRecorder;
2021
import io.grpc.internal.ObjectPool;
2122
import io.grpc.xds.client.XdsClient;
2223
import io.grpc.xds.client.XdsInitializationException;
@@ -36,6 +37,11 @@ public static void setDefaultProviderBootstrapOverride(Map<String, ?> bootstrap)
3637

3738
public static ObjectPool<XdsClient> getOrCreate(String target)
3839
throws XdsInitializationException {
39-
return SharedXdsClientPoolProvider.getDefaultProvider().getOrCreate(target);
40+
return getOrCreate(target, new MetricRecorder() {});
41+
}
42+
43+
public static ObjectPool<XdsClient> getOrCreate(String target, MetricRecorder metricRecorder)
44+
throws XdsInitializationException {
45+
return SharedXdsClientPoolProvider.getDefaultProvider().getOrCreate(target, metricRecorder);
4046
}
4147
}

xds/src/main/java/io/grpc/xds/SharedXdsClientPoolProvider.java

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import com.google.common.annotations.VisibleForTesting;
2323
import com.google.common.collect.ImmutableList;
24+
import io.grpc.MetricRecorder;
2425
import io.grpc.internal.ExponentialBackoffPolicy;
2526
import io.grpc.internal.GrpcUtil;
2627
import io.grpc.internal.ObjectPool;
@@ -56,6 +57,7 @@ final class SharedXdsClientPoolProvider implements XdsClientPoolFactory {
5657
private final Object lock = new Object();
5758
private final AtomicReference<Map<String, ?>> bootstrapOverride = new AtomicReference<>();
5859
private final Map<String, ObjectPool<XdsClient>> targetToXdsClientMap = new ConcurrentHashMap<>();
60+
private final Map<String, MetricRecorder> targetToMetricRecorderMap = new ConcurrentHashMap<>();
5961

6062
SharedXdsClientPoolProvider() {
6163
this(new GrpcBootstrapperImpl());
@@ -82,7 +84,14 @@ public ObjectPool<XdsClient> get(String target) {
8284
}
8385

8486
@Override
87+
@Nullable
8588
public ObjectPool<XdsClient> getOrCreate(String target) throws XdsInitializationException {
89+
return this.getOrCreate(target, new MetricRecorder() {});
90+
}
91+
92+
@Override
93+
public ObjectPool<XdsClient> getOrCreate(String target, MetricRecorder metricRecorder)
94+
throws XdsInitializationException {
8695
ObjectPool<XdsClient> ref = targetToXdsClientMap.get(target);
8796
if (ref == null) {
8897
synchronized (lock) {
@@ -98,8 +107,12 @@ public ObjectPool<XdsClient> getOrCreate(String target) throws XdsInitialization
98107
if (bootstrapInfo.servers().isEmpty()) {
99108
throw new XdsInitializationException("No xDS server provided");
100109
}
101-
ref = new RefCountedXdsClientObjectPool(bootstrapInfo, target);
110+
MetricRecorder metricRecorderForTarget = targetToMetricRecorderMap.get(target);
111+
metricRecorder =
112+
metricRecorderForTarget != null ? metricRecorderForTarget : metricRecorder;
113+
ref = new RefCountedXdsClientObjectPool(bootstrapInfo, target, metricRecorder);
102114
targetToXdsClientMap.put(target, ref);
115+
targetToMetricRecorderMap.putIfAbsent(target, metricRecorder);
103116
}
104117
}
105118
}
@@ -124,6 +137,7 @@ static class RefCountedXdsClientObjectPool implements ObjectPool<XdsClient> {
124137
new ExponentialBackoffPolicy.Provider();
125138
private final BootstrapInfo bootstrapInfo;
126139
private final String target; // The target associated with the xDS client.
140+
private final MetricRecorder metricRecorder;
127141
private final Object lock = new Object();
128142
@GuardedBy("lock")
129143
private ScheduledExecutorService scheduler;
@@ -133,9 +147,11 @@ static class RefCountedXdsClientObjectPool implements ObjectPool<XdsClient> {
133147
private int refCount;
134148

135149
@VisibleForTesting
136-
RefCountedXdsClientObjectPool(BootstrapInfo bootstrapInfo, String target) {
150+
RefCountedXdsClientObjectPool(BootstrapInfo bootstrapInfo, String target,
151+
MetricRecorder metricRecorder) {
137152
this.bootstrapInfo = checkNotNull(bootstrapInfo);
138153
this.target = target;
154+
this.metricRecorder = metricRecorder;
139155
}
140156

141157
@Override
@@ -154,7 +170,9 @@ public XdsClient getObject() {
154170
GrpcUtil.STOPWATCH_SUPPLIER,
155171
TimeProvider.SYSTEM_TIME_PROVIDER,
156172
MessagePrinter.INSTANCE,
157-
new TlsContextManagerImpl(bootstrapInfo));
173+
new TlsContextManagerImpl(bootstrapInfo),
174+
getTarget(),
175+
new XdsClientMetricReporterImpl(metricRecorder));
158176
}
159177
refCount++;
160178
return xdsClient;
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
/*
2+
* Copyright 2024 The gRPC Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.grpc.xds;
18+
19+
import io.grpc.Internal;
20+
import io.grpc.xds.client.XdsClient;
21+
22+
/**
23+
* Interface for reporting metrics from the xDS client.
24+
* We need this indirection, to de couple Xds from OpenTelemetry
25+
*/
26+
@Internal
27+
public interface XdsClientMetricReporter {
28+
29+
/**
30+
* Reports resource update counts.
31+
*
32+
* @param validResourceCount the number of resources that were successfully updated.
33+
* @param invalidResourceCount the number of resources that failed to update.
34+
* @param target the xDS management server name for the load balancing policy this update is for.
35+
* @param xdsServer the xDS management server address for this update.
36+
* @param resourceType the type of resource (e.g., "LDS", "RDS", "CDS", "EDS").
37+
*/
38+
default void reportResourceUpdates(long validResourceCount, long invalidResourceCount,
39+
String target, String xdsServer, String resourceType) {
40+
}
41+
42+
/**
43+
* Reports xDS server failure counts.
44+
*
45+
* @param serverFailure the number of times the xDS server has failed.
46+
* @param target the xDS management server name for the load balancing policy this failure is for.
47+
* @param xdsServer the xDS management server address for this failure.
48+
*/
49+
default void reportServerFailure(long serverFailure, String target, String xdsServer) {
50+
}
51+
52+
/**
53+
* Sets the {@link XdsClient} instance to the reporter.
54+
*
55+
* @param xdsClient the {@link XdsClient} instance.
56+
*/
57+
default void setXdsClient(XdsClient xdsClient) {
58+
}
59+
60+
/**
61+
* Closes the metric reporter.
62+
*/
63+
default void close() {
64+
}
65+
66+
/**
67+
* Interface for reporting metrics from the xDS client callbacks.
68+
*
69+
*/
70+
interface CallbackMetricReporter {
71+
72+
/**
73+
* Reports resource counts in the cache.
74+
*
75+
* @param resourceCount the number of resources in the cache.
76+
* @param cacheState the state of the cache (e.g., "SYNCED", "DOES_NOT_EXIST").
77+
* @param resourceType the type of resource (e.g., "LDS", "RDS", "CDS", "EDS").
78+
* @param target the xDS management server name for the load balancing policy this count is
79+
* for.
80+
*/
81+
// TODO(@dnvindhya): include the "authority" label once authority is available.
82+
default void reportResourceCounts(long resourceCount, String cacheState, String resourceType,
83+
String target) {
84+
}
85+
86+
/**
87+
* Reports server connection status.
88+
*
89+
* @param isConnected {@code true} if the client is connected to the xDS server, {@code false}
90+
* otherwise.
91+
* @param target the xDS management server name for the load balancing policy this connection
92+
* is for.
93+
* @param xdsServer the xDS management server address for this connection.
94+
* @since 0.1.0
95+
*/
96+
default void reportServerConnections(int isConnected, String target, String xdsServer) {
97+
}
98+
}
99+
}

0 commit comments

Comments
 (0)