Skip to content

Commit a2705f0

Browse files
Add pytest snapshot fixture (#2688)
The snapshot decorator worked ok for basic integration tests but isn't compatible for more advanced pytest usage like parametrization. By creating a fixture we can `pytest.mark.snapshot` test cases which will make use of a `snapshot` pytest fixture which supports more advanced pytest usage. Co-authored-by: Brett Langdon <[email protected]>
1 parent f9bab03 commit a2705f0

File tree

2 files changed

+61
-12
lines changed

2 files changed

+61
-12
lines changed

tests/conftest.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,13 @@
55
from tests.utils import DummyTracer
66
from tests.utils import TracerSpanContainer
77
from tests.utils import call_program
8+
from tests.utils import snapshot_context
9+
10+
11+
def pytest_configure(config):
12+
config.addinivalue_line(
13+
"markers", "snapshot(*args, **kwargs): mark test to run as a snapshot test which sends traces to the test agent"
14+
)
815

916

1017
@pytest.fixture
@@ -37,3 +44,19 @@ def _run(code, **kwargs):
3744
return call_program("ddtrace-run", sys.executable, str(pyfile), **kwargs)
3845

3946
yield _run
47+
48+
49+
@pytest.fixture(autouse=True)
50+
def snapshot(request):
51+
marks = [m for m in request.node.iter_markers(name="snapshot")]
52+
assert len(marks) < 2, "Multiple snapshot marks detected"
53+
if marks:
54+
snap = marks[0]
55+
token = ""
56+
token += request.module.__name__
57+
token += ".%s" % request.cls.__name__ if request.cls else ""
58+
token += ".%s" % request.node.name
59+
with snapshot_context(token, *snap.args, **snap.kwargs) as snapshot:
60+
yield snapshot
61+
else:
62+
yield

tests/utils.py

Lines changed: 38 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import sys
77
from typing import List
88

9+
import attr
910
import pytest
1011

1112
import ddtrace
@@ -19,6 +20,7 @@
1920
from ddtrace.internal.compat import to_unicode
2021
from ddtrace.internal.encoding import JSONEncoder
2122
from ddtrace.internal.writer import AgentWriter
23+
from ddtrace.utils.formats import parse_tags_str
2224
from ddtrace.vendor import wrapt
2325
from tests.subprocesstest import SubprocessTestCase
2426

@@ -820,8 +822,30 @@ class SnapshotFailed(Exception):
820822
pass
821823

822824

825+
@attr.s
826+
class SnapshotTest(object):
827+
token = attr.ib(type=str)
828+
tracer = attr.ib(type=ddtrace.Tracer, default=ddtrace.tracer)
829+
830+
def clear(self):
831+
"""Clear any traces sent that were sent for this snapshot."""
832+
parsed = parse.urlparse(self.tracer.writer.agent_url)
833+
conn = httplib.HTTPConnection(parsed.hostname, parsed.port)
834+
conn.request("GET", "/test/clear?token=%s" % self.token)
835+
resp = conn.getresponse()
836+
assert resp.status == 200
837+
838+
823839
@contextmanager
824-
def snapshot_context(token, ignores=None, tracer=None, async_mode=True):
840+
def snapshot_context(token, ignores=None, tracer=None, async_mode=True, variants=None):
841+
# Use variant that applies to update test token. One must apply. If none
842+
# apply, the test should have been marked as skipped.
843+
if variants:
844+
applicable_variant_ids = [k for (k, v) in variants.items() if v]
845+
assert len(applicable_variant_ids) == 1
846+
variant_id = applicable_variant_ids[0]
847+
token = "{}_{}".format(token, variant_id) if variant_id else token
848+
825849
ignores = ignores or []
826850
if not tracer:
827851
tracer = ddtrace.tracer
@@ -839,6 +863,13 @@ def snapshot_context(token, ignores=None, tracer=None, async_mode=True):
839863
if async_mode:
840864
# Patch the tracer writer to include the test token header for all requests.
841865
tracer.writer._headers["X-Datadog-Test-Token"] = token
866+
867+
# Also add a header to the environment for subprocesses test cases that might use snapshotting.
868+
existing_headers = parse_tags_str(os.environ.get("_DD_TRACE_WRITER_ADDITIONAL_HEADERS", ""))
869+
existing_headers.update({"X-Datadog-Test-Token": token})
870+
os.environ["_DD_TRACE_WRITER_ADDITIONAL_HEADERS"] = ",".join(
871+
["%s:%s" % (k, v) for k, v in existing_headers.items()]
872+
)
842873
else:
843874
# Signal the start of this test case to the test agent.
844875
try:
@@ -851,14 +882,17 @@ def snapshot_context(token, ignores=None, tracer=None, async_mode=True):
851882
# The test agent returns nice error messages we can forward to the user.
852883
raise SnapshotFailed(r.read())
853884

854-
# Return context to the caller
855885
try:
856-
yield
886+
yield SnapshotTest(
887+
tracer=tracer,
888+
token=token,
889+
)
857890
finally:
858891
# Force a flush so all traces are submitted.
859892
tracer.writer.flush_queue()
860893
if async_mode:
861894
del tracer.writer._headers["X-Datadog-Test-Token"]
895+
del os.environ["_DD_TRACE_WRITER_ADDITIONAL_HEADERS"]
862896

863897
# Query for the results of the test.
864898
conn = httplib.HTTPConnection(parsed.hostname, parsed.port)
@@ -917,15 +951,7 @@ def wrapper(wrapped, instance, args, kwargs):
917951
else token_override
918952
)
919953

920-
# Use variant that applies to update test token. One must apply. If none
921-
# apply, the test should have been marked as skipped.
922-
if variants:
923-
applicable_variant_ids = [k for (k, v) in variants.items() if v]
924-
assert len(applicable_variant_ids) == 1
925-
variant_id = applicable_variant_ids[0]
926-
token = "{}_{}".format(token, variant_id) if variant_id else token
927-
928-
with snapshot_context(token, ignores=ignores, tracer=tracer, async_mode=async_mode):
954+
with snapshot_context(token, ignores=ignores, tracer=tracer, async_mode=async_mode, variants=variants):
929955
# Run the test.
930956
if include_tracer:
931957
kwargs["tracer"] = tracer

0 commit comments

Comments
 (0)