|
5 | 5 | from datetime import date |
6 | 6 | from concurrent.futures import ProcessPoolExecutor, as_completed, Future |
7 | 7 | import itertools |
| 8 | +from contextlib import contextmanager |
8 | 9 |
|
9 | 10 | import shutil |
10 | 11 | import logging |
|
30 | 31 | from medcat.utils.defaults import avoid_legacy_conversion |
31 | 32 | from medcat.utils.defaults import doing_legacy_conversion_message |
32 | 33 | from medcat.utils.defaults import LegacyConversionDisabledError |
33 | | -from medcat.utils.usage_monitoring import UsageMonitor |
| 34 | +from medcat.utils.usage_monitoring import UsageMonitor, _NoDelUM |
34 | 35 | from medcat.utils.import_utils import MissingDependenciesError |
35 | 36 |
|
36 | 37 |
|
@@ -357,6 +358,28 @@ def get_entities_multi_texts( |
357 | 358 | yield text_index, result |
358 | 359 | return |
359 | 360 |
|
| 361 | + with self._no_usage_monitor_exit_flushing(): |
| 362 | + yield from self._multiprocess(n_process, batch_iter) |
| 363 | + |
| 364 | + @contextmanager |
| 365 | + def _no_usage_monitor_exit_flushing(self): |
| 366 | + # NOTE: the `UsageMonitor.__del__` method can cause |
| 367 | + # multiprocessing to stall while it waits for it to be |
| 368 | + # called. So here we remove the method. |
| 369 | + # However, due to the object being pickled for multiprocessing |
| 370 | + # purposes, the class'es `__del__` method will be used anyway. |
| 371 | + # So we need to trick it into using a different class. |
| 372 | + original_cls = self.usage_monitor.__class__ |
| 373 | + self.usage_monitor.__class__ = _NoDelUM |
| 374 | + try: |
| 375 | + yield |
| 376 | + finally: |
| 377 | + self.usage_monitor.__class__ = original_cls |
| 378 | + |
| 379 | + def _multiprocess( |
| 380 | + self, n_process: int, |
| 381 | + batch_iter: Iterator[list[tuple[str, str, bool]]] |
| 382 | + ) -> Iterator[tuple[str, Union[dict, Entities, OnlyCUIEntities]]]: |
360 | 383 | external_processes = n_process - 1 |
361 | 384 | with ProcessPoolExecutor(max_workers=external_processes) as executor: |
362 | 385 | yield from self._mp_one_batch_per_process( |
|
0 commit comments