|
| 1 | +from typing import Union |
| 2 | + |
| 3 | +from bitarray import bitarray |
| 4 | +from bitarray.util import int2ba |
| 5 | + |
| 6 | +from okdmr.dmrlib.etsi.crc.crc9 import CRC9 |
| 7 | +from okdmr.dmrlib.etsi.layer2.elements.crc_masks import CrcMasks |
| 8 | +from okdmr.dmrlib.utils.bits_bytes import bits_to_bytes, bytes_to_bits |
| 9 | +from okdmr.dmrlib.utils.bits_interface import BitsInterface |
| 10 | + |
| 11 | + |
| 12 | +class Rate12Data(BitsInterface): |
| 13 | + """ |
| 14 | + ETSI TS 102 361-1 V2.5.1 (2017-10) - 9.2.7 Rate 1/2 coded packet Data (R_1_2_DATA) PDU |
| 15 | + ETSI TS 102 361-1 V2.5.1 (2017-10) - 9.2.8 Rate 1/2 coded Last Data block (R_1_2_LDATA) PDU |
| 16 | + """ |
| 17 | + |
| 18 | + def __init__( |
| 19 | + self, |
| 20 | + data: bytes, |
| 21 | + dbsn: int = 0, |
| 22 | + crc9: int = 0, |
| 23 | + crc32: Union[int, bytes] = 0, |
| 24 | + ): |
| 25 | + self.data: bytes = data |
| 26 | + self.dbsn: int = dbsn |
| 27 | + self.crc32: int = ( |
| 28 | + crc32 if isinstance(crc32, int) else int.from_bytes(crc32, byteorder="big") |
| 29 | + ) |
| 30 | + |
| 31 | + self.crc9: int = self.calculate_crc9() |
| 32 | + self.crc9_ok: bool = self.crc9 == crc9 if crc9 > 0 else True |
| 33 | + |
| 34 | + def calculate_crc9(self) -> int: |
| 35 | + return CRC9.calculate_from_parts( |
| 36 | + data=self.data, |
| 37 | + serial_number=self.dbsn, |
| 38 | + crc32=self.crc32, |
| 39 | + mask=CrcMasks.Rate12DataContinuation, |
| 40 | + ) |
| 41 | + |
| 42 | + def __repr__(self) -> str: |
| 43 | + if len(self.data) == 12: |
| 44 | + return f"[RATE 1/2 DATA] [DATA(12) {self.data.hex()}]" |
| 45 | + elif len(self.data) == 10: |
| 46 | + return ( |
| 47 | + f"[RATE 1/2 DATA CONFIRMED] [DATA(10) {self.data.hex()}]" |
| 48 | + + f" [CRC9: {self.crc9}]" |
| 49 | + + (" [CRC9 INVALID]" if not self.crc9_ok else "") |
| 50 | + ) |
| 51 | + elif len(self.data) == 8: |
| 52 | + return ( |
| 53 | + f"[RATE 1/2 DATA - LAST BLOCK UNCONFIRMED] [DATA(8) {self.data.hex()}]" |
| 54 | + + f" [CRC32 int({self.crc32}) hex({self.crc32.to_bytes(4, byteorder='big').hex()})]" |
| 55 | + ) |
| 56 | + elif len(self.data) == 6: |
| 57 | + return ( |
| 58 | + f"[RATE 1/2 DATA - LAST BLOCK CONFIRMED] [DATA(6) {self.data.hex()}]" |
| 59 | + + f" [CRC9: {self.crc9}]" |
| 60 | + + (" [CRC9 INVALID]" if not self.crc9_ok else "") |
| 61 | + + f" [CRC32 int({self.crc32}) hex({self.crc32.to_bytes(4, byteorder='big').hex()})]" |
| 62 | + ) |
| 63 | + raise ValueError(f"__repr__ not implemented for data len {len(self.data)}") |
| 64 | + |
| 65 | + @staticmethod |
| 66 | + def from_bits(bits: bitarray) -> "Rate12Data": |
| 67 | + assert ( |
| 68 | + len(bits) == 96 |
| 69 | + ), f"Rate 1/2 Data packet must be 96 bits (12 bytes) long, got {len(bits)} bits" |
| 70 | + return Rate12Data(data=bits_to_bytes(bits)) |
| 71 | + |
| 72 | + def as_bits(self): |
| 73 | + if len(self.data) == 12: |
| 74 | + # R_1_2_DATA PDU content for unconfirmed data |
| 75 | + return bytes_to_bits(self.data) |
| 76 | + elif len(self.data) == 10: |
| 77 | + # R_1_2_DATA PDU content for confirmed data |
| 78 | + return ( |
| 79 | + int2ba(self.dbsn, length=7) |
| 80 | + + int2ba(self.crc9, length=9) |
| 81 | + + bytes_to_bits(self.data) |
| 82 | + ) |
| 83 | + elif len(self.data) == 8: |
| 84 | + # R_1_2_LDATA PDU content for confirmed data |
| 85 | + return bytes_to_bits(self.data) + int2ba(self.crc32, length=32) |
| 86 | + elif len(self.data) == 6: |
| 87 | + # R_3_4_LDATA PDU content for confirmed data |
| 88 | + return ( |
| 89 | + int2ba(self.dbsn, length=7) |
| 90 | + + int2ba(self.calculate_crc9(), length=9) |
| 91 | + + bytes_to_bits(self.data) |
| 92 | + + int2ba(self.crc32, length=32) |
| 93 | + ) |
0 commit comments