Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions pytest-embedded/pytest_embedded/dut.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import functools
import logging
import multiprocessing
import os.path
import re
from collections.abc import Callable
Expand All @@ -10,7 +9,7 @@
import pexpect

from .app import App
from .log import PexpectProcess
from .log import MessageQueue, PexpectProcess
from .unity import UNITY_SUMMARY_LINE_REGEX, TestSuite
from .utils import Meta, _InjectMixinCls, remove_asci_color_code, to_bytes, to_list

Expand All @@ -29,7 +28,7 @@ class Dut(_InjectMixinCls):
def __init__(
self,
pexpect_proc: PexpectProcess,
msg_queue: multiprocessing.Queue,
msg_queue: MessageQueue,
app: App,
pexpect_logfile: str,
test_case_name: str,
Expand Down
18 changes: 8 additions & 10 deletions pytest-embedded/pytest_embedded/dut_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,7 @@ def _drop_none_kwargs(kwargs: dict[t.Any, t.Any]):
return {k: v for k, v in kwargs.items() if v is not None}


if sys.platform == 'darwin':
_ctx = multiprocessing.get_context('fork')
else:
_ctx = multiprocessing.get_context()
_ctx = multiprocessing.get_context('spawn')

_stdout = sys.__stdout__

Expand All @@ -45,10 +42,6 @@ def _drop_none_kwargs(kwargs: dict[t.Any, t.Any]):
PARAMETRIZED_FIXTURES_CACHE = {}


def msg_queue_gn() -> MessageQueue:
return MessageQueue()


def _listen(q: MessageQueue, filepath: str, with_timestamp: bool = True, count: int = 1, total: int = 1) -> None:
shall_add_prefix = True
while True:
Expand Down Expand Up @@ -741,10 +734,15 @@ def create(
"""
layout = []
try:
global PARAMETRIZED_FIXTURES_CACHE
msg_queue = msg_queue_gn()
from .plugin import _MP_MANAGER # avoid circular import

if _MP_MANAGER is None:
raise SystemExit('The _MP_MANAGER is not initialized, please use this function under pytest.')

msg_queue = _MP_MANAGER.MessageQueue()
layout.append(msg_queue)

global PARAMETRIZED_FIXTURES_CACHE
_pexpect_logfile = os.path.join(
PARAMETRIZED_FIXTURES_CACHE['_meta'].logdir, f'custom-dut-{DUT_GLOBAL_INDEX}.txt'
)
Expand Down
32 changes: 18 additions & 14 deletions pytest-embedded/pytest_embedded/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
import multiprocessing
import os
import subprocess
import sys
import tempfile
import textwrap
import uuid
from multiprocessing import queues
from multiprocessing.managers import BaseManager
from typing import AnyStr

import pexpect.fdpexpect
Expand All @@ -16,10 +16,11 @@

from .utils import Meta, remove_asci_color_code, to_bytes, to_str, utcnow_str

if sys.platform == 'darwin':
_ctx = multiprocessing.get_context('fork')
else:
_ctx = multiprocessing.get_context()
_ctx = multiprocessing.get_context('spawn')


class MessageQueueManager(BaseManager):
pass


class MessageQueue(queues.Queue):
Expand All @@ -40,7 +41,7 @@ def put(self, obj, **kwargs):
_b = to_bytes(obj)
try:
super().put(_b, **kwargs)
except: # noqa # queue might be closed
except Exception: # queue might be closed
pass

def write(self, s: AnyStr):
Expand All @@ -53,6 +54,9 @@ def isatty(self):
return True


MessageQueueManager.register('MessageQueue', MessageQueue)


class PexpectProcess(pexpect.fdpexpect.fdspawn):
"""
Use a temp file to gather multiple inputs into one output, and do `pexpect.expect()` from one place.
Expand Down Expand Up @@ -146,16 +150,16 @@ def live_print_call(*args, msg_queue: MessageQueue | None = None, expect_returnc

class _PopenRedirectProcess(_ctx.Process):
def __init__(self, msg_queue: MessageQueue, logfile: str):
self._q = msg_queue

self.logfile = logfile

super().__init__(target=self._forward_io, daemon=True) # killed by the main process
super().__init__(target=self._forward_io, args=(msg_queue, logfile), daemon=True)

def _forward_io(self) -> None:
with open(self.logfile) as fr:
@staticmethod
def _forward_io(msg_queue, logfile) -> None:
with open(logfile) as fr:
while True:
self._q.put(fr.read())
try:
msg_queue.put(fr.read()) # msg_queue may be closed
except Exception:
break


class DuplicateStdoutPopen(subprocess.Popen):
Expand Down
21 changes: 17 additions & 4 deletions pytest-embedded/pytest_embedded/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,14 @@
app_fn,
dut_gn,
gdb_gn,
msg_queue_gn,
openocd_gn,
pexpect_proc_fn,
qemu_gn,
serial_gn,
set_parametrized_fixtures_cache,
wokwi_gn,
)
from .log import MessageQueue, PexpectProcess
from .log import MessageQueue, MessageQueueManager, PexpectProcess
from .unity import JunitMerger, UnityTestReportMode, escape_illegal_xml_chars
from .utils import (
SERVICE_LIB_NAMES,
Expand Down Expand Up @@ -300,6 +299,7 @@ def pytest_addoption(parser):
# helpers #
###########
_COUNT = 1
_MP_MANAGER: MessageQueueManager | None = None


def _gte_one_int(v) -> int:
Expand Down Expand Up @@ -630,6 +630,19 @@ def port_app_cache() -> dict[str, str]:
return {}


@pytest.fixture(scope='session', autouse=True)
def _mp_manager():
manager = MessageQueueManager()
manager.start()

global _MP_MANAGER
_MP_MANAGER = manager

yield manager

manager.shutdown()


@pytest.fixture
def test_case_tempdir(test_case_name: str, session_tempdir: str) -> str:
"""Function scoped temp dir for pytest-embedded"""
Expand Down Expand Up @@ -668,8 +681,8 @@ def _pexpect_logfile(test_case_tempdir, logfile_extension, dut_index, dut_total)

@pytest.fixture
@multi_dut_generator_fixture
def msg_queue() -> MessageQueue: # kwargs passed by `multi_dut_generator_fixture()`
return msg_queue_gn()
def msg_queue(_mp_manager) -> MessageQueue: # kwargs passed by `multi_dut_generator_fixture()`
return _mp_manager.MessageQueue()


@pytest.fixture
Expand Down
1 change: 0 additions & 1 deletion pytest-embedded/tests/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,6 @@ def test_expect_all_failed(dut):
result.assert_outcomes(passed=10)


@pytest.mark.xfail(reason='unstable')
def test_expect_from_timeout(testdir):
testdir.makepyfile(r"""
import threading
Expand Down
Loading