@@ -2140,3 +2140,78 @@ async def test_pipeline_with_on_demand_cluster_with_no_clusters_keeper_fails(
21402140 expected_progress = 1.0 ,
21412141 run_id = run_in_db .run_id ,
21422142 )
2143+
2144+
2145+ async def test_run_new_pipeline_called_twice_prevents_duplicate_runs (
2146+ with_disabled_auto_scheduling : mock .Mock ,
2147+ with_disabled_scheduler_publisher : mock .Mock ,
2148+ initialized_app : FastAPI ,
2149+ sqlalchemy_async_engine : AsyncEngine ,
2150+ published_project : PublishedProject ,
2151+ run_metadata : RunMetadataDict ,
2152+ computational_pipeline_rabbit_client_parser : mock .AsyncMock ,
2153+ ):
2154+ # Ensure we start with an empty database
2155+ await assert_comp_runs_empty (sqlalchemy_async_engine )
2156+
2157+ # First call to run_new_pipeline - should succeed
2158+ assert published_project .project .prj_owner
2159+ await run_new_pipeline (
2160+ initialized_app ,
2161+ user_id = published_project .project .prj_owner ,
2162+ project_id = published_project .project .uuid ,
2163+ run_metadata = run_metadata ,
2164+ use_on_demand_clusters = False ,
2165+ )
2166+
2167+ # Verify first run was created and published
2168+ runs_after_first_call = await assert_comp_runs (
2169+ sqlalchemy_async_engine ,
2170+ expected_total = 1 ,
2171+ expected_state = RunningState .PUBLISHED ,
2172+ where_statement = and_ (
2173+ comp_runs .c .user_id == published_project .project .prj_owner ,
2174+ comp_runs .c .project_uuid == f"{ published_project .project .uuid } " ,
2175+ ),
2176+ )
2177+ first_run = runs_after_first_call [0 ]
2178+
2179+ # Verify first RabbitMQ message was sent
2180+ await _assert_message_received (
2181+ computational_pipeline_rabbit_client_parser ,
2182+ 1 ,
2183+ ComputationalPipelineStatusMessage .model_validate_json ,
2184+ )
2185+
2186+ # Second call to run_new_pipeline - should be ignored since first run is still running
2187+ await run_new_pipeline (
2188+ initialized_app ,
2189+ user_id = published_project .project .prj_owner ,
2190+ project_id = published_project .project .uuid ,
2191+ run_metadata = run_metadata ,
2192+ use_on_demand_clusters = False ,
2193+ )
2194+
2195+ # Verify still only one run exists with same run_id
2196+ runs_after_second_call = await assert_comp_runs (
2197+ sqlalchemy_async_engine ,
2198+ expected_total = 1 ,
2199+ expected_state = RunningState .PUBLISHED ,
2200+ where_statement = and_ (
2201+ comp_runs .c .user_id == published_project .project .prj_owner ,
2202+ comp_runs .c .project_uuid == f"{ published_project .project .uuid } " ,
2203+ ),
2204+ )
2205+ second_run = runs_after_second_call [0 ]
2206+
2207+ # Verify it's the same run (same run_id, same created timestamp)
2208+ assert first_run .run_id == second_run .run_id
2209+ assert first_run .created == second_run .created
2210+ assert first_run .iteration == second_run .iteration
2211+
2212+ # Verify no additional RabbitMQ message was sent (still only 1 total)
2213+ await _assert_message_received (
2214+ computational_pipeline_rabbit_client_parser ,
2215+ 0 , # No new messages expected
2216+ ComputationalPipelineStatusMessage .model_validate_json ,
2217+ )
0 commit comments