|
9 | 9 | import struct |
10 | 10 | import logging |
11 | 11 |
|
| 12 | +import multiprocessing |
| 13 | +import queue |
| 14 | + |
12 | 15 | class PPK2_Command(): |
13 | 16 | """Serial command opcodes""" |
14 | 17 | NO_OP = 0x00 |
@@ -344,3 +347,119 @@ def get_samples(self, buf): |
344 | 347 | self.remainder["len"] = len(buf)-offset |
345 | 348 |
|
346 | 349 | return samples # return list of samples, handle those lists in PPK2 API wrapper |
| 350 | + |
| 351 | + |
| 352 | +class PPK_Fetch(multiprocessing.Process): |
| 353 | + ''' |
| 354 | + Background process for polling the data in multiprocessing variant |
| 355 | + ''' |
| 356 | + def __init__(self, ppk2, quit_evt, buffer_len_s=10, buffer_chunk_s=0.5): |
| 357 | + super().__init__() |
| 358 | + self._ppk2 = ppk2 |
| 359 | + self._quit = quit_evt |
| 360 | + |
| 361 | + self.print_stats = False |
| 362 | + self._stats = (None, None) |
| 363 | + self._last_timestamp = 0 |
| 364 | + |
| 365 | + self._buffer_max_len = int(buffer_len_s * 100000 * 4) # 100k 4-byte samples per second |
| 366 | + self._buffer_chunk = int(buffer_chunk_s * 100000 * 4) # put in the queue in chunks of 0.5s |
| 367 | + |
| 368 | + # round buffers to a whole sample |
| 369 | + if self._buffer_max_len % 4 != 0: |
| 370 | + self._buffer_max_len = (self._buffer_max_len // 4) * 4 |
| 371 | + if self._buffer_chunk % 4 != 0: |
| 372 | + self._buffer_chunk = (self._buffer_chunk // 4) * 4 |
| 373 | + |
| 374 | + self._buffer_q = multiprocessing.Queue() |
| 375 | + |
| 376 | + def run(self): |
| 377 | + s = 0 |
| 378 | + t = time.time() |
| 379 | + local_buffer = b'' |
| 380 | + while not self._quit.is_set(): |
| 381 | + d = PPK2_API.get_data(self._ppk2) |
| 382 | + tm_now = time.time() |
| 383 | + local_buffer += d |
| 384 | + while len(local_buffer) >= self._buffer_chunk: |
| 385 | + # FIXME: check if lock might be needed when discarding old data |
| 386 | + self._buffer_q.put(local_buffer[:self._buffer_chunk]) |
| 387 | + while self._buffer_q.qsize()>self._buffer_max_len/self._buffer_chunk: |
| 388 | + self._buffer_q.get() |
| 389 | + local_buffer = local_buffer[self._buffer_chunk:] |
| 390 | + self._last_timestamp = tm_now |
| 391 | + #print(len(d), len(local_buffer), self._buffer_q.qsize()) |
| 392 | + |
| 393 | + # calculate stats |
| 394 | + s += len(d) |
| 395 | + dt = tm_now - t |
| 396 | + if dt >= 1.0: |
| 397 | + if self.print_stats: |
| 398 | + print(s, dt) |
| 399 | + self._stats = (s, dt) |
| 400 | + s = 0 |
| 401 | + t = tm_now |
| 402 | + time.sleep(0.002) |
| 403 | + |
| 404 | + # process would hang on join() if there's data in the buffer after the measurement is done |
| 405 | + while True: |
| 406 | + try: |
| 407 | + self._buffer_q.get(block=False) |
| 408 | + except queue.Empty: |
| 409 | + break |
| 410 | + |
| 411 | + def get_data(self): |
| 412 | + ret = b'' |
| 413 | + count = 0 |
| 414 | + while True: |
| 415 | + try: |
| 416 | + ret += self._buffer_q.get(timeout=0.2) # get_nowait sometimes skips a chunk for some reason |
| 417 | + count += 1 |
| 418 | + except queue.Empty: |
| 419 | + break |
| 420 | + return ret |
| 421 | + |
| 422 | + |
| 423 | +class PPK2_MP(PPK2_API): |
| 424 | + ''' |
| 425 | + Multiprocessing variant of the object. The interface is the same as for the regular one except it spawns |
| 426 | + a background process on start_measuring() |
| 427 | + ''' |
| 428 | + def __init__(self, port, buffer_seconds=10): |
| 429 | + ''' |
| 430 | + port - port where PPK2 is connected |
| 431 | + buffer_seconds - how many seconds of data to keep in the buffer |
| 432 | + ''' |
| 433 | + super().__init__(port) |
| 434 | + self._fetcher = None |
| 435 | + self._quit_evt = multiprocessing.Event() |
| 436 | + self._buffer_seconds = buffer_seconds |
| 437 | + |
| 438 | + # stop measurement in case it was already started |
| 439 | + PPK2_API.stop_measuring(self) |
| 440 | + |
| 441 | + def start_measuring(self): |
| 442 | + # discard the data in the buffer |
| 443 | + while self.get_data()!=b'': |
| 444 | + pass |
| 445 | + |
| 446 | + PPK2_API.start_measuring(self) |
| 447 | + if self._fetcher is not None: |
| 448 | + # fetcher already started |
| 449 | + return |
| 450 | + self._quit_evt.clear() |
| 451 | + self._fetcher = PPK_Fetch(self, self._quit_evt, self._buffer_seconds) |
| 452 | + self._fetcher.start() |
| 453 | + |
| 454 | + def stop_measuring(self): |
| 455 | + PPK2_API.stop_measuring(self) |
| 456 | + PPK2_API.get_data(self) # flush the serial buffer (to prevent unicode error on next command) |
| 457 | + self._quit_evt.set() |
| 458 | + self._fetcher.join() # join() will block if the queue isn't empty |
| 459 | + |
| 460 | + def get_data(self): |
| 461 | + try: |
| 462 | + return self._fetcher.get_data() |
| 463 | + except (TypeError, AttributeError): |
| 464 | + return b'' |
| 465 | + |
0 commit comments