|
| 1 | +import time |
| 2 | +import os |
| 3 | +import json |
| 4 | +import board |
| 5 | +import terminalio |
| 6 | +from adafruit_display_text import label |
| 7 | +from displayio import Group |
| 8 | +import displayio |
| 9 | +import adafruit_displayio_ssd1306 |
| 10 | +import adafruit_lps35hw |
| 11 | + |
| 12 | +CONSOLE = False |
| 13 | +DEBUG = True |
| 14 | + |
| 15 | +MIN_PRESSURE = 8 |
| 16 | +HIGH_PRESSURE = 40 |
| 17 | +WAITING = 0 |
| 18 | +STARTED = 1 |
| 19 | +DETECTED = 2 |
| 20 | + |
| 21 | +SOFT_SIP = 0 |
| 22 | +HARD_SIP = 1 |
| 23 | +SOFT_PUFF = 2 |
| 24 | +HARD_PUFF = 3 |
| 25 | + |
| 26 | +SOFT = 1 |
| 27 | +STRONG = 2 |
| 28 | + |
| 29 | +COLOR = 0xFFFFFF |
| 30 | +FONT = terminalio.FONT |
| 31 | + |
| 32 | +DISPLAY_WIDTH = 128 |
| 33 | +DISPLAY_HEIGHT = 64 |
| 34 | +Y_OFFSET = 3 |
| 35 | +TEXT_HEIGHT = 8 |
| 36 | +BOTTOM_ROW = DISPLAY_HEIGHT - TEXT_HEIGHT |
| 37 | +BANNER_STRING = "PUFF-O-TRON-9000" |
| 38 | +pressure_string = " " |
| 39 | +input_type_string = " " |
| 40 | +# pylint:disable=too-many-locals,exec-used,eval-used |
| 41 | + |
| 42 | +class PuffDetector: |
| 43 | + def __init__( |
| 44 | + self, |
| 45 | + min_pressure=MIN_PRESSURE, |
| 46 | + high_pressure=HIGH_PRESSURE, |
| 47 | + config_filename="settings.json", |
| 48 | + display_timeout=1, |
| 49 | + ): |
| 50 | + # misc detection state |
| 51 | + self.current_pressure = 0 |
| 52 | + self.current_polarity = 0 |
| 53 | + self.current_time = time.monotonic() |
| 54 | + self.start_polarity = 0 |
| 55 | + self.peak_level = 0 |
| 56 | + self.puff_start = 0 |
| 57 | + self.duration = 0 |
| 58 | + self.state = WAITING |
| 59 | + self.prev_state = self.state |
| 60 | + |
| 61 | + # settings |
| 62 | + self.settings_dict = {} |
| 63 | + self.high_pressure = high_pressure |
| 64 | + self.min_pressure = min_pressure |
| 65 | + self._config_filename = config_filename |
| 66 | + self._load_config() |
| 67 | + |
| 68 | + # callbacks |
| 69 | + self._on_sip_callbacks = [] |
| 70 | + self._on_puff_callbacks = [] |
| 71 | + |
| 72 | + # display and display state |
| 73 | + self.display = None |
| 74 | + self.state_display_start = self.current_time |
| 75 | + self.detection_result_str = " " |
| 76 | + self.duration_str = " " |
| 77 | + self.min_press_str = " " |
| 78 | + self.high_press_str = " " |
| 79 | + self.state_str = " " |
| 80 | + self.press_str = " " |
| 81 | + self.display_timeout = display_timeout |
| 82 | + self._init_stuff() |
| 83 | + |
| 84 | + def _init_stuff(self): |
| 85 | + |
| 86 | + # decouple display |
| 87 | + self.state_display_timeout = 1.0 |
| 88 | + self.state_display_start = 0 |
| 89 | + displayio.release_displays() |
| 90 | + i2c = board.I2C() |
| 91 | + |
| 92 | + display_bus = displayio.I2CDisplay(i2c, device_address=0x3D) |
| 93 | + self.display = adafruit_displayio_ssd1306.SSD1306( |
| 94 | + display_bus, width=DISPLAY_WIDTH, height=DISPLAY_HEIGHT |
| 95 | + ) |
| 96 | + |
| 97 | + self.min_press_str = "min: %d" % self.min_pressure |
| 98 | + self.high_press_str = "hi: %d" % self.high_pressure |
| 99 | + |
| 100 | + self.pressure_sensor = adafruit_lps35hw.LPS35HW(i2c) |
| 101 | + self.pressure_sensor.zero_pressure() |
| 102 | + self.pressure_sensor.data_rate = adafruit_lps35hw.DataRate.RATE_75_HZ |
| 103 | + |
| 104 | + self.pressure_sensor.filter_enabled = True |
| 105 | + self.pressure_sensor.filter_config = True |
| 106 | + |
| 107 | + |
| 108 | + def _load_config(self): |
| 109 | + if not self._config_filename in os.listdir("/"): |
| 110 | + return |
| 111 | + try: |
| 112 | + with open(self._config_filename, "r") as file: |
| 113 | + self.settings_dict = json.load(file) |
| 114 | + except (ValueError, OSError) as error: |
| 115 | + print("Error loading config file") |
| 116 | + print(type(error)) |
| 117 | + |
| 118 | + if self.settings_dict: |
| 119 | + if "MIN_PRESSURE" in self.settings_dict.keys(): |
| 120 | + self.min_pressure = self.settings_dict["MIN_PRESSURE"] |
| 121 | + if "HIGH_PRESSURE" in self.settings_dict.keys(): |
| 122 | + self.high_pressure = self.settings_dict["HIGH_PRESSURE"] |
| 123 | + if "DISPLAY_TIMEOUT" in self.settings_dict.keys(): |
| 124 | + self.display_timeout = self.settings_dict["DISPLAY_TIMEOUT"] |
| 125 | + |
| 126 | + def check_for_events(self): |
| 127 | + self.current_time = time.monotonic() |
| 128 | + self.current_pressure = self.pressure_sensor.pressure |
| 129 | + self._update_state() |
| 130 | + self._notify_callbacks() |
| 131 | + self._update_display() |
| 132 | + |
| 133 | + def run(self): |
| 134 | + while True: |
| 135 | + self.check_for_events() |
| 136 | + |
| 137 | + def _catagorize_pressure(self, pressure): |
| 138 | + """determine the strength and polarity of the pressure reading""" |
| 139 | + level = 0 |
| 140 | + polarity = 0 |
| 141 | + abs_pressure = abs(pressure) |
| 142 | + |
| 143 | + if abs_pressure > self.min_pressure: |
| 144 | + level = 1 |
| 145 | + if abs_pressure > self.high_pressure: |
| 146 | + level = 2 |
| 147 | + |
| 148 | + if level != 0: |
| 149 | + if pressure > 0: |
| 150 | + polarity = 1 |
| 151 | + else: |
| 152 | + polarity = -1 |
| 153 | + |
| 154 | + return (polarity, level) |
| 155 | + |
| 156 | + def on_sip(self, func): |
| 157 | + self.add_on_sip(func) |
| 158 | + return func |
| 159 | + |
| 160 | + def on_puff(self, func): |
| 161 | + self.add_on_puff(func) |
| 162 | + return func |
| 163 | + |
| 164 | + def add_on_sip(self, new_callback): |
| 165 | + self._on_sip_callbacks.append(new_callback) |
| 166 | + |
| 167 | + def add_on_puff(self, new_callback): |
| 168 | + self._on_puff_callbacks.append(new_callback) |
| 169 | + |
| 170 | + def _update_state(self): |
| 171 | + """Updates the internal state to detect if a sip/puff has been started or stopped""" |
| 172 | + |
| 173 | + self.current_polarity, level = self._catagorize_pressure(self.current_pressure) |
| 174 | + |
| 175 | + if self.state == DETECTED: |
| 176 | + self.state = WAITING |
| 177 | + |
| 178 | + self.start_polarity = 0 |
| 179 | + self.peak_level = 0 |
| 180 | + self.duration = 0 |
| 181 | + |
| 182 | + if (self.state == WAITING) and level != 0 and (self.start_polarity == 0): |
| 183 | + self.state = STARTED |
| 184 | + self.start_polarity = self.current_polarity |
| 185 | + self.puff_start = time.monotonic() |
| 186 | + |
| 187 | + if self.state == STARTED: |
| 188 | + if level > self.peak_level: |
| 189 | + self.peak_level = level |
| 190 | + |
| 191 | + if level == 0: |
| 192 | + self.state = DETECTED |
| 193 | + self.duration = time.monotonic() - self.puff_start |
| 194 | + |
| 195 | + def _notify_callbacks(self): |
| 196 | + state_changed = self.prev_state != self.state |
| 197 | + self.prev_state = self.state |
| 198 | + if not state_changed: |
| 199 | + return |
| 200 | + |
| 201 | + if self.state == DETECTED: |
| 202 | + |
| 203 | + # if this is a sip |
| 204 | + if self.start_polarity == -1: |
| 205 | + for on_sip_callback in self._on_sip_callbacks: |
| 206 | + on_sip_callback(self.peak_level, self.duration) |
| 207 | + |
| 208 | + # if this is a sip |
| 209 | + if self.start_polarity == 1: |
| 210 | + for on_puff_callback in self._on_puff_callbacks: |
| 211 | + on_puff_callback(self.peak_level, self.duration) |
| 212 | + |
| 213 | + def _update_display_strings(self): |
| 214 | + |
| 215 | + self.press_str = "Press: %0.3f" % self.current_pressure |
| 216 | + |
| 217 | + if self.state == DETECTED: |
| 218 | + self.duration_str = "Duration: %0.2f" % self.duration |
| 219 | + |
| 220 | + self.state_str = "DETECTED:" |
| 221 | + if self.start_polarity == -1: |
| 222 | + if self.peak_level == STRONG: |
| 223 | + self.detection_result_str = "STRONG SIP" |
| 224 | + if self.peak_level == SOFT: |
| 225 | + self.detection_result_str = "SOFT SIP" |
| 226 | + |
| 227 | + if self.start_polarity == 1: |
| 228 | + if self.peak_level == STRONG: |
| 229 | + self.detection_result_str = "STRONG PUFF" |
| 230 | + if self.peak_level == SOFT: |
| 231 | + self.detection_result_str = "SOFT PUFF" |
| 232 | + |
| 233 | + self.state_display_start = self.current_time |
| 234 | + |
| 235 | + elif self.state == WAITING: |
| 236 | + display_elapsed = self.current_time - self.state_display_start |
| 237 | + if display_elapsed > self.display_timeout: |
| 238 | + self.detection_result_str = " " |
| 239 | + self.duration_str = " " |
| 240 | + self.detection_result_str = " " |
| 241 | + self.state_str = "WAITING FOR INPUT" |
| 242 | + elif self.state == STARTED: |
| 243 | + if self.start_polarity == -1: |
| 244 | + self.state_str = "SIP STARTED..." |
| 245 | + |
| 246 | + if self.start_polarity == 1: |
| 247 | + self.state_str = "PUFF STARTED..." |
| 248 | + def _update_display(self): |
| 249 | + self._update_display_strings() |
| 250 | + banner = label.Label(FONT, text=BANNER_STRING, color=COLOR) |
| 251 | + state = label.Label(FONT, text=self.state_str, color=COLOR) |
| 252 | + detector_result = label.Label(FONT, text=self.detection_result_str, color=COLOR) |
| 253 | + duration = label.Label(FONT, text=self.duration_str, color=COLOR) |
| 254 | + min_pressure_label = label.Label(FONT, text=self.min_press_str, color=COLOR) |
| 255 | + high_pressure_label = label.Label(FONT, text=self.high_press_str, color=COLOR) |
| 256 | + pressure_label = label.Label(FONT, text=self.press_str, color=COLOR) |
| 257 | + |
| 258 | + banner.x = 0 |
| 259 | + banner.y = 0 + Y_OFFSET |
| 260 | + |
| 261 | + state.x = 10 |
| 262 | + state.y = 10 + Y_OFFSET |
| 263 | + |
| 264 | + detector_result.x = 10 |
| 265 | + detector_result.y = 20 + Y_OFFSET |
| 266 | + |
| 267 | + duration.x = 10 |
| 268 | + duration.y = 30 + Y_OFFSET |
| 269 | + |
| 270 | + min_pressure_label.x = 0 |
| 271 | + min_pressure_label.y = BOTTOM_ROW - 10 |
| 272 | + |
| 273 | + pressure_label.x = DISPLAY_WIDTH - pressure_label.bounding_box[2] |
| 274 | + pressure_label.y = BOTTOM_ROW |
| 275 | + |
| 276 | + high_pressure_label.x = 0 |
| 277 | + high_pressure_label.y = BOTTOM_ROW |
| 278 | + |
| 279 | + splash = Group(max_size=10) |
| 280 | + splash.append(banner) |
| 281 | + splash.append(state) |
| 282 | + splash.append(detector_result) |
| 283 | + splash.append(duration) |
| 284 | + splash.append(min_pressure_label) |
| 285 | + splash.append(high_pressure_label) |
| 286 | + splash.append(pressure_label) |
| 287 | + |
| 288 | + self.display.show(splash) |
0 commit comments