|
| 1 | +from machine import UART, Pin |
| 2 | +from network import PPP |
| 3 | +from micropython import const |
| 4 | +import time |
| 5 | + |
| 6 | + |
| 7 | +DEFAULT_PIN_RST = 35 |
| 8 | +DEFAULT_PIN_NETLIGHT = 34 |
| 9 | +DEFAULT_PIN_RX = 33 |
| 10 | +DEFAULT_PIN_TX = 32 |
| 11 | +DEFAULT_UART_ID = 0 |
| 12 | + |
| 13 | +DEFAULT_UART_TIMEOUT = const(1) |
| 14 | +DEFAULT_UART_TIMEOUT_CHAR = const(1) |
| 15 | +DEFAULT_UART_RXBUF = const(1024) |
| 16 | +DEFAULT_UART_STARTUP_BAUD = const(115200) |
| 17 | +DEFAULT_UART_BAUD = const(460800) |
| 18 | + |
| 19 | + |
| 20 | +class CellularError(Exception): |
| 21 | + def __init__(self, message=None): |
| 22 | + self.message = "CellularError: " + message |
| 23 | + |
| 24 | + |
| 25 | +class LTE(): |
| 26 | + def __init__(self, apn, uart=None, reset_pin=None, netlight_pin=None, netlight_led=None, skip_reset=False): |
| 27 | + self._apn = apn |
| 28 | + self._reset = reset_pin or Pin(DEFAULT_PIN_RST, Pin.OUT) |
| 29 | + self._uart = uart or UART( |
| 30 | + DEFAULT_UART_ID, |
| 31 | + tx=Pin(DEFAULT_PIN_TX, Pin.OUT), |
| 32 | + rx=Pin(DEFAULT_PIN_RX, Pin.OUT)) |
| 33 | + |
| 34 | + # Set PPP timeouts and rxbuf |
| 35 | + self._uart.init( |
| 36 | + timeout=DEFAULT_UART_TIMEOUT, |
| 37 | + timeout_char=DEFAULT_UART_TIMEOUT_CHAR, |
| 38 | + rxbuf=DEFAULT_UART_RXBUF) |
| 39 | + |
| 40 | + if not skip_reset: |
| 41 | + self._reset.value(0) |
| 42 | + time.sleep(1.0) |
| 43 | + self._reset.value(1) |
| 44 | + |
| 45 | + if netlight_led: |
| 46 | + self._led = netlight_led |
| 47 | + self._netlight = netlight_pin or Pin(DEFAULT_PIN_NETLIGHT, Pin.IN) |
| 48 | + self._netlight.irq(self._netlight_irq) |
| 49 | + |
| 50 | + def _netlight_irq(self, pin): |
| 51 | + self._led.value(pin.value()) |
| 52 | + |
| 53 | + def ipconfig(self, *args, **kwargs): |
| 54 | + if len(args): |
| 55 | + return self._ppp.ipconfig(*args) |
| 56 | + else: |
| 57 | + return self._ppp.ipconfig(**kwargs) |
| 58 | + |
| 59 | + def iccid(self): |
| 60 | + try: |
| 61 | + return self._send_at_command("AT+CICCID", 1) |
| 62 | + except CellularError: |
| 63 | + return None |
| 64 | + |
| 65 | + def status(self): |
| 66 | + lte_status = self._send_at_command("AT+CEREG?", 1) |
| 67 | + gsm_status = self._send_at_command("AT+CGREG?", 1) |
| 68 | + return lte_status, gsm_status |
| 69 | + |
| 70 | + def signal_quality(self): |
| 71 | + try: |
| 72 | + response = self._send_at_command("AT+CSQ", 1) |
| 73 | + quality = int(response.split(":")[1].split(",")[0]) |
| 74 | + db = -113 + (2 * quality) # conversion as per AT command set datasheet |
| 75 | + return db |
| 76 | + except CellularError: |
| 77 | + pass |
| 78 | + return None |
| 79 | + |
| 80 | + def stop_ppp(self): |
| 81 | + self._ppp.disconnect() |
| 82 | + self._send_at_command(f"AT+IPR={DEFAULT_UART_STARTUP_BAUD}") |
| 83 | + self._flush_uart() |
| 84 | + |
| 85 | + def start_ppp(self, baudrate=DEFAULT_UART_BAUD, connect=True): |
| 86 | + self._wait_ready(poll_time=1.0, timeout=30) |
| 87 | + |
| 88 | + # Switch to a faster baudrate |
| 89 | + self._send_at_command(f"AT+IPR={baudrate}") |
| 90 | + self._flush_uart() |
| 91 | + self._uart.init( |
| 92 | + baudrate=baudrate, |
| 93 | + timeout=DEFAULT_UART_TIMEOUT, |
| 94 | + timeout_char=DEFAULT_UART_TIMEOUT_CHAR, |
| 95 | + rxbuf=DEFAULT_UART_RXBUF) |
| 96 | + self._wait_ready(poll_time=1.0) |
| 97 | + |
| 98 | + # Connect! |
| 99 | + if connect: |
| 100 | + self.connect() |
| 101 | + |
| 102 | + # This will just always time out!? |
| 103 | + # try: |
| 104 | + # self._send_at_command("ATD*99#", timeout=300) |
| 105 | + # except CellularError as e: |
| 106 | + # print(e) |
| 107 | + |
| 108 | + # Force PPP to use modem's default settings... |
| 109 | + #time.sleep(2.0) |
| 110 | + self._flush_uart() |
| 111 | + self._uart.write("ATD*99#\r") |
| 112 | + self._uart.flush() |
| 113 | + #time.sleep(2.0) |
| 114 | + |
| 115 | + self._ppp = PPP(self._uart) |
| 116 | + self._ppp.connect() |
| 117 | + while self._ppp.status() != 4: |
| 118 | + time.sleep(1.0) |
| 119 | + |
| 120 | + return self._ppp.ifconfig() |
| 121 | + |
| 122 | + def connect(self, timeout=60): |
| 123 | + print(" - setting up cellular uart") |
| 124 | + # connect to and flush the uart |
| 125 | + # consume any unsolicited messages first, we don't need those |
| 126 | + self._flush_uart() |
| 127 | + |
| 128 | + print(" - waiting for cellular module to be ready") |
| 129 | + |
| 130 | + # wait for the cellular module to respond to AT commands |
| 131 | + self._wait_ready() |
| 132 | + |
| 133 | + self._send_at_command("ATE0") # disable local echo |
| 134 | + self._send_at_command(f"AT+CGDCONT=1,\"IP\",\"{self._apn}\"") # set apn and activate pdp context |
| 135 | + |
| 136 | + # wait for roaming lte connection to be established |
| 137 | + giveup = time.time() + timeout |
| 138 | + status = None |
| 139 | + while status != "+CEREG: 0,5" and status != "+CEREG: 0,1": |
| 140 | + status = self._send_at_command("AT+CEREG?", 1) |
| 141 | + time.sleep(0.25) |
| 142 | + if time.time() > giveup: |
| 143 | + raise CellularError("timed out getting network registration") |
| 144 | + |
| 145 | + # disable server and client certification validation |
| 146 | + self._send_at_command("AT+CSSLCFG=\"authmode\",0,0") |
| 147 | + self._send_at_command("AT+CSSLCFG=\"enableSNI\",0,1") |
| 148 | + |
| 149 | + print(f" - SIM ICCID is {self.iccid()}") |
| 150 | + |
| 151 | + def _wait_ready(self, poll_time=0.25, timeout=10): |
| 152 | + giveup = time.time() + timeout |
| 153 | + while time.time() <= giveup: |
| 154 | + try: |
| 155 | + self._send_at_command("AT") |
| 156 | + return # if __send_at_command doesn't throw an exception then we're good! |
| 157 | + except CellularError as e: |
| 158 | + print(e) |
| 159 | + time.sleep(poll_time) |
| 160 | + |
| 161 | + raise CellularError("timed out waiting for AT response") |
| 162 | + |
| 163 | + def _flush_uart(self): |
| 164 | + self._uart.flush() |
| 165 | + time.sleep(0.25) |
| 166 | + while self._uart.any(): |
| 167 | + self._uart.read(self._uart.any()) |
| 168 | + time.sleep(0.25) |
| 169 | + |
| 170 | + def _send_at_command(self, command, result_lines=0, timeout=5.0): |
| 171 | + # consume any unsolicited messages first, we don't need those |
| 172 | + self._flush_uart() |
| 173 | + |
| 174 | + self._uart.write(command + "\r") |
| 175 | + #print(f" - tx: {command}") |
| 176 | + self._uart.flush() |
| 177 | + status, data = self._read_result(result_lines, timeout=timeout) |
| 178 | + |
| 179 | + print(" -", command, status, data) |
| 180 | + |
| 181 | + if status == "TIMEOUT": |
| 182 | + #print.error(" !", command, status, data) |
| 183 | + raise CellularError(f"cellular module timed out for command {command}") |
| 184 | + |
| 185 | + if status not in ["OK", "DOWNLOAD"]: |
| 186 | + #print(" !", command, status, data) |
| 187 | + raise CellularError(f"non 'OK' or 'DOWNLOAD' result for command {command}") |
| 188 | + |
| 189 | + if result_lines == 1: |
| 190 | + return data[0] |
| 191 | + if result_lines > 1: |
| 192 | + return data |
| 193 | + return None |
| 194 | + |
| 195 | + def _read_result(self, result_lines, timeout=1.0): |
| 196 | + status = None |
| 197 | + result = [] |
| 198 | + start = time.ticks_ms() |
| 199 | + timeout *= 1000 |
| 200 | + while len(result) < result_lines or status is None: |
| 201 | + if (time.ticks_ms() - start) > timeout: |
| 202 | + return "TIMEOUT", [] |
| 203 | + |
| 204 | + line = self._uart.readline() |
| 205 | + |
| 206 | + if line: |
| 207 | + line = line.strip() |
| 208 | + if line in [b"OK", b"ERROR", b"DOWNLOAD"]: |
| 209 | + status = line.decode("ascii") |
| 210 | + elif line != b"": |
| 211 | + result.append(str(line, "ascii")) |
| 212 | + start = time.ticks_ms() |
| 213 | + |
| 214 | + return status, result |
| 215 | + |
0 commit comments