Skip to content

Commit a41effe

Browse files
committed
Hytera TMP with tests
1 parent 2dc1035 commit a41effe

File tree

4 files changed

+267
-0
lines changed

4 files changed

+267
-0
lines changed

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ Transport Format (UDT Format), UDP Port Identifier (SPID/DPID), IP Address Ident
8282
| Radio Registration Service (RRS) ||
8383
| Location Protocol (LP) ||
8484
| Radio Control Protocol (RCP) ||
85+
| Text Message Protocol (TMP) ||
8586

8687
- Not all opcodes in all protocols are implemented, however it will fail with descriptive message, which opcode is
8788
missing in particular operation (decoding, description, encoding)
@@ -92,6 +93,7 @@ Transport Format (UDT Format), UDP Port Identifier (SPID/DPID), IP Address Ident
9293
|-------------------------------------------|:-------------------:|
9394
| Location Request Response Protocol (LRRP) ||
9495
| Automatic Registration Service (ARS) ||
96+
| Text Messaging Service (TMS) ||
9597

9698
- Motorola has MBXML (Motorola Binary XML) which is used to represent LRRP/ARRP documents, ok-dmrlib contains abstract
9799
MBXML implementation with various tools, LRRP implementation tested with both examples and real-world data

okdmr/dmrlib/hytera/pdu/hdap.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ def from_bytes(
107107

108108
from okdmr.dmrlib.hytera.pdu.location_protocol import LocationProtocol
109109
from okdmr.dmrlib.hytera.pdu.radio_control_protocol import RadioControlProtocol
110+
from okdmr.dmrlib.hytera.pdu.text_message_protocol import TextMessageProtocol
110111
from okdmr.dmrlib.hytera.pdu.radio_registration_service import (
111112
RadioRegistrationService,
112113
)
@@ -115,4 +116,5 @@ def from_bytes(
115116
HyteraServiceType.LP: LocationProtocol.from_bytes,
116117
HyteraServiceType.RCP: RadioControlProtocol.from_bytes,
117118
HyteraServiceType.RRS: RadioRegistrationService.from_bytes,
119+
HyteraServiceType.TMP: TextMessageProtocol.from_bytes,
118120
}[service_type](data)
Lines changed: 196 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,196 @@
1+
import enum
2+
from typing import Optional, Literal, Union
3+
4+
from okdmr.dmrlib.hytera.pdu.hdap import HDAP, HyteraServiceType
5+
from okdmr.dmrlib.hytera.pdu.radio_ip import RadioIP
6+
from okdmr.dmrlib.utils.bytes_interface import BytesInterface
7+
8+
9+
@enum.unique
10+
class TMPService(enum.Enum):
11+
SendPrivateMessage = 0xA1
12+
SendPrivateMessageAck = 0xA2
13+
SendGroupMessage = 0xB1
14+
SendGroupMessageAck = 0xB2
15+
16+
17+
@enum.unique
18+
class TMPResultCodes(BytesInterface, enum.Enum):
19+
OK = 0x00
20+
FAIL = 0x01
21+
INVALID_PARAMS = 0x03
22+
CHANNEL_BUSY = 0x04
23+
RX_ONLY = 0x05
24+
LOW_BATTERY = 0x06
25+
PLL_UNLOCK = 0x07
26+
PRIVATE_CALL_NO_ACK = 0x08
27+
REPEATER_WAKEUP_FAIL = 0x09
28+
NOCONTACT = 0x0A
29+
TX_DENY = 0x0B
30+
TX_INTERRUPTED = 0x0C
31+
32+
@staticmethod
33+
def from_bytes(
34+
data: bytes, endian: Literal["big", "little"] = "big"
35+
) -> Optional["TMPResultCodes"]:
36+
return TMPResultCodes(data[0])
37+
38+
def as_bytes(self, endian: Literal["big", "little"] = "big") -> bytes:
39+
return int(self.value).to_bytes(length=1, byteorder=endian)
40+
41+
42+
class TextMessageProtocol(HDAP):
43+
"""
44+
Hytera Text Message Protocol (TMP)
45+
"""
46+
47+
def __init__(
48+
self,
49+
opcode: TMPService,
50+
destination_id: RadioIP,
51+
source_id: RadioIP,
52+
is_reliable: bool = False,
53+
is_confirmed: bool = False,
54+
has_option: bool = False,
55+
request_id: int = 0,
56+
text_data: Union[bytes, str] = b"",
57+
option_data: Optional[bytes] = None,
58+
result_code: Optional[TMPResultCodes] = None,
59+
):
60+
super().__init__(is_reliable=is_reliable)
61+
self.opcode: TMPService = opcode
62+
self.has_option: bool = has_option
63+
self.is_confirmed: bool = is_confirmed
64+
self.request_id: int = request_id
65+
self.destination_id: RadioIP = destination_id
66+
self.source_id: RadioIP = source_id
67+
self.text_data: bytes = (
68+
text_data if isinstance(text_data, bytes) else text_data.encode("utf-16-le")
69+
)
70+
self.option_data: Optional[bytes] = option_data
71+
self.result_code: Optional[TMPResultCodes] = result_code
72+
73+
def get_service_type(self) -> HyteraServiceType:
74+
return HyteraServiceType.TMP
75+
76+
def get_opcode(self) -> bytes:
77+
return bytes(
78+
[
79+
0b0000_0000
80+
| (0b1000_0000 if self.is_confirmed else 0)
81+
| (0b0100_0000 if self.has_option else 0),
82+
self.opcode.value,
83+
]
84+
)
85+
86+
def is_group(self) -> bool:
87+
return self.opcode in (
88+
TMPService.SendGroupMessageAck,
89+
TMPService.SendGroupMessage,
90+
)
91+
92+
@staticmethod
93+
def from_bytes(
94+
data: bytes, endian: Literal["big", "little"] = "big"
95+
) -> Optional["TextMessageProtocol"]:
96+
(is_reliable, service_type) = HDAP.get_reliable_and_service(data[0:1])
97+
assert service_type == HyteraServiceType.TMP, f"Expected TMP got {service_type}"
98+
opcode: TMPService = TMPService(data[2])
99+
is_confirmed: bool = bool(data[1] & 0b1000_0000)
100+
has_option: bool = bool(data[1] & 0b0100_0000)
101+
payload_idx: int = 7 if has_option else 5
102+
payload_len = int.from_bytes(data[3:5], byteorder=endian)
103+
option_data_len: Optional[int] = (
104+
int.from_bytes(data[5:7], byteorder=endian) if has_option else None
105+
)
106+
option_data: Optional[bytes] = (
107+
(
108+
data[
109+
(payload_idx + payload_len - option_data_len - 2) : (
110+
payload_idx + payload_len - 2
111+
)
112+
]
113+
)
114+
if has_option and option_data_len
115+
else None
116+
)
117+
118+
if opcode in (TMPService.SendPrivateMessage, TMPService.SendGroupMessage):
119+
text_data_len = payload_len - (
120+
(12 + 2 + option_data_len) if has_option else 12
121+
)
122+
return TextMessageProtocol(
123+
opcode=opcode,
124+
is_reliable=is_reliable,
125+
is_confirmed=is_confirmed,
126+
has_option=has_option,
127+
option_data=option_data,
128+
request_id=int.from_bytes(
129+
data[payload_idx : payload_idx + 4], byteorder=endian
130+
),
131+
destination_id=RadioIP.from_bytes(
132+
data[payload_idx + 4 : payload_idx + 8], endian=endian
133+
),
134+
source_id=RadioIP.from_bytes(
135+
data[payload_idx + 8 : payload_idx + 12], endian=endian
136+
),
137+
# utf-16-le text data
138+
text_data=data[payload_idx + 12 : payload_idx + 12 + text_data_len],
139+
)
140+
elif opcode in (
141+
TMPService.SendPrivateMessageAck,
142+
TMPService.SendGroupMessageAck,
143+
):
144+
return TextMessageProtocol(
145+
opcode=opcode,
146+
is_reliable=is_reliable,
147+
is_confirmed=is_confirmed,
148+
has_option=has_option,
149+
option_data=option_data,
150+
request_id=int.from_bytes(
151+
data[payload_idx : payload_idx + 4], byteorder=endian
152+
),
153+
destination_id=RadioIP.from_bytes(
154+
data[payload_idx + 4 : payload_idx + 8], endian=endian
155+
),
156+
source_id=RadioIP.from_bytes(
157+
data[payload_idx + 8 : payload_idx + 12], endian=endian
158+
),
159+
result_code=TMPResultCodes(data[payload_idx + 12]),
160+
)
161+
162+
def get_payload(self) -> bytes:
163+
return (
164+
(
165+
len(self.option_data).to_bytes(
166+
length=2, byteorder=self.get_endianness()
167+
)
168+
if self.has_option
169+
else b""
170+
)
171+
+ self.request_id.to_bytes(length=4, byteorder=self.get_endianness())
172+
+ self.destination_id.as_bytes(endian=self.get_endianness())
173+
+ self.source_id.as_bytes(endian=self.get_endianness())
174+
+ (
175+
self.text_data
176+
if self.opcode
177+
in (TMPService.SendPrivateMessage, TMPService.SendGroupMessage)
178+
else self.result_code.as_bytes(endian=self.get_endianness())
179+
)
180+
+ (self.option_data if self.has_option else b"")
181+
)
182+
183+
def __repr__(self) -> str:
184+
repre: str = "[TMP "
185+
repre += "GROUP " if self.is_group() else "PRIVATE "
186+
repre += f"FROM:{repr(self.source_id)} TO:{repr(self.destination_id)}"
187+
repre += "] "
188+
if self.request_id:
189+
repre += f"[REQUEST_ID: {self.request_id}] "
190+
if self.result_code:
191+
repre += f"[RESULT: {self.result_code}] "
192+
if self.has_option and self.option_data:
193+
repre += f"[OPTION: {self.option_data}] "
194+
if self.text_data:
195+
repre += f"[TEXT: {self.text_data.decode('utf-16-le')}] "
196+
return repre
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
from okdmr.dmrlib.hytera.pdu.hdap import HDAP
2+
from okdmr.dmrlib.hytera.pdu.radio_ip import RadioIP
3+
from okdmr.dmrlib.hytera.pdu.text_message_protocol import (
4+
TextMessageProtocol,
5+
TMPResultCodes,
6+
TMPService,
7+
)
8+
9+
10+
def test_tmp():
11+
pdu_bytes: bytes = bytes.fromhex(
12+
"0980a10022000000010a01b2070a03640e4f004c004900560045005200200054004500530054007a03"
13+
)
14+
pdu: HDAP = HDAP.from_bytes(pdu_bytes)
15+
assert isinstance(pdu, TextMessageProtocol)
16+
assert not pdu.is_group()
17+
assert not pdu.has_option
18+
assert len(repr(pdu))
19+
assert pdu_bytes == pdu.as_bytes()
20+
21+
22+
def test_result_codes():
23+
rc_bytes: bytes = b"\x0C"
24+
rc: TMPResultCodes = TMPResultCodes.from_bytes(rc_bytes)
25+
assert rc.as_bytes() == rc_bytes
26+
assert rc == TMPResultCodes.TX_INTERRUPTED
27+
28+
29+
def test_ack():
30+
pdu_bytes: bytes = bytes.fromhex("0980a2000D000000010a01b2070a030000003103")
31+
pdu: HDAP = HDAP.from_bytes(pdu_bytes)
32+
assert isinstance(pdu, TextMessageProtocol)
33+
assert not pdu.is_group()
34+
assert not pdu.has_option
35+
assert pdu.result_code == TMPResultCodes.OK
36+
assert len(repr(pdu))
37+
assert pdu_bytes == pdu.as_bytes()
38+
39+
40+
def test_ack_with_option_field():
41+
pdu0: TextMessageProtocol = TextMessageProtocol(
42+
is_reliable=False,
43+
is_confirmed=True,
44+
has_option=True,
45+
option_data=b"\x01\x02\x03",
46+
opcode=TMPService.SendPrivateMessageAck,
47+
source_id=RadioIP(radio_id=196608, subnet=10),
48+
destination_id=RadioIP(radio_id=111111, subnet=10),
49+
request_id=2,
50+
result_code=TMPResultCodes.OK,
51+
)
52+
pdu0_bytes: bytes = pdu0.as_bytes()
53+
pdu1: TextMessageProtocol = TextMessageProtocol.from_bytes(pdu0_bytes)
54+
assert pdu0_bytes == pdu1.as_bytes()
55+
56+
pdu_bytes: bytes = bytes.fromhex(
57+
"09c0a200120003000000020a01b2070a03000000010203e203"
58+
)
59+
pdu: TextMessageProtocol = TextMessageProtocol.from_bytes(pdu_bytes)
60+
assert pdu_bytes == pdu0.as_bytes()
61+
assert pdu.as_bytes()
62+
assert isinstance(pdu, TextMessageProtocol)
63+
assert not pdu.is_group()
64+
assert pdu.has_option
65+
assert len(repr(pdu))
66+
assert len(pdu.option_data)
67+
assert pdu_bytes == pdu.as_bytes()

0 commit comments

Comments
 (0)