-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpayloadTelemetrySubscale.py
More file actions
176 lines (143 loc) · 6.74 KB
/
payloadTelemetrySubscale.py
File metadata and controls
176 lines (143 loc) · 6.74 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
# Import necessary modules
import socket # For establishing a TCP socket connection
import time # For time delays
import subprocess # For running external processes (i.e., Direwolf)
import RPi.GPIO as GPIO # For controlling Raspberry Pi GPIO pins
import board
import busio
from adafruit_bmp3xx import BMP3XX_I2C
# Paths for Direwolf executable and configuration file
direwolf_path = r'/usr/local/bin/direwolf' # Path to Direwolf binary
config_path = r'/home/soar/Direwolf/direwolf/build/direwolf.conf' # Path to Direwolf config file
# Start the Direwolf process using the specified configuration file
direwolf_process = subprocess.Popen([direwolf_path, '-c', config_path], shell=True)
# KISS protocol special characters
KISS_FEND = 0xC0 # Frame delimiter (start and end of a KISS frame)
KISS_FESC = 0xDB # Escape character
KISS_TFEND = 0xDC # Translated FEND (if FEND appears in data, it gets escaped)
KISS_TFESC = 0xDD # Translated FESC (if FESC appears in data, it gets escaped)
KISS_CMD_DATA = 0x00 # Command byte for sending data
i2c = busio.I2C(board.SCL, board.SDA)
bmp = BMP3XX_I2C(i2c, address=0x77)
bmp.pressure_oversampling = 8
bmp.temperature_oversampling = 2
bmp.sea_level_pressure = 1015.4
apogee_reached = False
last_altitude = bmp.altitude
def kiss_encode(data):
"""
Encodes a payload in KISS format.
This function wraps the provided data in KISS protocol format,
adding the necessary frame delimiters and escaping special characters.
Parameters:
- data (bytes): The data payload to be encoded.
Returns:
- bytearray: The KISS-encoded frame.
"""
kiss_frame = bytearray([KISS_FEND, KISS_CMD_DATA]) # Start with FEND and command byte
for byte in data:
if byte == KISS_FEND:
# Escape FEND character within data
kiss_frame.extend([KISS_FESC, KISS_TFEND])
elif byte == KISS_FESC:
# Escape FESC character within data
kiss_frame.extend([KISS_FESC, KISS_TFESC])
else:
# Regular byte, no escaping needed
kiss_frame.append(byte)
kiss_frame.append(KISS_FEND) # Add FEND at the end of the frame
return kiss_frame
def ax25_encode_call(call, ssid, last=False):
"""
Encodes a callsign with SSID for AX.25 protocol.
The callsign is padded to 6 characters, and each character is shifted left by 1 bit
as required by the AX.25 specification. The SSID is also encoded with specific bits.
Parameters:
- call (str): The callsign (max 6 characters).
- ssid (int): The SSID number (0-15).
- last (bool): Indicates if this is the last address field (set the "last" bit).
Returns:
- bytearray: Encoded callsign and SSID.
"""
call = call.ljust(6) # Ensure the callsign is exactly 6 characters by padding with spaces
encoded_call = bytearray((ord(c) << 1) for c in call) # Shift each character left by 1 bit
ssid_byte = (ssid & 0x0F) << 1 | 0x60 # Set the SSID in lower nibble, add control bits
if last:
ssid_byte |= 0x01 # Set "last" bit if this is the final address field
encoded_call.append(ssid_byte)
return encoded_call
def create_ax25_frame(source, source_ssid, destination, destination_ssid, data):
"""
Creates an AX.25 frame with source, destination, and data.
Parameters:
- source (str): Source callsign.
- source_ssid (int): SSID for the source callsign.
- destination (str): Destination callsign.
- destination_ssid (int): SSID for the destination callsign.
- data (str): The payload (APRS message).
Returns:
- bytearray: Complete AX.25 frame with headers and data.
"""
ax25_frame = bytearray()
# Encode destination callsign and SSID
ax25_frame.extend(ax25_encode_call(destination, destination_ssid))
# Encode source callsign and SSID (last=True since it's the last address field)
ax25_frame.extend(ax25_encode_call(source, source_ssid, last=True))
ax25_frame.append(0x03) # Control field: UI frame (Unnumbered Information)
ax25_frame.append(0xF0) # PID field: No Layer 3 protocol
# Append the payload data (e.g., APRS message)
ax25_frame.extend(data.encode("ascii"))
return ax25_frame
def send_aprs_packet(aprs_data, source, source_ssid, destination, destination_ssid, tcp_ip="127.0.0.1", tcp_port=8001):
"""
Encodes APRS data in AX.25, wraps in KISS format, and sends it to Direwolf via TCP.
This function also activates a GPIO pin on the Raspberry Pi before sending.
Parameters:
- aprs_data (str): The APRS message to be sent.
- source (str): Source callsign.
- source_ssid (int): SSID for the source callsign.
- destination (str): Destination callsign.
- destination_ssid (int): SSID for the destination callsign.
- tcp_ip (str): IP address for the Direwolf server (default: 127.0.0.1).
- tcp_port (int): TCP port for the Direwolf server (default: 8001).
"""
# Create the AX.25 frame with the given data
ax25_frame = create_ax25_frame(source, source_ssid, destination, destination_ssid, aprs_data)
# Wrap the AX.25 frame in KISS encoding
kiss_frame = kiss_encode(ax25_frame)
# Configure Raspberry Pi GPIO
GPIO.setmode(GPIO.BCM) # Set GPIO mode to Broadcom pin numbering
pin = 17 # Pin number to control (GPIO 17)
GPIO.setup(pin, GPIO.OUT) # Set GPIO pin as an output
# Activate GPIO pin to signal packet transmission
GPIO.output(pin, GPIO.HIGH)
# Send the KISS frame to Direwolf over TCP
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect((tcp_ip, tcp_port))
s.sendall(kiss_frame) # Send the KISS frame
time.sleep(15) # Wait for 15 seconds (to ensure packet transmission completes)
GPIO.cleanup() # Clean up GPIO pins after use
# APRS packet details
# Example APRS payload
source = "KQ4FYU" # Source callsign
source_ssid = 1 # Source SSID
destination = "APRS" # Destination callsign
destination_ssid = 0 # Destination SSID
# Continuously send APRS packets every 10 seconds
def get_apogee():
last_altitude = bmp.altitude
while True:
pressure = bmp.pressure
temperature = bmp.temperature
altitude = bmp.altitude
if altitude > last_altitude:
last_altitude = altitude
elif altitude < last_altitude:
apogee_reached = True
return bmp.altitude
apogee = get_apogee()
aprs_data = f"!Highest apogee = {apogee}"
for i in range(10):
send_aprs_packet(aprs_data, source, source_ssid, destination, destination_ssid)
# Wait for the Direwolf process to finish (blocks indefinitely)
direwolf_process.wait()