Skip to content

Commit c713561

Browse files
committed
logs the metrics inside sync context
1 parent 623c0f9 commit c713561

File tree

3 files changed

+87
-45
lines changed

3 files changed

+87
-45
lines changed

core/src/main/java/io/grpc/internal/DelayedClientTransport.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -201,8 +201,8 @@ public ListenableFuture<SocketStats> getStats() {
201201
}
202202

203203
/**
204-
* Prevents creating any new streams. Buffered streams are not failed and may still proceed
205-
* when {@link #reprocess} is called. The delayed transport will be terminated when there is no
204+
* Prevents creating any new streams. Buffered streams are not failed and may still proceed
205+
* when {@link #reprocess} is called. The delayed transport will be terminated when there is no
206206
* more buffered streams.
207207
*/
208208
@Override

core/src/main/java/io/grpc/internal/InternalSubchannel.java

Lines changed: 26 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -588,15 +588,6 @@ public Attributes filterTransport(Attributes attributes) {
588588
@Override
589589
public void transportReady() {
590590
channelLogger.log(ChannelLogLevel.INFO, "READY");
591-
subchannelMetrics.recordConnectionAttemptSucceeded(buildLabelSet(
592-
getAttributeOrDefault(
593-
addressIndex.getCurrentEagAttributes(), NameResolver.ATTR_BACKEND_SERVICE),
594-
getAttributeOrDefault(
595-
addressIndex.getCurrentEagAttributes(), LoadBalancer.ATTR_LOCALITY_NAME),
596-
null,
597-
extractSecurityLevel(
598-
addressIndex.getCurrentEagAttributes().get(GrpcAttributes.ATTR_SECURITY_LEVEL))
599-
));
600591
syncContext.execute(new Runnable() {
601592
@Override
602593
public void run() {
@@ -611,6 +602,15 @@ public void run() {
611602
pendingTransport = null;
612603
connectedAddressAttributes = addressIndex.getCurrentEagAttributes();
613604
gotoNonErrorState(READY);
605+
subchannelMetrics.recordConnectionAttemptSucceeded(buildLabelSet(
606+
getAttributeOrDefault(
607+
addressIndex.getCurrentEagAttributes(), NameResolver.ATTR_BACKEND_SERVICE),
608+
getAttributeOrDefault(
609+
addressIndex.getCurrentEagAttributes(), LoadBalancer.ATTR_LOCALITY_NAME),
610+
null,
611+
extractSecurityLevel(
612+
addressIndex.getCurrentEagAttributes().get(GrpcAttributes.ATTR_SECURITY_LEVEL))
613+
));
614614
}
615615
}
616616
});
@@ -626,22 +626,6 @@ public void transportShutdown(final Status s) {
626626
channelLogger.log(
627627
ChannelLogLevel.INFO, "{0} SHUTDOWN with {1}", transport.getLogId(), printShortStatus(s));
628628
shutdownInitiated = true;
629-
subchannelMetrics.recordConnectionAttemptFailed(buildLabelSet(
630-
getAttributeOrDefault(
631-
addressIndex.getCurrentEagAttributes(), NameResolver.ATTR_BACKEND_SERVICE),
632-
getAttributeOrDefault(
633-
addressIndex.getCurrentEagAttributes(), LoadBalancer.ATTR_LOCALITY_NAME),
634-
null, null
635-
));
636-
subchannelMetrics.recordDisconnection(buildLabelSet(
637-
getAttributeOrDefault(
638-
addressIndex.getCurrentEagAttributes(), NameResolver.ATTR_BACKEND_SERVICE),
639-
getAttributeOrDefault(
640-
addressIndex.getCurrentEagAttributes(), LoadBalancer.ATTR_LOCALITY_NAME),
641-
"Peer Pressure",
642-
extractSecurityLevel(
643-
addressIndex.getCurrentEagAttributes().get(GrpcAttributes.ATTR_SECURITY_LEVEL))
644-
));
645629
syncContext.execute(new Runnable() {
646630
@Override
647631
public void run() {
@@ -652,11 +636,27 @@ public void run() {
652636
activeTransport = null;
653637
addressIndex.reset();
654638
gotoNonErrorState(IDLE);
639+
subchannelMetrics.recordDisconnection(buildLabelSet(
640+
getAttributeOrDefault(
641+
addressIndex.getCurrentEagAttributes(), NameResolver.ATTR_BACKEND_SERVICE),
642+
getAttributeOrDefault(
643+
addressIndex.getCurrentEagAttributes(), LoadBalancer.ATTR_LOCALITY_NAME),
644+
"Peer Pressure",
645+
extractSecurityLevel(
646+
addressIndex.getCurrentEagAttributes().get(GrpcAttributes.ATTR_SECURITY_LEVEL))
647+
));
655648
} else if (pendingTransport == transport) {
649+
subchannelMetrics.recordConnectionAttemptFailed(buildLabelSet(
650+
getAttributeOrDefault(
651+
addressIndex.getCurrentEagAttributes(), NameResolver.ATTR_BACKEND_SERVICE),
652+
getAttributeOrDefault(
653+
addressIndex.getCurrentEagAttributes(), LoadBalancer.ATTR_LOCALITY_NAME),
654+
null, null
655+
));
656656
Preconditions.checkState(state.getState() == CONNECTING,
657657
"Expected state is CONNECTING, actual state is %s", state.getState());
658658
addressIndex.increment();
659-
// Continue reconnect if there are still addresses to try.
659+
// Continue reconnecting with remaining addresses.
660660
if (!addressIndex.isValid()) {
661661
pendingTransport = null;
662662
addressIndex.reset();

core/src/test/java/io/grpc/internal/InternalSubchannelTest.java

Lines changed: 59 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -123,8 +123,6 @@ public void uncaughtException(Thread t, Throwable e) {
123123
private MetricRecorder mockMetricRecorder = mock(MetricRecorder.class,
124124
delegatesTo(new MetricRecorderImpl()));
125125

126-
private static final long RECONNECT_BACKOFF_DELAY_NANOS = TimeUnit.SECONDS.toNanos(1);
127-
128126
private final LinkedList<String> callbackInvokes = new LinkedList<>();
129127
private final InternalSubchannel.Callback mockInternalSubchannelCallback =
130128
new InternalSubchannel.Callback() {
@@ -1471,10 +1469,60 @@ private void createInternalSubchannel(boolean reconnectDisabled,
14711469
}
14721470

14731471
@Test
1474-
public void subchannelStateChanges_triggersMetrics_disconnectionOnly() {
1475-
// 1. Mock the backoff policy
1472+
public void subchannelStateChanges_triggersAttemptFailedMetric() {
1473+
// 1. Setup: Standard subchannel initialization
1474+
when(mockBackoffPolicyProvider.get()).thenReturn(mockBackoffPolicy);
1475+
SocketAddress addr = mock(SocketAddress.class);
1476+
Attributes eagAttributes = Attributes.newBuilder()
1477+
.set(NameResolver.ATTR_BACKEND_SERVICE, BACKEND_SERVICE)
1478+
.set(LoadBalancer.ATTR_LOCALITY_NAME, LOCALITY)
1479+
.set(GrpcAttributes.ATTR_SECURITY_LEVEL, SECURITY_LEVEL)
1480+
.build();
1481+
List<EquivalentAddressGroup> addressGroups =
1482+
Arrays.asList(new EquivalentAddressGroup(Arrays.asList(addr), eagAttributes));
1483+
InternalLogId logId = InternalLogId.allocate("Subchannel", /*details=*/ AUTHORITY);
1484+
ChannelTracer subchannelTracer = new ChannelTracer(logId, 10,
1485+
fakeClock.getTimeProvider().currentTimeNanos(), "Subchannel");
1486+
LoadBalancer.CreateSubchannelArgs createSubchannelArgs =
1487+
LoadBalancer.CreateSubchannelArgs.newBuilder().setAddresses(addressGroups).build();
1488+
internalSubchannel = new InternalSubchannel(
1489+
createSubchannelArgs, AUTHORITY, USER_AGENT, mockBackoffPolicyProvider,
1490+
mockTransportFactory, fakeClock.getScheduledExecutorService(),
1491+
fakeClock.getStopwatchSupplier(), syncContext, mockInternalSubchannelCallback, channelz,
1492+
CallTracer.getDefaultFactory().create(), subchannelTracer, logId,
1493+
new ChannelLoggerImpl(subchannelTracer, fakeClock.getTimeProvider()),
1494+
Collections.emptyList(), AUTHORITY, mockMetricRecorder
1495+
);
1496+
1497+
// --- Action: Simulate the "connecting to failed" transition ---
1498+
// a. Initiate the connection attempt. The subchannel is now CONNECTING.
1499+
internalSubchannel.obtainActiveTransport();
1500+
MockClientTransportInfo transportInfo = transports.poll();
1501+
assertNotNull("A connection attempt should have been made", transportInfo);
1502+
1503+
// b. Fail the transport before it can signal `transportReady()`.
1504+
transportInfo.listener.transportShutdown(
1505+
Status.INTERNAL.withDescription("Simulated connect failure"));
1506+
fakeClock.runDueTasks(); // Process the failure event
1507+
1508+
// --- Verification ---
1509+
// a. Verify that the "connection_attempts_failed" metric was recorded exactly once.
1510+
verify(mockMetricRecorder).addLongCounter(
1511+
eqMetricInstrumentName("grpc.subchannel.connection_attempts_failed"),
1512+
eq(1L),
1513+
eq(Arrays.asList(AUTHORITY)),
1514+
eq(Arrays.asList(BACKEND_SERVICE, LOCALITY))
1515+
);
1516+
1517+
// b. Verify no other metrics were recorded. This confirms it wasn't incorrectly
1518+
// logged as a success, disconnection, or open connection.
1519+
verifyNoMoreInteractions(mockMetricRecorder);
1520+
}
1521+
1522+
@Test
1523+
public void subchannelStateChanges_triggersSuccessAndDisconnectMetrics() {
1524+
// 1. Mock the backoff policy (needed for subchannel creation)
14761525
when(mockBackoffPolicyProvider.get()).thenReturn(mockBackoffPolicy);
1477-
when(mockBackoffPolicy.nextBackoffNanos()).thenReturn(RECONNECT_BACKOFF_DELAY_NANOS);
14781526

14791527
// 2. Setup Subchannel with attributes
14801528
SocketAddress addr = mock(SocketAddress.class);
@@ -1500,15 +1548,16 @@ public void subchannelStateChanges_triggersMetrics_disconnectionOnly() {
15001548
Collections.emptyList(), AUTHORITY, mockMetricRecorder
15011549
);
15021550

1503-
// --- Action ---
1551+
// --- Action: Successful connection ---
15041552
internalSubchannel.obtainActiveTransport();
15051553
MockClientTransportInfo transportInfo = transports.poll();
15061554
assertNotNull(transportInfo);
15071555
transportInfo.listener.transportReady();
1508-
fakeClock.runDueTasks();
1556+
fakeClock.runDueTasks(); // Process the successful connection
15091557

1510-
transportInfo.listener.transportShutdown(Status.UNAVAILABLE);
1511-
fakeClock.runDueTasks();
1558+
// --- Action: Transport is shut down by the "peer" ---
1559+
transportInfo.listener.transportShutdown(Status.UNAVAILABLE.withDescription("Peer Pressure"));
1560+
fakeClock.runDueTasks(); // Process the shutdown
15121561

15131562
// --- Verification ---
15141563
InOrder inOrder = inOrder(mockMetricRecorder);
@@ -1527,14 +1576,7 @@ public void subchannelStateChanges_triggersMetrics_disconnectionOnly() {
15271576
eq(Arrays.asList("privacy_and_integrity", BACKEND_SERVICE, LOCALITY))
15281577
);
15291578

1530-
inOrder.verify(mockMetricRecorder).addLongCounter(
1531-
eqMetricInstrumentName("grpc.subchannel.connection_attempts_failed"),
1532-
eq(1L),
1533-
eq(Arrays.asList(AUTHORITY)),
1534-
eq(Arrays.asList(BACKEND_SERVICE, LOCALITY))
1535-
);
1536-
1537-
// Verify disconnection and automatic failure metrics
1579+
// Verify disconnection metrics
15381580
inOrder.verify(mockMetricRecorder).addLongCounter(
15391581
eqMetricInstrumentName("grpc.subchannel.disconnections"),
15401582
eq(1L),

0 commit comments

Comments
 (0)