11import json
22import time
33
4+ import supervisor
45from pysquared .cdh import CommandDataHandler
56from pysquared .config .config import Config
67from pysquared .hardware .radio .packetizer .packet_manager import PacketManager
@@ -25,11 +26,17 @@ def __init__(
2526 def listen (self ):
2627 try :
2728 while True :
29+ if supervisor .runtime .serial_bytes_available :
30+ typed = input ().strip ()
31+ if typed :
32+ self .handle_input (typed )
33+
2834 b = self ._packet_manager .listen (1 )
2935 if b is not None :
3036 self ._log .info (
3137 message = "Received response" , response = b .decode ("utf-8" )
3238 )
39+
3340 except KeyboardInterrupt :
3441 self ._log .debug ("Keyboard interrupt received, exiting listen mode." )
3542
@@ -45,62 +52,66 @@ def send_receive(self):
4552 ===============================
4653 """
4754 )
48- if cmd_selection not in ["1" , "2" , "3" ]:
49- self ._log .warning ("Invalid command selection. Please try again." )
50- return
51-
52- message : dict [str , object ] = {
53- "name" : self ._config .cubesat_name ,
54- "password" : self ._config .super_secret_code ,
55- }
56-
57- if cmd_selection == "1" :
58- message ["command" ] = self ._cdh .command_reset
59- elif cmd_selection == "2" :
60- message ["command" ] = self ._cdh .command_change_radio_modulation
61- modulation = input ("Enter new radio modulation [FSK | LoRa]: " )
62- message ["args" ] = [modulation ]
63- elif cmd_selection == "3" :
64- message ["command" ] = self ._cdh .command_send_joke
6555
66- while True :
67- # Turn on the radio so that it captures any received packets to buffer
68- self ._packet_manager .listen (1 )
56+ self .handle_input (cmd_selection )
6957
70- # Send the message
71- self ._log .info (
72- "Sending command" ,
73- cmd = message ["command" ],
74- args = message .get ("args" , []),
75- )
76- self ._packet_manager .send (json .dumps (message ).encode ("utf-8" ))
58+ except KeyboardInterrupt :
59+ self ._log .debug ("Keyboard interrupt received, exiting send mode." )
7760
78- # Listen for ACK response
79- b = self ._packet_manager .listen (1 )
80- if b is None :
81- self ._log .info ("No response received, retrying..." )
82- continue
61+ def handle_input (self , cmd_selection ):
62+ if cmd_selection not in ["1" , "2" , "3" ]:
63+ self ._log .warning ("Invalid command selection. Please try again." )
64+ return
65+
66+ message : dict [str , object ] = {
67+ "name" : self ._config .cubesat_name ,
68+ "password" : self ._config .super_secret_code ,
69+ }
70+
71+ if cmd_selection == "1" :
72+ message ["command" ] = self ._cdh .command_reset
73+ elif cmd_selection == "2" :
74+ message ["command" ] = self ._cdh .command_change_radio_modulation
75+ modulation = input ("Enter new radio modulation [FSK | LoRa]: " )
76+ message ["args" ] = [modulation ]
77+ elif cmd_selection == "3" :
78+ message ["command" ] = self ._cdh .command_send_joke
8379
84- if b != b"ACK" :
85- self ._log .info (
86- "No ACK response received, retrying..." ,
87- response = b .decode ("utf-8" ),
88- )
89- continue
80+ while True :
81+ # Turn on the radio so that it captures any received packets to buffer
82+ self ._packet_manager .listen (1 )
83+
84+ # Send the message
85+ self ._log .info (
86+ "Sending command" ,
87+ cmd = message ["command" ],
88+ args = message .get ("args" , []),
89+ )
90+ self ._packet_manager .send (json .dumps (message ).encode ("utf-8" ))
9091
91- self ._log .info ("Received ACK" )
92+ # Listen for ACK response
93+ b = self ._packet_manager .listen (1 )
94+ if b is None :
95+ self ._log .info ("No response received, retrying..." )
96+ continue
9297
93- # Now listen for the actual response
94- b = self ._packet_manager .listen (1 )
95- if b is None :
96- self ._log .info ("No response received, retrying..." )
97- continue
98+ if b != b"ACK" :
99+ self ._log .info (
100+ "No ACK response received, retrying..." ,
101+ response = b .decode ("utf-8" ),
102+ )
103+ continue
98104
99- self ._log .info ("Received response" , response = b .decode ("utf-8" ))
100- break
105+ self ._log .info ("Received ACK" )
101106
102- except KeyboardInterrupt :
103- self ._log .debug ("Keyboard interrupt received, exiting send mode." )
107+ # Now listen for the actual response
108+ b = self ._packet_manager .listen (1 )
109+ if b is None :
110+ self ._log .info ("No response received, retrying..." )
111+ continue
112+
113+ self ._log .info ("Received response" , response = b .decode ("utf-8" ))
114+ break
104115
105116 def run (self ):
106117 while True :
0 commit comments