-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathschier.py
More file actions
467 lines (362 loc) · 17.4 KB
/
schier.py
File metadata and controls
467 lines (362 loc) · 17.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
import asyncio
import logging
import math
from enum import Enum, auto
from comm import MountComm
from configuration import MountConfig
from coordinates import MountCoordinates
class MountState(Enum):
IDLE = auto()
SLEWING = auto()
TRACKING = auto()
PARKING = auto()
PARKED = auto()
HOMING = auto()
FAULT = auto()
RECOVERING = auto()
UNKNOWN = auto()
class SchierMount():
def __init__(self):
self.logger = logging.getLogger("SchierMount")
self._status_task = None
self._move_task = None # Track the active move
self.serial_lock = asyncio.Lock()
self.current_positions = {
"ra_enc": 0,
"dec_enc": 0,
}
self.ha_offset_deg = 0.0
self.dec_offset_deg = 0.0
self.config = MountConfig()
self.coord = MountCoordinates(config=self.config)
self.comm = MountComm(config=self.config)
self.state = MountState.UNKNOWN
async def init_mount(self):
"""
Initializes the mount hardware and starts the background status monitoring loop.
This method sends the initialization command to the hardware, sets the initial
state to PARKED, and ensures the status polling task is running.
Raises:
Exception: If hardware initialization fails.
"""
try:
self.logger.info("Initializing mount hardware...")
await self._safe_comm(self.comm.init_mount)
self.state = MountState.PARKED
if self._status_task is None or self._status_task.done():
self._status_task = asyncio.create_task(self._status_loop())
self.logger.info("Mount initialization complete.")
except Exception as e:
self.state = MountState.UNKNOWN
self.logger.error(f"Failed to initialize mount: {e}")
raise
async def home_mount(self):
"""
Initiates the homing sequence for both axes.
This method performs the following steps:
1. Sets the mount state to HOMING.
2. Sends the hardware homing command to the controller.
3. Monitors encoder feedback until movement stops (within tolerance).
4. Resets the internal encoder counts to zero at the home position.
5. Transitions the mount state to IDLE.
Raises:
TimeoutError: If the mount fails to stabilize at home within the timeout.
Exception: For communication or hardware errors during the sequence.
"""
try:
self.logger.debug("Starting homing sequence...")
self.state = MountState.HOMING
self._move_task = asyncio.current_task()
# Use safe_comm to send the homing command
await self._safe_comm(self.comm.home_mount)
self.logger.debug("Homing command sent, waiting for encoders to stabilize...")
await self._await_encoder_stop(tolerance=100, timeout=120)
await self._safe_comm(self.comm.zero_mount)
self.state = MountState.IDLE
self.logger.info("Homing sequence completed successfully.")
except Exception as e:
logging.error(f"Failed to home mount: {e}")
finally:
self._move_task = None
async def stop_mount(self):
"""
Immediately stops all mount movement and cancels active movement tasks.
This method:
1. Sends an idle command to the hardware to stop motor movement.
2. Sets the mount state to IDLE.
3. Cancels any running asynchronous movement tasks (e.g., homing or parking).
"""
self.logger.info("Stopping mount...")
# 1. Stop the Hardware
await self._safe_comm(self.comm.idle_mount)
self.state = MountState.IDLE
# 2. Stop the Software Task
if self._move_task and not self._move_task.done():
self._move_task.cancel()
async def park_mount(self):
"""
Initiates the parking sequence for the mount.
This method performs the following steps:
1. Sets the mount state to PARKING.
2. Sends the hardware homing command to move the mount to its park position.
3. Monitors encoder feedback until movement reaches target (within tolerance).
4. Transitions the mount state to PARKED.
Raises:
TimeoutError: If the mount fails to stabilize at the park position within the timeout.
Exception: For communication or hardware errors during the sequence.
"""
try:
self.logger.info("Parking mount...")
self.state = MountState.PARKING
self._move_task = asyncio.current_task()
# Use safe_comm to send the park command
await self._safe_comm(self.comm.park_mount)
self.logger.debug("Parking command sent, waiting for encoders to reach target...")
#await self._await_encoder_stop(tolerance=100, timeout=120)
await self._await_mount_at_position()
self.state = MountState.PARKED
self.logger.info("Homing sequence completed successfully.")
except Exception as e:
logging.error(f"Failed to park mount: {e}")
finally:
self._move_task = None
async def standby_mount(self):
"""
Moves the mount to the standby (zenith) position.
This method:
1. Sets the mount state to SLEWING.
2. Sends the hardware command to move to the standby position.
3. Monitors encoder feedback until movement stops (within tolerance).
4. Transitions the mount state to IDLE.
Raises:
TimeoutError: If the mount fails to stabilize at the standby position within the timeout.
Exception: For communication or hardware errors during the sequence.
"""
try:
self.logger.info("Sending mount to standby position (zenith) ...")
self.state = MountState.SLEWING
self._move_task = asyncio.current_task()
# Use safe_comm to send the park command
await self._safe_comm(self.comm.standby_mount)
self.logger.debug("Standby command sent, waiting for encoders to reach target...")
#await self._await_encoder_stop(tolerance=100, timeout=120)
await self._await_mount_at_position()
self.state = MountState.IDLE
self.logger.info("Mount moved to standby pos.")
except Exception as e:
logging.error(f"Failed to move mount: {e}")
finally:
self._move_task = None
async def slew_mount(self, ha_deg : float, dec_deg : float ):
"""
Slews the mount to the specified HA and Dec coordinates.
Args:
ha_deg (float): Target Hour Angle in degrees.
dec_deg (float): Target Declination in degrees.
Steps:
1. Applies software offsets to the target coordinates.
2. Converts the target HA/Dec to encoder steps.
3. Commands the hardware to slew to the target encoder positions.
4. Monitors the movement until the target is reached.
Raises:
TimeoutError: If the mount fails to reach the target within the timeout.
Exception: For communication or hardware errors.
"""
try:
self.logger.info(f"Slewing to HA: {ha_deg}, Dec: {dec_deg}...")
self.state = MountState.SLEWING
self._move_task = asyncio.current_task()
# 1. Apply software offsets
target_ha = ha_deg + self.ha_offset_deg
target_dec = dec_deg + self.dec_offset_deg
# 2. Convert to encoder steps
ra_steps, dec_steps = self.coord.hadec_to_enc(target_ha, target_dec)
# 3. Send hardware command
self.logger.info(f"Slew Command: target HA={target_ha:.4f} ({int(ra_steps)} enc), target Dec={target_dec:.4f} ({int(dec_steps)} enc)")
await self._safe_comm(self.comm.slew_mount, int(ra_steps), int(dec_steps))
# 4. Wait for completion
await self._await_mount_at_position(timeout=120)
self.state = MountState.IDLE
self.logger.info("Slew completed successfully.")
except Exception as e:
self.logger.error(f"Slew failed: {e}")
self.state = MountState.FAULT
raise
finally:
self._move_task = None
async def track_sidereal(self):
"""
Starts sidereal tracking on the RA axis.
Calculates the sidereal rate in steps per second based on the mount's
configuration. Note that for the Southern Hemisphere, the RA motor
direction is inverted.
Transitions the mount state to TRACKING.
Raises:
Exception: If the tracking command fails to send to the hardware.
"""
try:
self.logger.info("Starting sidereal tracking...")
self.state = MountState.TRACKING
# since we are in the SOUTHERN HEMISPHERE we need to flip the ra motor direction ...
sidereal_rate_steps_per_sec = -1 * 0.004178 * self.config.encoder['steps_per_deg_ra']
await self._safe_comm(self.comm.track_mount, sidereal_rate_steps_per_sec, 0.0)
self.logger.info("Mount is now tracking at sidereal rate.")
except Exception as e:
self.state = MountState.FAULT
self.logger.error(f"Failed to start sidereal tracking: {e}")
raise
async def shift_mount(self, delta_ra: float, delta_dec: float):
"""
Shifts the mount by a relative amount of degrees in RA and Dec.
Uses cosine projection to ensure 'delta_ra' represents true angular
distance on the sky regardless of proximity to the poles.
Args:
delta_ra (float): The relative shift in Right Ascension (degrees).
delta_dec (float): The relative shift in Declination (degrees).
Steps:
1. Retrieves current RA/Dec.
2. Applies cosine correction (secant of Dec) to RA to maintain true angular distance.
3. Converts corrected degrees to encoder steps.
4. Commands the hardware to perform a relative move.
5. Waits for the mount to reach the target position.
Raises:
TimeoutError: If the mount fails to reach the target within the timeout.
Exception: For communication or hardware errors.
"""
try:
self.state = MountState.SLEWING
self._move_task = asyncio.current_task()
ra_steps = int(delta_ra* self.config.encoder['steps_per_deg_ra'])
dec_steps = int(delta_dec * self.config.encoder['steps_per_deg_dec'])
# 4. Hardware Communication
await self._safe_comm(self.comm.shift_mount, ra_steps, dec_steps)
# 5. Wait for completion
await self._await_mount_at_position()
self.state = MountState.IDLE
self.logger.info("Shift completed.")
except Exception as e:
self.logger.error(f"Failed to shift mount: {e}", exc_info=True)
self.state = MountState.FAULT
finally:
self._move_task = None
async def track_non_sidereal(self, ha_rate : float, dec_rate : float):
"""
Starts tracking at a custom non-sidereal rate.
Args:
ha_rate (float): Tracking rate for Hour Angle in degrees per second.
dec_rate (float): Tracking rate for Declination in degrees per second.
Raises:
ValueError: If the requested rate exceeds the safety limit (5.0 deg/sec).
Exception: If the tracking command fails to send to the hardware.
"""
try:
self.logger.info(f"Starting non-sidereal tracking (HA: {ha_rate}, Dec: {dec_rate})...")
self.state = MountState.TRACKING
# Limit tracking rate to 5 degrees per second to prevent hardware strain
MAX_TRACK_RATE = 5.0
if abs(ha_rate) > MAX_TRACK_RATE or abs(dec_rate) > MAX_TRACK_RATE:
raise ValueError(f"Tracking rate exceeds maximum limit of {MAX_TRACK_RATE} deg/sec")
# Convert deg/sec to steps/sec
ra_steps_per_sec = -1* ha_rate * self.config.encoder['steps_per_deg_ra']
dec_steps_per_sec = dec_rate * self.config.encoder['steps_per_deg_dec']
await self._safe_comm(self.comm.track_mount, ra_steps_per_sec, dec_steps_per_sec)
self.logger.info("Mount is now tracking at non-sidereal rate.")
except Exception as e:
self.state = MountState.FAULT
self.logger.error(f"Failed to start non-sidereal tracking: {e}")
raise
async def update_offsets(self, delta_ha_deg :float, delta_dec_deg : float):
"""
Updates the software-level coordinate offsets.
Args:
delta_ha_deg (float): The offset to apply to Hour Angle in degrees.
delta_dec_deg (float): The offset to apply to Declination in degrees.
"""
self.ha_offset_deg = delta_ha_deg
self.dec_offset_deg = delta_dec_deg
self.logger.info(f"Offsets updated to HA: {delta_ha_deg}, Dec: {delta_dec_deg}")
async def get_offsets(self) -> tuple[float, float]:
"""
Retrieves the current software-level coordinate offsets.
Returns:
tuple[float, float]: A tuple containing (ha_offset_deg, dec_offset_deg).
"""
return self.ha_offset_deg, self.dec_offset_deg
async def get_ha_dec(self):
"""
Returns the current HA and Dec of the telescope in degrees.
Calculated using the current encoder positions and the coordinate module,
excluding any software offsets.
Returns:
tuple: (ha_deg, dec_deg) as floats.
"""
ra_enc = self.current_positions["ra_enc"]
dec_enc = self.current_positions["dec_enc"]
ha_deg, dec_deg = self.coord.enc_to_hadec(ra_enc, dec_enc)
return ha_deg, dec_deg
async def _attempt_recovery(self):
self.logger.info("Attempting servo and mount recovery...")
max_retry_attempts = 3
try:
await self._safe_comm(self.comm.init_mount)
except Exception as e:
self.logger.error(f"Recovery failed after {1} attempts: {e}")
async def _await_encoder_stop(self, tolerance=100, timeout=60):
"""
Wait until encoders stay within tolerance for 5 seconds or timeout.
Immediately raises an error if the mount enters a FAULT state.
"""
start_time = asyncio.get_event_loop().time()
stable_start_time = None
last_ra = self.current_positions["ra_enc"]
last_dec = self.current_positions["dec_enc"]
while (asyncio.get_event_loop().time() - start_time) < timeout:
if self.state == MountState.FAULT:
raise RuntimeError("Mount entered FAULT state during movement.")
curr_ra = self.current_positions["ra_enc"]
curr_dec = self.current_positions["dec_enc"]
if abs(curr_ra - last_ra) <= tolerance and abs(curr_dec - last_dec) <= tolerance:
if stable_start_time is None:
stable_start_time = asyncio.get_event_loop().time()
elif (asyncio.get_event_loop().time() - stable_start_time) >= 5.0:
return
else:
stable_start_time = None
last_ra, last_dec = curr_ra, curr_dec
await asyncio.sleep(0.1)
raise TimeoutError("Mount failed to stop within timeout period.")
async def _await_mount_at_position(self, timeout=180, tolerance=100):
"""
Wait until current encoder positions match target positions within tolerance.
Immediately raises an error if the mount enters a FAULT state.
"""
start_time = asyncio.get_event_loop().time()
while (asyncio.get_event_loop().time() - start_time) < timeout:
if self.state == MountState.FAULT:
raise RuntimeError("Mount entered FAULT state during slew.")
ra_diff = abs(self.current_positions["ra_enc"] - self.comm.ra_target_enc)
dec_diff = abs(self.current_positions["dec_enc"] - self.comm.dec_target_enc)
if ra_diff <= tolerance and dec_diff <= tolerance:
return
await asyncio.sleep(0.1)
raise TimeoutError(f"Mount failed to reach target position within {timeout}s ")
async def _safe_comm(self, func, *args, **kwargs):
"""Standard lock wrapper to prevent serial collision."""
async with self.serial_lock:
return await asyncio.to_thread(func, *args, **kwargs)
async def _status_loop(self):
while True:
try:
_, ra_actual = await self._safe_comm(self.comm.get_encoder_position, 0)
_, dec_actual = await self._safe_comm(self.comm.get_encoder_position, 1)
ra_axis_status = await self._safe_comm(self.comm.get_axis_status_bits, 0)
dec_axis_status = await self._safe_comm(self.comm.get_axis_status_bits, 1)
self.current_positions = {
"ra_enc": ra_actual,
"dec_enc": dec_actual,
}
if ra_axis_status['any_error'] or dec_axis_status['any_error']:
self.state = MountState.FAULT
except Exception as e:
self.logger.error(f"Status Loop Error: {e}")
await asyncio.sleep(0.01)