Skip to content

Commit 1a9925d

Browse files
committed
Add concurrency guard for sweeps and update docs
Introduces a concurrency guard to ensure only one non-queued sweep runs at a time, with exceptions for related (parent/child) sweeps and those launched via SweepQueue. Adds `start_force()` to forcibly kill unrelated active sweeps before starting, and `list_active_sweeps()` for debugging. Updates documentation and tutorial notebooks to reflect new concurrency behavior and error handling.
1 parent 98a17e7 commit 1a9925d

File tree

13 files changed

+998
-64
lines changed

13 files changed

+998
-64
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ See related projects:
2222

2323
### Key Components
2424
- **Base Classes**: `BaseSweep` provides the foundation with parameter following, measurement creation, and thread management
25+
- **Sweep Concurrency Guard**: Only one non-queued sweep can run at a time; `SweepQueue` bypasses this guard, and internal ramp/inner sweeps are allowed via parent relationships. Use `start_force()` to kill unrelated active sweeps before starting.
2526
- **Measurement Types**:
2627
- 0D (time-based measurements)
2728
- 1D (single parameter sweep)

docs/source/start/index.rst

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,22 @@ Sweep Logging
6868
The helper marshals log messages from background Qt threads back onto the
6969
IPython event loop so they show up alongside your notebook output.
7070

71+
Sweep Concurrency
72+
-----------------
73+
74+
- Only one non-queued sweep may run at a time. Calling ``start()`` on a new
75+
sweep while another is running raises ``RuntimeError``.
76+
- Resuming a paused sweep while another is running also raises ``RuntimeError``.
77+
- Sweeps launched by :class:`~measureit.tools.sweep_queue.SweepQueue` bypass
78+
this guard (``is_queued=True``).
79+
- Internal sweeps (inner sweeps and ramp sweeps) are allowed via the parent
80+
relationship between sweeps.
81+
- To force-start a sweep (killing unrelated active sweeps first), use
82+
``start_force()``.
83+
- For debugging or recovery (e.g., you lost a Python reference), use
84+
``BaseSweep.list_active_sweeps()`` to inspect running/paused sweeps and
85+
regain access to them.
86+
7187
Example notebooks
7288
-----------------
7389

examples/Tutorial.ipynb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
"name": "python",
3636
"nbconvert_exporter": "python",
3737
"pygments_lexer": "ipython3",
38-
"version": "3.13.9"
38+
"version": "3.11.14"
3939
}
4040
},
4141
"nbformat": 4,

examples/content/quick start.ipynb

Lines changed: 458 additions & 56 deletions
Large diffs are not rendered by default.

src/measureit/sweep/base_sweep.py

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import time
66
import threading
77
import warnings
8+
import weakref
89
from decimal import ROUND_HALF_EVEN, Decimal, localcontext
910
from functools import partial
1011
from typing import Optional, Tuple
@@ -20,6 +21,72 @@
2021
from ..tools.util import _autorange_srs, is_numeric_parameter, safe_get, safe_set
2122
from .progress import ProgressState, SweepState
2223

24+
_ACTIVE_SWEEPS = weakref.WeakSet()
25+
_ACTIVE_SWEEPS_LOCK = threading.Lock()
26+
27+
28+
def _register_active_sweep(sweep: "BaseSweep") -> None:
29+
with _ACTIVE_SWEEPS_LOCK:
30+
_ACTIVE_SWEEPS.add(sweep)
31+
32+
33+
def _deregister_active_sweep(sweep: "BaseSweep") -> None:
34+
with _ACTIVE_SWEEPS_LOCK:
35+
_ACTIVE_SWEEPS.discard(sweep)
36+
37+
38+
def _iter_parent_chain(sweep: "BaseSweep"):
39+
seen = set()
40+
current = getattr(sweep, "parent", None)
41+
while current is not None and current not in seen:
42+
yield current
43+
seen.add(current)
44+
current = getattr(current, "parent", None)
45+
46+
47+
def _is_related_sweep(a: "BaseSweep", b: "BaseSweep") -> bool:
48+
if a is b:
49+
return True
50+
for parent in _iter_parent_chain(a):
51+
if parent is b:
52+
return True
53+
for parent in _iter_parent_chain(b):
54+
if parent is a:
55+
return True
56+
return False
57+
58+
59+
def _has_other_active_sweep(sweep: "BaseSweep") -> bool:
60+
with _ACTIVE_SWEEPS_LOCK:
61+
active = list(_ACTIVE_SWEEPS)
62+
for other in active:
63+
if other is sweep:
64+
continue
65+
state = getattr(getattr(other, "progressState", None), "state", None)
66+
if state in (SweepState.RUNNING, SweepState.RAMPING):
67+
if _is_related_sweep(sweep, other):
68+
continue
69+
return True
70+
return False
71+
72+
73+
def _kill_other_active_sweeps(sweep: "BaseSweep") -> int:
74+
"""Kill all unrelated active sweeps and return how many were killed."""
75+
with _ACTIVE_SWEEPS_LOCK:
76+
active = list(_ACTIVE_SWEEPS)
77+
killed = 0
78+
for other in active:
79+
if other is sweep:
80+
continue
81+
if _is_related_sweep(sweep, other):
82+
continue
83+
try:
84+
other.kill()
85+
killed += 1
86+
except Exception:
87+
pass
88+
return killed
89+
2390

2491
class BaseSweep(QObject):
2592
"""The parent class for the 0D, 1D and 2D sweep classes.
@@ -376,6 +443,7 @@ def _enter_running_state(self, *, reset_elapsed: bool) -> float:
376443
self._mark_done_deferred = False
377444
self._run_started_at = now
378445
self.progressState.state = SweepState.RUNNING
446+
_register_active_sweep(self)
379447
return now
380448

381449
def pause(self):
@@ -388,6 +456,15 @@ def pause(self):
388456
self._add_runtime_since_last_resume()
389457
self.progressState.state = SweepState.PAUSED
390458
self.send_updates()
459+
# If this is an inner/child sweep, pause the parent as well to keep states consistent
460+
parent = getattr(self, "parent", None)
461+
if parent is not None and parent is not self:
462+
try:
463+
parent_state = getattr(getattr(parent, "progressState", None), "state", None)
464+
if parent_state in (SweepState.RUNNING, SweepState.RAMPING):
465+
parent.pause()
466+
except Exception:
467+
pass
391468

392469
def stop(self):
393470
"""Stop/pause the sweep. Alias for pause() for backward compatibility.
@@ -411,6 +488,7 @@ def kill(self):
411488
# ERROR state transitions to KILLED since user explicitly called kill()
412489
if progress_state is not None and progress_state.state not in (SweepState.DONE, SweepState.KILLED):
413490
self.progressState.state = SweepState.KILLED
491+
_deregister_active_sweep(self)
414492
if hasattr(self, "_error_completion_pending"):
415493
self._error_completion_pending = False # Clear to prevent stale flag
416494

@@ -454,6 +532,29 @@ def check_running(self):
454532
"""Returns the status of the sweep."""
455533
return self.progressState.state in (SweepState.RUNNING, SweepState.RAMPING)
456534

535+
def start_force(self, *args, **kwargs):
536+
"""Kill other unrelated active sweeps, then start this sweep."""
537+
if not self.progressState.is_queued:
538+
_kill_other_active_sweeps(self)
539+
return self.start(*args, **kwargs)
540+
541+
@staticmethod
542+
def list_active_sweeps():
543+
"""Return a snapshot of active sweeps for debugging."""
544+
with _ACTIVE_SWEEPS_LOCK:
545+
active = list(_ACTIVE_SWEEPS)
546+
info = []
547+
for sweep in active:
548+
state = getattr(getattr(sweep, "progressState", None), "state", None)
549+
info.append(
550+
{
551+
"type": sweep.__class__.__name__,
552+
"state": state.name if state is not None else None,
553+
"sweep": sweep,
554+
}
555+
)
556+
return info
557+
457558
def start(self, persist_data=None, ramp_to_start=False):
458559
"""Starts the sweep by creating and running the worker threads.
459560
@@ -465,6 +566,10 @@ def start(self, persist_data=None, ramp_to_start=False):
465566
Optional argument which gradually ramps each parameter to the starting
466567
point of its sweep. Default is true for Sweep1D and Sweep2D.
467568
"""
569+
if not self.progressState.is_queued and _has_other_active_sweep(self):
570+
raise RuntimeError(
571+
"Another sweep is already running. Stop or kill it before starting a new sweep, use start_force() to kill others, or use SweepQueue to stack sweeps."
572+
)
468573
if self.progressState.state in (SweepState.RUNNING, SweepState.RAMPING):
469574
self.print_main.emit("We are already running, can't start while running.")
470575
return
@@ -540,6 +645,10 @@ def start(self, persist_data=None, ramp_to_start=False):
540645
def resume(self):
541646
"""Restarts the sweep after it has been paused."""
542647
if self.progressState.state == SweepState.PAUSED:
648+
if not self.progressState.is_queued and _has_other_active_sweep(self):
649+
raise RuntimeError(
650+
"Another sweep is already running. Stop or kill it before resuming this sweep, use start_force() to kill others, or use SweepQueue to stack sweeps."
651+
)
543652
self._enter_running_state(reset_elapsed=False)
544653
self.send_updates(no_sp=True)
545654
else:
@@ -614,6 +723,7 @@ def mark_done(self) -> None:
614723
if self.progressState.state == SweepState.RUNNING:
615724
self._add_runtime_since_last_resume()
616725
self.progressState.state = SweepState.DONE
726+
_deregister_active_sweep(self)
617727
self.send_updates()
618728
self.completed.emit()
619729

@@ -639,6 +749,7 @@ def mark_error(self, error_message: str, _from_runner: bool = False) -> None:
639749
self._add_runtime_since_last_resume()
640750
self.progressState.state = SweepState.ERROR
641751
self.progressState.error_message = error_message
752+
_deregister_active_sweep(self)
642753

643754
# Propagate error to parent sweep (e.g., Sweep2D when inner Sweep1D fails)
644755
parent = getattr(self, "parent", None)

src/measureit/sweep/simul_sweep.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -405,6 +405,7 @@ def ramp_to(self, vals_dict, start_on_finish=False, persist=None, multiplier=1):
405405
),
406406
suppress_output=self.suppress_output,
407407
)
408+
self.ramp_sweep.parent = self
408409
# Only follow parameters that are not being ramped to avoid circular dependencies
409410
follow_params = [p for p in self._params if p not in ramp_params_dict.keys()]
410411
if follow_params:

src/measureit/sweep/sweep1d.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -506,6 +506,7 @@ def ramp_to(self, value, start_on_finish=False, persist=None, multiplier=1):
506506
save_data=False,
507507
plot_data=self.plot_data,
508508
)
509+
self.ramp_sweep.parent = self
509510
self.ramp_sweep.follow_param(self._params)
510511

511512
self.progressState.state = SweepState.RAMPING

src/measureit/sweep/sweep1d_listening.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -401,6 +401,7 @@ def ramp_to(self, value, start_on_finish=False, persist=None, multiplier=1):
401401
save_data=False,
402402
plot_data=self.plot_data,
403403
)
404+
self.ramp_sweep.parent = self
404405

405406
self.progressState.state = SweepState.RAMPING
406407
self.ramp_sweep.start(ramp_to_start=False)

src/measureit/sweep/sweep2d.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -673,6 +673,7 @@ def ramp_to(self, value, start_on_finish=False, multiplier=1):
673673
save_data=False,
674674
plot_data=True,
675675
)
676+
self.ramp_sweep.parent = self
676677
for p in self._params:
677678
if p is not self.set_param:
678679
self.ramp_sweep.follow_param(p)
@@ -705,6 +706,7 @@ def ramp_to_zero(self):
705706
inter_delay=self.inter_delay,
706707
complete_func=self.done_ramping,
707708
)
709+
self.ramp_sweep.parent = self
708710
self.ramp_sweep.follow_param(self._params)
709711
self.progressState.state = SweepState.RAMPING
710712
self.outer_ramp = True

tests/conftest.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,28 @@ def close_qcodes_instruments_between_tests():
134134
pass
135135

136136

137+
@pytest.fixture(autouse=True, scope="function")
138+
def clear_active_sweeps():
139+
"""Clear the active sweeps registry before and after each test.
140+
141+
This prevents the concurrency guard from blocking tests that create
142+
multiple sweeps across different test functions.
143+
"""
144+
try:
145+
from measureit.sweep.base_sweep import _ACTIVE_SWEEPS, _ACTIVE_SWEEPS_LOCK
146+
with _ACTIVE_SWEEPS_LOCK:
147+
_ACTIVE_SWEEPS.clear()
148+
except ImportError:
149+
pass
150+
yield
151+
try:
152+
from measureit.sweep.base_sweep import _ACTIVE_SWEEPS, _ACTIVE_SWEEPS_LOCK
153+
with _ACTIVE_SWEEPS_LOCK:
154+
_ACTIVE_SWEEPS.clear()
155+
except ImportError:
156+
pass
157+
158+
137159
@pytest.fixture(autouse=True, scope="function")
138160
def cleanup_qt_threads():
139161
"""Clean up any lingering QThreads after each test to prevent segfaults.

0 commit comments

Comments
 (0)