Skip to content

Commit 5231029

Browse files
authored
Add generic interactive shell (#189)
1 parent 64a7828 commit 5231029

File tree

9 files changed

+92
-25
lines changed

9 files changed

+92
-25
lines changed

docs/snippets/static02.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,4 @@ class TemperatureController(Controller):
77

88

99
fastcs = FastCS(TemperatureController(), [])
10-
fastcs.run()
10+
# fastcs.run() # Commented as this will block

docs/snippets/static03.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,4 +9,4 @@ class TemperatureController(Controller):
99

1010

1111
fastcs = FastCS(TemperatureController(), [])
12-
fastcs.run()
12+
# fastcs.run() # Commented as this will block

pyproject.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@ dependencies = [
2020
"pytango",
2121
"softioc>=4.5.0",
2222
"strawberry-graphql",
23-
"p4p"
23+
"p4p",
24+
"IPython",
2425
]
2526
dynamic = ["version"]
2627
license.file = "LICENSE"

src/fastcs/launch.py

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,13 @@
22
import inspect
33
import json
44
import signal
5+
from collections.abc import Coroutine
6+
from functools import partial
57
from pathlib import Path
68
from typing import Annotated, Any, Optional, TypeAlias, get_type_hints
79

810
import typer
11+
from IPython.terminal.embed import InteractiveShellEmbed
912
from pydantic import BaseModel, create_model
1013
from ruamel.yaml import YAML
1114

@@ -36,6 +39,7 @@ def __init__(
3639
transport_options: TransportOptions,
3740
):
3841
self._loop = asyncio.get_event_loop()
42+
self._controller = controller
3943
self._backend = Backend(controller, self._loop)
4044
transport: TransportAdapter
4145
self._transports: list[TransportAdapter] = []
@@ -100,12 +104,59 @@ def run(self):
100104

101105
async def serve(self) -> None:
102106
coros = [self._backend.serve()]
103-
coros.extend([transport.serve() for transport in self._transports])
107+
context = {
108+
"controller": self._controller,
109+
"controller_api": self._backend.controller_api,
110+
"transports": [
111+
transport.__class__.__name__ for transport in self._transports
112+
],
113+
}
114+
115+
for transport in self._transports:
116+
coros.append(transport.serve())
117+
common_context = context.keys() & transport.context.keys()
118+
if common_context:
119+
raise RuntimeError(
120+
"Duplicate context keys found between "
121+
f"current context { ({k: context[k] for k in common_context}) } "
122+
f"and {transport.__class__.__name__} context: "
123+
f"{ ({k: transport.context[k] for k in common_context}) }"
124+
)
125+
context.update(transport.context)
126+
127+
coros.append(self._interactive_shell(context))
128+
104129
try:
105130
await asyncio.gather(*coros)
106131
except asyncio.CancelledError:
107132
pass
108133

134+
async def _interactive_shell(self, context: dict[str, Any]):
135+
"""Spawn interactive shell in another thread and wait for it to complete."""
136+
137+
def run(coro: Coroutine[None, None, None]):
138+
"""Run coroutine on FastCS event loop from IPython thread."""
139+
140+
def wrapper():
141+
asyncio.create_task(coro)
142+
143+
self._loop.call_soon_threadsafe(wrapper)
144+
145+
async def interactive_shell(
146+
context: dict[str, object], stop_event: asyncio.Event
147+
):
148+
"""Run interactive shell in a new thread."""
149+
shell = InteractiveShellEmbed()
150+
await asyncio.to_thread(partial(shell.mainloop, local_ns=context))
151+
152+
stop_event.set()
153+
154+
context["run"] = run
155+
156+
stop_event = asyncio.Event()
157+
self._loop.create_task(interactive_shell(context, stop_event))
158+
await stop_event.wait()
159+
109160

110161
def launch(
111162
controller_class: type[Controller],

src/fastcs/transport/adapter.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,3 +22,7 @@ def create_docs(self) -> None:
2222
@abstractmethod
2323
def create_gui(self) -> None:
2424
pass
25+
26+
@property
27+
def context(self) -> dict[str, Any]:
28+
return {}

src/fastcs/transport/epics/ca/adapter.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,5 +40,3 @@ def create_gui(self) -> None:
4040
async def serve(self) -> None:
4141
print(f"Running FastCS IOC: {self._pv_prefix}")
4242
self._ioc.run(self._loop)
43-
while True:
44-
await asyncio.sleep(1)

tests/example_p4p_ioc.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ async def d(self):
4646
print("D: RUNNING")
4747
await asyncio.sleep(0.1)
4848
print("D: FINISHED")
49+
await self.j.set(self.j.get() + 1)
4950

5051
e: AttrR = AttrR(Bool())
5152

@@ -67,6 +68,9 @@ async def i(self):
6768
else:
6869
self.fail_on_next_e = True
6970
print("I: FINISHED")
71+
await self.j.set(self.j.get() + 1)
72+
73+
j: AttrR = AttrR(Int())
7074

7175

7276
def run(pv_prefix="P4P_TEST_DEVICE"):

tests/test_launch.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,3 +145,17 @@ def test_get_schema(data):
145145

146146
ref_schema = YAML(typ="safe").load(data / "schema.json")
147147
assert target_schema == ref_schema
148+
149+
150+
def test_error_if_identical_context_in_transports(mocker: MockerFixture, data):
151+
mocker.patch("fastcs.launch.FastCS.create_gui")
152+
mocker.patch("fastcs.launch.FastCS.create_docs")
153+
mocker.patch(
154+
"fastcs.transport.adapter.TransportAdapter.context",
155+
new_callable=mocker.PropertyMock,
156+
return_value={"controller": "test"},
157+
)
158+
app = _launch(IsHinted)
159+
result = runner.invoke(app, ["run", str(data / "config.yaml")])
160+
assert isinstance(result.exception, RuntimeError)
161+
assert "Duplicate context keys found" in result.exception.args[0]

tests/transport/epics/pva/test_p4p.py

Lines changed: 14 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ async def test_ioc(p4p_subprocess: tuple[str, Queue]):
6060
"g": {"rw": f"{pv_prefix}:Child1:G"},
6161
"h": {"rw": f"{pv_prefix}:Child1:H"},
6262
"i": {"x": f"{pv_prefix}:Child1:I"},
63+
"j": {"r": f"{pv_prefix}:Child1:J"},
6364
}
6465

6566

@@ -104,31 +105,29 @@ async def test_scan_method(p4p_subprocess: tuple[str, Queue]):
104105

105106
@pytest.mark.asyncio
106107
async def test_command_method(p4p_subprocess: tuple[str, Queue]):
107-
QUEUE_TIMEOUT = 1
108-
pv_prefix, stdout_queue = p4p_subprocess
108+
pv_prefix, _ = p4p_subprocess
109109
d_values = asyncio.Queue()
110110
i_values = asyncio.Queue()
111+
j_values = asyncio.Queue()
111112
ctxt = Context("pva")
112113

113114
d_monitor = ctxt.monitor(f"{pv_prefix}:Child1:D", d_values.put)
114115
i_monitor = ctxt.monitor(f"{pv_prefix}:Child1:I", i_values.put)
116+
j_monitor = ctxt.monitor(f"{pv_prefix}:Child1:J", j_values.put)
115117

116118
try:
117-
if not stdout_queue.empty():
118-
raise RuntimeError("stdout_queue not empty", stdout_queue.get())
119+
j_initial_value = await j_values.get()
119120
assert (await d_values.get()).raw.value is False
120121
await ctxt.put(f"{pv_prefix}:Child1:D", True)
121122
assert (await d_values.get()).raw.value is True
123+
# D process hangs for 0.1s, so we wait slightly longer
122124
await asyncio.sleep(0.2)
125+
# Value returns to False, signifying completed process
123126
assert (await d_values.get()).raw.value is False
124-
125-
assert stdout_queue.get(timeout=QUEUE_TIMEOUT) == "D: RUNNING"
126-
assert stdout_queue.get(timeout=QUEUE_TIMEOUT) == "\n"
127-
assert stdout_queue.get(timeout=QUEUE_TIMEOUT) == "D: FINISHED"
128-
assert stdout_queue.get(timeout=QUEUE_TIMEOUT) == "\n"
127+
# D process increments J by 1
128+
assert (await j_values.get()).raw.value == j_initial_value + 1
129129

130130
# First run fails
131-
assert stdout_queue.empty()
132131
before_command_value = (await i_values.get()).raw
133132
assert before_command_value["value"] is False
134133
assert before_command_value["alarm"]["severity"] == 0
@@ -143,30 +142,26 @@ async def test_command_method(p4p_subprocess: tuple[str, Queue]):
143142
assert (
144143
after_command_value["alarm"]["message"] == "I: FAILED WITH THIS WEIRD ERROR"
145144
)
146-
assert stdout_queue.get(timeout=QUEUE_TIMEOUT) == "I: RUNNING"
147-
assert stdout_queue.get(timeout=QUEUE_TIMEOUT) == "\n"
145+
# Failed I process does not increment J
146+
assert j_values.empty()
148147

149148
# Second run succeeds
150-
assert stdout_queue.empty()
151149
await ctxt.put(f"{pv_prefix}:Child1:I", True)
152150
assert (await i_values.get()).raw.value is True
153151
await asyncio.sleep(0.2)
154152
after_command_value = (await i_values.get()).raw
153+
# Successful I process increments J by 1
154+
assert (await j_values.get()).raw.value == j_initial_value + 2
155155

156156
# On the second run the command succeeded so we left the error state
157157
assert after_command_value["value"] is False
158158
assert after_command_value["alarm"]["severity"] == 0
159159
assert after_command_value["alarm"]["message"] == ""
160160

161-
assert stdout_queue.get(timeout=QUEUE_TIMEOUT) == "I: RUNNING"
162-
assert stdout_queue.get(timeout=QUEUE_TIMEOUT) == "\n"
163-
assert stdout_queue.get(timeout=QUEUE_TIMEOUT) == "I: FINISHED"
164-
assert stdout_queue.get(timeout=QUEUE_TIMEOUT) == "\n"
165-
assert stdout_queue.empty()
166-
167161
finally:
168162
d_monitor.close()
169163
i_monitor.close()
164+
j_monitor.close()
170165

171166

172167
@pytest.mark.asyncio

0 commit comments

Comments
 (0)