29
29
import static org .junit .Assert .assertSame ;
30
30
import static org .junit .Assert .assertThrows ;
31
31
import static org .junit .Assert .assertTrue ;
32
+ import static org .mockito .AdditionalAnswers .delegatesTo ;
32
33
import static org .mockito .ArgumentMatchers .any ;
34
+ import static org .mockito .ArgumentMatchers .argThat ;
33
35
import static org .mockito .ArgumentMatchers .eq ;
34
36
import static org .mockito .ArgumentMatchers .isA ;
35
37
import static org .mockito .ArgumentMatchers .same ;
38
+ import static org .mockito .Mockito .inOrder ;
36
39
import static org .mockito .Mockito .mock ;
37
40
import static org .mockito .Mockito .never ;
38
41
import static org .mockito .Mockito .times ;
48
51
import io .grpc .InternalLogId ;
49
52
import io .grpc .InternalWithLogId ;
50
53
import io .grpc .LoadBalancer ;
54
+ import io .grpc .MetricInstrument ;
51
55
import io .grpc .MetricRecorder ;
56
+ import io .grpc .NameResolver ;
57
+ import io .grpc .SecurityLevel ;
52
58
import io .grpc .Status ;
53
59
import io .grpc .SynchronizationContext ;
54
60
import io .grpc .internal .InternalSubchannel .CallTracingTransport ;
69
75
import org .junit .Test ;
70
76
import org .junit .runner .RunWith ;
71
77
import org .junit .runners .JUnit4 ;
78
+ import org .mockito .InOrder ;
72
79
import org .mockito .Mock ;
73
80
import org .mockito .junit .MockitoJUnit ;
74
81
import org .mockito .junit .MockitoRule ;
@@ -82,6 +89,9 @@ public class InternalSubchannelTest {
82
89
public final MockitoRule mocks = MockitoJUnit .rule ();
83
90
84
91
private static final String AUTHORITY = "fakeauthority" ;
92
+ private static final String BACKEND_SERVICE = "ice-cream-factory-service" ;
93
+ private static final String LOCALITY = "mars-olympus-mons-datacenter" ;
94
+ private static final SecurityLevel SECURITY_LEVEL = SecurityLevel .PRIVACY_AND_INTEGRITY ;
85
95
private static final String USER_AGENT = "mosaic" ;
86
96
private static final ConnectivityStateInfo UNAVAILABLE_STATE =
87
97
ConnectivityStateInfo .forTransientFailure (Status .UNAVAILABLE );
@@ -109,6 +119,12 @@ public void uncaughtException(Thread t, Throwable e) {
109
119
@ Mock private BackoffPolicy .Provider mockBackoffPolicyProvider ;
110
120
@ Mock private ClientTransportFactory mockTransportFactory ;
111
121
122
+ @ Mock private BackoffPolicy mockBackoffPolicy ;
123
+ private MetricRecorder mockMetricRecorder = mock (MetricRecorder .class ,
124
+ delegatesTo (new MetricRecorderImpl ()));
125
+
126
+ private static final long RECONNECT_BACKOFF_DELAY_NANOS = TimeUnit .SECONDS .toNanos (1 );
127
+
112
128
private final LinkedList <String > callbackInvokes = new LinkedList <>();
113
129
private final InternalSubchannel .Callback mockInternalSubchannelCallback =
114
130
new InternalSubchannel .Callback () {
@@ -1449,8 +1465,90 @@ private void createInternalSubchannel(boolean reconnectDisabled,
1449
1465
new ChannelLoggerImpl (subchannelTracer , fakeClock .getTimeProvider ()),
1450
1466
Collections .emptyList (),
1451
1467
"" ,
1452
- new MetricRecorder () {}
1468
+ new MetricRecorder () {
1469
+ }
1470
+ );
1471
+ }
1472
+
1473
+ @ Test
1474
+ public void subchannelStateChanges_triggersMetrics_disconnectionOnly () {
1475
+ // 1. Mock the backoff policy
1476
+ when (mockBackoffPolicyProvider .get ()).thenReturn (mockBackoffPolicy );
1477
+ when (mockBackoffPolicy .nextBackoffNanos ()).thenReturn (RECONNECT_BACKOFF_DELAY_NANOS );
1478
+
1479
+ // 2. Setup Subchannel with attributes
1480
+ SocketAddress addr = mock (SocketAddress .class );
1481
+ Attributes eagAttributes = Attributes .newBuilder ()
1482
+ .set (NameResolver .ATTR_BACKEND_SERVICE , BACKEND_SERVICE )
1483
+ .set (LoadBalancer .ATTR_LOCALITY_NAME , LOCALITY )
1484
+ .set (GrpcAttributes .ATTR_SECURITY_LEVEL , SECURITY_LEVEL )
1485
+ .build ();
1486
+ List <EquivalentAddressGroup > addressGroups =
1487
+ Arrays .asList (new EquivalentAddressGroup (Arrays .asList (addr ), eagAttributes ));
1488
+ createInternalSubchannel (new EquivalentAddressGroup (addr ));
1489
+ InternalLogId logId = InternalLogId .allocate ("Subchannel" , /*details=*/ AUTHORITY );
1490
+ ChannelTracer subchannelTracer = new ChannelTracer (logId , 10 ,
1491
+ fakeClock .getTimeProvider ().currentTimeNanos (), "Subchannel" );
1492
+ LoadBalancer .CreateSubchannelArgs createSubchannelArgs =
1493
+ LoadBalancer .CreateSubchannelArgs .newBuilder ().setAddresses (addressGroups ).build ();
1494
+ internalSubchannel = new InternalSubchannel (
1495
+ createSubchannelArgs , AUTHORITY , USER_AGENT , mockBackoffPolicyProvider ,
1496
+ mockTransportFactory , fakeClock .getScheduledExecutorService (),
1497
+ fakeClock .getStopwatchSupplier (), syncContext , mockInternalSubchannelCallback , channelz ,
1498
+ CallTracer .getDefaultFactory ().create (), subchannelTracer , logId ,
1499
+ new ChannelLoggerImpl (subchannelTracer , fakeClock .getTimeProvider ()),
1500
+ Collections .emptyList (), AUTHORITY , mockMetricRecorder
1501
+ );
1502
+
1503
+ // --- Action ---
1504
+ internalSubchannel .obtainActiveTransport ();
1505
+ MockClientTransportInfo transportInfo = transports .poll ();
1506
+ assertNotNull (transportInfo );
1507
+ transportInfo .listener .transportReady ();
1508
+ fakeClock .runDueTasks ();
1509
+
1510
+ transportInfo .listener .transportShutdown (Status .UNAVAILABLE );
1511
+ fakeClock .runDueTasks ();
1512
+
1513
+ // --- Verification ---
1514
+ InOrder inOrder = inOrder (mockMetricRecorder );
1515
+
1516
+ // Verify successful connection metrics
1517
+ inOrder .verify (mockMetricRecorder ).addLongCounter (
1518
+ eqMetricInstrumentName ("grpc.subchannel.connection_attempts_succeeded" ),
1519
+ eq (1L ),
1520
+ eq (Arrays .asList (AUTHORITY )),
1521
+ eq (Arrays .asList (BACKEND_SERVICE , LOCALITY ))
1522
+ );
1523
+ inOrder .verify (mockMetricRecorder ).addLongUpDownCounter (
1524
+ eqMetricInstrumentName ("grpc.subchannel.open_connections" ),
1525
+ eq (1L ),
1526
+ eq (Arrays .asList (AUTHORITY )),
1527
+ eq (Arrays .asList ("privacy_and_integrity" , BACKEND_SERVICE , LOCALITY ))
1528
+ );
1529
+
1530
+ inOrder .verify (mockMetricRecorder ).addLongCounter (
1531
+ eqMetricInstrumentName ("grpc.subchannel.connection_attempts_failed" ),
1532
+ eq (1L ),
1533
+ eq (Arrays .asList (AUTHORITY )),
1534
+ eq (Arrays .asList (BACKEND_SERVICE , LOCALITY ))
1453
1535
);
1536
+
1537
+ // Verify disconnection and automatic failure metrics
1538
+ inOrder .verify (mockMetricRecorder ).addLongCounter (
1539
+ eqMetricInstrumentName ("grpc.subchannel.disconnections" ),
1540
+ eq (1L ),
1541
+ eq (Arrays .asList (AUTHORITY )),
1542
+ eq (Arrays .asList (BACKEND_SERVICE , LOCALITY , "Peer Pressure" ))
1543
+ );
1544
+ inOrder .verify (mockMetricRecorder ).addLongUpDownCounter (
1545
+ eqMetricInstrumentName ("grpc.subchannel.open_connections" ),
1546
+ eq (-1L ),
1547
+ eq (Arrays .asList (AUTHORITY )),
1548
+ eq (Arrays .asList ("privacy_and_integrity" , BACKEND_SERVICE , LOCALITY ))
1549
+ );
1550
+
1551
+ inOrder .verifyNoMoreInteractions ();
1454
1552
}
1455
1553
1456
1554
private void assertNoCallbackInvoke () {
@@ -1463,5 +1561,13 @@ private void assertExactCallbackInvokes(String ... expectedInvokes) {
1463
1561
callbackInvokes .clear ();
1464
1562
}
1465
1563
1564
+ static class MetricRecorderImpl implements MetricRecorder {
1565
+ }
1566
+
1567
+ @ SuppressWarnings ("TypeParameterUnusedInFormals" )
1568
+ private <T extends MetricInstrument > T eqMetricInstrumentName (String name ) {
1569
+ return argThat (instrument -> instrument .getName ().equals (name ));
1570
+ }
1571
+
1466
1572
private static class FakeSocketAddress extends SocketAddress {}
1467
1573
}
0 commit comments