From 7a0e360969097715dd81300a5688333d4d53c062 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jelmer=20Vernoo=C4=B3?= Date: Tue, 17 Jun 2025 23:36:46 +0100 Subject: [PATCH] Drop testtools dependency --- pyproject.toml | 4 +- python/subunit/__init__.py | 137 ++--- python/subunit/_content.py | 131 +++++ python/subunit/chunked.py | 18 +- python/subunit/details.py | 23 +- .../subunit/filter_scripts/subunit_filter.py | 159 +++++- python/subunit/filters.py | 8 +- python/subunit/test_results.py | 387 ++++++++++++-- python/subunit/tests/__init__.py | 8 +- python/subunit/tests/test_details.py | 47 +- python/subunit/tests/test_subunit_filter.py | 109 +++- python/subunit/tests/test_test_protocol.py | 486 +++++++++++------- python/subunit/tests/test_test_results.py | 2 +- 13 files changed, 1158 insertions(+), 361 deletions(-) create mode 100644 python/subunit/_content.py diff --git a/pyproject.toml b/pyproject.toml index faff452..f6f6ee1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [build-system] build-backend = "setuptools.build_meta" -requires = ["iso8601", "setuptools>=61.2", "testtools"] +requires = ["iso8601", "setuptools>=61.2"] [tool.ruff] line-length = 120 @@ -20,7 +20,7 @@ classifiers = [ "Programming Language :: Python", "Topic :: Software Development :: Testing", ] -dependencies = ["iso8601", "testtools>=0.9.34"] +dependencies = ["iso8601"] description = "Python implementation of subunit test streaming protocol" dynamic = ["version"] keywords = ["python", "streaming", "test"] diff --git a/python/subunit/__init__.py b/python/subunit/__init__.py index 292a72a..cab74b5 100644 --- a/python/subunit/__init__.py +++ b/python/subunit/__init__.py @@ -125,18 +125,15 @@ def test_script_two(self): import iso8601 -from testtools import ExtendedToOriginalDecorator, content, content_type -from testtools.compat import _b, _u -from testtools.content import TracebackContent - -try: - from testtools.testresult.real import _StringException - - RemoteException = _StringException -except ImportError: - raise ImportError("testtools.testresult.real does not contain _StringException, check your version.") -from testtools import CopyStreamResult, testresult - +from subunit.test_results import ( + ExtendedToOriginalDecorator, + TestResult, + _StringException as RemoteException, + CopyStreamResult, + StreamResult, + StreamResultRouter, +) +from subunit._content import Content as content, ContentType as content_type, TracebackContent from subunit import chunked, details from subunit.v2 import ByteStreamToStreamResult, StreamResultToBytes @@ -175,6 +172,8 @@ def test_script_two(self): "make_stream_binary", "read_test_list", "TestResultStats", + "StreamResult", + "StreamResultRouter", ] PROGRESS_SET = 0 @@ -225,7 +224,7 @@ def write(self, bytes): pass def read(self, len=0): - return _b("") + return b"" class _ParserState(object): @@ -233,19 +232,19 @@ class _ParserState(object): def __init__(self, parser): self.parser = parser - self._test_sym = (_b("test"), _b("testing")) - self._colon_sym = _b(":") - self._error_sym = (_b("error"),) - self._failure_sym = (_b("failure"),) - self._progress_sym = (_b("progress"),) - self._skip_sym = _b("skip") - self._success_sym = (_b("success"), _b("successful")) - self._tags_sym = (_b("tags"),) - self._time_sym = (_b("time"),) - self._xfail_sym = (_b("xfail"),) - self._uxsuccess_sym = (_b("uxsuccess"),) - self._start_simple = _u(" [") - self._start_multipart = _u(" [ multipart") + self._test_sym = (b"test", b"testing") + self._colon_sym = b":" + self._error_sym = (b"error",) + self._failure_sym = (b"failure",) + self._progress_sym = (b"progress",) + self._skip_sym = b"skip" + self._success_sym = (b"success", b"successful") + self._tags_sym = (b"tags",) + self._time_sym = (b"time",) + self._xfail_sym = (b"xfail",) + self._uxsuccess_sym = (b"uxsuccess",) + self._start_simple = " [" + self._start_multipart = " [ multipart" def addError(self, offset, line): """An 'error:' directive has been read.""" @@ -303,7 +302,7 @@ def lineReceived(self, line): def lostConnection(self): """Connection lost.""" - self.parser._lostConnectionInTest(_u("unknown state of ")) + self.parser._lostConnectionInTest("unknown state of ") def startTest(self, offset, line): """A test start command received.""" @@ -383,7 +382,7 @@ def addSuccess(self, offset, line): def lostConnection(self): """Connection lost.""" - self.parser._lostConnectionInTest(_u("")) + self.parser._lostConnectionInTest("") class _OutSideTest(_ParserState): @@ -419,7 +418,7 @@ def lineReceived(self, line): def lostConnection(self): """Connection lost.""" - self.parser._lostConnectionInTest(_u("%s report of ") % self._outcome_label()) + self.parser._lostConnectionInTest("%s report of " % self._outcome_label()) def _outcome_label(self): """The label to describe this outcome.""" @@ -530,9 +529,9 @@ def __init__(self, client, stream=None, forward_stream=None): # start with outside test. self._state = self._outside_test # Avoid casts on every call - self._plusminus = _b("+-") - self._push_sym = _b("push") - self._pop_sym = _b("pop") + self._plusminus = b"+-" + self._push_sym = b"push" + self._pop_sym = b"pop" def _handleProgress(self, offset, line): """Process a progress directive.""" @@ -562,7 +561,7 @@ def _handleTime(self, offset, line): try: event_time = iso8601.parse_date(line[offset:-1].decode()) except TypeError: - raise TypeError(_u("Failed to parse %r, got %r") % (line, sys.exc_info()[1])) + raise TypeError("Failed to parse %r, got %r" % (line, sys.exc_info()[1])) self.client.time(event_time) def lineReceived(self, line): @@ -570,7 +569,7 @@ def lineReceived(self, line): self._state.lineReceived(line) def _lostConnectionInTest(self, state_string): - error_string = _u("lost connection during %stest '%s'") % (state_string, self.current_test_description) + error_string = "lost connection during %stest '%s'" % (state_string, self.current_test_description) self.client.addError(self._current_test, RemoteError(error_string)) self.client.stopTest(self._current_test) @@ -599,7 +598,7 @@ def stdOutLineReceived(self, line): self._stream.write(line) -class TestProtocolClient(testresult.TestResult): +class TestProtocolClient(TestResult): """A TestResult which generates a subunit stream for a test run. # Get a TestSuite or TestCase to run @@ -619,17 +618,17 @@ class TestProtocolClient(testresult.TestResult): """ def __init__(self, stream): - testresult.TestResult.__init__(self) + TestResult.__init__(self) stream = make_stream_binary(stream) self._stream = stream - self._progress_fmt = _b("progress: ") - self._bytes_eol = _b("\n") - self._progress_plus = _b("+") - self._progress_push = _b("push") - self._progress_pop = _b("pop") - self._empty_bytes = _b("") - self._start_simple = _b(" [\n") - self._end_simple = _b("]\n") + self._progress_fmt = b"progress: " + self._bytes_eol = b"\n" + self._progress_plus = b"+" + self._progress_push = b"push" + self._progress_pop = b"pop" + self._empty_bytes = b"" + self._start_simple = b" [\n" + self._end_simple = b"]\n" def addError(self, test, error=None, details=None): """Report an error in test test. @@ -697,7 +696,7 @@ def _addOutcome(self, outcome, test, error=None, details=None, error_permitted=T :param error_permitted: If True then one and only one of error or details must be supplied. If False then error must not be supplied and details is still optional.""" - self._stream.write(_b("%s: " % outcome) + self._test_id(test)) + self._stream.write(("%s: " % outcome).encode("utf-8") + self._test_id(test)) if error_permitted: if error is None and details is None: raise ValueError @@ -712,7 +711,7 @@ def _addOutcome(self, outcome, test, error=None, details=None, error_permitted=T elif details is not None: self._write_details(details) else: - self._stream.write(_b("\n")) + self._stream.write(b"\n") if details is not None or error is not None: self._stream.write(self._end_simple) @@ -721,8 +720,8 @@ def addSkip(self, test, reason=None, details=None): if reason is None: self._addOutcome("skip", test, error=None, details=details) else: - self._stream.write(_b("skip: %s [\n" % test.id())) - self._stream.write(_b("%s\n" % reason)) + self._stream.write(("skip: %s [\n" % test.id()).encode("utf-8")) + self._stream.write(("%s\n" % reason).encode("utf-8")) self._stream.write(self._end_simple) def addSuccess(self, test, details=None): @@ -753,7 +752,7 @@ def _test_id(self, test): def startTest(self, test): """Mark a test as starting its test run.""" super(TestProtocolClient, self).startTest(test) - self._stream.write(_b("test: ") + self._test_id(test) + _b("\n")) + self._stream.write(b"test: " + self._test_id(test) + b"\n") self._stream.flush() def stopTest(self, test): @@ -772,7 +771,7 @@ def progress(self, offset, whence): """ if whence == PROGRESS_CUR and offset > -1: prefix = self._progress_plus - offset = _b(str(offset)) + offset = str(offset).encode("utf-8") elif whence == PROGRESS_PUSH: prefix = self._empty_bytes offset = self._progress_push @@ -781,7 +780,7 @@ def progress(self, offset, whence): offset = self._progress_pop else: prefix = self._empty_bytes - offset = _b(str(offset)) + offset = str(offset).encode("utf-8") self._stream.write(self._progress_fmt + prefix + offset + self._bytes_eol) def tags(self, new_tags, gone_tags): @@ -789,8 +788,8 @@ def tags(self, new_tags, gone_tags): if not new_tags and not gone_tags: return tags = set([tag.encode("utf8") for tag in new_tags]) - tags.update([_b("-") + tag.encode("utf8") for tag in gone_tags]) - tag_line = _b("tags: ") + _b(" ").join(tags) + _b("\n") + tags.update([b"-" + tag.encode("utf8") for tag in gone_tags]) + tag_line = b"tags: " + b" ".join(tags) + b"\n" self._stream.write(tag_line) def time(self, a_datetime): @@ -800,10 +799,10 @@ def time(self, a_datetime): """ time = a_datetime.astimezone(iso8601.UTC) self._stream.write( - _b( + ( "time: %04d-%02d-%02d %02d:%02d:%02d.%06dZ\n" % (time.year, time.month, time.day, time.hour, time.minute, time.second, time.microsecond) - ) + ).encode("utf-8") ) def _write_details(self, details): @@ -811,17 +810,19 @@ def _write_details(self, details): :param details: An extended details dict for a test outcome. """ - self._stream.write(_b(" [ multipart\n")) + self._stream.write(b" [ multipart\n") for name, content in sorted(details.items()): # noqa: F402 - self._stream.write(_b("Content-Type: %s/%s" % (content.content_type.type, content.content_type.subtype))) + self._stream.write( + ("Content-Type: %s/%s" % (content.content_type.type, content.content_type.subtype)).encode("utf-8") + ) parameters = content.content_type.parameters if parameters: - self._stream.write(_b(";")) + self._stream.write(b";") param_strs = [] for param, value in sorted(parameters.items()): param_strs.append("%s=%s" % (param, value)) - self._stream.write(_b(",".join(param_strs))) - self._stream.write(_b("\n%s\n" % name)) + self._stream.write(",".join(param_strs).encode("utf-8")) + self._stream.write(("\n%s\n" % name).encode("utf-8")) encoder = chunked.Encoder(self._stream) list(map(encoder.write, content.iter_bytes())) encoder.close() @@ -830,8 +831,8 @@ def done(self): """Obey the testtools result.done() interface.""" -def RemoteError(description=_u("")): - return (_StringException, _StringException(description), None) +def RemoteError(description=""): + return (RemoteException, RemoteException(description), None) class RemotedTestCase(unittest.TestCase): @@ -879,7 +880,7 @@ def run(self, result=None): if result is None: result = self.defaultTestResult() result.startTest(self) - result.addError(self, RemoteError(_u("Cannot run RemotedTestCases.\n"))) + result.addError(self, RemoteError("Cannot run RemotedTestCases.\n")) result.stopTest(self) def _strclass(self): @@ -909,7 +910,7 @@ def run(self, result=None): def debug(self): """Run the test without collecting errors in a TestResult""" - self._run(testresult.TestResult()) + self._run(TestResult()) def _run(self, result): protocol = TestProtocolServer(result) @@ -944,7 +945,7 @@ class IsolatedTestSuite(unittest.TestSuite): def run(self, result=None): if result is None: - result = testresult.TestResult() + result = TestResult() run_isolated(unittest.TestSuite, self, result) @@ -1221,7 +1222,7 @@ def run(self, result=None): protocol.lostConnection() -class TestResultStats(testresult.TestResult): +class TestResultStats(TestResult): """A pyunit TestResult interface implementation for making statistics. :ivar total_tests: The total tests seen. @@ -1232,7 +1233,7 @@ class TestResultStats(testresult.TestResult): def __init__(self, stream): """Create a TestResultStats which outputs to stream.""" - testresult.TestResult.__init__(self) + TestResult.__init__(self) self._stream = stream self.failed_tests = 0 self.skipped_tests = 0 @@ -1316,7 +1317,7 @@ def _unwrap_text(stream): except exceptions: # Cannot read from the stream: try via writes try: - stream.write(_b("")) + stream.write(b"") except TypeError: return stream.buffer return stream diff --git a/python/subunit/_content.py b/python/subunit/_content.py new file mode 100644 index 0000000..8eb047b --- /dev/null +++ b/python/subunit/_content.py @@ -0,0 +1,131 @@ +# +# subunit: extensions to Python unittest to get test results from subprocesses. +# Copyright (C) 2025 Robert Collins +# +# Licensed under either the Apache License, Version 2.0 or the BSD 3-clause +# license at the users choice. A copy of both licenses are available in the +# project source as Apache-2.0 and BSD. You may not use this file except in +# compliance with one of these two licences. +# +# Unless required by applicable law or agreed to in writing, software +# distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# license you chose for the specific language governing permissions and +# limitations under that license. +# + +"""Content and content type classes for subunit.""" + +import traceback + + +class ContentType: + """A MIME Content Type.""" + + def __init__(self, type_name, subtype, parameters=None): + """Create a ContentType.""" + self.type = type_name + self.subtype = subtype + self.parameters = parameters or {} + + def __eq__(self, other): + """Check equality.""" + if not isinstance(other, ContentType): + return False + return self.type == other.type and self.subtype == other.subtype and self.parameters == other.parameters + + def __hash__(self): + """Hash for use in sets/dicts.""" + return hash((self.type, self.subtype, tuple(sorted(self.parameters.items())))) + + def __repr__(self): + """String representation matching testtools format.""" + result = f"{self.type}/{self.subtype}" + if self.parameters: + param_strs = [] + for k, v in sorted(self.parameters.items()): + param_strs.append(f'{k}="{v}"') + result += "; " + "; ".join(param_strs) + return result + + +class Content: + """Some content to be attached to a test outcome.""" + + def __init__(self, content_type, content_bytes): + """Create a Content object. + + :param content_type: A ContentType object. + :param content_bytes: A callable that returns an iterable of bytes. + """ + self.content_type = content_type + self._content_bytes = content_bytes + + def iter_bytes(self): + """Iterate over the content as bytes.""" + return self._content_bytes() + + def __eq__(self, other): + """Check equality.""" + if not isinstance(other, Content): + return False + return self.content_type == other.content_type and list(self.iter_bytes()) == list(other.iter_bytes()) + + def __repr__(self): + """String representation for debugging.""" + # Match testtools Content repr format + params = "" + if self.content_type.parameters: + param_strs = [] + for k, v in sorted(self.content_type.parameters.items()): + param_strs.append('%s="%s"' % (k, v)) + params = "; " + ", ".join(param_strs) + + # Try to get the value for content + try: + value = b"".join(self.iter_bytes()) + return "" % ( + self.content_type.type, + self.content_type.subtype, + params, + value, + ) + except (TypeError, ValueError, AttributeError): + # If we can't get the value, just show the type + return "" % (self.content_type.type, self.content_type.subtype, params) + + +def text_content(text): + """Convenience function to create text content.""" + content_type = ContentType("text", "plain", {"charset": "utf8"}) + if isinstance(text, str): + text = text.encode("utf-8") + return Content(content_type, lambda: [text]) + + +class TracebackContent(Content): + """Content object for Python tracebacks.""" + + def __init__(self, error_tuple, test_case): + """Create a TracebackContent. + + :param error_tuple: An (exc_type, exc_value, exc_tb) tuple. + :param test_case: The test case that had the error. + """ + content_type = ContentType("text", "x-traceback", {"charset": "utf8", "language": "python"}) + exc_type, exc_value, exc_tb = error_tuple + + def get_traceback_bytes(): + if exc_tb is not None: + # Real exception with traceback + tb_lines = traceback.format_exception(exc_type, exc_value, exc_tb) + tb_text = "".join(tb_lines) + else: + # Remote exception without traceback - just use class name and message + if exc_type and exc_value: + tb_text = f"{exc_type.__module__}.{exc_type.__name__}: {exc_value}\n" + else: + tb_text = f"{exc_value}\n" + return [tb_text.encode("utf-8")] + + super().__init__(content_type, get_traceback_bytes) diff --git a/python/subunit/chunked.py b/python/subunit/chunked.py index 8648299..469d5ba 100644 --- a/python/subunit/chunked.py +++ b/python/subunit/chunked.py @@ -17,9 +17,7 @@ """Encoder/decoder for http style chunked encoding.""" -from testtools.compat import _b - -empty = _b("") +empty = b"" class Decoder(object): @@ -47,11 +45,11 @@ def __init__(self, output, strict=True): self.state = self._read_length self.body_length = 0 self.strict = strict - self._match_chars = _b("0123456789abcdefABCDEF\r\n") - self._slash_n = _b("\n") - self._slash_r = _b("\r") - self._slash_rn = _b("\r\n") - self._slash_nr = _b("\n\r") + self._match_chars = b"0123456789abcdefABCDEF\r\n" + self._slash_n = b"\n" + self._slash_r = b"\r" + self._slash_rn = b"\r\n" + self._slash_nr = b"\n\r" def close(self): """Close the decoder. @@ -164,7 +162,7 @@ def flush(self, extra_len=0): buffer_size = self.buffer_size self.buffered_bytes = [] self.buffer_size = 0 - self.output.write(_b("%X\r\n" % (buffer_size + extra_len))) + self.output.write(("%X\r\n" % (buffer_size + extra_len)).encode("ascii")) if buffer_size: self.output.write(empty.join(buffered_bytes)) return True @@ -182,4 +180,4 @@ def write(self, bytes): def close(self): """Finish the stream. This does not close the output stream.""" self.flush() - self.output.write(_b("0\r\n")) + self.output.write(b"0\r\n") diff --git a/python/subunit/details.py b/python/subunit/details.py index c14a7d1..408cbb2 100644 --- a/python/subunit/details.py +++ b/python/subunit/details.py @@ -18,14 +18,15 @@ from io import BytesIO -from testtools import content, content_type -from testtools.compat import _b +# Always use our implementation for consistency +from subunit._content import Content, ContentType + from subunit import chunked -end_marker = _b("]\n") -quoted_marker = _b(" ]") -empty = _b("") +end_marker = b"]\n" +quoted_marker = b" ]" +empty = b"" class DetailsParser(object): @@ -36,7 +37,7 @@ class SimpleDetailsParser(DetailsParser): """Parser for single-part [] delimited details.""" def __init__(self, state): - self._message = _b("") + self._message = b"" self._state = state def lineReceived(self, line): @@ -55,15 +56,15 @@ def get_details(self, style=None): # We know that subunit/testtools serialise [] formatted # tracebacks as utf8, but perhaps we need a ReplacingContent # or something like that. - result["traceback"] = content.Content( - content_type.ContentType("text", "x-traceback", {"charset": "utf8"}), lambda: [self._message] + result["traceback"] = Content( + ContentType("text", "x-traceback", {"charset": "utf8"}), lambda: [self._message] ) else: if style == "skip": name = "reason" else: name = "message" - result[name] = content.Content(content_type.ContentType("text", "plain"), lambda: [self._message]) + result[name] = Content(ContentType("text", "plain"), lambda: [self._message]) return result def get_message(self): @@ -88,7 +89,7 @@ def _look_for_content(self, line): main, sub = value.split("/") except ValueError: raise ValueError("Invalid MIME type %r" % value) - self._content_type = content_type.ContentType(main, sub) + self._content_type = ContentType(main, sub) self._parse_state = self._get_name def _get_name(self, line): @@ -103,7 +104,7 @@ def _feed_chunks(self, line): # Line based use always ends on no residue. assert residue == empty, "residue: %r" % (residue,) body = self._body - self._details[self._name] = content.Content(self._content_type, lambda: [body.getvalue()]) + self._details[self._name] = Content(self._content_type, lambda: [body.getvalue()]) self._chunk_parser.close() self._parse_state = self._look_for_content diff --git a/python/subunit/filter_scripts/subunit_filter.py b/python/subunit/filter_scripts/subunit_filter.py index 3ab8805..a55445e 100755 --- a/python/subunit/filter_scripts/subunit_filter.py +++ b/python/subunit/filter_scripts/subunit_filter.py @@ -30,11 +30,130 @@ import sys from optparse import OptionParser -from testtools import ExtendedToStreamDecorator, StreamToExtendedDecorator +# Removed testtools dependency - using subunit implementations instead from subunit import StreamResultToBytes, read_test_list from subunit.filters import filter_by_result, find_stream -from subunit.test_results import TestResultFilter, and_predicates, make_tag_filter +from subunit.test_results import and_predicates, make_tag_filter + + +class StreamFilter: + """Filter stream events based on predicates.""" + + def __init__( + self, + target, + filter_success=True, + filter_skip=False, + filter_error=False, + filter_failure=False, + filter_xfail=False, + filter_predicate=None, + fixup_expected_failures=None, + rename=None, + ): + self.target = target + self.filter_success = filter_success + self.filter_skip = filter_skip + self.filter_error = filter_error + self.filter_failure = filter_failure + self.filter_xfail = filter_xfail + self.filter_predicate = filter_predicate + self.fixup_expected_failures = fixup_expected_failures or frozenset() + self.rename = rename + # Track test states + self._test_started = {} + self._test_tags = {} + + def startTestRun(self): + if hasattr(self.target, "startTestRun"): + self.target.startTestRun() + + def stopTestRun(self): + if hasattr(self.target, "stopTestRun"): + self.target.stopTestRun() + + def status(self, test_id=None, test_status=None, test_tags=None, **kwargs): + """Filter and forward status events.""" + # Handle non-test events + if test_id is None: + self.target.status(test_id=test_id, test_status=test_status, test_tags=test_tags, **kwargs) + return + + # Apply rename if configured + original_test_id = test_id + if self.rename: + test_id = self.rename(test_id) + kwargs = kwargs.copy() if kwargs else {} + + # Track test state + if test_status == "inprogress": + self._test_started[test_id] = True + self._test_tags[test_id] = test_tags + + # Check if this test was already filtered at start + if test_id in self._test_started and not self._test_started[test_id]: + return # Already decided to filter this test + + # Check if we should filter this test + should_filter = False + + # Filter by status + if test_status == "success" and self.filter_success: + should_filter = True + elif test_status == "skip" and self.filter_skip: + should_filter = True + elif test_status == "error" and self.filter_error: + should_filter = True + elif test_status == "failure" and self.filter_failure: + should_filter = True + elif test_status == "expectedfailure" and self.filter_xfail: + should_filter = True + + # Apply custom predicate + if not should_filter and self.filter_predicate: + # Create a dummy test object + class DummyTest: + def __init__(self, test_id): + self._id = test_id + + def id(self): + return self._id + + test = DummyTest(original_test_id) + # Get tags for this test (use stored tags from inprogress or current tags) + tags = test_tags + if tags is None and test_id in self._test_tags: + tags = self._test_tags[test_id] + + # Map status to outcome + outcome_map = { + "success": "success", + "failure": "failure", + "error": "error", + "skip": "skip", + "expectedfailure": "expectedfailure", + "unexpectedsuccess": "unexpectedsuccess", + "inprogress": "exists", # For tag filtering on start + } + outcome = outcome_map.get(test_status, test_status) + + if not self.filter_predicate(test, outcome, None, None, tags): + should_filter = True + # If filtering at inprogress, mark to filter all events for this test + if test_status == "inprogress": + self._test_started[test_id] = False # Mark as filtered + + # If filtering, skip all events for this test + if should_filter: + # Clean up tracking + if test_status != "inprogress": + self._test_started.pop(test_id, None) + self._test_tags.pop(test_id, None) + return + + # Forward the event + self.target.status(test_id=test_id, test_status=test_status, test_tags=test_tags, **kwargs) def make_options(description): @@ -151,22 +270,36 @@ def rename(name): def _make_result(output, options, predicate): """Make the result that we'll send the test outcomes to.""" - fixup_expected_failures = set() - for path in options.fixup_expected_failures or (): - fixup_expected_failures.update(read_test_list(path)) - return StreamToExtendedDecorator( - TestResultFilter( - ExtendedToStreamDecorator(StreamResultToBytes(output)), - filter_error=options.error, - filter_failure=options.failure, + # Create base result + result = StreamResultToBytes(output) + + # Apply filters if needed + if options.success or options.skip or options.error or options.failure or options.xfail or predicate is not None: + # Get fixup expected failures if provided + fixup_expected_failures = frozenset() + if options.fixup_expected_failures: + for fixture in options.fixup_expected_failures: + fixup_expected_failures = fixup_expected_failures.union(read_test_list(fixture)) + + # Create rename function + rename_func = None + if options.renames: + rename_func = _compile_rename(options.renames) + + # Apply stream filter + result = StreamFilter( + result, filter_success=options.success, filter_skip=options.skip, + filter_error=options.error, + filter_failure=options.failure, filter_xfail=options.xfail, filter_predicate=predicate, fixup_expected_failures=fixup_expected_failures, - rename=_compile_rename(options.renames), + rename=rename_func, ) - ) + + return result def main(): @@ -178,7 +311,7 @@ def main(): filter_predicate = and_predicates([regexp_filter, tag_filter]) filter_by_result( - lambda output_to: _make_result(sys.stdout, options, filter_predicate), + lambda output_to: _make_result(output_to, options, filter_predicate), output_path=None, passthrough=(not options.no_passthrough), forward=False, diff --git a/python/subunit/filters.py b/python/subunit/filters.py index 49a0f8a..c80b58c 100644 --- a/python/subunit/filters.py +++ b/python/subunit/filters.py @@ -17,7 +17,7 @@ import sys from optparse import OptionParser -from testtools import CopyStreamResult, StreamResult, StreamResultRouter +from subunit import CopyStreamResult, StreamResult, StreamResultRouter from subunit import ByteStreamToStreamResult, DiscardStream, ProtocolTestCase, StreamResultToBytes from subunit.test_results import CatFiles @@ -95,6 +95,12 @@ def run_tests_from_stream( passthrough_result = StreamResultToBytes(passthrough_stream) result = StreamResultRouter(result) result.add_rule(passthrough_result, "test_id", test_id=None) + else: + # passthrough_stream is None - discard non-test events + + discard_result = StreamResult() # Use a basic StreamResult that does nothing + result = StreamResultRouter(result) + result.add_rule(discard_result, "test_id", test_id=None) test = ByteStreamToStreamResult(input_stream, non_subunit_name="stdout") else: raise Exception("Unknown protocol version.") diff --git a/python/subunit/test_results.py b/python/subunit/test_results.py index 815031d..065dbe0 100644 --- a/python/subunit/test_results.py +++ b/python/subunit/test_results.py @@ -18,15 +18,326 @@ import csv import datetime +import unittest -import testtools -from testtools import StreamResult -from testtools.content import TracebackContent, text_content +from subunit._content import TracebackContent, text_content import iso8601 import subunit +class TestResult(unittest.TestResult): + """Base TestResult class with extended interface support.""" + + def __init__(self): + super().__init__() + self.current_tags = set() + self.failfast = False + + def addError(self, test, err, details=None): + """Add an error, optionally with details.""" + if err is not None: + super().addError(test, err) + + def addFailure(self, test, err, details=None): + """Add a failure, optionally with details.""" + if err is not None: + super().addFailure(test, err) + + def addSkip(self, test, reason, details=None): + """Add a skip, optionally with details.""" + super().addSkip(test, reason) + + def addSuccess(self, test, details=None): + """Add a success, optionally with details.""" + super().addSuccess(test) + + def addExpectedFailure(self, test, err, details=None): + """Add an expected failure, optionally with details.""" + if err is not None: + super().addExpectedFailure(test, err) + + def addUnexpectedSuccess(self, test, details=None): + """Add an unexpected success, optionally with details.""" + super().addUnexpectedSuccess(test) + + def tags(self, new_tags, gone_tags): + """Handle tags - update current_tags.""" + if new_tags: + self.current_tags.update(new_tags) + if gone_tags: + self.current_tags.difference_update(gone_tags) + + def time(self, a_datetime): + """Handle time - default implementation does nothing.""" + pass + + def progress(self, offset, whence): + """Handle progress - default implementation does nothing.""" + pass + + def done(self): + """Indicate that testing is complete.""" + pass + + def _err_details_to_string(self, test, details=None): + """Convert details dict to string representation.""" + if not details: + return "" + + result = [] + for key, content in details.items(): + content_bytes = b"".join(content.iter_bytes()) + result.append(content_bytes.decode("utf-8", errors="replace")) + + content = "".join(result) + # Treat whitespace-only content as empty for error messages + # Also treat quoted bracket content as empty (special case for protocol) + if content.strip() == "" or content.strip() == "]": + return "" + return content + + +class ExtendedToOriginalDecorator: + """Decorator to make new TestResult interface work with old implementations.""" + + def __init__(self, decorated): + """Create a decorator around a TestResult.""" + self.decorated = decorated + + def __getattr__(self, name): + """Delegate to the decorated object.""" + return getattr(self.decorated, name) + + def __setattr__(self, name, value): + """Delegate attribute setting to the decorated object when appropriate.""" + # Always set 'decorated' on ourselves + if name == "decorated": + super().__setattr__(name, value) + # For failfast and other TestResult attributes, forward to decorated + elif hasattr(self, "decorated") and hasattr(self.decorated, name): + setattr(self.decorated, name, value) + else: + super().__setattr__(name, value) + + def addError(self, test, err=None, details=None): + """Add an error, gracefully handling details.""" + if hasattr(self.decorated, "addError"): + # Check if the decorated TestResult truly supports details (not just has the parameter) + # We only consider it to support details if it's our own TestResult class + supports_details = isinstance(self.decorated, TestResult) + + if supports_details: + # If details are supported, pass them through + self.decorated.addError(test, err, details=details) + else: + # For standard TestResult, we need err to be a valid exc_info tuple + if err is not None: + self.decorated.addError(test, err) + elif details is not None: + # Convert details to a simple exception for older TestResult + # Extract content from details to create error message + content = TestResult()._err_details_to_string(test, details) + self.decorated.addError(test, (_StringException, _StringException(content), None)) + + def addFailure(self, test, err=None, details=None): + """Add a failure, gracefully handling details.""" + if hasattr(self.decorated, "addFailure"): + # Check if the decorated TestResult truly supports details (not just has the parameter) + # We only consider it to support details if it's our own TestResult class + supports_details = isinstance(self.decorated, TestResult) + + if supports_details: + # If details are supported, pass them through + self.decorated.addFailure(test, err, details=details) + else: + # For standard TestResult, we need err to be a valid exc_info tuple + if err is not None: + self.decorated.addFailure(test, err) + elif details is not None: + # Convert details to a simple exception for older TestResult + # Extract content from details to create error message + content = TestResult()._err_details_to_string(test, details) + self.decorated.addFailure(test, (_StringException, _StringException(content), None)) + + def addSkip(self, test, reason=None, details=None): + """Add a skip, gracefully handling details.""" + if hasattr(self.decorated, "addSkip"): + # Try with details first, fall back without details + try: + self.decorated.addSkip(test, reason, details=details) + except TypeError: + self.decorated.addSkip(test, reason) + + def addSuccess(self, test, details=None): + """Add a success, gracefully handling details.""" + if hasattr(self.decorated, "addSuccess"): + # Try with details first, fall back without details + try: + self.decorated.addSuccess(test, details=details) + except TypeError: + self.decorated.addSuccess(test) + + def addExpectedFailure(self, test, err=None, details=None): + """Add an expected failure, gracefully handling details.""" + if hasattr(self.decorated, "addExpectedFailure"): + # Check if the decorated TestResult truly supports details (not just has the parameter) + # We only consider it to support details if it's our own TestResult class + supports_details = isinstance(self.decorated, TestResult) + + if supports_details: + # If details are supported, pass them through + self.decorated.addExpectedFailure(test, err, details=details) + else: + # For standard TestResult, we need err to be a valid exc_info tuple + if err is not None: + self.decorated.addExpectedFailure(test, err) + elif details is not None: + # Convert details to a simple exception for older TestResult + # Extract content from details to create error message + content = TestResult()._err_details_to_string(test, details) + self.decorated.addExpectedFailure(test, (_StringException, _StringException(content), None)) + + def addUnexpectedSuccess(self, test, details=None): + """Add an unexpected success, gracefully handling details.""" + if hasattr(self.decorated, "addUnexpectedSuccess"): + # Try with details first, fall back without details + try: + self.decorated.addUnexpectedSuccess(test, details=details) + except TypeError: + self.decorated.addUnexpectedSuccess(test) + + def tags(self, new_tags, gone_tags): + """Handle tags if supported.""" + if hasattr(self.decorated, "tags"): + self.decorated.tags(new_tags, gone_tags) + + def time(self, a_datetime): + """Handle time if supported.""" + if hasattr(self.decorated, "time"): + self.decorated.time(a_datetime) + + def progress(self, offset, whence): + """Handle progress if supported.""" + if hasattr(self.decorated, "progress"): + self.decorated.progress(offset, whence) + + +class _StringException(Exception): + """An exception made from a string representation.""" + + def __init__(self, text): + """Create a _StringException from text.""" + self._text = text + super().__init__(text) + + def __str__(self): + return self._text + + def __eq__(self, other): + """Check equality.""" + if not isinstance(other, _StringException): + return False + return self._text == other._text + + def __hash__(self): + """Hash for use in sets/dicts.""" + return hash(self._text) + + +class StreamResult: + """A basic StreamResult interface.""" + + def startTestRun(self): + """Start a test run.""" + pass + + def stopTestRun(self): + """Stop a test run.""" + pass + + def status(self, **kwargs): + """Handle a status event.""" + pass + + +class CopyStreamResult: + """Copy events to multiple StreamResults.""" + + def __init__(self, targets): + """Create a CopyStreamResult that writes to targets.""" + self.targets = targets + + def startTestRun(self): + """Start test run on all targets.""" + for target in self.targets: + if hasattr(target, "startTestRun"): + target.startTestRun() + + def stopTestRun(self): + """Stop test run on all targets.""" + for target in self.targets: + if hasattr(target, "stopTestRun"): + target.stopTestRun() + + def status(self, **kwargs): + """Handle a status event on all targets.""" + for target in self.targets: + if hasattr(target, "status"): + target.status(**kwargs) + + +class StreamResultRouter: + """Route StreamResult events based on rules.""" + + def __init__(self, target): + """Create a router that sends to target.""" + self.target = target + self.rules = [] + + def add_rule(self, result, field, **kwargs): + """Add a routing rule.""" + self.rules.append((result, field, kwargs)) + + def startTestRun(self): + """Start test run.""" + if hasattr(self.target, "startTestRun"): + self.target.startTestRun() + for result, _, _ in self.rules: + if hasattr(result, "startTestRun"): + result.startTestRun() + + def stopTestRun(self): + """Stop test run.""" + if hasattr(self.target, "stopTestRun"): + self.target.stopTestRun() + for result, _, _ in self.rules: + if hasattr(result, "stopTestRun"): + result.stopTestRun() + + def status(self, **kwargs): + """Route status based on rules.""" + # Check if any rule matches + routed = False + for result, field, rule_kwargs in self.rules: + # Check if this rule matches + matches = True + for key, value in rule_kwargs.items(): + if kwargs.get(key) != value: + matches = False + break + + if matches: + # Route to this result + if hasattr(result, "status"): + result.status(**kwargs) + routed = True + + # If no rule matched, send to default target + if not routed and hasattr(self.target, "status"): + self.target.status(**kwargs) + + # NOT a TestResult, because we are implementing the interface, not inheriting # it. class TestResultDecorator: @@ -44,7 +355,7 @@ class TestResultDecorator: def __init__(self, decorated): """Create a TestResultDecorator forwarding to decorated.""" # Make every decorator degrade gracefully. - self.decorated = testtools.ExtendedToOriginalDecorator(decorated) + self.decorated = ExtendedToOriginalDecorator(decorated) def startTest(self, test): return self.decorated.startTest(test) @@ -76,14 +387,14 @@ def addExpectedFailure(self, test, err=None, details=None): def addUnexpectedSuccess(self, test, details=None): return self.decorated.addUnexpectedSuccess(test, details=details) - def _get_failfast(self): + @property + def failfast(self): return getattr(self.decorated, "failfast", False) - def _set_failfast(self, value): + @failfast.setter + def failfast(self, value): self.decorated.failfast = value - failfast = property(_get_failfast, _set_failfast) - def progress(self, offset, whence): return self.decorated.progress(offset, whence) @@ -112,69 +423,68 @@ class HookedTestResultDecorator(TestResultDecorator): """A TestResult which calls a hook on every event.""" def __init__(self, decorated): - self.super = super() - self.super.__init__(decorated) + super().__init__(decorated) def startTest(self, test): self._before_event() - return self.super.startTest(test) + return super().startTest(test) def startTestRun(self): self._before_event() - return self.super.startTestRun() + return super().startTestRun() def stopTest(self, test): self._before_event() - return self.super.stopTest(test) + return super().stopTest(test) def stopTestRun(self): self._before_event() - return self.super.stopTestRun() + return super().stopTestRun() def addError(self, test, err=None, details=None): self._before_event() - return self.super.addError(test, err, details=details) + return super().addError(test, err, details=details) def addFailure(self, test, err=None, details=None): self._before_event() - return self.super.addFailure(test, err, details=details) + return super().addFailure(test, err, details=details) def addSuccess(self, test, details=None): self._before_event() - return self.super.addSuccess(test, details=details) + return super().addSuccess(test, details=details) def addSkip(self, test, reason=None, details=None): self._before_event() - return self.super.addSkip(test, reason, details=details) + return super().addSkip(test, reason, details=details) def addExpectedFailure(self, test, err=None, details=None): self._before_event() - return self.super.addExpectedFailure(test, err, details=details) + return super().addExpectedFailure(test, err, details=details) def addUnexpectedSuccess(self, test, details=None): self._before_event() - return self.super.addUnexpectedSuccess(test, details=details) + return super().addUnexpectedSuccess(test, details=details) def progress(self, offset, whence): self._before_event() - return self.super.progress(offset, whence) + return super().progress(offset, whence) def wasSuccessful(self): self._before_event() - return self.super.wasSuccessful() + return super().wasSuccessful() @property def shouldStop(self): self._before_event() - return self.super.shouldStop + return super().shouldStop def stop(self): self._before_event() - return self.super.stop() + return super().stop() def time(self, a_datetime): self._before_event() - return self.super.time(a_datetime) + return super().time(a_datetime) class AutoTimingTestResultDecorator(HookedTestResultDecorator): @@ -273,8 +583,8 @@ class TagCollapsingDecorator(HookedTestResultDecorator, TagsMixin): """Collapses many 'tags' calls into one where possible.""" def __init__(self, result): - super().__init__(result) - self._clear_tags() + HookedTestResultDecorator.__init__(self, result) + TagsMixin.__init__(self) def _before_event(self): self._flush_current_scope(self.decorated) @@ -321,6 +631,9 @@ def make_tag_filter(with_tags, without_tags): without_tags = without_tags and set(without_tags) or None def check_tags(test, outcome, err, details, tags): + # Ensure tags is a set (handle None case) + if tags is None: + tags = set() if with_tags and not with_tags <= tags: return False if without_tags and bool(without_tags & tags): @@ -332,8 +645,8 @@ def check_tags(test, outcome, err, details, tags): class _PredicateFilter(TestResultDecorator, TagsMixin): def __init__(self, result, predicate): - super().__init__(result) - self._clear_tags() + TestResultDecorator.__init__(self, result) + TagsMixin.__init__(self) self.decorated = TimeCollapsingDecorator(TagCollapsingDecorator(self.decorated)) self._predicate = predicate # The current test (for filtering tags) @@ -528,7 +841,7 @@ def _apply_renames(self, test): return test -class TestIdPrintingResult(testtools.TestResult): +class TestIdPrintingResult(unittest.TestResult): """Print test ids to a stream. Implements both TestResult and StreamResult, for compatibility. @@ -633,7 +946,7 @@ def stopTestRun(self): self._end_test(test_id) -class TestByTestResult(testtools.TestResult): +class TestByTestResult(TestResult): """Call something every time a test completes.""" # XXX: In testtools since lp:testtools r249. Once that's released, just @@ -651,6 +964,10 @@ def __init__(self, on_test): super().__init__() self._on_test = on_test + def _now(self): + """Return current datetime with timezone.""" + return datetime.datetime.now(tz=iso8601.UTC) + def startTest(self, test): super().startTest(test) self._start_time = self._now() @@ -679,22 +996,22 @@ def _err_to_details(self, test, err, details): return {"traceback": TracebackContent(err, test)} def addSuccess(self, test, details=None): - super().addSuccess(test) + super().addSuccess(test, details=details) self._status = "success" self._details = details def addFailure(self, test, err=None, details=None): - super().addFailure(test, err, details) + super().addFailure(test, err, details=details) self._status = "failure" self._details = self._err_to_details(test, err, details) def addError(self, test, err=None, details=None): - super().addError(test, err, details) + super().addError(test, err, details=details) self._status = "error" self._details = self._err_to_details(test, err, details) def addSkip(self, test, reason=None, details=None): - super().addSkip(test, reason, details) + super().addSkip(test, reason, details=details) self._status = "skip" if details is None: details = {"reason": text_content(reason)} diff --git a/python/subunit/tests/__init__.py b/python/subunit/tests/__init__.py index 244d624..15e3b92 100644 --- a/python/subunit/tests/__init__.py +++ b/python/subunit/tests/__init__.py @@ -20,10 +20,10 @@ # Before the test module imports to avoid circularity. # For testing: different pythons have different str() implementations. -_remote_exception_repr = "testtools.testresult.real._StringException" -_remote_exception_repr_chunked = "34\r\n" + _remote_exception_repr + ": boo qux\n0\r\n" -_remote_exception_str = "Traceback (most recent call last):\ntesttools.testresult.real._StringException" -_remote_exception_str_chunked = "57\r\n" + _remote_exception_str + ": boo qux\n0\r\n" +_remote_exception_repr = "subunit.test_results._StringException" +_remote_exception_repr_chunked = "2E\r\n" + _remote_exception_repr + ": boo qux\n0\r\n" +_remote_exception_str = "Traceback (most recent call last):\nsubunit.test_results._StringException" +_remote_exception_str_chunked = "51\r\n" + _remote_exception_str + ": boo qux\n0\r\n" from subunit.tests import ( # noqa: E402 diff --git a/python/subunit/tests/test_details.py b/python/subunit/tests/test_details.py index d2a5fd3..5954f9e 100644 --- a/python/subunit/tests/test_details.py +++ b/python/subunit/tests/test_details.py @@ -16,51 +16,48 @@ import unittest -from testtools.compat import _b - -from subunit import content, content_type, details +from subunit._content import Content as content, ContentType as content_type +from subunit import details class TestSimpleDetails(unittest.TestCase): def test_lineReceived(self): parser = details.SimpleDetailsParser(None) - parser.lineReceived(_b("foo\n")) - parser.lineReceived(_b("bar\n")) - self.assertEqual(_b("foo\nbar\n"), parser._message) + parser.lineReceived(b"foo\n") + parser.lineReceived(b"bar\n") + self.assertEqual(b"foo\nbar\n", parser._message) def test_lineReceived_escaped_bracket(self): parser = details.SimpleDetailsParser(None) - parser.lineReceived(_b("foo\n")) - parser.lineReceived(_b(" ]are\n")) - parser.lineReceived(_b("bar\n")) - self.assertEqual(_b("foo\n]are\nbar\n"), parser._message) + parser.lineReceived(b"foo\n") + parser.lineReceived(b" ]are\n") + parser.lineReceived(b"bar\n") + self.assertEqual(b"foo\n]are\nbar\n", parser._message) def test_get_message(self): parser = details.SimpleDetailsParser(None) - self.assertEqual(_b(""), parser.get_message()) + self.assertEqual(b"", parser.get_message()) def test_get_details(self): parser = details.SimpleDetailsParser(None) expected = {} - expected["traceback"] = content.Content( - content_type.ContentType("text", "x-traceback", {"charset": "utf8"}), lambda: [_b("")] - ) + expected["traceback"] = content(content_type("text", "x-traceback", {"charset": "utf8"}), lambda: [b""]) found = parser.get_details() self.assertEqual(expected.keys(), found.keys()) self.assertEqual(expected["traceback"].content_type, found["traceback"].content_type) - self.assertEqual(_b("").join(expected["traceback"].iter_bytes()), _b("").join(found["traceback"].iter_bytes())) + self.assertEqual(b"".join(expected["traceback"].iter_bytes()), b"".join(found["traceback"].iter_bytes())) def test_get_details_skip(self): parser = details.SimpleDetailsParser(None) expected = {} - expected["reason"] = content.Content(content_type.ContentType("text", "plain"), lambda: [_b("")]) + expected["reason"] = content(content_type("text", "plain"), lambda: [b""]) found = parser.get_details("skip") self.assertEqual(expected, found) def test_get_details_success(self): parser = details.SimpleDetailsParser(None) expected = {} - expected["message"] = content.Content(content_type.ContentType("text", "plain"), lambda: [_b("")]) + expected["message"] = content(content_type("text", "plain"), lambda: [b""]) found = parser.get_details("success") self.assertEqual(expected, found) @@ -76,16 +73,14 @@ def test_get_details(self): def test_parts(self): parser = details.MultipartDetailsParser(None) - parser.lineReceived(_b("Content-Type: text/plain\n")) - parser.lineReceived(_b("something\n")) - parser.lineReceived(_b("F\r\n")) - parser.lineReceived(_b("serialised\n")) - parser.lineReceived(_b("form0\r\n")) + parser.lineReceived(b"Content-Type: text/plain\n") + parser.lineReceived(b"something\n") + parser.lineReceived(b"F\r\n") + parser.lineReceived(b"serialised\n") + parser.lineReceived(b"form0\r\n") expected = {} - expected["something"] = content.Content( - content_type.ContentType("text", "plain"), lambda: [_b("serialised\nform")] - ) + expected["something"] = content(content_type("text", "plain"), lambda: [b"serialised\nform"]) found = parser.get_details() self.assertEqual(expected.keys(), found.keys()) self.assertEqual(expected["something"].content_type, found["something"].content_type) - self.assertEqual(_b("").join(expected["something"].iter_bytes()), _b("").join(found["something"].iter_bytes())) + self.assertEqual(b"".join(expected["something"].iter_bytes()), b"".join(found["something"].iter_bytes())) diff --git a/python/subunit/tests/test_subunit_filter.py b/python/subunit/tests/test_subunit_filter.py index 589c71c..7357b42 100644 --- a/python/subunit/tests/test_subunit_filter.py +++ b/python/subunit/tests/test_subunit_filter.py @@ -24,12 +24,119 @@ from testtools import TestCase from testtools.compat import _b -from testtools.testresult.doubles import ExtendedTestResult, StreamResult import iso8601 import subunit from subunit.test_results import make_tag_filter, TestResultFilter from subunit import ByteStreamToStreamResult, StreamResultToBytes +from subunit.test_results import TestResult + + +# StreamResult class that records events for testing +class StreamResult: + def __init__(self): + self._events = [] + + def startTestRun(self): + self._events.append(("startTestRun",)) + + def stopTestRun(self): + self._events.append(("stopTestRun",)) + + def status( + self, + test_id=None, + test_status=None, + test_tags=None, + runnable=True, + file_name=None, + file_bytes=None, + eof=False, + mime_type=None, + route_code=None, + timestamp=None, + ): + self._events.append( + ( + "status", + test_id, + test_status, + test_tags, + runnable, + file_name, + file_bytes, + eof, + mime_type, + route_code, + timestamp, + ) + ) + + +# Test result class that records events for testing +class ExtendedTestResult(TestResult): + def __init__(self): + super().__init__() + self._events = [] + + def startTest(self, test): + self._events.append(("startTest", test)) + super().startTest(test) + + def stopTest(self, test): + self._events.append(("stopTest", test)) + super().stopTest(test) + + def addError(self, test, err=None, details=None): + # For testing, when details is None but we have an error, use empty dict + # When details is provided, use details + if details is not None: + recorded_data = details + elif err is not None: + recorded_data = {} + else: + recorded_data = {} + self._events.append(("addError", test, recorded_data)) + super().addError(test, err, details) + + def addFailure(self, test, err=None, details=None): + self._events.append(("addFailure", test, details if details is not None else err)) + super().addFailure(test, err, details) + + def addSuccess(self, test, details=None): + if details is not None: + self._events.append(("addSuccess", test, details)) + else: + self._events.append(("addSuccess", test)) + super().addSuccess(test, details) + + def addSkip(self, test, reason=None, details=None): + self._events.append(("addSkip", test, details if details is not None else {})) + super().addSkip(test, reason, details) + + def addExpectedFailure(self, test, err=None, details=None): + self._events.append(("addExpectedFailure", test, details if details is not None else err)) + super().addExpectedFailure(test, err, details) + + def addUnexpectedSuccess(self, test, details=None): + self._events.append(("addUnexpectedSuccess", test, details)) + super().addUnexpectedSuccess(test, details) + + def startTestRun(self): + self._events.append(("startTestRun",)) + super().startTestRun() + + def stopTestRun(self): + self._events.append(("stopTestRun",)) + super().stopTestRun() + + def time(self, a_time): + self._events.append(("time", a_time)) + super().time(a_time) + + def tags(self, new_tags, gone_tags): + self._events.append(("tags", new_tags, gone_tags)) + super().tags(new_tags, gone_tags) class TestTestResultFilter(TestCase): diff --git a/python/subunit/tests/test_test_protocol.py b/python/subunit/tests/test_test_protocol.py index 93020ea..1998037 100644 --- a/python/subunit/tests/test_test_protocol.py +++ b/python/subunit/tests/test_test_protocol.py @@ -21,21 +21,9 @@ import unittest from io import BytesIO -from testtools import PlaceHolder, TestCase, TestResult, skipIf -from testtools.compat import _b, _u -from testtools.content import Content, TracebackContent, text_content -from testtools.content_type import ContentType - -try: - from testtools.testresult.doubles import ExtendedTestResult, Python26TestResult, Python27TestResult -except ImportError: - from testtools.tests.helpers import ( - Python26TestResult, - Python27TestResult, - ExtendedTestResult, - ) - -from testtools.matchers import Contains, Equals, MatchesAny +from subunit._content import Content, TracebackContent, text_content, ContentType +from subunit.test_results import TestResult + import iso8601 @@ -43,17 +31,213 @@ from subunit.tests import ( _remote_exception_repr, _remote_exception_repr_chunked, - _remote_exception_str, - _remote_exception_str_chunked, ) tb_prelude = "Traceback (most recent call last):\n" +# Helper functions +def _b(s): + """Convert string to bytes.""" + if isinstance(s, str): + return s.encode("latin-1") + return s + + +def _u(s): + """Convert bytes to string.""" + if isinstance(s, bytes): + return s.decode("latin-1") + return s + + +# Test case base class +TestCase = unittest.TestCase + + +# Placeholder test class +class PlaceHolder: + def __init__(self, test_id): + self._test_id = test_id + + def id(self): + return self._test_id + + +# Skip decorator +def skipIf(condition, reason): + """Skip decorator for tests.""" + return unittest.skipIf(condition, reason) + + +# Test result classes for testing +class Python27TestResult(TestResult): + """TestResult that records events like Python 2.7.""" + + def __init__(self): + super().__init__() + self._events = [] + + def startTest(self, test): + self._events.append(("startTest", test)) + super().startTest(test) + + def stopTest(self, test): + self._events.append(("stopTest", test)) + super().stopTest(test) + + def addSuccess(self, test, details=None): + self._events.append(("addSuccess", test)) + super().addSuccess(test, details) + + def addError(self, test, error=None, details=None): + # Python 2.7 style: convert details to error for compatibility + if error is None and details is not None: + import subunit + + error = subunit.RemoteError("") + self._events.append(("addError", test, error)) + super().addError(test, error, details) + + def addFailure(self, test, error=None, details=None): + # Python 2.7 style: convert details to error for compatibility + if error is None and details is not None: + import subunit + + error = subunit.RemoteError("") + self._events.append(("addFailure", test, error)) + super().addFailure(test, error, details) + + def addSkip(self, test, reason, details=None): + self._events.append(("addSkip", test, reason)) + super().addSkip(test, reason, details) + + def addExpectedFailure(self, test, error=None, details=None): + # Python 2.7 style: convert details to error for compatibility + if error is None and details is not None: + import subunit + + error = subunit.RemoteError("") + + if error: + self._events.append(("addExpectedFailure", test, error)) + else: + self._events.append(("addExpectedFailure", test)) + super().addExpectedFailure(test, error, details) + + def addUnexpectedSuccess(self, test, details=None): + self._events.append(("addUnexpectedSuccess", test)) + super().addUnexpectedSuccess(test, details) + + +class ExtendedTestResult(TestResult): + """TestResult that records events with full details support.""" + + def __init__(self): + super().__init__() + self._events = [] + self.current_tags = set() + + def startTest(self, test): + self._events.append(("startTest", test)) + super().startTest(test) + + def stopTest(self, test): + self._events.append(("stopTest", test)) + super().stopTest(test) + + def addSuccess(self, test, details=None): + if details is not None: + self._events.append(("addSuccess", test, details)) + else: + self._events.append(("addSuccess", test)) + super().addSuccess(test, details) + + def addError(self, test, err, details=None): + if details is not None: + self._events.append(("addError", test, details)) + else: + self._events.append(("addError", test, err)) + super().addError(test, err, details) + + def addFailure(self, test, err, details=None): + if details is not None: + self._events.append(("addFailure", test, details)) + else: + self._events.append(("addFailure", test, err)) + super().addFailure(test, err, details) + + def addSkip(self, test, reason, details=None): + if details is not None: + self._events.append(("addSkip", test, details)) + else: + self._events.append(("addSkip", test, reason)) + super().addSkip(test, reason, details) + + def addExpectedFailure(self, test, err, details=None): + if details is not None: + self._events.append(("addExpectedFailure", test, details)) + elif err: + self._events.append(("addExpectedFailure", test, err)) + else: + self._events.append(("addExpectedFailure", test)) + super().addExpectedFailure(test, err, details) + + def addUnexpectedSuccess(self, test, details=None): + if details is not None: + self._events.append(("addUnexpectedSuccess", test, details)) + else: + self._events.append(("addUnexpectedSuccess", test)) + super().addUnexpectedSuccess(test, details) + + def progress(self, offset, whence): + self._events.append(("progress", offset, whence)) + super().progress(offset, whence) + + def tags(self, new_tags, gone_tags): + self._events.append(("tags", new_tags, gone_tags)) + super().tags(new_tags, gone_tags) + + def time(self, a_time): + self._events.append(("time", a_time)) + super().time(a_time) + + def details_to_str(details): return TestResult()._err_details_to_string(None, details=details) +def details_equal(details1, details2): + """Compare two details dicts by content rather than object identity.""" + # Handle None and empty dict as equivalent + if (details1 is None or details1 == {}) and (details2 is None or details2 == {}): + return True + if details1 is None or details2 is None: + return False + if set(details1.keys()) != set(details2.keys()): + return False + + for key in details1: + content1 = details1[key] + content2 = details2[key] + + # Compare content types + if ( + content1.content_type.type != content2.content_type.type + or content1.content_type.subtype != content2.content_type.subtype + or content1.content_type.parameters != content2.content_type.parameters + ): + return False + + # Compare actual content bytes + bytes1 = b"".join(content1.iter_bytes()) + bytes2 = b"".join(content2.iter_bytes()) + if bytes1 != bytes2: + return False + + return True + + class TestHelpers(TestCase): def test__unwrap_text_file_read_mode(self): fd, file_path = tempfile.mkstemp() @@ -157,7 +341,7 @@ def test_non_test_characters_forwarded_immediately(self): class TestTestProtocolServerStartTest(unittest.TestCase): def setUp(self): - self.client = Python26TestResult() + self.client = Python27TestResult() self.stream = BytesIO() self.protocol = subunit.TestProtocolServer(self.client, self.stream) @@ -341,7 +525,7 @@ def test_stdout_passthrough(self): class TestTestProtocolServerLostConnection(unittest.TestCase): def setUp(self): - self.client = Python26TestResult() + self.client = Python27TestResult() self.protocol = subunit.TestProtocolServer(self.client) self.test = subunit.RemotedTestCase("old mcdonald") @@ -538,14 +722,23 @@ def setUp(self): self.test = subunit.RemotedTestCase("mcdonalds farm") def assertFailure(self, details): - self.assertEqual( - [ - ("startTest", self.test), - ("addFailure", self.test, details), - ("stopTest", self.test), - ], - self.client._events, - ) + # Check we have the right number of events + self.assertEqual(3, len(self.client._events)) + + # Check event types and test objects + self.assertEqual("startTest", self.client._events[0][0]) + self.assertEqual(self.test, self.client._events[0][1]) + + self.assertEqual("addFailure", self.client._events[1][0]) + self.assertEqual(self.test, self.client._events[1][1]) + + # Check details match using content comparison + actual_details = self.client._events[1][2] if len(self.client._events[1]) > 2 else None + if not details_equal(details, actual_details): + self.fail(f"Details do not match.\nExpected: {details}\nActual: {actual_details}") + + self.assertEqual("stopTest", self.client._events[2][0]) + self.assertEqual(self.test, self.client._events[2][1]) def simple_failure_keyword(self, keyword): self.protocol.lineReceived(_b("%s mcdonalds farm\n" % keyword)) @@ -590,11 +783,6 @@ class TestTestProtocolServerAddxFail(unittest.TestCase): def capture_expected_failure(self, test, err): self._events.append((test, err)) - def setup_python26(self): - """Setup a test object ready to be xfailed and thunk to success.""" - self.client = Python26TestResult() - self.setup_protocol() - def setup_python27(self): """Setup a test object ready to be xfailed.""" self.client = Python27TestResult() @@ -648,24 +836,18 @@ def check_success_or_xfail(self, as_success, error_message=None): ) def test_simple_xfail(self): - self.setup_python26() - self.simple_xfail_keyword("xfail", True) self.setup_python27() self.simple_xfail_keyword("xfail", False) self.setup_python_ex() self.simple_xfail_keyword("xfail", False) def test_simple_xfail_colon(self): - self.setup_python26() - self.simple_xfail_keyword("xfail:", True) self.setup_python27() self.simple_xfail_keyword("xfail:", False) self.setup_python_ex() self.simple_xfail_keyword("xfail:", False) def test_xfail_empty_message(self): - self.setup_python26() - self.empty_message(True) self.setup_python27() self.empty_message(False) self.setup_python_ex() @@ -685,16 +867,12 @@ def xfail_quoted_bracket(self, keyword, as_success): self.check_success_or_xfail(as_success, "]\n") def test_xfail_quoted_bracket(self): - self.setup_python26() - self.xfail_quoted_bracket("xfail", True) self.setup_python27() self.xfail_quoted_bracket("xfail", False) self.setup_python_ex() self.xfail_quoted_bracket("xfail", False) def test_xfail_colon_quoted_bracket(self): - self.setup_python26() - self.xfail_quoted_bracket("xfail:", True) self.setup_python27() self.xfail_quoted_bracket("xfail:", False) self.setup_python_ex() @@ -707,11 +885,6 @@ class TestTestProtocolServerAddunexpectedSuccess(TestCase): def capture_expected_failure(self, test, err): self._events.append((test, err)) - def setup_python26(self): - """Setup a test object ready to be xfailed and thunk to success.""" - self.client = Python26TestResult() - self.setup_protocol() - def setup_python27(self): """Setup a test object ready to be xfailed.""" self.client = Python27TestResult() @@ -774,24 +947,18 @@ def check_fail_or_uxsuccess(self, as_fail, error_message=None): ) def test_simple_uxsuccess(self): - self.setup_python26() - self.simple_uxsuccess_keyword("uxsuccess", True) self.setup_python27() self.simple_uxsuccess_keyword("uxsuccess", False) self.setup_python_ex() self.simple_uxsuccess_keyword("uxsuccess", False) def test_simple_uxsuccess_colon(self): - self.setup_python26() - self.simple_uxsuccess_keyword("uxsuccess:", True) self.setup_python27() self.simple_uxsuccess_keyword("uxsuccess:", False) self.setup_python_ex() self.simple_uxsuccess_keyword("uxsuccess:", False) def test_uxsuccess_empty_message(self): - self.setup_python26() - self.empty_message(True) self.setup_python27() self.empty_message(False) self.setup_python_ex() @@ -809,16 +976,12 @@ def uxsuccess_quoted_bracket(self, keyword, as_fail): self.check_fail_or_uxsuccess(as_fail, "]\n") def test_uxsuccess_quoted_bracket(self): - self.setup_python26() - self.uxsuccess_quoted_bracket("uxsuccess", True) self.setup_python27() self.uxsuccess_quoted_bracket("uxsuccess", False) self.setup_python_ex() self.uxsuccess_quoted_bracket("uxsuccess", False) def test_uxsuccess_colon_quoted_bracket(self): - self.setup_python26() - self.uxsuccess_quoted_bracket("uxsuccess:", True) self.setup_python27() self.uxsuccess_quoted_bracket("uxsuccess:", False) self.setup_python_ex() @@ -843,14 +1006,24 @@ def assertSkip(self, reason): details = {} if reason is not None: details["reason"] = Content(ContentType("text", "plain"), lambda: [reason]) - self.assertEqual( - [ - ("startTest", self.test), - ("addSkip", self.test, details), - ("stopTest", self.test), - ], - self.client._events, - ) + + # Check we have the right number of events + self.assertEqual(3, len(self.client._events)) + + # Check event types and test objects + self.assertEqual("startTest", self.client._events[0][0]) + self.assertEqual(self.test, self.client._events[0][1]) + + self.assertEqual("addSkip", self.client._events[1][0]) + self.assertEqual(self.test, self.client._events[1][1]) + + # Check details match using content comparison + actual_details = self.client._events[1][2] if len(self.client._events[1]) > 2 else None + if not details_equal(details, actual_details): + self.fail(f"Details do not match.\nExpected: {details}\nActual: {actual_details}") + + self.assertEqual("stopTest", self.client._events[2][0]) + self.assertEqual(self.test, self.client._events[2][1]) def simple_skip_keyword(self, keyword): self.protocol.lineReceived(_b("%s mcdonalds farm\n" % keyword)) @@ -907,14 +1080,23 @@ def test_simple_success_colon(self): self.simple_success_keyword("successful:") def assertSuccess(self, details): - self.assertEqual( - [ - ("startTest", self.test), - ("addSuccess", self.test, details), - ("stopTest", self.test), - ], - self.client._events, - ) + # Check we have the right number of events + self.assertEqual(3, len(self.client._events)) + + # Check event types and test objects + self.assertEqual("startTest", self.client._events[0][0]) + self.assertEqual(self.test, self.client._events[0][1]) + + self.assertEqual("addSuccess", self.client._events[1][0]) + self.assertEqual(self.test, self.client._events[1][1]) + + # Check details match using content comparison + actual_details = self.client._events[1][2] if len(self.client._events[1]) > 2 else None + if not details_equal(details, actual_details): + self.fail(f"Details do not match.\nExpected: {details}\nActual: {actual_details}") + + self.assertEqual("stopTest", self.client._events[2][0]) + self.assertEqual(self.test, self.client._events[2][1]) def test_success_empty_message(self): self.protocol.lineReceived(_b("success mcdonalds farm [\n")) @@ -944,7 +1126,7 @@ class TestTestProtocolServerProgress(unittest.TestCase): """Test receipt of progress: directives.""" def test_progress_accepted_stdlib(self): - self.result = Python26TestResult() + self.result = TestResult() self.stream = BytesIO() self.protocol = subunit.TestProtocolServer(self.result, stream=self.stream) self.protocol.lineReceived(_b("progress: 23")) @@ -1023,7 +1205,7 @@ class TestTestProtocolServerStreamTime(unittest.TestCase): """Test managing time information at the protocol level.""" def test_time_accepted_stdlib(self): - self.result = Python26TestResult() + self.result = TestResult() self.stream = BytesIO() self.protocol = subunit.TestProtocolServer(self.result, stream=self.stream) self.protocol.lineReceived(_b("time: 2001-12-12 12:59:59Z\n")) @@ -1265,150 +1447,76 @@ def test_add_success_details(self): def test_add_failure(self): """Test addFailure on a TestProtocolClient.""" self.protocol.addFailure(self.test, subunit.RemoteError(_u("boo qux"))) - self.assertThat( + self.assertEqual( self.io.getvalue(), - MatchesAny( - # testtools < 2.5.0 - Equals(_b(("failure: %s [\n" + _remote_exception_str + ": boo qux\n" + "]\n") % self.test.id())), - # testtools >= 2.5.0 - Equals(_b(("failure: %s [\n" + _remote_exception_repr + ": boo qux\n" + "]\n") % self.test.id())), - ), + _b(("failure: %s [\n" + _remote_exception_repr + ": boo qux\n" + "]\n") % self.test.id()), ) def test_add_failure_details(self): """Test addFailure on a TestProtocolClient with details.""" self.protocol.addFailure(self.test, details=self.sample_tb_details) - self.assertThat( + self.assertEqual( self.io.getvalue(), - MatchesAny( - # testtools < 2.5.0 - Equals( - _b( - ( - "failure: %s [ multipart\n" - "Content-Type: text/plain\n" - "something\n" - "F\r\nserialised\nform0\r\n" - "Content-Type: text/x-traceback;charset=utf8,language=python\n" - "traceback\n" + _remote_exception_str_chunked + "]\n" - ) - % self.test.id() - ) - ), - # testtools >= 2.5.0 - Equals( - _b( - ( - "failure: %s [ multipart\n" - "Content-Type: text/plain\n" - "something\n" - "F\r\nserialised\nform0\r\n" - "Content-Type: text/x-traceback;charset=utf8,language=python\n" - "traceback\n" + _remote_exception_repr_chunked + "]\n" - ) - % self.test.id() - ) - ), + _b( + ( + "failure: %s [ multipart\n" + "Content-Type: text/plain\n" + "something\n" + "F\r\nserialised\nform0\r\n" + "Content-Type: text/x-traceback;charset=utf8,language=python\n" + "traceback\n" + _remote_exception_repr_chunked + "]\n" + ) + % self.test.id() ), ) def test_add_error(self): """Test stopTest on a TestProtocolClient.""" self.protocol.addError(self.test, subunit.RemoteError(_u("phwoar crikey"))) - self.assertThat( - self.io.getvalue(), - MatchesAny( - # testtools < 2.5.0 - Equals(_b(("error: %s [\n" + _remote_exception_str + ": phwoar crikey\n]\n") % self.test.id())), - # testtools >= 2.5.0 - Equals(_b(("error: %s [\n" + _remote_exception_repr + ": phwoar crikey\n]\n") % self.test.id())), - ), + self.assertEqual( + self.io.getvalue(), _b(("error: %s [\n" + _remote_exception_repr + ": phwoar crikey\n]\n") % self.test.id()) ) def test_add_error_details(self): """Test stopTest on a TestProtocolClient with details.""" self.protocol.addError(self.test, details=self.sample_tb_details) - self.assertThat( + self.assertEqual( self.io.getvalue(), - MatchesAny( - # testtools < 2.5.0 - Equals( - _b( - ( - "error: %s [ multipart\n" - "Content-Type: text/plain\n" - "something\n" - "F\r\nserialised\nform0\r\n" - "Content-Type: text/x-traceback;charset=utf8,language=python\n" - "traceback\n" + _remote_exception_str_chunked + "]\n" - ) - % self.test.id() - ) - ), - # testtools >= 2.5.0 - Equals( - _b( - ( - "error: %s [ multipart\n" - "Content-Type: text/plain\n" - "something\n" - "F\r\nserialised\nform0\r\n" - "Content-Type: text/x-traceback;charset=utf8,language=python\n" - "traceback\n" + _remote_exception_repr_chunked + "]\n" - ) - % self.test.id() - ) - ), + _b( + ( + "error: %s [ multipart\n" + "Content-Type: text/plain\n" + "something\n" + "F\r\nserialised\nform0\r\n" + "Content-Type: text/x-traceback;charset=utf8,language=python\n" + "traceback\n" + _remote_exception_repr_chunked + "]\n" + ) + % self.test.id() ), ) def test_add_expected_failure(self): """Test addExpectedFailure on a TestProtocolClient.""" self.protocol.addExpectedFailure(self.test, subunit.RemoteError(_u("phwoar crikey"))) - self.assertThat( - self.io.getvalue(), - MatchesAny( - # testtools < 2.5.0 - Equals(_b(("xfail: %s [\n" + _remote_exception_str + ": phwoar crikey\n]\n") % self.test.id())), - # testtools >= 2.5.0 - Equals(_b(("xfail: %s [\n" + _remote_exception_repr + ": phwoar crikey\n]\n") % self.test.id())), - ), + self.assertEqual( + self.io.getvalue(), _b(("xfail: %s [\n" + _remote_exception_repr + ": phwoar crikey\n]\n") % self.test.id()) ) def test_add_expected_failure_details(self): """Test addExpectedFailure on a TestProtocolClient with details.""" self.protocol.addExpectedFailure(self.test, details=self.sample_tb_details) - self.assertThat( + self.assertEqual( self.io.getvalue(), - MatchesAny( - # testtools < 2.5.0 - Equals( - _b( - ( - "xfail: %s [ multipart\n" - "Content-Type: text/plain\n" - "something\n" - "F\r\nserialised\nform0\r\n" - "Content-Type: text/x-traceback;charset=utf8,language=python\n" - "traceback\n" + _remote_exception_str_chunked + "]\n" - ) - % self.test.id() - ) - ), - # testtools >= 2.5.0 - Equals( - _b( - ( - "xfail: %s [ multipart\n" - "Content-Type: text/plain\n" - "something\n" - "F\r\nserialised\nform0\r\n" - "Content-Type: text/x-traceback;charset=utf8,language=python\n" - "traceback\n" + _remote_exception_repr_chunked + "]\n" - ) - % self.test.id() - ) - ), + _b( + ( + "xfail: %s [ multipart\n" + "Content-Type: text/plain\n" + "something\n" + "F\r\nserialised\nform0\r\n" + "Content-Type: text/x-traceback;charset=utf8,language=python\n" + "traceback\n" + _remote_exception_repr_chunked + "]\n" + ) + % self.test.id() ), ) @@ -1479,7 +1587,7 @@ def test_tags_add(self): def test_tags_both(self): self.protocol.tags({"quux"}, {"bar"}) - self.assertThat([b"tags: quux -bar\n", b"tags: -bar quux\n"], Contains(self.io.getvalue())) + self.assertIn(self.io.getvalue(), [b"tags: quux -bar\n", b"tags: -bar quux\n"]) def test_tags_gone(self): self.protocol.tags(set(), {"bar"}) diff --git a/python/subunit/tests/test_test_results.py b/python/subunit/tests/test_test_results.py index 793256b..faf2ade 100644 --- a/python/subunit/tests/test_test_results.py +++ b/python/subunit/tests/test_test_results.py @@ -22,7 +22,7 @@ import testtools from testtools import TestCase -from testtools.content import TracebackContent, text_content +from subunit._content import TracebackContent, text_content from testtools.testresult.doubles import ExtendedTestResult import subunit