@@ -164,11 +164,11 @@ async def test_dask_clients_pool_acquisition_creates_client_on_demand(
164164 cluster_type = ClusterTypeInModel .ON_PREMISE ,
165165 )
166166 )
167- async with clients_pool .acquire (cluster ):
167+ async with clients_pool .acquire (cluster , ref = f"test-ref- { cluster . name } " ):
168168 # on start it is created
169169 mocked_dask_client .create .assert_has_calls (mocked_creation_calls )
170170
171- async with clients_pool .acquire (cluster ):
171+ async with clients_pool .acquire (cluster , ref = f"test-ref- { cluster . name } -2" ):
172172 # the connection already exists, so there is no new call to create
173173 mocked_dask_client .create .assert_has_calls (mocked_creation_calls )
174174
@@ -196,7 +196,9 @@ async def test_acquiring_wrong_cluster_raises_exception(
196196
197197 non_existing_cluster = fake_clusters (1 )[0 ]
198198 with pytest .raises (DaskClientAcquisisitonError ):
199- async with clients_pool .acquire (non_existing_cluster ):
199+ async with clients_pool .acquire (
200+ non_existing_cluster , ref = "test-non-existing-ref"
201+ ):
200202 ...
201203
202204
@@ -239,7 +241,9 @@ async def test_acquire_default_cluster(
239241 dask_scheduler_settings = the_app .state .settings .DIRECTOR_V2_COMPUTATIONAL_BACKEND
240242 default_cluster = dask_scheduler_settings .default_cluster
241243 assert default_cluster
242- async with dask_clients_pool .acquire (default_cluster ) as dask_client :
244+ async with dask_clients_pool .acquire (
245+ default_cluster , ref = "test-default-cluster-ref"
246+ ) as dask_client :
243247
244248 def just_a_quick_fct (x , y ):
245249 return x + y
@@ -252,3 +256,56 @@ def just_a_quick_fct(x, y):
252256 assert future
253257 result = await future .result (timeout = 10 )
254258 assert result == 35
259+
260+
261+ async def test_dask_clients_pool_reference_counting (
262+ minimal_dask_config : None ,
263+ mocker : MockerFixture ,
264+ client : TestClient ,
265+ fake_clusters : Callable [[int ], list [BaseCluster ]],
266+ ):
267+ """Test that the reference counting mechanism works correctly."""
268+ assert client .app
269+ the_app = cast (FastAPI , client .app )
270+ mocked_dask_client = mocker .patch (
271+ "simcore_service_director_v2.modules.dask_clients_pool.DaskClient" ,
272+ autospec = True ,
273+ )
274+ mocked_dask_client .create .return_value = mocked_dask_client
275+ clients_pool = DaskClientsPool .instance (the_app )
276+
277+ # Create a cluster
278+ cluster = fake_clusters (1 )[0 ]
279+
280+ # Acquire the client with first reference
281+ ref1 = "test-ref-1"
282+ async with clients_pool .acquire (cluster , ref = ref1 ):
283+ # Client should be created
284+ mocked_dask_client .create .assert_called_once ()
285+ # Reset the mock to check the next call
286+ mocked_dask_client .create .reset_mock ()
287+
288+ mocked_dask_client .delete .assert_not_called ()
289+ # Acquire the same client with second reference
290+ ref2 = "test-ref-2"
291+ async with clients_pool .acquire (cluster , ref = ref2 ):
292+ # No new client should be created
293+ mocked_dask_client .create .assert_not_called ()
294+ mocked_dask_client .delete .assert_not_called ()
295+
296+ # Release first reference, client should still exist
297+ await clients_pool .release_client_ref (ref1 )
298+ mocked_dask_client .delete .assert_not_called ()
299+
300+ # Release second reference, which should delete the client
301+ await clients_pool .release_client_ref (ref2 )
302+ mocked_dask_client .delete .assert_called_once ()
303+
304+ # calling again should not raise and not delete more
305+ await clients_pool .release_client_ref (ref2 )
306+ mocked_dask_client .delete .assert_called_once ()
307+
308+ # Acquire again should create a new client
309+ mocked_dask_client .create .reset_mock ()
310+ async with clients_pool .acquire (cluster , ref = "test-ref-3" ):
311+ mocked_dask_client .create .assert_called_once ()
0 commit comments