| 
4 | 4 | import logging  | 
5 | 5 | import multiprocessing  | 
6 | 6 | import multiprocessing.queues  | 
7 |  | -import platform  | 
8 | 7 | from multiprocessing.context import ForkProcess as ForkProcessType  | 
9 | 8 | from multiprocessing.context import SpawnProcess as SpawnProcessType  | 
10 | 9 | from typing import Callable  | 
 | 
21 | 20 | SpawnQueue = SpawnContext.Queue  | 
22 | 21 | 
 
  | 
23 | 22 | 
 
  | 
24 |  | -class MacSafeQueue(multiprocessing.queues.Queue):  | 
25 |  | -    """ Multiprocessing queues do not have qsize attributes on MacOS.  | 
26 |  | -    This is slower but more portable version of the multiprocessing Queue  | 
27 |  | -    that adds a explicit counter  | 
28 |  | -
  | 
29 |  | -    Reference : https://github.com/keras-team/autokeras/commit/4ddd568b06b4045ace777bc0fb7bc18573b85a75  | 
30 |  | -    """  | 
31 |  | - | 
32 |  | -    def __init__(self, *args, **kwargs):  | 
33 |  | -        if 'ctx' not in kwargs:  | 
34 |  | -            kwargs['ctx'] = multiprocessing.get_context('spawn')  | 
35 |  | -        super().__init__(*args, **kwargs)  | 
36 |  | -        self._counter = multiprocessing.Value('i', 0)  | 
37 |  | - | 
38 |  | -    def put(self, *args, **kwargs):  | 
39 |  | -        # logger.critical("Putting item {}".format(args))  | 
40 |  | -        x = super().put(*args, **kwargs)  | 
41 |  | -        with self._counter.get_lock():  | 
42 |  | -            self._counter.value += 1  | 
43 |  | -        return x  | 
44 |  | - | 
45 |  | -    def get(self, *args, **kwargs):  | 
46 |  | -        x = super().get(*args, **kwargs)  | 
47 |  | -        with self._counter.get_lock():  | 
48 |  | -            self._counter.value -= 1  | 
49 |  | -        # logger.critical("Getting item {}".format(x))  | 
50 |  | -        return x  | 
51 |  | - | 
52 |  | -    def qsize(self):  | 
53 |  | -        return self._counter.value  | 
54 |  | - | 
55 |  | -    def empty(self):  | 
56 |  | -        return not self._counter.value  | 
57 |  | - | 
58 |  | - | 
59 |  | -# SizedQueue should be constructable using the same calling  | 
60 |  | -# convention as multiprocessing.Queue but that entire signature  | 
61 |  | -# isn't expressible in mypy 0.790  | 
62 |  | -SizedQueue: Callable[..., multiprocessing.Queue]  | 
63 |  | - | 
64 |  | - | 
65 |  | -if platform.system() != 'Darwin':  | 
66 |  | -    import multiprocessing  | 
67 |  | -    SizedQueue = SpawnQueue  | 
68 |  | -else:  | 
69 |  | -    SizedQueue = MacSafeQueue  | 
70 |  | - | 
71 |  | - | 
72 | 23 | def join_terminate_close_proc(process: SpawnProcessType, *, timeout: int = 30) -> None:  | 
73 | 24 |     """Increasingly aggressively terminate a process.  | 
74 | 25 | 
  | 
 | 
0 commit comments