Skip to content

Commit 47d2a0d

Browse files
fix(iast): ensure args are not empty in aspects to avoid index error [backport 2.1] (#7741)
Backport 20ea51d from #7738 to 2.1. IAST: Resolved an issue where certain aspects raised an IndexError when no arguments were provided, despite the replaced methods/functions accommodating calls without parameters. The fix eliminates the requirement for at least one parameter in these aspects and includes regression tests to ensure stability. ## Checklist - [x] Change(s) are motivated and described in the PR description. - [x] Testing strategy is described if automated tests are not included in the PR. - [x] Risk is outlined (performance impact, potential for breakage, maintainability, etc). - [x] Change is maintainable (easy to change, telemetry, documentation). - [x] [Library release note guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html) are followed. If no release note is required, add label `changelog/no-changelog`. - [x] Documentation is included (in-code, generated user docs, [public corp docs](https://github.com/DataDog/documentation/)). - [x] Backport labels are set (if [applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)) ## Reviewer Checklist - [x] Title is accurate. - [x] No unnecessary changes are introduced. - [x] Description motivates each change. - [x] Avoids breaking [API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces) changes unless absolutely necessary. - [x] Testing strategy adequately addresses listed risk(s). - [x] Change is maintainable (easy to change, telemetry, documentation). - [x] Release note makes sense to a user of the library. - [x] Reviewer has explicitly acknowledged and discussed the performance implications of this PR as reported in the benchmarks PR comment. - [x] Backport labels are set in a manner that is consistent with the [release branch maintenance policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting) - [x] If this PR touches code that signs or publishes builds or packages, or handles credentials of any kind, I've requested a review from `@DataDog/security-design-and-guidance`. - [x] This PR doesn't touch any of that. --------- Co-authored-by: Federico Mon <[email protected]>
1 parent 1e401b0 commit 47d2a0d

File tree

3 files changed

+144
-13
lines changed

3 files changed

+144
-13
lines changed

ddtrace/appsec/_iast/_taint_tracking/aspects.py

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ def str_aspect(orig_function, flag_added_args, *args, **kwargs):
6363
else:
6464
result = args[0].str(*args[1:], **kwargs)
6565

66-
if isinstance(args[0], TEXT_TYPES) and is_pyobject_tainted(args[0]):
66+
if args and isinstance(args[0], TEXT_TYPES) and is_pyobject_tainted(args[0]):
6767
try:
6868
if isinstance(args[0], (bytes, bytearray)):
6969
check_offset = args[0].decode("utf-8")
@@ -89,7 +89,7 @@ def bytes_aspect(orig_function, flag_added_args, *args, **kwargs):
8989
else:
9090
result = args[0].bytes(*args[1:], **kwargs)
9191

92-
if isinstance(args[0], TEXT_TYPES) and is_pyobject_tainted(args[0]):
92+
if args and isinstance(args[0], TEXT_TYPES) and is_pyobject_tainted(args[0]):
9393
try:
9494
taint_pyobject_with_ranges(result, tuple(get_ranges(args[0])))
9595
except Exception as e:
@@ -108,7 +108,7 @@ def bytearray_aspect(orig_function, flag_added_args, *args, **kwargs):
108108
else:
109109
result = args[0].bytearray(*args[1:], **kwargs)
110110

111-
if isinstance(args[0], TEXT_TYPES) and is_pyobject_tainted(args[0]):
111+
if args and isinstance(args[0], TEXT_TYPES) and is_pyobject_tainted(args[0]):
112112
try:
113113
taint_pyobject_with_ranges(result, tuple(get_ranges(args[0])))
114114
except Exception as e:
@@ -125,6 +125,9 @@ def join_aspect(orig_function, flag_added_args, *args, **kwargs):
125125
args = args[flag_added_args:]
126126
return orig_function(*args, **kwargs)
127127

128+
if not args:
129+
return orig_function(*args, **kwargs)
130+
128131
joiner = args[0]
129132
args = args[flag_added_args:]
130133
if not isinstance(joiner, TEXT_TYPES):
@@ -327,6 +330,9 @@ def format_aspect(
327330
args = args[flag_added_args:]
328331
return orig_function(*args, **kwargs)
329332

333+
if not args:
334+
return orig_function(*args, **kwargs)
335+
330336
candidate_text = args[0] # type: str
331337
args = args[flag_added_args:]
332338

@@ -377,6 +383,9 @@ def format_map_aspect(
377383
args = args[flag_added_args:]
378384
return orig_function(*args, **kwargs)
379385

386+
if orig_function and not args:
387+
return orig_function(*args, **kwargs)
388+
380389
candidate_text = args[0] # type: str
381390
args = args[flag_added_args:]
382391
if not isinstance(candidate_text, TEXT_TYPES):
@@ -421,7 +430,7 @@ def repr_aspect(orig_function, flag_added_args, *args, **kwargs):
421430

422431
result = repr(*args, **kwargs)
423432

424-
if isinstance(args[0], TEXT_TYPES) and is_pyobject_tainted(args[0]):
433+
if args and isinstance(args[0], TEXT_TYPES) and is_pyobject_tainted(args[0]):
425434
try:
426435
if isinstance(args[0], (bytes, bytearray)):
427436
check_offset = args[0].decode("utf-8")
@@ -518,7 +527,7 @@ def incremental_translation(self, incr_coder, funcode, empty):
518527

519528

520529
def decode_aspect(orig_function, flag_added_args, *args, **kwargs):
521-
if orig_function and not flag_added_args:
530+
if orig_function and (not flag_added_args or not args):
522531
# This patch is unexpected, so we fallback
523532
# to executing the original function
524533
return orig_function(*args, **kwargs)
@@ -541,7 +550,7 @@ def decode_aspect(orig_function, flag_added_args, *args, **kwargs):
541550

542551

543552
def encode_aspect(orig_function, flag_added_args, *args, **kwargs):
544-
if orig_function and not flag_added_args:
553+
if orig_function and (not flag_added_args or not args):
545554
# This patch is unexpected, so we fallback
546555
# to executing the original function
547556
return orig_function(*args, **kwargs)
@@ -566,7 +575,7 @@ def encode_aspect(orig_function, flag_added_args, *args, **kwargs):
566575
def upper_aspect(
567576
orig_function, flag_added_args, *args, **kwargs
568577
): # type: (Optional[Callable], int, Any, Any) -> TEXT_TYPE
569-
if orig_function and not isinstance(orig_function, BuiltinFunctionType):
578+
if orig_function and (not isinstance(orig_function, BuiltinFunctionType) or not args):
570579
if flag_added_args > 0:
571580
args = args[flag_added_args:]
572581
return orig_function(*args, **kwargs)
@@ -586,7 +595,7 @@ def upper_aspect(
586595
def lower_aspect(
587596
orig_function, flag_added_args, *args, **kwargs
588597
): # type: (Optional[Callable], int, Any, Any) -> TEXT_TYPE
589-
if orig_function and not isinstance(orig_function, BuiltinFunctionType):
598+
if orig_function and (not isinstance(orig_function, BuiltinFunctionType) or not args):
590599
if flag_added_args > 0:
591600
args = args[flag_added_args:]
592601
return orig_function(*args, **kwargs)
@@ -606,7 +615,7 @@ def lower_aspect(
606615
def swapcase_aspect(
607616
orig_function, flag_added_args, *args, **kwargs
608617
): # type: (Optional[Callable], int, Any, Any) -> TEXT_TYPE
609-
if orig_function and not isinstance(orig_function, BuiltinFunctionType):
618+
if orig_function and (not isinstance(orig_function, BuiltinFunctionType) or not args):
610619
if flag_added_args > 0:
611620
args = args[flag_added_args:]
612621
return orig_function(*args, **kwargs)
@@ -625,7 +634,7 @@ def swapcase_aspect(
625634
def title_aspect(
626635
orig_function, flag_added_args, *args, **kwargs
627636
): # type: (Optional[Callable], int, Any, Any) -> TEXT_TYPE
628-
if orig_function and not isinstance(orig_function, BuiltinFunctionType):
637+
if orig_function and (not isinstance(orig_function, BuiltinFunctionType) or not args):
629638
if flag_added_args > 0:
630639
args = args[flag_added_args:]
631640
return orig_function(*args, **kwargs)
@@ -644,7 +653,7 @@ def title_aspect(
644653
def capitalize_aspect(
645654
orig_function, flag_added_args, *args, **kwargs
646655
): # type: (Optional[Callable], int, Any, Any) -> TEXT_TYPE
647-
if orig_function and not isinstance(orig_function, BuiltinFunctionType):
656+
if orig_function and (not isinstance(orig_function, BuiltinFunctionType) or not args):
648657
if flag_added_args > 0:
649658
args = args[flag_added_args:]
650659
return orig_function(*args, **kwargs)
@@ -665,7 +674,7 @@ def casefold_aspect(
665674
orig_function, flag_added_args, *args, **kwargs
666675
): # type: (Optional[Callable], int, Any, Any) -> TEXT_TYPE
667676
if orig_function:
668-
if not isinstance(orig_function, BuiltinFunctionType):
677+
if not isinstance(orig_function, BuiltinFunctionType) or not args:
669678
if flag_added_args > 0:
670679
args = args[flag_added_args:]
671680
return orig_function(*args, **kwargs)
@@ -693,7 +702,7 @@ def casefold_aspect(
693702
def translate_aspect(
694703
orig_function, flag_added_args, *args, **kwargs
695704
): # type: (Optional[Callable], int, Any, Any) -> TEXT_TYPE
696-
if orig_function and not isinstance(orig_function, BuiltinFunctionType):
705+
if orig_function and (not isinstance(orig_function, BuiltinFunctionType) or not args):
697706
if flag_added_args > 0:
698707
args = args[flag_added_args:]
699708
return orig_function(*args, **kwargs)
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
---
2+
fixes:
3+
- |
4+
Vulnerability Management for Code-level (IAST): This fix resolves an issue where certain aspects incorrectly expected at least one argument, leading to an IndexError when none were provided. The solution removes this constraint and incorporates regression tests for stability assurance.
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
# -*- coding: utf-8 -*-
2+
"""
3+
Common tests to aspects, ensuring that they don't break when receiving empty arguments.
4+
"""
5+
import os
6+
7+
import pytest
8+
9+
10+
try:
11+
# Disable unused import warning pylint: disable=W0611
12+
from ddtrace.appsec._iast._taint_tracking import TagMappingMode # noqa: F401
13+
except (ImportError, AttributeError):
14+
pytest.skip("IAST not supported for this Python version", allow_module_level=True)
15+
16+
from tests.appsec.iast.aspects.conftest import _iast_patched_module
17+
import tests.appsec.iast.fixtures.aspects.callees
18+
19+
20+
def generate_callers_from_callees(callers_file=""):
21+
"""
22+
Generate a callers module from a callees module, calling all it's functions.
23+
"""
24+
module_functions = [
25+
"bytearray",
26+
"bytes",
27+
"str",
28+
]
29+
30+
str_functions = [
31+
"casefold",
32+
"capitalize",
33+
"translate",
34+
"title",
35+
"swapcase",
36+
"lower",
37+
"upper",
38+
"format_map",
39+
"format",
40+
"zfill",
41+
"ljust",
42+
"join",
43+
"encode",
44+
]
45+
46+
with open(callers_file, "w", encoding="utf-8") as callers:
47+
callers.write(
48+
f"""
49+
import builtins as _builtins
50+
51+
def call_builtin_repr(*args, **kwargs):
52+
return _builtins.repr()\n
53+
54+
def call_builtin_decode(*args, **kwargs):
55+
return bytes.decode()\n
56+
"""
57+
)
58+
for function in str_functions:
59+
callers.write(
60+
f"""
61+
def call_builtin_{function}(*args, **kwargs):
62+
return _builtins.str.{function}()\n
63+
"""
64+
)
65+
66+
for function in module_functions:
67+
callers.write(
68+
f"""
69+
def callee_{function}(*args, **kwargs):
70+
return {function}(*args, **kwargs)\n
71+
"""
72+
)
73+
74+
75+
PATCHED_CALLERS_FILE = "tests/appsec/iast/fixtures/aspects/callers.py"
76+
UNPATCHED_CALLERS_FILE = "tests/appsec/iast/fixtures/aspects/unpatched_callers.py"
77+
78+
for _file in (PATCHED_CALLERS_FILE, UNPATCHED_CALLERS_FILE):
79+
generate_callers_from_callees(
80+
callers_file=_file,
81+
)
82+
83+
patched_callers = _iast_patched_module(PATCHED_CALLERS_FILE.replace("/", ".")[0:-3])
84+
# This import needs to be done after the file is created (previous line)
85+
# pylint: disable=[wrong-import-position],[no-name-in-module]
86+
from tests.appsec.iast.fixtures.aspects import unpatched_callers # type: ignore[attr-defined] # noqa: E402
87+
88+
89+
@pytest.mark.parametrize("aspect", [x for x in dir(unpatched_callers) if not x.startswith(("_", "@"))])
90+
def test_aspect_patched_result(aspect):
91+
"""
92+
Test that the result of the patched aspect call is the same as the unpatched one.
93+
"""
94+
if aspect.startswith("callee_"):
95+
assert getattr(patched_callers, aspect)() == getattr(unpatched_callers, aspect)()
96+
elif aspect.startswith("call_builtin_"):
97+
try:
98+
getattr(patched_callers, aspect)()
99+
except TypeError as e:
100+
aspect = aspect[len("call_builtin_") :]
101+
if aspect == "repr":
102+
assert e.args[0] == f"{aspect}() takes exactly one argument (0 given)"
103+
else:
104+
# assert exception message is "unbound method str.<aspect>() needs an argument"
105+
aspect_namespace = "bytes" if aspect == "decode" else "str"
106+
assert e.args[0] == f"unbound method {aspect_namespace}.{aspect}() needs an argument"
107+
108+
109+
def teardown_module(_):
110+
"""
111+
Remove the callers file after the tests are done.
112+
"""
113+
for _file in (PATCHED_CALLERS_FILE, UNPATCHED_CALLERS_FILE):
114+
try:
115+
os.remove(_file)
116+
except FileNotFoundError:
117+
pass
118+
assert not os.path.exists(_file)

0 commit comments

Comments
 (0)