99from logging .handlers import QueueHandler
1010from queue import Empty as QueueEmpty
1111from queue import Queue
12- from typing import Any , List
12+ from typing import Any , Callable , List , Tuple
1313
1414# Inspired by https://stackoverflow.com/a/894284
1515
@@ -33,12 +33,14 @@ def __init__(self, name: str, wrapped_handler: logging.Handler) -> None:
3333 self .setFormatter (self .wrapped_handler .formatter )
3434 self .filters = self .wrapped_handler .filters
3535
36- self .queue = multiprocessing .Manager ().Queue (- 1 )
36+ self ._manager = multiprocessing .Manager ()
37+ self .queue = self ._manager .Queue (- 1 )
3738 self ._is_closed = False
3839 # Use thread to asynchronously receive messages from the queue
3940 self ._queue_thread = threading .Thread (target = self ._receive , name = name )
4041 self ._queue_thread .daemon = True
4142 self ._queue_thread .start ()
43+ self ._usage_counter = 1
4244
4345 def _receive (self ) -> None :
4446 while True :
@@ -72,11 +74,27 @@ def _receive(self) -> None:
7274 def emit (self , record : logging .LogRecord ) -> None :
7375 self .wrapped_handler .emit (record )
7476
77+ def increment_usage (self ) -> None :
78+ self ._usage_counter += 1
79+
80+ def decrement_usage (self ) -> None :
81+ self ._usage_counter -= 1
82+ if self ._usage_counter == 0 :
83+ # unwrap inner handler:
84+ root_logger = getLogger ()
85+ root_logger .removeHandler (self )
86+ root_logger .addHandler (self .wrapped_handler )
87+
88+ self ._is_closed = True
89+ self ._queue_thread .join ()
90+ self ._manager .shutdown ()
91+ super ().close ()
92+
7593 def close (self ) -> None :
7694 if not self ._is_closed :
7795 self ._is_closed = True
7896 self ._queue_thread .join ()
79-
97+ self . _manager . shutdown ()
8098 self .wrapped_handler .close ()
8199 super ().close ()
82100
@@ -101,33 +119,36 @@ def _setup_logging_multiprocessing(
101119 root_logger .addHandler (handler )
102120
103121
104- def _get_multiprocessing_logging_setup_fn () -> Any :
105- root_logger = getLogger ()
122+ class _MultiprocessingLoggingHandlerPool :
123+ def __init__ (self ) -> None :
124+ root_logger = getLogger ()
125+
126+ self .handlers = []
127+ for i , handler in enumerate (list (root_logger .handlers )):
128+ # Wrap logging handlers in _MultiprocessingLoggingHandlers to make them work in a multiprocessing setup
129+ # when using start_methods other than fork, for example, spawn or forkserver
130+ if not isinstance (handler , _MultiprocessingLoggingHandler ):
131+ mp_handler = _MultiprocessingLoggingHandler (
132+ f"multi-processing-handler-{ i } " , handler
133+ )
134+ root_logger .removeHandler (handler )
135+ root_logger .addHandler (mp_handler )
136+ self .handlers .append (mp_handler )
137+ else :
138+ handler .increment_usage ()
139+ self .handlers .append (handler )
140+
141+ def get_multiprocessing_logging_setup_fn (self ) -> Callable [[], None ]:
142+ # Return a logging setup function that when called will setup QueueHandler loggers
143+ # using the queues of the _MultiprocessingLoggingHandlers. This way all log messages
144+ # are forwarded to the main process.
145+ return functools .partial (
146+ _setup_logging_multiprocessing ,
147+ queues = [handler .queue for handler in self .handlers ],
148+ levels = [handler .level for handler in self .handlers ],
149+ filters = warnings .filters ,
150+ )
106151
107- queues = []
108- levels = []
109- for i , handler in enumerate (list (root_logger .handlers )):
110- # Wrap logging handlers in _MultiprocessingLoggingHandlers to make them work in a multiprocessing setup
111- # when using start_methods other than fork, for example, spawn or forkserver
112- if not isinstance (handler , _MultiprocessingLoggingHandler ):
113- mp_handler = _MultiprocessingLoggingHandler (
114- f"multi-processing-handler-{ i } " , handler
115- )
116-
117- root_logger .removeHandler (handler )
118- root_logger .addHandler (mp_handler )
119- else :
120- mp_handler = handler
121-
122- queues .append (mp_handler .queue )
123- levels .append (mp_handler .level )
124-
125- # Return a logging setup function that when called will setup QueueHandler loggers
126- # reusing the queues of each wrapped _MultiprocessingLoggingHandler. This way all log messages
127- # are forwarded to the main process.
128- return functools .partial (
129- _setup_logging_multiprocessing ,
130- queues ,
131- levels ,
132- filters = warnings .filters ,
133- )
152+ def close (self ) -> None :
153+ for handler in self .handlers :
154+ handler .decrement_usage ()
0 commit comments