Skip to content

Commit ea92977

Browse files
Create common fixtures which can be reused by other repos (#1657)
* Create common fixtures which can be reused by other repos * Fixed tests, made run_engine only module * Move event_loop_fuzzing to dodal.testing.fixtures.utils * omit fixtures from code coverage report
1 parent dff7317 commit ea92977

File tree

6 files changed

+112
-89
lines changed

6 files changed

+112
-89
lines changed

conftest.py

Lines changed: 4 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,10 @@
1-
import asyncio
21
import logging
32
import sys
4-
import time
5-
from collections.abc import Mapping
63
from os import environ
74
from pathlib import Path
85
from unittest.mock import MagicMock, patch
96

107
import pytest
11-
from bluesky.run_engine import RunEngine
128
from ophyd.status import Status
139
from ophyd_async.core import (
1410
PathInfo,
@@ -49,6 +45,10 @@
4945
environ["DODAL_TEST_MODE"] = "true"
5046

5147

48+
# Add run_engine fixtures to be used in tests
49+
pytest_plugins = ["dodal.testing.fixtures.run_engine"]
50+
51+
5252
@pytest.fixture(autouse=True)
5353
def patch_open_to_prevent_dls_reads_in_tests():
5454
unpatched_open = open
@@ -105,37 +105,7 @@ async def static_path_provider(
105105
return svpp
106106

107107

108-
@pytest.fixture
109-
def run_engine_documents(run_engine: RunEngine) -> Mapping[str, list[dict]]:
110-
docs: dict[str, list[dict]] = {}
111-
112-
def append_and_print(name, doc):
113-
if name not in docs:
114-
docs[name] = []
115-
docs[name] += [doc]
116-
117-
run_engine.subscribe(append_and_print)
118-
return docs
119-
120-
121108
def failed_status(failure: Exception) -> Status:
122109
status = Status()
123110
status.set_exception(failure)
124111
return status
125-
126-
127-
@pytest.fixture(scope="session", autouse=True)
128-
async def _ensure_running_bluesky_event_loop():
129-
run_engine = RunEngine()
130-
# make sure the event loop is thoroughly up and running before we try to create
131-
# any ophyd_async devices which might need it
132-
timeout = time.monotonic() + 1
133-
while not run_engine.loop.is_running():
134-
await asyncio.sleep(0)
135-
if time.monotonic() > timeout:
136-
raise TimeoutError("This really shouldn't happen but just in case...")
137-
138-
139-
@pytest.fixture()
140-
async def run_engine():
141-
yield RunEngine()

pyproject.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,8 @@ testpaths = "docs src tests system_tests"
136136
exclude_also = [
137137
'^"""', # Ignore the start/end of a file-level triple quoted docstring
138138
]
139+
# Exclude pytest.fixtures from code coverage report
140+
omit = ["*/dodal/testing/fixtures/*"]
139141

140142
[tool.coverage.run]
141143
data_file = "/tmp/dodal.coverage"

src/dodal/testing/fixtures/__init__.py

Whitespace-only changes.
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
"""
2+
Allow external repos to reuse these fixtures so defined in single place.
3+
"""
4+
5+
import asyncio
6+
import time
7+
from collections.abc import Mapping
8+
9+
import pytest
10+
from bluesky.run_engine import RunEngine
11+
from bluesky.simulators import RunEngineSimulator
12+
13+
14+
@pytest.fixture(scope="session", autouse=True)
15+
async def _ensure_running_bluesky_event_loop():
16+
run_engine = RunEngine()
17+
# make sure the event loop is thoroughly up and running before we try to create
18+
# any ophyd_async devices which might need it
19+
timeout = time.monotonic() + 1
20+
while not run_engine.loop.is_running():
21+
await asyncio.sleep(0)
22+
if time.monotonic() > timeout:
23+
raise TimeoutError("This really shouldn't happen but just in case...")
24+
25+
26+
@pytest.fixture()
27+
async def run_engine():
28+
yield RunEngine()
29+
30+
31+
@pytest.fixture
32+
def sim_run_engine() -> RunEngineSimulator:
33+
return RunEngineSimulator()
34+
35+
36+
@pytest.fixture
37+
def run_engine_documents(run_engine: RunEngine) -> Mapping[str, list[dict]]:
38+
docs: dict[str, list[dict]] = {}
39+
40+
def append_and_print(name, doc):
41+
if name not in docs:
42+
docs[name] = []
43+
docs[name] += [doc]
44+
45+
run_engine.subscribe(append_and_print)
46+
return docs
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
import asyncio
2+
import threading
3+
import time
4+
from random import random
5+
from threading import Thread
6+
7+
import pytest
8+
9+
10+
@pytest.fixture
11+
async def event_loop_fuzzing():
12+
"""
13+
This fixture can be used to try and detect / reproduce intermittent test failures
14+
caused by race conditions and timing issues, which are often difficult to replicate
15+
due to caching etc. causing timing to be different on a development machine compared
16+
to when the test runs in CI.
17+
18+
It works by attaching a fuzzer to the current event loop which randomly schedules
19+
a fixed delay into the event loop thread every few milliseconds. The idea is that
20+
over a number of iterations, there should be sufficient timing variation introduced
21+
that the failure can be reproduced.
22+
23+
Examples:
24+
Example usage:
25+
>>> import pytest
26+
>>> # repeat the test a number of times
27+
>>> @pytest.mark.parametrize("i", range(0, 100))
28+
... async def my_unreliable_test(i, event_loop_fuzzing):
29+
... # Do some stuff in here
30+
... ...
31+
"""
32+
fuzz_probability = 0.05
33+
fuzz_delay_s = 0.05
34+
fuzz_period_s = 0.001
35+
stop_running = threading.Event()
36+
event_loop = asyncio.get_running_loop()
37+
38+
def delay(finished_event: threading.Event):
39+
time.sleep(fuzz_delay_s) # noqa: TID251
40+
finished_event.set()
41+
42+
def fuzz():
43+
while not stop_running.is_set():
44+
if random() < fuzz_probability:
45+
delay_is_finished = threading.Event()
46+
event_loop.call_soon_threadsafe(delay, delay_is_finished)
47+
delay_is_finished.wait()
48+
49+
time.sleep(fuzz_period_s) # noqa: TID251
50+
51+
fuzzer_thread = Thread(group=None, target=fuzz, name="Event loop fuzzer")
52+
fuzzer_thread.start()
53+
try:
54+
yield None
55+
finally:
56+
stop_running.set()
57+
fuzzer_thread.join()

tests/conftest.py

Lines changed: 3 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,7 @@
1-
import asyncio
21
import importlib
32
import os
4-
import threading
5-
import time
63
from collections.abc import AsyncGenerator
74
from pathlib import Path
8-
from random import random
9-
from threading import Thread
105
from types import ModuleType
116
from unittest.mock import patch
127

@@ -28,6 +23,9 @@
2823
from tests.devices.test_data import TEST_LUT_TXT
2924
from tests.test_data import I04_BEAMLINE_PARAMETERS
3025

26+
# Add utils fixtures to be used in tests
27+
pytest_plugins = ["dodal.testing.fixtures.utils"]
28+
3129

3230
@pytest.fixture(scope="function")
3331
def module_and_devices_for_beamline(request: pytest.FixtureRequest):
@@ -81,53 +79,3 @@ async def baton_in_commissioning_mode() -> AsyncGenerator[Baton]:
8179
set_mock_value(baton.commissioning, True)
8280
yield baton
8381
set_commissioning_signal(None)
84-
85-
86-
@pytest.fixture
87-
async def event_loop_fuzzing():
88-
"""
89-
This fixture can be used to try and detect / reproduce intermittent test failures
90-
caused by race conditions and timing issues, which are often difficult to replicate
91-
due to caching etc. causing timing to be different on a development machine compared
92-
to when the test runs in CI.
93-
94-
It works by attaching a fuzzer to the current event loop which randomly schedules
95-
a fixed delay into the event loop thread every few milliseconds. The idea is that
96-
over a number of iterations, there should be sufficient timing variation introduced
97-
that the failure can be reproduced.
98-
99-
Examples:
100-
Example usage:
101-
>>> import pytest
102-
>>> # repeat the test a number of times
103-
>>> @pytest.mark.parametrize("i", range(0, 100))
104-
... async def my_unreliable_test(i, event_loop_fuzzing):
105-
... # Do some stuff in here
106-
... ...
107-
"""
108-
fuzz_probability = 0.05
109-
fuzz_delay_s = 0.05
110-
fuzz_period_s = 0.001
111-
stop_running = threading.Event()
112-
event_loop = asyncio.get_running_loop()
113-
114-
def delay(finished_event: threading.Event):
115-
time.sleep(fuzz_delay_s)
116-
finished_event.set()
117-
118-
def fuzz():
119-
while not stop_running.is_set():
120-
if random() < fuzz_probability:
121-
delay_is_finished = threading.Event()
122-
event_loop.call_soon_threadsafe(delay, delay_is_finished)
123-
delay_is_finished.wait()
124-
125-
time.sleep(fuzz_period_s)
126-
127-
fuzzer_thread = Thread(group=None, target=fuzz, name="Event loop fuzzer")
128-
fuzzer_thread.start()
129-
try:
130-
yield None
131-
finally:
132-
stop_running.set()
133-
fuzzer_thread.join()

0 commit comments

Comments
 (0)