Skip to content

Commit 30fe619

Browse files
authored
Create custom_space_data_link.py
this is for a patch for the gds accepting da packages
1 parent 868fc66 commit 30fe619

File tree

1 file changed

+198
-0
lines changed

1 file changed

+198
-0
lines changed

custom_space_data_link.py

Lines changed: 198 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,198 @@
1+
"""F Prime Framer/Deframer Implementation of the CCSDS Space Data Link (TC/TM) Protocols"""
2+
3+
import copy
4+
import struct
5+
import sys
6+
7+
import crc
8+
from fprime_gds.common.communication.framing import FramerDeframer
9+
from fprime_gds.plugin.definitions import gds_plugin_implementation
10+
11+
12+
class SpaceDataLinkFramerDeframer(FramerDeframer):
13+
"""CCSDS Framer/Deframer Implementation for the TC (uplink / framing) and TM (downlink / deframing)
14+
protocols. This FramerDeframer is used for framing TC data for uplink and deframing TM data for downlink.
15+
"""
16+
17+
SEQUENCE_NUMBER_MAXIMUM = 256
18+
TC_HEADER_SIZE = 5
19+
TM_HEADER_SIZE = 6
20+
TM_FIXED_FRAME_SIZE = 248
21+
TM_TRAILER_SIZE = 2
22+
TC_TRAILER_SIZE = 2
23+
24+
# As per CCSDS standard, use CRC-16 CCITT config with init value
25+
# all 1s and final XOR value of 0x0000
26+
CRC_CCITT_CONFIG = crc.Configuration(
27+
width=16,
28+
polynomial=0x1021,
29+
init_value=0xFFFF,
30+
final_xor_value=0x0000,
31+
)
32+
CRC_CALCULATOR = crc.Calculator(CRC_CCITT_CONFIG)
33+
34+
def __init__(self, scid, vcid):
35+
""" """
36+
self.scid = scid
37+
self.vcid = vcid
38+
self.sequence_number = 0
39+
40+
def frame(self, data):
41+
"""Frame the supplied data in a TC frame"""
42+
space_packet_bytes = data
43+
# CCSDS TC protocol defines the length token as number of bytes in full frame, minus 1
44+
# so we add to packet size the size of the header and trailer and subtract 1
45+
length = (
46+
len(space_packet_bytes) + self.TC_HEADER_SIZE + self.TC_TRAILER_SIZE - 1
47+
)
48+
assert length < (pow(2, 10) - 1), "Length too-large for CCSDS format"
49+
50+
# CCSDS TC Header:
51+
# 2b - 00 - TF version number
52+
# 1b - 0/1 - 0 enable FARM checks, 1 bypass FARM
53+
# 1b - 0/1 - 0 = data (Type-D), 1 = control information (Type-C)
54+
# 2b - 00 - Reserved
55+
# 10b - XX - Spacecraft id
56+
# 6b - XX - Virtual Channel ID
57+
# 10b - XX - Frame length
58+
# 8b - XX - Frame sequence number
59+
60+
# First 16 bits:
61+
header_val1_u16 = (
62+
(0 << 14) # TF version number (2 bits)
63+
| (1 << 13) # Bypass FARM (1 bit)
64+
| (0 << 12) # Type-D (1 bit)
65+
| (0 << 10) # Reserved (2 bits)
66+
| (self.scid & 0x3FF) # SCID (10 bits)
67+
)
68+
# Second 16 bits:
69+
header_val2_u16 = (
70+
((self.vcid & 0x3F) << 10) # VCID (6 bits)
71+
| (length & 0x3FF) # Frame length (10 bits)
72+
)
73+
# 8 bit sequence number - always 0 in bypass FARM mode
74+
header_val3_u8 = 0
75+
header_bytes = struct.pack(
76+
">HHB", header_val1_u16, header_val2_u16, header_val3_u8
77+
)
78+
full_bytes_no_crc = header_bytes + space_packet_bytes
79+
assert len(header_bytes) == self.TC_HEADER_SIZE, (
80+
"CCSDS primary header must be 5 octets long"
81+
)
82+
assert len(full_bytes_no_crc) == self.TC_HEADER_SIZE + len(data), (
83+
"Malformed packet generated"
84+
)
85+
86+
full_bytes = full_bytes_no_crc + struct.pack(
87+
">H", self.CRC_CALCULATOR.checksum(full_bytes_no_crc)
88+
)
89+
return full_bytes
90+
91+
def get_sequence_number(self):
92+
"""Get the sequence number and increment - used for TM deframing
93+
94+
This function will return the current sequence number and then increment the sequence number for the next round.
95+
96+
Return:
97+
current sequence number
98+
"""
99+
sequence = self.sequence_number
100+
self.sequence_number = (self.sequence_number + 1) % self.SEQUENCE_NUMBER_MAXIMUM
101+
return sequence
102+
103+
def deframe(self, data, no_copy=False):
104+
"""Deframe TM frames"""
105+
discarded = b""
106+
if not no_copy:
107+
data = copy.copy(data)
108+
# Continue until there is not enough data for the header, or until a packet is found (return)
109+
while len(data) >= self.TM_FIXED_FRAME_SIZE:
110+
# Read header information
111+
sc_and_channel_ids = struct.unpack_from(">H", data)
112+
spacecraft_id = (sc_and_channel_ids[0] & 0x3FF0) >> 4
113+
virtual_channel_id = (sc_and_channel_ids[0] & 0x000E) >> 1
114+
# Check if the header is correct with regards to expected spacecraft and VC IDs
115+
if spacecraft_id != self.scid or virtual_channel_id != self.vcid:
116+
# If the header is invalid, rotate away a Byte and keep processing
117+
discarded += data[0:1]
118+
data = data[1:]
119+
continue
120+
# Spacecraft ID and Virtual Channel ID match, so we look at end of frame for CRC
121+
crc_offset = self.TM_FIXED_FRAME_SIZE - self.TM_TRAILER_SIZE
122+
transmitted_crc = struct.unpack_from(">H", data, crc_offset)[0]
123+
if transmitted_crc == self.CRC_CALCULATOR.checksum(data[:crc_offset]):
124+
# CRC is valid, so we return the deframed data
125+
deframed_data_len = (
126+
self.TM_FIXED_FRAME_SIZE
127+
- self.TM_TRAILER_SIZE
128+
- self.TM_HEADER_SIZE
129+
)
130+
deframed = struct.unpack_from(
131+
f">{deframed_data_len}s", data, self.TM_HEADER_SIZE
132+
)[0]
133+
# Consume the fixed size frame
134+
data = data[self.TM_FIXED_FRAME_SIZE :]
135+
return deframed, data, discarded
136+
137+
print(
138+
"[WARNING] Checksum validation failed.",
139+
file=sys.stderr,
140+
)
141+
# Bad checksum, rotate 1 and keep looking for non-garbage
142+
discarded += data[0:1]
143+
data = data[1:]
144+
continue
145+
return None, data, discarded
146+
147+
@classmethod
148+
def get_arguments(cls):
149+
"""Arguments to request from the CLI"""
150+
return {
151+
("--scid",): {
152+
"type": lambda input_arg: int(input_arg, 0),
153+
"help": "Spacecraft ID",
154+
"default": 0x44,
155+
"required": False,
156+
},
157+
("--vcid",): {
158+
"type": lambda input_arg: int(input_arg, 0),
159+
"help": "Virtual channel ID",
160+
"default": 1,
161+
"required": False,
162+
},
163+
}
164+
165+
@classmethod
166+
def check_arguments(cls, scid, vcid):
167+
"""Check arguments from the CLI
168+
169+
Confirms that the input arguments are valid for this framer/deframer.
170+
171+
Args:
172+
scid: spacecraft id
173+
vcid: virtual channel id
174+
"""
175+
if scid is None:
176+
raise TypeError("Spacecraft ID not specified")
177+
if scid < 0:
178+
raise TypeError(f"Spacecraft ID {scid} is negative")
179+
if scid > 0x3FF:
180+
raise TypeError(f"Spacecraft ID {scid} is larger than {0x3FF}")
181+
182+
if vcid is None:
183+
raise TypeError("Virtual Channel ID not specified")
184+
if vcid < 0:
185+
raise TypeError(f"Virtual Channel ID {vcid} is negative")
186+
if vcid > 0x3F:
187+
raise TypeError(f"Virtual Channel ID {vcid} is larger than {0x3FF}")
188+
189+
@classmethod
190+
def get_name(cls):
191+
"""Name of this implementation provided to CLI"""
192+
return "raw-space-data-link"
193+
194+
@classmethod
195+
@gds_plugin_implementation
196+
def register_framing_plugin(cls):
197+
"""Register the MyPlugin plugin"""
198+
return cls

0 commit comments

Comments
 (0)