Skip to content

subinterpreters.Queue interface is incompatible with logging.handlers.QueueListener #138253

@j2cry

Description

@j2cry

Bug report

Bug description:

Common interface for getting an item from queue (queue.Queue, multiprocessing.Queue, etc.) is

def get(self, block=True, timeout=None): ...

but interpreters.Queue has a different one:

def get(self, timeout=None, *, _delay=10 / 1000): ...

This leads to incorrect behavior when used in logging.handlers.QueueListener. QueueListener thread exits ~1second after the queue becomes empty.

class QueueListener(object):
    def dequeue(self, block):
        return self.queue.get(block)

    def _monitor(self):
        ...
                record = self.dequeue(True)  # instead of blocking flag it sets the timeout for interpreters.Queue
        ...

Here's the test script

import sys
import time
import logging
from logging.handlers import QueueHandler, QueueListener
from concurrent.futures import InterpreterPoolExecutor, wait
from concurrent.interpreters import create_queue


def initworker(queue):
    loghandler = QueueHandler(queue)
    loghandler.setLevel(logging.DEBUG)
    logger = logging.getLogger()
    logger.setLevel(logging.DEBUG)
    logger.addHandler(loghandler)


def worker(num: int, delay: float):
    time.sleep(delay)  # emulate highload
    logging.debug('finish %s', str(num))


class CustomQueueListener(QueueListener):
    def dequeue(self, block: bool) -> logging.LogRecord:
        # return self.queue.get(block)  # original
        return self.queue.get()


def check(listener: QueueListener, bounds: tuple[int, int], delay: float):
    tasks = []
    with (
        InterpreterPoolExecutor(initializer=initworker, initargs=(queue, )) as pool,
        listener,
    ):
        for i in range(*bounds):
            tasks.append(pool.submit(worker, i, delay))
        wait(tasks)



if __name__ == '__main__':
    loghandler = logging.StreamHandler(sys.stdout)
    queue = create_queue()
    check(QueueListener(queue, loghandler), (0, 10), delay=0.5)  # everything is ok

    queue = create_queue()
    check(QueueListener(queue, loghandler), (10, 20), delay=1.25)  # no output, because listener thread is stopped

    queue = create_queue()
    check(CustomQueueListener(queue, loghandler), (20, 30), delay=1.25)  # everything is ok

CPython versions tested on:

3.14

Operating systems tested on:

Windows, Linux

Linked PRs

Metadata

Metadata

Assignees

No one assigned

    Labels

    3.14bugs and security fixes3.15new features, bugs and security fixesrelease-blockerstdlibStandard Library Python modules in the Lib/ directorytopic-subinterpreterstype-bugAn unexpected behavior, bug, or error

    Projects

    Status

    Done

    Status

    Done

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions