|
30 | 30 | import time |
31 | 31 | import board |
32 | 32 | import supervisor |
| 33 | +import usb_cdc |
33 | 34 |
|
34 | 35 | __version__ = "0.0.0-auto.0" |
35 | 36 | __repo__ = "https://github.com/s-light/CircuitPython_nonblocking_serialinput.git" |
|
41 | 42 | class NonBlockingSerialInput(object): |
42 | 43 | """docstring for NonBlockingSerialInput.""" |
43 | 44 |
|
44 | | - def __init__(self, arg): |
| 45 | + def __init__( |
| 46 | + self, |
| 47 | + *, # force keyword arguments |
| 48 | + parse_input_fn=None, |
| 49 | + print_help_fn=None, |
| 50 | + serial=usb_cdc.console, |
| 51 | + encoding="utf-8", |
| 52 | + line_end_custom=None, |
| 53 | + universal_line_end_basic=True, |
| 54 | + universal_line_end_advanced=False, |
| 55 | + ): |
45 | 56 | super(NonBlockingSerialInput, self).__init__() |
46 | | - self.arg = arg |
| 57 | + self.parse_input_fn = parse_input_fn |
| 58 | + self.print_help_fn = print_help_fn |
| 59 | + self.serial = serial |
| 60 | + self.encoding = encoding |
| 61 | + self.line_end_list = [] |
| 62 | + if line_end_custom: |
| 63 | + self.line_end_list.extend(line_end_custom) |
| 64 | + if universal_line_end_basic: |
| 65 | + self.line_end_list.extend(self.universal_line_end_basic) |
| 66 | + if universal_line_end_advanced: |
| 67 | + self.line_end_list.extend(self.universal_line_end_advanced) |
47 | 68 |
|
| 69 | + # no block: |
| 70 | + self.serial.timeout = 0 |
| 71 | + self.input_list = [] |
48 | 72 |
|
49 | | -def memo(self): |
50 | | - if supervisor.runtime.serial_connected: |
| 73 | + def _parse_input_fallback(self, input_string): |
| 74 | + pass |
| 75 | + |
| 76 | + def _print_help_fallback(self): |
| 77 | + pass |
| 78 | + |
| 79 | + def memo(self): |
| 80 | + if self.serial_connected: |
| 81 | + self.print_help() |
| 82 | + # prepare new input |
51 | 83 | self.print_help() |
52 | | - # prepare new input |
53 | | - self.print_help() |
54 | | - print(">> ", end="") |
| 84 | + print(">> ", end="") |
| 85 | + |
| 86 | + def _buffer_count_line_ends(self): |
| 87 | + result = 0 |
| 88 | + for end in self.line_end_list: |
| 89 | + if end in self.input_buffer: |
| 90 | + result += 1 |
| 91 | + return result |
| 92 | + |
| 93 | + def _buffer_endswith_line_end(self): |
| 94 | + # in python3 endswith() supports a tuple as argument |
| 95 | + # in micropython/CircuitPython we have to do it manually.. |
| 96 | + # https://forum.micropython.org/viewtopic.php?t=4906 |
| 97 | + # |
| 98 | + # we use a while loop to finish earlie |
| 99 | + # https://stackoverflow.com/a/59092738/574981 |
| 100 | + result = None |
| 101 | + i = iter(self.line_end_list) |
| 102 | + while result is None and (end := next(i, None)) is not None: |
| 103 | + if self.input_buffer.endswith(end): |
| 104 | + result = end |
| 105 | + return result |
| 106 | + |
| 107 | + def _buffer_check_and_handle_line_ends(self): |
| 108 | + if self._buffer_count_line_ends(): |
| 109 | + # we have at minimum one full command. |
| 110 | + # let us extract this. |
| 111 | + # buffer_endswith = self._buffer_endswith_line_end() |
| 112 | + # if buffer_endswith: |
| 113 | + # # just split the lines and all is fine.. |
| 114 | + # pass |
| 115 | + # else: |
| 116 | + # # we have to leave the last part in place.. |
| 117 | + # pass |
| 118 | + # self.input_list.extend(self.input_buffer.splitlines(self.line_end_list)) |
| 119 | + lines, rest = self.splitlines_advanced(self.input_buffer) |
| 120 | + self.input_list.extend(lines) |
| 121 | + self.input_buffer = rest |
| 122 | + |
| 123 | + def input(self): |
| 124 | + """get oldest input string if there is any available. Otherwise None.""" |
| 125 | + try: |
| 126 | + result = self.input_list.pop(0) |
| 127 | + except IndexError: |
| 128 | + result = None |
| 129 | + return result |
| 130 | + |
| 131 | + def update(self): |
| 132 | + """Main update funciton. please call as often as possible.""" |
| 133 | + if self.serial.connected: |
| 134 | + available = self.serial.in_waiting |
| 135 | + while available: |
| 136 | + self.input_buffer += self.serial.read(available).decode( |
| 137 | + encoding=self.encoding, errors="strict" |
| 138 | + ) |
| 139 | + self._buffer_check_and_handle_line_ends() |
| 140 | + available = self.serial.in_waiting |
| 141 | + parsed_input = False |
| 142 | + while self.input_list: |
| 143 | + if self.parse_input_fn: |
| 144 | + # first in first out |
| 145 | + oldest_input = self.input_list.pop(0) |
| 146 | + self.parse_input_fn(oldest_input) |
| 147 | + parsed_input = True |
| 148 | + if parsed_input and self.print_help_fn: |
| 149 | + self.print_help_fn() |
55 | 150 |
|
56 | 151 |
|
57 | 152 | ########################################## |
58 | 153 | # helper |
59 | 154 |
|
| 155 | +# source for universal_line_end |
| 156 | +# https://docs.python.org/3.8/library/stdtypes.html#str.splitlines |
| 157 | +universal_line_end_basic = [ |
| 158 | + # Line Feed |
| 159 | + "\n", |
| 160 | + # Carriage Return |
| 161 | + "\r", |
| 162 | + # Carriage Return + Line Feed |
| 163 | + "\r\n", |
| 164 | +] |
| 165 | +universal_line_end_advanced = [ |
| 166 | + # Line Tabulation |
| 167 | + "\v", |
| 168 | + "\x0b", |
| 169 | + # Form Feed |
| 170 | + "\f", |
| 171 | + "\x0c", |
| 172 | + # File Separator |
| 173 | + "\x1c", |
| 174 | + # Group Separator |
| 175 | + "\x1d", |
| 176 | + # Record Separator |
| 177 | + "\x1e", |
| 178 | + # Next Line (C1 Control Code) |
| 179 | + "\x85", |
| 180 | + # Line Separator |
| 181 | + "\u2028", |
| 182 | + # Paragraph Seprator |
| 183 | + "\u2029", |
| 184 | +] |
| 185 | + |
| 186 | + |
| 187 | +def find_first_line_end(input_string, line_end_list=universal_line_end_basic, start=0): |
| 188 | + result = -1 |
| 189 | + # print("input_string: {}".format(repr(input_string))) |
| 190 | + i = iter(line_end_list) |
| 191 | + while result is -1: |
| 192 | + try: |
| 193 | + line_end = next(i) |
| 194 | + except StopIteration: |
| 195 | + result = False |
| 196 | + # print("StopIteration") |
| 197 | + else: |
| 198 | + # print("line_end: {}".format(repr(line_end))) |
| 199 | + result = input_string.find(line_end, start) |
| 200 | + # print("result: {}".format(repr(result))) |
| 201 | + if result is False: |
| 202 | + result = -1 |
| 203 | + return result |
| 204 | + |
| 205 | + |
| 206 | +def splitlines_advanced(input_string, line_end_list): |
| 207 | + result = [] |
| 208 | + rest = None |
| 209 | + # we have to do the splitting manually as we have a list of available seperators.. |
| 210 | + pos_last = 0 |
| 211 | + while ( |
| 212 | + pos := find_first_line_end(input_string, line_end_list, start=pos_last) |
| 213 | + ) > -1: |
| 214 | + result.append(input_string[:pos]) |
| 215 | + print("input_string[:pos]: {}".format(repr(input_string[:pos]))) |
| 216 | + pos_last = pos |
| 217 | + if pos_last <= len(input_string): |
| 218 | + print("ping - rest") |
| 219 | + return (result, rest) |
| 220 | + |
| 221 | + |
| 222 | +""" |
| 223 | +debugging: |
| 224 | +nbs.splitlines_advanced("Hallo\n Welt\bEin Wünder Schön€r Tag!", nbs.universal_line_end_basic) |
| 225 | +import nonblocking_serialinput as nbs |
| 226 | +nbs.find_first_line_end("Hallo\nWelt\rTest", start=0) |
| 227 | +nbs.find_first_line_end("Hallo\nWelt\rTest", start=5) |
| 228 | +nbs.find_first_line_end("Hallo\nWelt\rTest", start=6) |
| 229 | +nbs.find_first_line_end("Hallo\nWelt\rTest", start=11) |
| 230 | +""" |
| 231 | + |
60 | 232 |
|
61 | 233 | def parse_value(self, input_string, pre_text): |
62 | 234 | value = None |
|
0 commit comments