1414import org .elasticsearch .action .admin .indices .stats .ShardStats ;
1515import org .elasticsearch .common .unit .ByteSizeValue ;
1616import org .elasticsearch .common .util .CollectionUtils ;
17+ import org .elasticsearch .core .TimeValue ;
1718import org .elasticsearch .plugins .Plugin ;
1819import org .elasticsearch .plugins .PluginsService ;
1920import org .elasticsearch .snapshots .AbstractSnapshotIntegTestCase ;
2021import org .elasticsearch .telemetry .Measurement ;
2122import org .elasticsearch .telemetry .TestTelemetryPlugin ;
23+ import org .elasticsearch .threadpool .ThreadPool ;
2224import org .hamcrest .Matcher ;
2325
2426import java .util .Collection ;
3133import static org .hamcrest .Matchers .everyItem ;
3234import static org .hamcrest .Matchers .greaterThan ;
3335import static org .hamcrest .Matchers .hasItem ;
36+ import static org .hamcrest .Matchers .hasSize ;
3437import static org .hamcrest .Matchers .lessThan ;
3538
3639public class SnapshotMetricsIT extends AbstractSnapshotIntegTestCase {
@@ -82,6 +85,10 @@ public void testSnapshotAPMMetrics() throws Exception {
8285 waitForBlockOnAnyDataNode (repositoryName );
8386 collectMetrics ();
8487 assertShardsInProgressMetricIs (hasItem (greaterThan (0L )));
88+ assertThat (getTotalClusterLongCounterValue (SnapshotMetrics .SNAPSHOT_STARTED ), equalTo (1L ));
89+ assertThat (getTotalClusterLongCounterValue (SnapshotMetrics .SNAPSHOT_COMPLETED ), equalTo (0L ));
90+ assertThat (getTotalClusterLongCounterValue (SnapshotMetrics .SNAPSHOT_SHARDS_STARTED ), greaterThan (0L ));
91+ assertThat (getTotalClusterLongCounterValue (SnapshotMetrics .SNAPSHOT_SHARDS_COMPLETED ), equalTo (0L ));
8592 } finally {
8693 unblockAllDataNodes (repositoryName );
8794 }
@@ -96,9 +103,31 @@ public void testSnapshotAPMMetrics() throws Exception {
96103 assertThat (getTotalClusterLongCounterValue (SnapshotMetrics .SNAPSHOT_BYTES_UPLOADED ), greaterThan (0L ));
97104 assertThat (getTotalClusterLongCounterValue (SnapshotMetrics .SNAPSHOT_CREATE_THROTTLE_DURATION ), greaterThan (0L ));
98105 assertThat (getTotalClusterLongCounterValue (SnapshotMetrics .SNAPSHOT_RESTORE_THROTTLE_DURATION ), equalTo (0L ));
106+ assertThat (getTotalClusterLongCounterValue (SnapshotMetrics .SNAPSHOT_STARTED ), equalTo (1L ));
107+ assertThat (getTotalClusterLongCounterValue (SnapshotMetrics .SNAPSHOT_COMPLETED ), equalTo (1L ));
108+
109+ // Sanity check shard duration observations
110+ assertDoubleHistogramMetrics (SnapshotMetrics .SNAPSHOT_SHARDS_DURATION , hasSize (numShards ));
111+ assertDoubleHistogramMetrics (
112+ SnapshotMetrics .SNAPSHOT_SHARDS_DURATION ,
113+ everyItem (lessThan (TimeValue .timeValueNanos (snapshotElapsedTimeNanos ).secondsFrac ()))
114+ );
115+
116+ // Sanity check snapshot observations
117+ assertDoubleHistogramMetrics (SnapshotMetrics .SNAPSHOT_DURATION , hasSize (1 ));
118+ assertDoubleHistogramMetrics (
119+ SnapshotMetrics .SNAPSHOT_DURATION ,
120+ everyItem (lessThan (TimeValue .timeValueNanos (snapshotElapsedTimeNanos ).secondsFrac ()))
121+ );
122+
123+ // Work out the maximum amount of concurrency per node
124+ final ThreadPool tp = internalCluster ().getDataNodeInstance (ThreadPool .class );
125+ int snapshotThreadPoolSize = tp .info (ThreadPool .Names .SNAPSHOT ).getMax ();
126+ int maximumPerNodeConcurrency = Math .max (snapshotThreadPoolSize , numShards );
99127
100128 // sanity check duration values
101- final long upperBoundTimeSpentOnSnapshotThingsNanos = internalCluster ().numDataNodes () * snapshotElapsedTimeNanos ;
129+ final long upperBoundTimeSpentOnSnapshotThingsNanos = internalCluster ().numDataNodes () * maximumPerNodeConcurrency
130+ * snapshotElapsedTimeNanos ;
102131 assertThat (
103132 getTotalClusterLongCounterValue (SnapshotMetrics .SNAPSHOT_UPLOAD_DURATION ),
104133 allOf (greaterThan (0L ), lessThan (upperBoundTimeSpentOnSnapshotThingsNanos ))
@@ -126,6 +155,14 @@ public void testSnapshotAPMMetrics() throws Exception {
126155 assertThat (getTotalClusterLongCounterValue (SnapshotMetrics .SNAPSHOT_RESTORE_THROTTLE_DURATION ), greaterThan (0L ));
127156 }
128157
158+ private static void assertDoubleHistogramMetrics (String metricName , Matcher <? super List <Double >> matcher ) {
159+ final List <Double > values = allTestTelemetryPlugins ().flatMap (testTelemetryPlugin -> {
160+ final List <Measurement > doubleHistogramMeasurement = testTelemetryPlugin .getDoubleHistogramMeasurement (metricName );
161+ return doubleHistogramMeasurement .stream ().map (Measurement ::getDouble );
162+ }).toList ();
163+ assertThat (values , matcher );
164+ }
165+
129166 private static void assertShardsInProgressMetricIs (Matcher <? super List <Long >> matcher ) {
130167 final List <Long > values = allTestTelemetryPlugins ().map (testTelemetryPlugin -> {
131168 final List <Measurement > longGaugeMeasurement = testTelemetryPlugin .getLongGaugeMeasurement (
@@ -147,7 +184,7 @@ private long getTotalClusterLongCounterValue(String metricName) {
147184 }
148185
149186 private static Stream <TestTelemetryPlugin > allTestTelemetryPlugins () {
150- return StreamSupport .stream (internalCluster ().getDataNodeInstances (PluginsService .class ).spliterator (), false )
187+ return StreamSupport .stream (internalCluster ().getDataOrMasterNodeInstances (PluginsService .class ).spliterator (), false )
151188 .flatMap (pluginsService -> pluginsService .filterPlugins (TestTelemetryPlugin .class ));
152189 }
153190}
0 commit comments