Skip to content

Commit ae5b6d4

Browse files
committed
Add jp-compliance test
1 parent 3381a3c commit ae5b6d4

File tree

1 file changed

+222
-0
lines changed

1 file changed

+222
-0
lines changed

bin/jp-compliance

Lines changed: 222 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,222 @@
1+
#!/usr/bin/env python
2+
"""JMESPath compliance test runner.
3+
4+
This is a test runner that will run the JMESPath compliance tests against a
5+
JMESPath executable.
6+
7+
Compliance tests are broken down into three components:
8+
9+
* The filename that contains the test. These are grouped by feature. The
10+
* test group within the file. A test group can have multiple tests. The
11+
* test case number. This is an individual test.
12+
13+
If "-t/--tests" is not provided then all compliance tests are run.
14+
You can specify which tests to run using the "-t/--tests" argument.
15+
Each test is specified as a comma separated list consisting of
16+
"category,group_number,test_number". The group number and test number
17+
are optional. If no test number if provided all tests within the group
18+
are run. If no group number is given, all tests in that category
19+
are run. To see a list of categories use the "-l/--list" option.
20+
Multiple comma separates values are space separated.
21+
22+
When a test failure occurs, the category, group number, and test number are
23+
displayed in the failure message. This allows you to quickly rerun a specific
24+
test.
25+
26+
Examples
27+
========
28+
29+
These examples show how to run the compliance tests against the "jp"
30+
executable.
31+
32+
Run all the basic tests::
33+
34+
jp-compliance -e jp -t basic
35+
36+
Run all the basic tests in group 1::
37+
38+
jp-compliance -e jp -t basic,1
39+
40+
Run the filter and function tests::
41+
42+
jp-compliance -e jp -t filters functions
43+
44+
Run the filter and function tests in group 1::
45+
46+
jp-compliance -e jp -t filters,1 functions,1
47+
48+
"""
49+
50+
import sys
51+
import argparse
52+
import os
53+
import subprocess
54+
import json
55+
import shlex
56+
57+
58+
if sys.version_info[:2] == (2, 6):
59+
import simplejson as json
60+
from ordereddict import OrderedDict
61+
else:
62+
import json
63+
from collections import OrderedDict
64+
65+
66+
_abs = os.path.abspath
67+
_dname = os.path.dirname
68+
_pjoin = os.path.join
69+
_splitext = os.path.splitext
70+
_bname = os.path.basename
71+
72+
73+
class ComplianceTestRunner(object):
74+
TEST_DIR = _pjoin(_dname(_dname(_abs(__file__))), 'tests')
75+
76+
def __init__(self, exe=None, tests=None, test_dir=TEST_DIR):
77+
self.test_dir= test_dir
78+
self.tests = tests
79+
self.jp_executable = exe
80+
81+
def run_tests(self):
82+
for test_case in self._test_cases():
83+
if self._should_run(test_case):
84+
self._run_test(test_case)
85+
86+
def _should_run(self, test_case):
87+
if not self.tests:
88+
return True
89+
# Specific tests were called out so we need
90+
# at least one thing in self.tests to match
91+
# in order to run the test.
92+
for allowed_test in self.tests:
93+
if self._is_subset(allowed_test, test_case):
94+
return True
95+
return False
96+
97+
def _is_subset(self, subset, fullset):
98+
for key in subset:
99+
if subset[key] != fullset.get(key):
100+
return False
101+
return True
102+
103+
def _load_test_file(self, test_json_file):
104+
with open(test_json_file) as f:
105+
loaded_test = json.loads(f.read(), object_pairs_hook=OrderedDict)
106+
return loaded_test
107+
108+
def _load_test_cases(self, filename, group_number, test_group):
109+
given = test_group['given']
110+
for i, case in enumerate(test_group['cases']):
111+
current = {"given": given, "group_number": group_number,
112+
"test_number": i,
113+
'category': _splitext(_bname(filename))[0]}
114+
current.update(case)
115+
yield current
116+
117+
def _test_cases(self):
118+
for test_json_file in self.get_compliance_test_files():
119+
test_groups = self._load_test_file(test_json_file)
120+
for i, test_group in enumerate(test_groups):
121+
test_cases = self._load_test_cases(test_json_file, i, test_group)
122+
for test_case in test_cases:
123+
yield test_case
124+
125+
def _run_test(self, test_case):
126+
command = shlex.split(self.jp_executable)
127+
command.append(test_case['expression'])
128+
process = subprocess.Popen(command, stdout=subprocess.PIPE,
129+
stderr=subprocess.PIPE,
130+
stdin=subprocess.PIPE)
131+
process.stdin.write(json.dumps(test_case['given']))
132+
process.stdin.flush()
133+
stdout, stderr = process.communicate()
134+
if 'result' in test_case:
135+
actual = json.loads(stdout)
136+
expected = test_case['result']
137+
if not actual == expected:
138+
self._show_failure(actual, test_case)
139+
else:
140+
sys.stdout.write('.')
141+
sys.stdout.flush()
142+
else:
143+
error_type = test_case['error']
144+
# For errors, we expect the error type on stderr.
145+
if error_type not in stderr:
146+
self._show_failure_for_error(stderr, test_case)
147+
else:
148+
sys.stdout.write('.')
149+
sys.stdout.flush()
150+
151+
def _show_failure(self, actual, test_case):
152+
test_case['actual'] = json.dumps(actual)
153+
test_case['result'] = json.dumps(test_case['result'])
154+
failure_message = (
155+
"\nFAIL {category},{group_number},{test_number}\n"
156+
"The expression: {expression}\n"
157+
"was suppose to give: {result}\n"
158+
"but instead gave: {actual}\n"
159+
).format(**test_case)
160+
sys.stdout.write(failure_message)
161+
162+
def _show_failure_for_error(self, stderr, test_case):
163+
test_case['stderr'] = stderr
164+
failure_message = (
165+
"\nFAIL {category},{group_number},{test_number}\n"
166+
"The expression: {expression}\n"
167+
"was suppose to emit the error: {error}\n"
168+
"but instead gave: \n{stderr}\n"
169+
).format(**test_case)
170+
sys.stdout.write(failure_message)
171+
172+
def get_compliance_test_files(self):
173+
for root, dirnames, filenames in os.walk(self.test_dir):
174+
for filename in filenames:
175+
if filename.endswith('.json'):
176+
full_path = _pjoin(root, filename)
177+
yield full_path
178+
179+
180+
def display_available_tests(test_files):
181+
print("Available test types:\n")
182+
for filename in test_files:
183+
no_extension = os.path.splitext(os.path.basename(filename))[0]
184+
print(no_extension)
185+
186+
187+
def test_spec(value):
188+
parts = value.split(',')
189+
if not parts:
190+
raise ValueError("%s should be a comma separated list." % value)
191+
spec = {'category': parts[0]}
192+
if len(parts) == 2:
193+
spec['group_number'] = int(parts[1])
194+
elif len(parts) == 3:
195+
spec['group_number'] = int(parts[1])
196+
spec['test_number'] = int(parts[2])
197+
return spec
198+
199+
200+
def main():
201+
parser = argparse.ArgumentParser(usage=__doc__)
202+
parser.add_argument('-e', '--exe', help='The JMESPath executable to use.')
203+
parser.add_argument('-t', '--tests', help=('The compliance tests to run. '
204+
'If this value is not provided, '
205+
'then all compliance tests are '
206+
'run.'), type=test_spec, nargs='+')
207+
parser.add_argument('-l', '--list', action="store_true",
208+
help=('List the available compliance tests to run. '
209+
'These values can then be used with the '
210+
'"-t/--tests" argument. If this argument is '
211+
'specified, no tests will actually be run.'))
212+
args = parser.parse_args()
213+
runner = ComplianceTestRunner(args.exe, args.tests)
214+
if args.list:
215+
display_available_tests(runner.get_compliance_test_files())
216+
else:
217+
runner.run_tests()
218+
sys.stdout.write('\n')
219+
220+
221+
if __name__ == '__main__':
222+
main()

0 commit comments

Comments
 (0)