1919
2020import static org .apache .beam .sdk .metrics .MetricResultsMatchers .attemptedMetricsResult ;
2121import static org .hamcrest .MatcherAssert .assertThat ;
22- import static org .hamcrest .Matchers .anyOf ;
2322import static org .hamcrest .Matchers .hasItem ;
2423import static org .hamcrest .Matchers .is ;
24+ import static org .junit .Assert .assertEquals ;
25+ import static org .junit .Assert .assertTrue ;
2526
2627import java .io .IOException ;
2728import java .io .Serializable ;
@@ -250,15 +251,15 @@ public void testFlattenPCollResumeFromCheckpoint() {
250251 "BoundedAssert" ,
251252 DistributionResult .create (45 , 10 , 0L , 9L ))));
252253
253- // Verify metrics for Flattened result after first run
254- assertThat (
255- res . metrics (). queryMetrics ( metricsFilter ). getDistributions (),
256- hasItem (
257- attemptedMetricsResult (
258- PAssertFn . class . getName (),
259- "distribution" ,
260- "FlattenedAssert" ,
261- DistributionResult . create ( 45 , 10 , 0L , 9L ))));
254+ // Fetch metrics for Flattened result after first run
255+ long firstMax = 0 ;
256+ for ( MetricResult < DistributionResult > dists :
257+ res . metrics (). queryMetrics ( metricsFilter ). getDistributions ()) {
258+ long currentMax = dists . getAttempted (). getMax ();
259+ if ( currentMax > firstMax ) {
260+ firstMax = currentMax ;
261+ }
262+ }
262263
263264 // Clean up state
264265 clean ();
@@ -276,29 +277,19 @@ public void testFlattenPCollResumeFromCheckpoint() {
276277 "BoundedAssert" ,
277278 DistributionResult .create (45 , 10 , 0L , 9L ))));
278279
279- // Verify Flattened results show accumulated values from both runs
280- // We use anyOf matcher because the unbounded source may emit either 2 or 3 elements during the
281- // test window:
282- // Case 1 (3 elements): sum=78 (45 from bounded + 33 from unbounded), count=13 (10 bounded + 3
283- // unbounded)
284- // Case 2 (2 elements): sum=66 (45 from bounded + 21 from unbounded), count=12 (10 bounded + 2
285- // unbounded)
286- // This variation occurs because the unbounded source's withRate(3, Duration.standardSeconds(1))
287- // timing may be affected by test environment conditions
288- assertThat (
289- res .metrics ().queryMetrics (metricsFilter ).getDistributions (),
290- hasItem (
291- anyOf (
292- attemptedMetricsResult (
293- PAssertFn .class .getName (),
294- "distribution" ,
295- "FlattenedAssert" ,
296- DistributionResult .create (78 , 13 , 0 , 12 )),
297- attemptedMetricsResult (
298- PAssertFn .class .getName (),
299- "distribution" ,
300- "FlattenedAssert" ,
301- DistributionResult .create (66 , 12 , 0 , 11 )))));
280+ long secondMax = 0 ;
281+ long secondSum = 0 ;
282+ for (MetricResult <DistributionResult > dists :
283+ res .metrics ().queryMetrics (metricsFilter ).getDistributions ()) {
284+ long currentMax = dists .getAttempted ().getMax ();
285+ if (currentMax > secondMax ) {
286+ secondMax = currentMax ;
287+ secondSum = dists .getAttempted ().getSum ();
288+ }
289+ }
290+
291+ assertTrue (secondMax > firstMax );
292+ assertEquals ((1L + secondMax ) * secondMax / 2 , secondSum );
302293 }
303294
304295 /** Restarts the pipeline from checkpoint. Sets pipeline to stop after 1 second. */
0 commit comments