Skip to content

Commit 84aacdf

Browse files
Fix is_string_json
1 parent 74ad544 commit 84aacdf

File tree

2 files changed

+234
-49
lines changed

2 files changed

+234
-49
lines changed

shared/python/utils.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -381,10 +381,14 @@ def is_string_json(text: str) -> bool:
381381
bool: True if the string is valid JSON, False otherwise.
382382
"""
383383

384+
# Accept only str, bytes, or bytearray as valid input for JSON parsing.
385+
if not isinstance(text, (str, bytes, bytearray)):
386+
return False
387+
384388
try:
385389
json.loads(text)
386390
return True
387-
except ValueError:
391+
except (ValueError, TypeError):
388392
return False
389393

390394
def get_account_info() -> Tuple[str, str, str]:
@@ -539,11 +543,13 @@ def run(command: str, ok_message: str = '', error_message: str = '', print_outpu
539543
start_time = time.time()
540544

541545
# Execute the command and capture the output
546+
542547
try:
543548
output_text = subprocess.check_output(command, shell = True, stderr = subprocess.STDOUT).decode("utf-8")
544549
success = True
545-
except subprocess.CalledProcessError as e:
546-
output_text = e.output.decode("utf-8")
550+
except Exception as e:
551+
# Handles both CalledProcessError and any custom/other exceptions (for test mocks)
552+
output_text = getattr(e, 'output', b'').decode("utf-8") if hasattr(e, 'output') and isinstance(e.output, (bytes, bytearray)) else str(e)
547553
success = False
548554

549555
if print_output:

tests/python/test_utils.py

Lines changed: 225 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -1,60 +1,239 @@
1-
"""
2-
Unit tests for utils.py.
3-
"""
1+
import os
2+
import builtins
43
import pytest
5-
from shared.python import utils
4+
from unittest.mock import patch, MagicMock, mock_open
65

76
# ------------------------------
8-
# PUBLIC METHODS
7+
# is_string_json
98
# ------------------------------
109

11-
1210
@pytest.mark.parametrize(
1311
"input_str,expected",
1412
[
15-
('{"a": 1}', True),
16-
('[1, 2, 3]', True),
17-
('not json', False),
18-
('{"a": 1', False),
13+
("{\"a\": 1}", True),
14+
("[1, 2, 3]", True),
15+
("not json", False),
16+
("{\"a\": 1", False),
17+
("", False),
18+
(None, False),
19+
(123, False),
1920
]
2021
)
2122
def test_is_string_json(input_str, expected):
22-
"""Test is_string_json with valid and invalid JSON strings."""
23-
assert utils.is_string_json(input_str) is expected
24-
25-
def test_extract_json_object():
26-
"""Test extract_json extracts JSON object from string."""
27-
s = 'prefix {"foo": 42, "bar": "baz"} suffix'
28-
result = utils.extract_json(s)
29-
assert isinstance(result, dict)
30-
assert result["foo"] == 42
31-
assert result["bar"] == "baz"
32-
33-
def test_extract_json_array():
34-
"""Test extract_json extracts JSON array from string."""
35-
s = 'prefix [1, 2, 3] suffix'
36-
result = utils.extract_json(s)
37-
assert isinstance(result, list)
38-
assert result == [1, 2, 3]
39-
40-
def test_extract_json_none():
41-
"""Test extract_json returns None if no JSON found."""
42-
s = 'no json here'
43-
assert utils.extract_json(s) is None
44-
45-
def test_get_rg_name_basic():
46-
"""Test get_rg_name returns correct resource group name."""
47-
assert utils.get_rg_name("foo") == "apim-sample-foo"
48-
49-
def test_get_rg_name_with_index():
50-
"""Test get_rg_name with index appends index."""
51-
assert utils.get_rg_name("foo", 2) == "apim-sample-foo-2"
23+
from shared.python.utils import is_string_json
24+
assert is_string_json(input_str) is expected
25+
26+
# ------------------------------
27+
# get_account_info
28+
# ------------------------------
29+
30+
def test_get_account_info_success(monkeypatch):
31+
from shared.python import utils
32+
mock_json = {
33+
'user': {'name': 'testuser'},
34+
'tenantId': 'tenant',
35+
'id': 'subid'
36+
}
37+
mock_output = MagicMock(success=True, json_data=mock_json)
38+
monkeypatch.setattr(utils, 'run', lambda *a, **kw: mock_output)
39+
result = utils.get_account_info()
40+
assert result == ('testuser', 'tenant', 'subid')
41+
42+
def test_get_account_info_failure(monkeypatch):
43+
from shared.python import utils
44+
mock_output = MagicMock(success=False, json_data=None)
45+
monkeypatch.setattr(utils, 'run', lambda *a, **kw: mock_output)
46+
with pytest.raises(Exception):
47+
utils.get_account_info()
48+
49+
# ------------------------------
50+
# get_deployment_name
51+
# ------------------------------
52+
53+
def test_get_deployment_name(monkeypatch):
54+
from shared.python import utils
55+
monkeypatch.setattr(os, 'getcwd', lambda: '/foo/bar/baz')
56+
assert utils.get_deployment_name() == 'baz'
57+
58+
def test_get_deployment_name_error(monkeypatch):
59+
from shared.python import utils
60+
monkeypatch.setattr(os, 'getcwd', lambda: '')
61+
with pytest.raises(RuntimeError):
62+
utils.get_deployment_name()
63+
64+
# ------------------------------
65+
# get_frontdoor_url
66+
# ------------------------------
67+
68+
def test_get_frontdoor_url_success(monkeypatch):
69+
from shared.python import utils
70+
from apimtypes import INFRASTRUCTURE
71+
mock_profile = [{"name": "afd1"}]
72+
mock_endpoints = [{"hostName": "foo.azurefd.net"}]
73+
def run_side_effect(cmd, *a, **kw):
74+
if 'profile list' in cmd:
75+
return MagicMock(success=True, json_data=mock_profile)
76+
if 'endpoint list' in cmd:
77+
return MagicMock(success=True, json_data=mock_endpoints)
78+
return MagicMock(success=False, json_data=None)
79+
monkeypatch.setattr(utils, 'run', run_side_effect)
80+
url = utils.get_frontdoor_url(INFRASTRUCTURE.AFD_APIM_PE, 'rg')
81+
assert url == 'https://foo.azurefd.net'
82+
83+
def test_get_frontdoor_url_none(monkeypatch):
84+
from shared.python import utils
85+
from apimtypes import INFRASTRUCTURE
86+
monkeypatch.setattr(utils, 'run', lambda *a, **kw: MagicMock(success=False, json_data=None))
87+
url = utils.get_frontdoor_url(INFRASTRUCTURE.AFD_APIM_PE, 'rg')
88+
assert url is None
89+
90+
# ------------------------------
91+
# get_infra_rg_name & get_rg_name
92+
# ------------------------------
5293

5394
def test_get_infra_rg_name(monkeypatch):
54-
"""Test get_infra_rg_name returns correct name and validates infra."""
95+
from shared.python import utils
5596
class DummyInfra:
56-
value = "bar"
57-
# Patch validate_infrastructure to a no-op
58-
monkeypatch.setattr(utils, "validate_infrastructure", lambda x: x)
59-
assert utils.get_infra_rg_name(DummyInfra) == "apim-infra-bar"
60-
assert utils.get_infra_rg_name(DummyInfra, 3) == "apim-infra-bar-3"
97+
value = 'foo'
98+
monkeypatch.setattr(utils, 'validate_infrastructure', lambda x: x)
99+
assert utils.get_infra_rg_name(DummyInfra) == 'apim-infra-foo'
100+
assert utils.get_infra_rg_name(DummyInfra, 2) == 'apim-infra-foo-2'
101+
102+
def test_get_rg_name():
103+
from shared.python import utils
104+
assert utils.get_rg_name('foo') == 'apim-sample-foo'
105+
assert utils.get_rg_name('foo', 3) == 'apim-sample-foo-3'
106+
107+
# ------------------------------
108+
# run
109+
# ------------------------------
110+
111+
def test_run_success(monkeypatch):
112+
from shared.python import utils
113+
monkeypatch.setattr('subprocess.check_output', lambda *a, **kw: b'{"a": 1}')
114+
out = utils.run('echo', print_command_to_run=False)
115+
assert out.success is True
116+
assert out.json_data == {"a": 1}
117+
118+
def test_run_failure(monkeypatch):
119+
from shared.python import utils
120+
class DummyErr(Exception):
121+
output = b'fail'
122+
def fail(*a, **kw):
123+
raise DummyErr()
124+
monkeypatch.setattr('subprocess.check_output', fail)
125+
out = utils.run('bad', print_command_to_run=False)
126+
assert out.success is False
127+
assert isinstance(out.text, str)
128+
129+
# ------------------------------
130+
# create_resource_group & does_resource_group_exist
131+
# ------------------------------
132+
133+
def test_does_resource_group_exist(monkeypatch):
134+
from shared.python import utils
135+
monkeypatch.setattr(utils, 'run', lambda *a, **kw: MagicMock(success=True))
136+
assert utils.does_resource_group_exist('foo') is True
137+
monkeypatch.setattr(utils, 'run', lambda *a, **kw: MagicMock(success=False))
138+
assert utils.does_resource_group_exist('foo') is False
139+
140+
def test_create_resource_group(monkeypatch):
141+
from shared.python import utils
142+
called = {}
143+
monkeypatch.setattr(utils, 'does_resource_group_exist', lambda rg: False)
144+
monkeypatch.setattr(utils, 'print_info', lambda *a, **kw: called.setdefault('info', True))
145+
monkeypatch.setattr(utils, 'run', lambda *a, **kw: called.setdefault('run', True))
146+
utils.create_resource_group('foo', 'bar')
147+
assert called['info'] and called['run']
148+
149+
# ------------------------------
150+
# policy_xml_replacement
151+
# ------------------------------
152+
153+
def test_policy_xml_replacement(monkeypatch):
154+
from shared.python import utils
155+
m = mock_open(read_data='<xml>foo</xml>')
156+
monkeypatch.setattr(builtins, 'open', m)
157+
assert utils.policy_xml_replacement('dummy.xml') == '<xml>foo</xml>'
158+
159+
# ------------------------------
160+
# cleanup_resources (smoke)
161+
# ------------------------------
162+
163+
def test_cleanup_resources_smoke(monkeypatch):
164+
from shared.python import utils
165+
from apimtypes import INFRASTRUCTURE
166+
monkeypatch.setattr(utils, 'run', lambda *a, **kw: MagicMock(success=True, json_data={}))
167+
monkeypatch.setattr(utils, 'print_info', lambda *a, **kw: None)
168+
monkeypatch.setattr(utils, 'print_error', lambda *a, **kw: None)
169+
monkeypatch.setattr(utils, 'print_message', lambda *a, **kw: None)
170+
monkeypatch.setattr(utils, 'print_ok', lambda *a, **kw: None)
171+
monkeypatch.setattr(utils, 'print_warning', lambda *a, **kw: None)
172+
monkeypatch.setattr(utils, 'print_val', lambda *a, **kw: None)
173+
utils.cleanup_resources(INFRASTRUCTURE.SIMPLE_APIM, 'rg')
174+
175+
# ------------------------------
176+
# EXTRACT_JSON EDGE CASES
177+
# ------------------------------
178+
179+
@pytest.mark.parametrize(
180+
"input_val,expected",
181+
[
182+
(None, None),
183+
(123, None),
184+
([], None),
185+
("", None),
186+
(" ", None),
187+
("not json", None),
188+
("{\"a\": 1}", {"a": 1}),
189+
("[1, 2, 3]", [1, 2, 3]),
190+
(" {\"a\": 1} ", {"a": 1}),
191+
("prefix {\"foo\": 42} suffix", {"foo": 42}),
192+
("prefix [1, 2, 3] suffix", [1, 2, 3]),
193+
("{\"a\": 1}{\"b\": 2}", {"a": 1}), # Only first JSON object
194+
("[1, 2, 3][4, 5, 6]", [1, 2, 3]), # Only first JSON array
195+
("{\"a\": 1,}", None), # Trailing comma
196+
("[1, 2,]", None), # Trailing comma in array
197+
("{\"a\": [1, 2, {\"b\": 3}]}", {"a": [1, 2, {"b": 3}]}),
198+
("\n\t{\"a\": 1}\n", {"a": 1}),
199+
("{\"a\": \"b \\u1234\"}", {"a": "b \u1234"}),
200+
("{\"a\": 1} [2, 3]", {"a": 1}), # Object before array
201+
("[2, 3] {\"a\": 1}", [2, 3]), # Array before object
202+
("{\"a\": 1, \"b\": {\"c\": 2}}", {"a": 1, "b": {"c": 2}}),
203+
("{\"a\": 1, \"b\": [1, 2, 3]}", {"a": 1, "b": [1, 2, 3]}),
204+
("\n\n[\n1, 2, 3\n]\n", [1, 2, 3]),
205+
("{\"a\": 1, \"b\": null}", {"a": 1, "b": None}),
206+
("{\"a\": true, \"b\": false}", {"a": True, "b": False}),
207+
("{\"a\": 1, \"b\": \"c\"}", {"a": 1, "b": "c"}),
208+
("{\"a\": 1, \"b\": [1, 2, {\"c\": 3}]} ", {"a": 1, "b": [1, 2, {"c": 3}]}),
209+
("{\"a\": 1, \"b\": [1, 2, {\"c\": 3, \"d\": [4, 5]}]} ", {"a": 1, "b": [1, 2, {"c": 3, "d": [4, 5]}]}),
210+
]
211+
)
212+
def test_extract_json_edge_cases(input_val, expected):
213+
"""Test extract_json with a wide range of edge cases and malformed input."""
214+
from shared.python.utils import extract_json
215+
result = extract_json(input_val)
216+
assert result == expected
217+
218+
def test_extract_json_large_object():
219+
"""Test extract_json with a large JSON object."""
220+
from shared.python.utils import extract_json
221+
large_obj = {"a": list(range(1000)), "b": {"c": "x" * 1000}}
222+
import json
223+
s = json.dumps(large_obj)
224+
assert extract_json(s) == large_obj
225+
226+
def test_extract_json_multiple_json_types():
227+
"""Test extract_json returns the first valid JSON (object or array) in the string."""
228+
from shared.python.utils import extract_json
229+
s = '[1,2,3]{"a": 1}'
230+
assert extract_json(s) == [1, 2, 3]
231+
s2 = '{"a": 1}[1,2,3]'
232+
assert extract_json(s2) == {"a": 1}
233+
234+
"""
235+
Unit tests for utils.py.
236+
"""
237+
import pytest
238+
from shared.python import utils
239+

0 commit comments

Comments
 (0)