4141import java .util .List ;
4242import java .util .Map ;
4343import java .util .concurrent .CyclicBarrier ;
44+ import java .util .concurrent .atomic .AtomicReference ;
4445import java .util .stream .Collectors ;
4546import java .util .stream .Stream ;
4647import java .util .stream .StreamSupport ;
@@ -133,8 +134,17 @@ public void testSnapshotAPMMetrics() throws Exception {
133134
134135 // wait for snapshot to finish to test the other metrics
135136 awaitNumberOfSnapshotsInProgress (0 );
136- final TimeValue snapshotElapsedTime = TimeValue .timeValueNanos (System .nanoTime () - beforeCreateSnapshotNanos );
137- collectMetrics ();
137+ final AtomicReference <TimeValue > elapsedTimeValueRef = new AtomicReference <>();
138+ // Sanity check snapshot completion metric observations recorded in snapshot finalization.
139+ // Use assertBusy() so the finalization code has time to run after the SnapshotsInProgress cluster state update has completed.
140+ assertBusy (() -> {
141+ collectMetrics ();
142+ elapsedTimeValueRef .set (TimeValue .timeValueNanos (System .nanoTime () - beforeCreateSnapshotNanos ));
143+ assertThat (getTotalClusterLongCounterValue (SnapshotMetrics .SNAPSHOTS_COMPLETED ), equalTo (1L ));
144+ assertDoubleHistogramMetrics (SnapshotMetrics .SNAPSHOT_DURATION , hasSize (1 ));
145+ assertDoubleHistogramMetrics (SnapshotMetrics .SNAPSHOT_DURATION , everyItem (lessThan (elapsedTimeValueRef .get ().secondsFrac ())));
146+ });
147+ final TimeValue snapshotElapsedTime = elapsedTimeValueRef .get ();
138148
139149 // sanity check blobs, bytes and throttling metrics
140150 assertThat (getTotalClusterLongCounterValue (SnapshotMetrics .SNAPSHOT_BLOBS_UPLOADED ), greaterThan (0L ));
@@ -143,16 +153,11 @@ public void testSnapshotAPMMetrics() throws Exception {
143153 assertThat (getTotalClusterLongCounterValue (SnapshotMetrics .SNAPSHOT_RESTORE_THROTTLE_DURATION ), equalTo (0L ));
144154
145155 assertThat (getTotalClusterLongCounterValue (SnapshotMetrics .SNAPSHOTS_STARTED ), equalTo (1L ));
146- assertThat (getTotalClusterLongCounterValue (SnapshotMetrics .SNAPSHOTS_COMPLETED ), equalTo (1L ));
147156
148157 // Sanity check shard duration observations
149158 assertDoubleHistogramMetrics (SnapshotMetrics .SNAPSHOT_SHARDS_DURATION , hasSize (numShards ));
150159 assertDoubleHistogramMetrics (SnapshotMetrics .SNAPSHOT_SHARDS_DURATION , everyItem (lessThan (snapshotElapsedTime .secondsFrac ())));
151160
152- // Sanity check snapshot observations
153- assertDoubleHistogramMetrics (SnapshotMetrics .SNAPSHOT_DURATION , hasSize (1 ));
154- assertDoubleHistogramMetrics (SnapshotMetrics .SNAPSHOT_DURATION , everyItem (lessThan (snapshotElapsedTime .secondsFrac ())));
155-
156161 // Work out the maximum amount of concurrency per node
157162 final ThreadPool tp = internalCluster ().getDataNodeInstance (ThreadPool .class );
158163 final int snapshotThreadPoolSize = tp .info (ThreadPool .Names .SNAPSHOT ).getMax ();
0 commit comments