Skip to content

Commit 4d71d22

Browse files
committed
add group 3 hall probe emulator
1 parent 3e69553 commit 4d71d22

File tree

5 files changed

+168
-0
lines changed

5 files changed

+168
-0
lines changed
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
from .device import SimulatedGroup3HallProbe
2+
3+
__all__ = ["SimulatedGroup3HallProbe"]
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
from collections import OrderedDict
2+
from enum import Enum
3+
from typing import Callable, assert_never
4+
5+
from lewis.devices import StateMachineDevice
6+
7+
from .states import DefaultState, State
8+
9+
GAUSS_PER_TESLA = 10_000.0
10+
11+
12+
class Ranges(Enum):
13+
"""
14+
Expresses a measurement range that a Probe can be in.
15+
16+
Values and indices from device manual.
17+
"""
18+
19+
R0 = 0 # 0.3 Tesla range
20+
R1 = 1 # 0.6 Tesla range
21+
R2 = 2 # 1.2 Tesla range
22+
R3 = 3 # 3.0 Tesla range
23+
24+
25+
def range_to_max_gauss(r: Ranges) -> float:
26+
match r:
27+
case Ranges.R0:
28+
return 0.3 * GAUSS_PER_TESLA
29+
case Ranges.R1:
30+
return 0.6 * GAUSS_PER_TESLA
31+
case Ranges.R2:
32+
return 1.2 * GAUSS_PER_TESLA
33+
case Ranges.R3:
34+
return 3.0 * GAUSS_PER_TESLA
35+
36+
assert_never(r)
37+
38+
39+
class Probe:
40+
"""
41+
A single probe.
42+
"""
43+
44+
def __init__(self) -> None:
45+
self.field = 0.0
46+
self.temperature = 0.0
47+
self.sensor_range = Ranges.R3
48+
self.initialized = True
49+
50+
def is_over_range(self) -> bool:
51+
return abs(self.field) > range_to_max_gauss(self.sensor_range)
52+
53+
def initialize(self) -> None:
54+
self.sensor_range = Ranges.R3
55+
self.initialized = True
56+
57+
58+
class SimulatedGroup3HallProbe(StateMachineDevice):
59+
def _initialize_data(self) -> None:
60+
"""
61+
Initialize all of the device's attributes.
62+
"""
63+
self.connected = True
64+
self.probes = {
65+
0: Probe(),
66+
1: Probe(),
67+
2: Probe(),
68+
}
69+
70+
def reset(self) -> None:
71+
self._initialize_data()
72+
73+
def backdoor_set_field(self, probe_id: int, field: float) -> None:
74+
self.probes[probe_id].field = field
75+
76+
def backdoor_set_temperature(self, probe_id: int, temperature: float) -> None:
77+
self.probes[probe_id].temperature = temperature
78+
79+
def backdoor_set_initialized(self, probe_id: int, initialized: bool) -> None:
80+
self.probes[probe_id].initialized = initialized
81+
82+
def _get_state_handlers(self) -> dict[str, State]:
83+
return {
84+
"default": DefaultState(),
85+
}
86+
87+
def _get_initial_state(self) -> str:
88+
return "default"
89+
90+
def _get_transition_handlers(self) -> dict[tuple[str, str], Callable[[], bool]]:
91+
return OrderedDict([])
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
from .stream_interface import Group3HallProbeStreamInterface
2+
3+
__all__ = ["Group3HallProbeStreamInterface"]
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
import logging
2+
3+
from lewis.adapters.stream import StreamInterface
4+
from lewis.core.logging import has_log
5+
from lewis.utils.command_builder import CmdBuilder
6+
from lewis.utils.replies import conditional_reply
7+
from lewis_emulators.group3hallprobe.device import Ranges, SimulatedGroup3HallProbe
8+
9+
if_connected = conditional_reply("connected")
10+
11+
12+
@has_log
13+
class Group3HallProbeStreamInterface(StreamInterface):
14+
in_terminator = "\n\r" # Yes, really LF-CR not CR-LF
15+
out_terminator = "\n\r"
16+
17+
def __init__(self) -> None:
18+
super(Group3HallProbeStreamInterface, self).__init__()
19+
20+
self.log: logging.Logger
21+
self.device: SimulatedGroup3HallProbe
22+
23+
# Commands that we expect via serial during normal operation
24+
self.commands = {
25+
CmdBuilder(self.initialize).escape("A").int().escape(" SE0GDR3GCNNUFG").eos().build(),
26+
CmdBuilder(self.get_field).escape("A").int().escape(" F").eos().build(),
27+
CmdBuilder(self.get_temperature).escape("A").int().escape(" T").eos().build(),
28+
CmdBuilder(self.set_range).escape("A").int().escape(" R").int().eos().build(),
29+
}
30+
31+
def handle_error(self, request: str, error: str | Exception) -> None:
32+
"""
33+
If command is not recognised print and error
34+
35+
Args:
36+
request: requested string
37+
error: problem
38+
39+
"""
40+
self.log.error("An error occurred at request " + repr(request) + ": " + repr(error))
41+
42+
@if_connected
43+
def initialize(self, probe_id: int) -> str:
44+
self.device.probes[probe_id].initialize()
45+
return f"A{probe_id} SE0GDR3GCNNUFG"
46+
47+
@if_connected
48+
def get_field(self, probe_id: int) -> str:
49+
probe = self.device.probes[probe_id]
50+
if not probe.initialized:
51+
return f"A{probe_id} F\n\runinitialized_bad_data"
52+
if probe.is_over_range():
53+
return f"A{probe_id} F\n\rOVER RANGE"
54+
return f"A{probe_id} F\n\r{probe.field}"
55+
56+
@if_connected
57+
def get_temperature(self, probe_id: int) -> str:
58+
probe = self.device.probes[probe_id]
59+
if not probe.initialized:
60+
return f"A{probe_id} T\n\runinitialized_bad_data"
61+
return f"A{probe_id} T\n\r{probe.temperature}C"
62+
63+
@if_connected
64+
def set_range(self, probe_id: int, range_id: int) -> str:
65+
self.device.probes[probe_id].sensor_range = Ranges(range_id)
66+
return f"A{probe_id} R{range_id}"
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
from lewis.core.statemachine import State
2+
3+
4+
class DefaultState(State):
5+
pass

0 commit comments

Comments
 (0)