3535import org .apache .flink .kubernetes .operator .config .FlinkConfigManager ;
3636import org .apache .flink .kubernetes .operator .exception .ReconciliationException ;
3737import org .apache .flink .kubernetes .operator .metrics .MetricManager ;
38+ import org .apache .flink .kubernetes .operator .metrics .TestingMetricListener ;
3839import org .apache .flink .kubernetes .operator .observer .snapshot .StateSnapshotObserver ;
3940import org .apache .flink .kubernetes .operator .reconciler .ReconciliationUtils ;
4041import org .apache .flink .kubernetes .operator .reconciler .snapshot .StateSnapshotReconciler ;
5657import javax .annotation .Nullable ;
5758
5859import java .time .Instant ;
60+ import java .util .Map ;
5961import java .util .Optional ;
6062import java .util .function .BiConsumer ;
6163
6466import static org .apache .flink .kubernetes .operator .api .status .FlinkStateSnapshotStatus .State .FAILED ;
6567import static org .apache .flink .kubernetes .operator .api .status .FlinkStateSnapshotStatus .State .IN_PROGRESS ;
6668import static org .apache .flink .kubernetes .operator .api .status .FlinkStateSnapshotStatus .State .TRIGGER_PENDING ;
69+ import static org .apache .flink .kubernetes .operator .metrics .FlinkStateSnapshotMetricsUtils .assertSnapshotMetrics ;
6770import static org .assertj .core .api .Assertions .assertThat ;
6871import static org .junit .jupiter .api .Assertions .assertThrows ;
6972
7073/** Test class for {@link FlinkStateSnapshotController}. */
7174@ EnableKubernetesMockClient (crud = true )
7275public class FlinkStateSnapshotControllerTest {
73- private static final String SNAPSHOT_NAME = "snapshot-test" ;
76+ private static final String SAVEPOINT_NAME = "savepoint-test" ;
77+ private static final String CHECKPOINT_NAME = "checkpoint-test" ;
7478 private static final String SAVEPOINT_PATH = "/tmp/asd" ;
7579 private static final String JOB_ID = "fd72014d4c864993a2e5a9287b4a9c5d" ;
7680
@@ -81,6 +85,8 @@ public class FlinkStateSnapshotControllerTest {
8185 private FlinkStateSnapshotEventCollector flinkStateSnapshotEventCollector ;
8286 private EventRecorder eventRecorder ;
8387 private TestingFlinkResourceContextFactory ctxFactory ;
88+ private TestingMetricListener listener ;
89+ private MetricManager <FlinkStateSnapshot > metricManager ;
8490 private StatusRecorder <FlinkStateSnapshot , FlinkStateSnapshotStatus > statusRecorder ;
8591 private FlinkStateSnapshotController controller ;
8692 private Context <FlinkStateSnapshot > context ;
@@ -97,14 +103,20 @@ public void beforeEach() {
97103 TestUtils .createTestMetricGroup (new Configuration ()),
98104 flinkService ,
99105 eventRecorder );
100- statusRecorder = new StatusRecorder <>(new MetricManager <>(), statusUpdateCounter );
106+
107+ listener = new TestingMetricListener (new Configuration ());
108+ metricManager =
109+ MetricManager .createFlinkStateSnapshotMetricManager (
110+ new Configuration (), listener .getMetricGroup ());
111+ statusRecorder = new StatusRecorder <>(metricManager , statusUpdateCounter );
101112 controller =
102113 new FlinkStateSnapshotController (
103114 ValidatorUtils .discoverValidators (configManager ),
104115 ctxFactory ,
105116 new StateSnapshotReconciler (ctxFactory , eventRecorder ),
106117 new StateSnapshotObserver (ctxFactory , eventRecorder ),
107118 eventRecorder ,
119+ metricManager ,
108120 statusRecorder );
109121 }
110122
@@ -514,8 +526,10 @@ public void testReconcileJobNotFound() {
514526 var snapshot = createSavepoint (deployment );
515527 var errorMessage =
516528 String .format (
517- "Secondary resource %s (%s) for savepoint snapshot-test was not found" ,
518- deployment .getMetadata ().getName (), CrdConstants .KIND_FLINK_DEPLOYMENT );
529+ "Secondary resource %s (%s) for savepoint %s was not found" ,
530+ deployment .getMetadata ().getName (),
531+ CrdConstants .KIND_FLINK_DEPLOYMENT ,
532+ SAVEPOINT_NAME );
519533
520534 // First reconcile will trigger the snapshot.
521535 controller .reconcile (snapshot , TestUtils .createSnapshotContext (client , deployment ));
@@ -556,8 +570,10 @@ public void testReconcileJobNotRunning() {
556570 var snapshot = createSavepoint (deployment );
557571 var errorMessage =
558572 String .format (
559- "Secondary resource %s (%s) for savepoint snapshot-test is not running" ,
560- deployment .getMetadata ().getName (), CrdConstants .KIND_FLINK_DEPLOYMENT );
573+ "Secondary resource %s (%s) for savepoint %s is not running" ,
574+ deployment .getMetadata ().getName (),
575+ CrdConstants .KIND_FLINK_DEPLOYMENT ,
576+ SAVEPOINT_NAME );
561577
562578 controller .reconcile (snapshot , context );
563579
@@ -579,6 +595,37 @@ public void testReconcileJobNotRunning() {
579595 });
580596 }
581597
598+ @ Test
599+ public void testMetrics () {
600+ var deployment = createDeployment ();
601+ var savepoint = createSavepoint (deployment );
602+ savepoint .getSpec ().getSavepoint ().setDisposeOnDelete (false );
603+ var checkpoint = createCheckpoint (deployment , CheckpointType .FULL , 1 );
604+
605+ context = TestUtils .createSnapshotContext (client , deployment );
606+
607+ controller .reconcile (savepoint , context );
608+ controller .reconcile (savepoint , context );
609+ controller .reconcile (savepoint , context );
610+ assertThat (savepoint .getStatus ().getState ()).isEqualTo (COMPLETED );
611+
612+ controller .reconcile (checkpoint , context );
613+ controller .reconcile (checkpoint , context );
614+ controller .reconcile (checkpoint , context );
615+ assertThat (checkpoint .getStatus ().getState ()).isEqualTo (COMPLETED );
616+
617+ assertSnapshotMetrics (
618+ listener , TestUtils .TEST_NAMESPACE , Map .of (COMPLETED , 1 ), Map .of (COMPLETED , 1 ));
619+
620+ // Remove savepoint
621+ assertDeleteControl (controller .cleanup (savepoint , context ), true , null );
622+ assertSnapshotMetrics (listener , TestUtils .TEST_NAMESPACE , Map .of (), Map .of (COMPLETED , 1 ));
623+
624+ // Remove checkpoint
625+ assertDeleteControl (controller .cleanup (checkpoint , context ), true , null );
626+ assertSnapshotMetrics (listener , TestUtils .TEST_NAMESPACE , Map .of (), Map .of ());
627+ }
628+
582629 private FlinkStateSnapshot createSavepoint (FlinkDeployment deployment ) {
583630 return createSavepoint (deployment , false , 7 );
584631 }
@@ -591,8 +638,8 @@ private FlinkStateSnapshot createSavepoint(
591638 FlinkDeployment deployment , boolean alreadyExists , int backoffLimit ) {
592639 var snapshot =
593640 TestUtils .buildFlinkStateSnapshotSavepoint (
594- SNAPSHOT_NAME ,
595- "test" ,
641+ SAVEPOINT_NAME ,
642+ TestUtils . TEST_NAMESPACE ,
596643 SAVEPOINT_PATH ,
597644 alreadyExists ,
598645 deployment == null ? null : JobReference .fromFlinkResource (deployment ));
@@ -606,8 +653,8 @@ private FlinkStateSnapshot createCheckpoint(
606653 FlinkDeployment deployment , CheckpointType checkpointType , int backoffLimit ) {
607654 var snapshot =
608655 TestUtils .buildFlinkStateSnapshotCheckpoint (
609- SNAPSHOT_NAME ,
610- "test" ,
656+ CHECKPOINT_NAME ,
657+ TestUtils . TEST_NAMESPACE ,
611658 checkpointType ,
612659 JobReference .fromFlinkResource (deployment ));
613660 snapshot .getSpec ().setBackoffLimit (backoffLimit );
0 commit comments