-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathplugin.py
More file actions
222 lines (176 loc) · 6.94 KB
/
plugin.py
File metadata and controls
222 lines (176 loc) · 6.94 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
import json
import os.path
import shutil
import tempfile
import warnings
from os import listdir
from os.path import isfile
import pytest
from django.test.utils import CaptureQueriesContext
from pytest_django_queries.utils import create_backup
# Defines the plugin marker name
PYTEST_QUERY_COUNT_MARKER = "count_queries"
PYTEST_QUERY_COUNT_FIXTURE_NAME = "count_queries"
DEFAULT_RESULT_FILENAME = ".pytest-queries"
DEFAULT_OLD_RESULT_FILENAME = ".pytest-queries.old"
def get_worker_input(node):
workerinput = getattr(node, "workerinput", None)
if workerinput is None:
workerinput = node.slaveinput
warnings.warn(
"pytest-xdist<2.0 support will be dropped in pytest-django-queries 2.0",
category=DeprecationWarning,
)
return workerinput
def is_worker(config: pytest.Config):
return hasattr(config, "workerinput") or hasattr(config, "slaveinput")
def get_workerid(config: pytest.Config):
if hasattr(config, "workerinput"):
return config.workerinput["workerid"]
elif hasattr(config, "slaveinput"):
# Deprecated: will be removed in pytest-django-queries 2.0
return config.slaveinput["slaveid"]
else:
return "master"
def save_results_to_json(save_path, backup_path, data):
if backup_path and isfile(save_path):
create_backup(save_path, backup_path)
with open(save_path, "w") as fp:
json.dump(data, fp, indent=2)
def add_entry(request: pytest.FixtureRequest, queries, dirout):
module_name = request.node.module.__name__
test_name = request.node.name
queries = queries[:]
query_count = len(queries)
duplicate_count = query_count - len(set((q["sql"] for q in queries)))
result_line = "%s\t%s\t%d\t%d\n" % (
module_name,
test_name,
query_count,
duplicate_count,
)
save_path = os.path.join(dirout, get_workerid(request.config))
if os.path.isfile(save_path):
mode = "a"
else:
mode = "w"
with open(save_path, mode=mode) as fp:
fp.write(result_line)
def pytest_addoption(parser):
group = parser.getgroup("django-queries")
group.addoption(
"--django-db-bench",
dest="queries_results_save_path",
action="store",
default=DEFAULT_RESULT_FILENAME,
metavar="PATH",
help="Output file for storing the results. Default: .pytest-queries",
)
group.addoption(
"--django-backup-queries",
dest="queries_backup_results",
action="store",
default=None,
metavar="PATH",
help="Whether the old results should be backed up or not before overriding",
nargs="?",
)
@pytest.hookimpl(tryfirst=True)
def pytest_load_initial_conftests(early_config, parser, args):
"""
:param early_config:
:param parser:
:param args:
:type args: tuple|list
:return:
"""
early_config.addinivalue_line(
"markers",
"%s: Mark the test as to have their database query counted."
"" % PYTEST_QUERY_COUNT_MARKER,
)
backup_path = early_config.known_args_namespace.queries_backup_results
# Set default value if the flag was provided without value in arguments
if backup_path is None and "--django-backup-queries" in args:
backup_path = DEFAULT_OLD_RESULT_FILENAME
early_config.known_args_namespace.queries_backup_results = backup_path
def _process_query_count_marker(
config: pytest.Config, request: pytest.FixtureRequest, *_args, **kwargs
):
autouse = kwargs.setdefault("autouse", True)
if autouse:
# Force load
if config.dj_queries_has_django_plugin is True:
request.getfixturevalue("_django_db_marker")
request.getfixturevalue(PYTEST_QUERY_COUNT_FIXTURE_NAME)
@pytest.fixture(autouse=True)
def _pytest_query_marker(request: pytest.FixtureRequest):
"""Use the fixture to count the queries on the current node if it's
marked with 'count_queries'.
Optional keyword-arguments:
- autouse (bool, default: True)
Whether the fixture should be used automatically.
This might be useful if you are executing fixtures
that are making queries and still want to mark the test
but place the fixture manually."""
marker = request.node.get_closest_marker(PYTEST_QUERY_COUNT_MARKER)
if marker:
_process_query_count_marker(
request.config, request, *marker.args, **marker.kwargs
)
def pytest_configure(config: pytest.Config) -> None:
config.django_queries_shared_directory = tempfile.mkdtemp(
prefix="pytest-django-queries"
)
# Stores whether the pytest-django plugin is installed
# Note: both names are valid, 'django' is an alias for 'pytest_django.plugin'
# (https://github.com/pytest-dev/pytest-django/blob/3955b1826f396cecd01d1c059e7703769ad94d81/pyproject.toml#L77-L78)
config.dj_queries_has_django_plugin = config.pluginmanager.hasplugin(
"django"
) or config.pluginmanager.hasplugin("pytest_django.plugin")
def pytest_unconfigure(config: pytest.Config):
results_path = config.django_queries_shared_directory
test_results = {}
for filename in listdir(results_path):
with open(os.path.join(results_path, filename)) as fp:
for result_line in fp.readlines():
result_line = result_line.strip()
if not result_line:
continue
module_name, test_name, query_count, duplicates = result_line.split(
"\t"
)
module_entries = test_results.setdefault(module_name, {})
module_entries[test_name] = {
"query-count": int(query_count),
"duplicates": int(duplicates),
}
if test_results:
save_results_to_json(
save_path=config.known_args_namespace.queries_results_save_path,
backup_path=config.known_args_namespace.queries_backup_results,
data=test_results,
)
# clean up the temporary directory
shutil.rmtree(config.django_queries_shared_directory)
@pytest.hookimpl(optionalhook=True)
def pytest_configure_node(node):
workerinput = get_worker_input(node)
workerinput["_django_queries_shared_dir"] = (
node.config.django_queries_shared_directory
)
def get_shared_directory(request: pytest.FixtureRequest):
"""Returns a unique and temporary directory which can be shared by
master or worker nodes in xdist runs.
"""
if not is_worker(request.config):
return request.config.django_queries_shared_directory
else:
return get_worker_input(request.config)["_django_queries_shared_dir"]
@pytest.fixture
def count_queries(request: pytest.FixtureRequest):
"""Wrap a test to count the number of performed queries."""
from django.db import connection
with CaptureQueriesContext(connection) as context:
yield context
add_entry(request, context, get_shared_directory(request))