Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions Doc/library/logging.config.rst
Original file line number Diff line number Diff line change
Expand Up @@ -753,16 +753,17 @@ The ``queue`` and ``listener`` keys are optional.

If the ``queue`` key is present, the corresponding value can be one of the following:

* An object implementing the :class:`queue.Queue` public API. For instance,
this may be an actual instance of :class:`queue.Queue` or a subclass thereof,
or a proxy obtained by :meth:`multiprocessing.managers.SyncManager.Queue`.
* An object implementing the :meth:`Queue.put_nowait <queue.Queue.put_nowait>`
and :meth:`Queue.get <queue.Queue.get>` public API. For instance, this may be
an actual instance of :class:`queue.Queue` or a subclass thereof, or a proxy
obtained by :meth:`multiprocessing.managers.SyncManager.Queue`.

This is of course only possible if you are constructing or modifying
the configuration dictionary in code.

* A string that resolves to a callable which, when called with no arguments, returns
the :class:`queue.Queue` instance to use. That callable could be a
:class:`queue.Queue` subclass or a function which returns a suitable queue instance,
the queue instance to use. That callable could be a :class:`queue.Queue` subclass
or a function which returns a suitable queue instance,
such as ``my.module.queue_factory()``.

* A dict with a ``'()'`` key which is constructed in the usual way as discussed in
Expand Down
14 changes: 7 additions & 7 deletions Lib/logging/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,7 @@ def as_tuple(self, value):

def _is_queue_like_object(obj):
"""Check that *obj* implements the Queue API."""
if isinstance(obj, queue.Queue):
if isinstance(obj, (queue.Queue, queue.SimpleQueue)):
return True
# defer importing multiprocessing as much as possible
from multiprocessing.queues import Queue as MPQueue
Expand All @@ -516,13 +516,13 @@ def _is_queue_like_object(obj):
# Ideally, we would have wanted to simply use strict type checking
# instead of a protocol-based type checking since the latter does
# not check the method signatures.
queue_interface = [
'empty', 'full', 'get', 'get_nowait',
'put', 'put_nowait', 'join', 'qsize',
'task_done',
]
#
# Note that only 'put_nowait' and 'get' are required by the logging
# queue handler and queue listener (see gh-124653) and that other
# methods are either optional or unused.
minimal_queue_interface = ['put_nowait', 'get']
return all(callable(getattr(obj, method, None))
for method in queue_interface)
for method in minimal_queue_interface)

class DictConfigurator(BaseConfigurator):
"""
Expand Down
108 changes: 64 additions & 44 deletions Lib/test/test_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -2376,16 +2376,22 @@ def __getattr__(self, attribute):
return getattr(queue, attribute)

class CustomQueueFakeProtocol(CustomQueueProtocol):
# An object implementing the Queue API (incorrect signatures).
# An object implementing the minimial Queue API for
# the logging module but with incorrect signatures.
#
# The object will be considered a valid queue class since we
# do not check the signatures (only callability of methods)
# but will NOT be usable in production since a TypeError will
# be raised due to a missing argument.
def empty(self, x):
# be raised due to the extra argument in 'put_nowait'.
def put_nowait(self):
pass

class CustomQueueWrongProtocol(CustomQueueProtocol):
empty = None
put_nowait = None

class MinimalQueueProtocol:
def put_nowait(self, x): pass
def get(self): pass

def queueMaker():
return queue.Queue()
Expand Down Expand Up @@ -3945,56 +3951,70 @@ def test_config_queue_handler(self):
msg = str(ctx.exception)
self.assertEqual(msg, "Unable to configure handler 'ah'")

def _apply_simple_queue_listener_configuration(self, qspec):
self.apply_config({
"version": 1,
"handlers": {
"queue_listener": {
"class": "logging.handlers.QueueHandler",
"queue": qspec,
},
},
})

@threading_helper.requires_working_threading()
@support.requires_subprocess()
@patch("multiprocessing.Manager")
def test_config_queue_handler_does_not_create_multiprocessing_manager(self, manager):
# gh-120868, gh-121723

from multiprocessing import Queue as MQ

q1 = {"()": "queue.Queue", "maxsize": -1}
q2 = MQ()
q3 = queue.Queue()
# CustomQueueFakeProtocol passes the checks but will not be usable
# since the signatures are incompatible. Checking the Queue API
# without testing the type of the actual queue is a trade-off
# between usability and the work we need to do in order to safely
# check that the queue object correctly implements the API.
q4 = CustomQueueFakeProtocol()

for qspec in (q1, q2, q3, q4):
self.apply_config(
{
"version": 1,
"handlers": {
"queue_listener": {
"class": "logging.handlers.QueueHandler",
"queue": qspec,
},
},
}
)
manager.assert_not_called()
# gh-120868, gh-121723, gh-124653

for qspec in [
{"()": "queue.Queue", "maxsize": -1},
queue.Queue(),
# queue.SimpleQueue does not inherit from queue.Queue
queue.SimpleQueue(),
# CustomQueueFakeProtocol passes the checks but will not be usable
# since the signatures are incompatible. Checking the Queue API
# without testing the type of the actual queue is a trade-off
# between usability and the work we need to do in order to safely
# check that the queue object correctly implements the API.
CustomQueueFakeProtocol(),
MinimalQueueProtocol(),
]:
with self.subTest(qspec=qspec):
self._apply_simple_queue_listener_configuration(qspec)
manager.assert_not_called()

@patch("multiprocessing.Manager")
def test_config_queue_handler_invalid_config_does_not_create_multiprocessing_manager(self, manager):
# gh-120868, gh-121723

for qspec in [object(), CustomQueueWrongProtocol()]:
with self.assertRaises(ValueError):
self.apply_config(
{
"version": 1,
"handlers": {
"queue_listener": {
"class": "logging.handlers.QueueHandler",
"queue": qspec,
},
},
}
)
manager.assert_not_called()
with self.subTest(qspec=qspec), self.assertRaises(ValueError):
self._apply_simple_queue_listener_configuration(qspec)
manager.assert_not_called()

@skip_if_tsan_fork
@support.requires_subprocess()
@unittest.skipUnless(support.Py_DEBUG, "requires a debug build for testing"
" assertions in multiprocessing")
def test_config_reject_simple_queue_handler_multiprocessing_context(self):
# multiprocessing.SimpleQueue does not implement 'put_nowait'
# and thus cannot be used as a queue-like object (gh-124653)

import multiprocessing

if support.MS_WINDOWS:
start_methods = ['spawn']
else:
start_methods = ['spawn', 'fork', 'forkserver']

for start_method in start_methods:
with self.subTest(start_method=start_method):
ctx = multiprocessing.get_context(start_method)
qspec = ctx.SimpleQueue()
with self.assertRaises(ValueError):
self._apply_simple_queue_listener_configuration(qspec)

@skip_if_tsan_fork
@support.requires_subprocess()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Fix detection of the minimal Queue API needed by the :mod:`logging` module.
Patch by Bénédikt Tran.
Loading