Skip to content

Commit fae6079

Browse files
authored
1680 fix run engine filehandle leak (#1682)
* Properly clean up the run engine after use * Make all tests use the fixture instead of creating their own tests * Additional patches to ensure filehandles not leaked in tests * Utility to detect which tests are leaking * Clean up dangling signal observers * Fix the event loop scope of _ensure_running_bluesky_event_loop to be consistent with mx-bluesky
1 parent 7f2d3e3 commit fae6079

File tree

8 files changed

+122
-43
lines changed

8 files changed

+122
-43
lines changed

src/dodal/testing/fixtures/run_engine.py

Lines changed: 77 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,32 +3,79 @@
33
"""
44

55
import asyncio
6+
import os
7+
import threading
68
import time
7-
from collections.abc import Mapping
9+
from collections.abc import AsyncGenerator, Mapping
810

911
import pytest
12+
import pytest_asyncio
13+
from _pytest.fixtures import FixtureRequest
1014
from bluesky.run_engine import RunEngine
1115
from bluesky.simulators import RunEngineSimulator
1216

1317
_run_engine = RunEngine()
1418

19+
_ENABLE_FILEHANDLE_LEAK_CHECKS = (
20+
os.getenv("DODAL_ENABLE_FILEHANDLE_LEAK_CHECKS", "").lower() == "true"
21+
)
1522

16-
@pytest.fixture(scope="session", autouse=True)
17-
async def _ensure_running_bluesky_event_loop():
23+
24+
@pytest_asyncio.fixture(scope="session", loop_scope="session", autouse=True)
25+
async def _ensure_running_bluesky_event_loop(_global_run_engine):
1826
# make sure the event loop is thoroughly up and running before we try to create
1927
# any ophyd_async devices which might need it
2028
timeout = time.monotonic() + 1
21-
while not _run_engine.loop.is_running():
29+
while not _global_run_engine.loop.is_running():
2230
await asyncio.sleep(0)
2331
if time.monotonic() > timeout:
2432
raise TimeoutError("This really shouldn't happen but just in case...")
2533

2634

2735
@pytest.fixture()
28-
def run_engine():
29-
global _run_engine
30-
_run_engine.reset()
31-
return _run_engine
36+
async def run_engine(_global_run_engine: RunEngine) -> AsyncGenerator[RunEngine, None]:
37+
try:
38+
yield _global_run_engine
39+
finally:
40+
_global_run_engine.reset()
41+
42+
43+
@pytest_asyncio.fixture(scope="session", loop_scope="session")
44+
async def _global_run_engine() -> AsyncGenerator[RunEngine, None]:
45+
"""
46+
Obtain a run engine, with its own event loop and thread.
47+
48+
On closure of the scope, the run engine is stopped and the event loop closed
49+
in order to release all resources it consumes.
50+
"""
51+
run_engine = RunEngine({}, call_returns_result=True)
52+
yield run_engine
53+
try:
54+
run_engine.halt()
55+
except Exception as e:
56+
# Ignore exception thrown if the run engine is already halted.
57+
print(f"Got exception while halting RunEngine {e}")
58+
finally:
59+
60+
async def get_event_loop_thread():
61+
"""Get the thread which the run engine created for the event loop."""
62+
return threading.current_thread()
63+
64+
fut = asyncio.run_coroutine_threadsafe(get_event_loop_thread(), run_engine.loop)
65+
while not fut.done():
66+
# It's not clear why this is necessary, given we are
67+
# on a completely different thread and event loop
68+
# but without it our future never seems to be populated with a result
69+
# despite the coro getting executed
70+
await asyncio.sleep(0)
71+
# Terminate the event loop so that we can join() the thread
72+
run_engine.loop.call_soon_threadsafe(run_engine.loop.stop)
73+
run_engine_thread = fut.result()
74+
run_engine_thread.join()
75+
# This closes the filehandle in the event loop.
76+
# This cannot be called while the event loop is running
77+
run_engine.loop.close()
78+
del run_engine
3279

3380

3481
@pytest.fixture
@@ -47,3 +94,25 @@ def append_and_print(name, doc):
4794

4895
run_engine.subscribe(append_and_print)
4996
return docs
97+
98+
99+
@pytest.fixture(autouse=_ENABLE_FILEHANDLE_LEAK_CHECKS)
100+
def check_for_filehandle_leaks(request: FixtureRequest):
101+
"""
102+
Test fixture that can be enabled in order to check for leaked filehandles
103+
(typically caused by a rogue RunEngine instance).
104+
105+
Note that this test is not enabled by default due to imposing a significant
106+
overhead. When a leak is suspected, usually from seeing a
107+
PytestUnraisableExceptionWarning, enable this via autouse and run the full
108+
test suite.
109+
"""
110+
pid = os.getpid()
111+
_baseline_n_open_files = len(os.listdir(f"/proc/{pid}/fd"))
112+
try:
113+
yield
114+
finally:
115+
_n_open_files = len(os.listdir(f"/proc/{pid}/fd"))
116+
assert _n_open_files == _baseline_n_open_files, (
117+
f"Function {request.function.__name__} leaked some filehandles"
118+
)

tests/common/test_watcher_utils.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ async def test_log_on_percentage_complete_value_error_on_bad_input():
6161
match="Percent interval on class _LogOnPercentageProgressWatcher must be a positive number, but received 0",
6262
):
6363
log_on_percentage_complete(status, "", 0)
64+
test_watchable.complete_event.set() # Ensure the signal observer exits
6465

6566

6667
async def test_log_on_percentage_complete_for_already_updating_status():
@@ -75,3 +76,4 @@ async def do_log():
7576
log_on_percentage_complete(status, "")
7677

7778
await asyncio.gather(update_signal(), do_log())
79+
test_watchable.complete_event.set() # Ensure the signal observer exits

tests/devices/oav/image_recognition/test_pin_tip_detect.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import asyncio
21
from unittest.mock import MagicMock, patch
32

43
import numpy as np
@@ -10,9 +9,6 @@
109
)
1110
from dodal.devices.oav.pin_image_recognition.utils import NONE_VALUE, SampleLocation
1211

13-
EVENT_LOOP = asyncio.new_event_loop()
14-
15-
1612
DEVICE_NAME = "pin_tip_detection"
1713
TRIGGERED_TIP_READING = DEVICE_NAME + "-triggered_tip"
1814
TRIGGERED_TOP_EDGE_READING = DEVICE_NAME + "-triggered_top_edge"

tests/devices/oav/test_oav_utils.py

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,13 @@ async def smargon() -> AsyncGenerator[Smargon]:
4040
yield smargon
4141

4242

43+
@pytest.fixture
44+
async def mock_pin_tip_detect() -> PinTipDetection:
45+
async with init_devices(mock=True):
46+
mock_pin_tip_detect = PinTipDetection("")
47+
return mock_pin_tip_detect
48+
49+
4350
@pytest.mark.parametrize(
4451
"h, v, expected_x, expected_y",
4552
[
@@ -78,10 +85,10 @@ async def test_values_for_move_so_that_beam_is_at_pixel(
7885
expected_xyz: tuple,
7986
oav: OAV,
8087
smargon: Smargon,
88+
run_engine: RunEngine,
8189
):
8290
set_mock_value(oav.zoom_controller.level, zoom_level)
8391
set_mock_value(smargon.omega.user_readback, angle)
84-
run_engine = RunEngine(call_returns_result=True)
8592
pos = run_engine(
8693
get_move_required_so_that_beam_is_at_pixel(smargon, pixel_to_move_to, oav)
8794
).plan_result # type: ignore
@@ -90,26 +97,20 @@ async def test_values_for_move_so_that_beam_is_at_pixel(
9097

9198

9299
async def test_given_tip_found_when_wait_for_tip_to_be_found_called_then_tip_immediately_returned(
93-
run_engine,
100+
run_engine: RunEngine, mock_pin_tip_detect: PinTipDetection
94101
):
95-
async with init_devices(mock=True):
96-
mock_pin_tip_detect = PinTipDetection("")
97-
98-
await mock_pin_tip_detect.connect(mock=True)
99102
mock_pin_tip_detect._get_tip_and_edge_data = AsyncMock(
100103
return_value=SampleLocation(100, 100, np.array([]), np.array([]))
101104
)
102-
run_engine = RunEngine(call_returns_result=True)
103105
result = run_engine(wait_for_tip_to_be_found(mock_pin_tip_detect))
104106
assert result.plan_result == (100, 100) # type: ignore
105107
mock_pin_tip_detect._get_tip_and_edge_data.assert_called_once()
106108

107109

108-
async def test_given_no_tip_when_wait_for_tip_to_be_found_called_then_exception_thrown():
109-
async with init_devices(mock=True):
110-
mock_pin_tip_detect = PinTipDetection("")
111-
112-
await mock_pin_tip_detect.connect(mock=True)
110+
async def test_given_no_tip_when_wait_for_tip_to_be_found_called_then_exception_thrown(
111+
run_engine: RunEngine,
112+
mock_pin_tip_detect: PinTipDetection,
113+
):
113114
await mock_pin_tip_detect.validity_timeout.set(0.2)
114115
mock_pin_tip_detect._get_tip_and_edge_data = AsyncMock(
115116
return_value=SampleLocation(
@@ -119,6 +120,5 @@ async def test_given_no_tip_when_wait_for_tip_to_be_found_called_then_exception_
119120
np.array([]),
120121
)
121122
)
122-
run_engine = RunEngine(call_returns_result=True)
123123
with pytest.raises(PinNotFoundError):
124124
run_engine(wait_for_tip_to_be_found(mock_pin_tip_detect))

tests/devices/test_gridscan.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -139,14 +139,15 @@ async def test_waits_for_running_motion(grid_scan: FastGridScanCommon):
139139
],
140140
)
141141
async def test_given_different_step_numbers_then_expected_images_correct(
142-
zebra_fast_grid_scan: ZebraFastGridScanThreeD, steps, expected_images
142+
run_engine: RunEngine,
143+
zebra_fast_grid_scan: ZebraFastGridScanThreeD,
144+
steps: tuple[int, int, int],
145+
expected_images: int,
143146
):
144147
set_mock_value(zebra_fast_grid_scan.x_steps, steps[0])
145148
set_mock_value(zebra_fast_grid_scan.y_steps, steps[1])
146149
set_mock_value(zebra_fast_grid_scan.z_steps, steps[2])
147150

148-
run_engine = RunEngine(call_returns_result=True)
149-
150151
result = run_engine(bps.rd(zebra_fast_grid_scan.expected_images))
151152

152153
assert result.plan_result == expected_images # type: ignore
@@ -161,13 +162,14 @@ async def test_given_different_step_numbers_then_expected_images_correct(
161162
],
162163
)
163164
async def test_given_different_2d_step_numbers_then_expected_images_correct(
164-
zebra_fast_grid_scan_2d: ZebraFastGridScanTwoD, steps, expected_images
165+
zebra_fast_grid_scan_2d: ZebraFastGridScanTwoD,
166+
steps: tuple[int, int],
167+
expected_images: int,
168+
run_engine: RunEngine,
165169
):
166170
set_mock_value(zebra_fast_grid_scan_2d.x_steps, steps[0])
167171
set_mock_value(zebra_fast_grid_scan_2d.y_steps, steps[1])
168172

169-
run_engine = RunEngine(call_returns_result=True)
170-
171173
result = run_engine(bps.rd(zebra_fast_grid_scan_2d.expected_images))
172174

173175
assert result.plan_result == expected_images # type: ignore

tests/plan_stubs/test_motor_util_plans.py

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -70,10 +70,8 @@ def test_given_types_of_device_when_home_and_reset_wrapper_called_then_motors_an
7070

7171

7272
def test_given_a_device_when_check_and_cache_values_then_motor_values_returned(
73-
my_device,
73+
my_device: DeviceWithOnlyMotors, run_engine: RunEngine
7474
):
75-
run_engine = RunEngine(call_returns_result=True)
76-
7775
for i, motor in enumerate(my_device.motors, start=1):
7876
set_mock_value(motor.user_readback, i * 100)
7977

@@ -124,10 +122,12 @@ def test_given_a_device_with_a_too_large_move_when_check_and_cache_values_then_e
124122
],
125123
)
126124
def test_given_a_device_where_one_move_too_small_when_check_and_cache_values_then_other_positions_returned(
127-
my_device: DeviceWithOnlyMotors, initial, min, new_position: float
125+
my_device: DeviceWithOnlyMotors,
126+
initial: float,
127+
min: float,
128+
new_position: float,
129+
run_engine: RunEngine,
128130
):
129-
run_engine = RunEngine(call_returns_result=True)
130-
131131
set_mock_value(my_device.x.user_readback, initial)
132132
set_mock_value(my_device.y.user_readback, 200)
133133

@@ -144,10 +144,8 @@ def test_given_a_device_where_one_move_too_small_when_check_and_cache_values_the
144144

145145

146146
def test_given_a_device_where_all_moves_too_small_when_check_and_cache_values_then_no_positions_returned(
147-
my_device,
147+
my_device: DeviceWithOnlyMotors, run_engine: RunEngine
148148
):
149-
run_engine = RunEngine(call_returns_result=True)
150-
151149
set_mock_value(my_device.x.user_readback, 10)
152150
set_mock_value(my_device.y.user_readback, 20)
153151

tests/plans/test_save_panda.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,18 @@
22
from unittest.mock import MagicMock, patch
33

44
import pytest
5-
from bluesky.simulators import RunEngineSimulator
5+
from bluesky import RunEngine
66

77
from dodal.plans.save_panda import _save_panda, main
88

99

10-
def test_save_panda():
11-
sim_run_engine = RunEngineSimulator()
10+
@pytest.fixture(autouse=True)
11+
def patch_run_engine_in_save_panda_to_avoid_leaks(run_engine: RunEngine):
12+
with patch("dodal.plans.save_panda.RunEngine", return_value=run_engine):
13+
yield
14+
15+
16+
def test_save_panda(sim_run_engine):
1217
panda = MagicMock()
1318
directory = "test"
1419
filename = "file.yml"

tests/test_cli.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
from unittest.mock import patch
33

44
import pytest
5+
from bluesky import RunEngine
56
from click.testing import CliRunner, Result
67
from ophyd.device import DEFAULT_CONNECTION_TIMEOUT
78
from ophyd_async.core import (
@@ -19,6 +20,12 @@
1920
EXAMPLE_BEAMLINE = "i22"
2021

2122

23+
@pytest.fixture(autouse=True)
24+
def patch_run_engine_in_cli_to_avoid_leaks(run_engine: RunEngine):
25+
with patch("dodal.cli.RunEngine", return_value=run_engine):
26+
yield
27+
28+
2229
@pytest.fixture
2330
def runner():
2431
return CliRunner()

0 commit comments

Comments
 (0)