|
3 | 3 | import logging |
4 | 4 |
|
5 | 5 |
|
6 | | -from modules.telemetry.v1.block import ( |
7 | | - PacketHeader, |
8 | | - BlockHeader, |
9 | | - DeviceAddress, |
10 | | - UnsupportedEncodingVersionError, |
11 | | - InvalidHeaderFieldValueError, |
12 | | -) |
13 | | -import modules.telemetry.v1.data_block as v1db |
| 6 | +# from modules.telemetry.v1.block import ( |
| 7 | +# PacketHeader, |
| 8 | +# BlockHeader, |
| 9 | +# DeviceAddress, |
| 10 | +# UnsupportedEncodingVersionError, |
| 11 | +# InvalidHeaderFieldValueError, |
| 12 | +# ) |
| 13 | +# import modules.telemetry.v1.data_block as v1db |
| 14 | + |
| 15 | +from modules.telemetry.v2.block import * |
14 | 16 | from modules.misc.config import Config |
15 | 17 |
|
16 | 18 | MIN_SUPPORTED_VERSION: int = 1 |
@@ -54,48 +56,51 @@ def parse_rn2483_transmission(data: str, config: Config) -> Optional[ParsedTrans |
54 | 56 |
|
55 | 57 | # Catch unsupported encoding versions by skipping packet |
56 | 58 | try: |
57 | | - pkt_hdr = PacketHeader.from_hex(data[:32]) |
58 | | - except UnsupportedEncodingVersionError as e: |
| 59 | + pkt_hdr: PacketHeader = parse_packet_header(data[:PACKET_HEADER_LENGTH]) |
| 60 | + except InvalidHeaderFieldValueError as e: |
59 | 61 | logger.error(f"{e}, skipping packet") |
60 | 62 | return |
| 63 | + |
| 64 | + logger.info(pkt_hdr) |
61 | 65 |
|
62 | 66 | # We can keep unauthorized callsigns but we'll log them as warnings |
63 | 67 | from_approved_callsign(pkt_hdr, config.approved_callsigns) |
64 | 68 |
|
65 | | - if len(pkt_hdr) <= 32: # If this packet nothing more than just the header |
| 69 | + if len(pkt_hdr) <= PACKET_HEADER_LENGTH: # If this packet nothing more than just the header |
66 | 70 | logger.debug(f"{pkt_hdr}") |
67 | 71 |
|
68 | | - blocks = data[32:] # Remove the packet header |
| 72 | + blocks = data[PACKET_HEADER_LENGTH:] # Remove the packet header |
69 | 73 |
|
70 | 74 | # Parse through all blocks |
71 | 75 | while blocks != "": |
72 | 76 | # Parse block header |
73 | 77 | logger.debug(f"Blocks: {blocks}") |
74 | | - logger.debug(f"Block header: {blocks[:8]}") |
| 78 | + logger.debug(f"Block header: {blocks[:BLOCK_HEADER_LENGTH]}") |
75 | 79 |
|
76 | 80 | # Catch invalid block headers field values by skipping packet |
77 | 81 | try: |
78 | | - block_header = BlockHeader.from_hex(blocks[:8]) |
| 82 | + block_header = parse_block_header(blocks[:BLOCK_HEADER_LENGTH]) |
79 | 83 | except InvalidHeaderFieldValueError as e: |
80 | 84 | logger.error(f"{e}, skipping packet") |
81 | 85 | return |
| 86 | + logger.info(block_header) |
82 | 87 |
|
83 | 88 | # Select block contents |
84 | | - block_len = len(block_header) * 2 # Convert length in bytes to length in hex symbols |
85 | | - block_contents = blocks[8:block_len] |
86 | | - logger.debug(f"Block info: {block_header}") |
87 | | - |
88 | | - # Check if message is destined for ground station for processing |
89 | | - if block_header.destination in [DeviceAddress.GROUND_STATION, DeviceAddress.MULTICAST]: |
90 | | - cur_block = parse_radio_block(pkt_hdr.version, block_header, block_contents) |
91 | | - if cur_block: |
92 | | - parsed_blocks.append(cur_block) # Append parsed block to list |
93 | | - else: |
94 | | - logger.warning("Invalid destination address") |
95 | | - |
96 | | - # Remove the data we processed from the whole set, and move onto the next data block |
97 | | - blocks = blocks[block_len:] |
98 | | - return ParsedTransmission(pkt_hdr, parsed_blocks) |
| 89 | + # block_len = len(block_header) * 2 # Convert length in bytes to length in hex symbols |
| 90 | + # block_contents = blocks[8:block_len] |
| 91 | + # logger.debug(f"Block info: {block_header}") |
| 92 | + |
| 93 | + # # Check if message is destined for ground station for processing |
| 94 | + # if block_header.destination in [DeviceAddress.GROUND_STATION, DeviceAddress.MULTICAST]: |
| 95 | + # cur_block = parse_radio_block(pkt_hdr.version, block_header, block_contents) |
| 96 | + # if cur_block: |
| 97 | + # parsed_blocks.append(cur_block) # Append parsed block to list |
| 98 | + # else: |
| 99 | + # logger.warning("Invalid destination address") |
| 100 | + |
| 101 | + # # Remove the data we processed from the whole set, and move onto the next data block |
| 102 | + # blocks = blocks[block_len:] |
| 103 | + # return ParsedTransmission(pkt_hdr, parsed_blocks) |
99 | 104 |
|
100 | 105 |
|
101 | 106 | def from_approved_callsign(pkt_hdr: PacketHeader, approved_callsigns: dict[str, str]) -> bool: |
|
0 commit comments