Skip to content

Commit 7da85e2

Browse files
committed
libs/module: Add COMX LTE Module support.
Signed-off-by: lbuque <[email protected]>
1 parent 701fa2c commit 7da85e2

File tree

3 files changed

+175
-0
lines changed

3 files changed

+175
-0
lines changed

m5stack/libs/module/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
"LoRaSx1262Module": "lora_sx1262",
2525
"LoRaWANModule": "lorawan",
2626
"LoRaWAN868Module": "lorawan868",
27+
"LTEModule": "lte",
2728
"Module4In8Out": "module_4in8out",
2829
"NBIOTModule": "nbiot",
2930
"ODriveModule": "odrive",

m5stack/libs/module/lte.py

Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
# SPDX-FileCopyrightText: 2025 M5Stack Technology CO LTD
2+
#
3+
# SPDX-License-Identifier: MIT
4+
import network
5+
import machine
6+
import sys
7+
import time
8+
from collections import namedtuple
9+
10+
ATCommand = namedtuple("ATCommand", ["cmd", "rsp1", "rsp2", "timeout"])
11+
12+
if sys.platform != "esp32":
13+
from typing import Literal
14+
15+
16+
class LTEModule:
17+
ERR_NONE = 0
18+
ERR_GENERIC = 1
19+
ERR_TIMEOUT = 2
20+
21+
def __init__(self, id: Literal[0, 1, 2], tx: int, rx: int, verbose: bool = False):
22+
self._uart = machine.UART(
23+
id,
24+
baudrate=115200,
25+
bits=8,
26+
parity=None,
27+
stop=1,
28+
tx=tx,
29+
rx=rx,
30+
txbuf=256,
31+
rxbuf=256,
32+
timeout=0,
33+
timeout_char=0,
34+
invert=0,
35+
flow=0,
36+
)
37+
self._verbose = verbose
38+
self._ppp = network.PPP(self._uart)
39+
40+
def execute_at_command2(
41+
self, cmd: ATCommand, repeat=False, clean_output=True, line_end="\r\n"
42+
):
43+
# clear the uart buffer
44+
self._uart.write(b"\r\n")
45+
46+
# execute the AT command
47+
cmdstr = "{}\r\n".format(cmd.cmd)
48+
self._uart.write(cmdstr)
49+
self._verbose and print("TE -> TA:", repr(cmdstr))
50+
51+
# wait for response
52+
return self.response_at_command2(cmd, repeat, clean_output, line_end)
53+
54+
# @utils.measure_time
55+
def response_at_command2(
56+
self, command: ATCommand, repeat=False, clean_output=True, line_end="\r\n"
57+
):
58+
find_keyword = False
59+
output = bytearray()
60+
error = self.ERR_NONE
61+
rsp1 = command.rsp1.encode("utf-8")
62+
rsp2 = command.rsp2.encode("utf-8")
63+
line_end = line_end.encode("utf-8")
64+
65+
ticks = time.ticks_ms()
66+
while time.ticks_diff(time.ticks_ms(), ticks) < command.timeout:
67+
if self._uart.any() == 0:
68+
time.sleep_ms(10)
69+
continue
70+
71+
line = self._uart.read(self._uart.any())
72+
self._verbose and print("TE <- TA:", repr(line))
73+
output.extend(line)
74+
75+
# Do we have an error?
76+
if output.rfind(rsp2) != -1:
77+
if output.endswith(line_end):
78+
print("Get AT command error response:", repr(output))
79+
error = self.ERR_GENERIC
80+
find_keyword = True
81+
82+
# If we had a pre-end, do we have the expected end?
83+
if output.rfind(rsp1) != -1:
84+
if output.endswith(line_end):
85+
find_keyword = True
86+
87+
if find_keyword:
88+
break
89+
90+
if time.ticks_diff(time.ticks_ms(), ticks) > command.timeout:
91+
print("Timeout for command:", repr(command.cmd))
92+
error = self.ERR_TIMEOUT
93+
94+
return (output, error)
95+
96+
def chat(self, script: tuple):
97+
output = bytearray()
98+
99+
for command, value in script:
100+
self._verbose and print("chat cmd: {}, value: {}".format(repr(command), repr(value)))
101+
102+
if command == "ABORT":
103+
rsp = self._uart.read(len(value))
104+
if rsp:
105+
output.extend(rsp)
106+
self._verbose and print("chat response:", repr(rsp))
107+
if output.find(value.encode("utf-8")) != -1:
108+
break
109+
continue
110+
111+
if command == "SAY":
112+
print(value)
113+
continue
114+
115+
if r"\d" in value:
116+
while True:
117+
time.sleep(1)
118+
rsp = self._uart.readline()
119+
self._verbose and print("chat response:", repr(rsp))
120+
if rsp and rsp.find(b"CONNECT") != -1:
121+
break
122+
continue
123+
124+
while True:
125+
cmd = ATCommand(value, command, "ERROR", 0 if command == "" else 5000)
126+
rsp, error = self.execute_at_command2(cmd)
127+
if command == "":
128+
break
129+
if command != "" and error is self.ERR_NONE:
130+
self._verbose and print("chat response:", repr(rsp))
131+
rsp = rsp.decode("utf-8")
132+
if rsp.find(command) != -1:
133+
break
134+
135+
def chat2(self, pdp_type: str, apn: str):
136+
self.chat(
137+
(
138+
("ABORT", "BUSY"),
139+
("ABORT", "NO ANSWER"),
140+
("ABORT", "NO CARRIER"),
141+
("ABORT", "NO DIALTONE"),
142+
("ABORT", "\nRINGING\r\n\r\nRINGING\r"),
143+
("SAY", "modem init: press <ctrl>-C to disconnect\n"),
144+
("", "+++ATH"),
145+
("SAY", "Before Connecting\n"),
146+
("OK", 'AT+CGDCONT=1,"{}","{}"'.format(pdp_type, apn)),
147+
("SAY", "\n + defining PDP context\n"),
148+
("", "ATD*99#"),
149+
("SAY", "Number Dialled\n"),
150+
("SAY", "\n + attaching"),
151+
("SAY", "\n + requesting data connection"),
152+
("CONNECT", r"\d\c"),
153+
("SAY", "\n + connected"),
154+
)
155+
)
156+
157+
def __getattr__(self, attr):
158+
return getattr(self._ppp, attr)
159+
160+
def deinit(self):
161+
actived = self._ppp.active()
162+
self._ppp.active(False)
163+
self.chat(
164+
(
165+
("DISCONNECTED" if actived else "", "+++ATH" if actived else ""),
166+
# ("OK", "AT")
167+
)
168+
)
169+
# FIXME: For unknown reasons, if this command is used, UART cannot receive data.
170+
self._uart.write(b"AT\r\n")
171+
time.sleep(0.5)
172+
self._uart.read()
173+
self._ppp.deinit()

m5stack/libs/module/manifest.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
"lora_sx1262.py",
2626
"lorawan.py",
2727
"lorawan868.py",
28+
"lte.py",
2829
"mbus.py",
2930
"module_4in8out.py",
3031
"module_helper.py",

0 commit comments

Comments
 (0)