Skip to content

Commit 778c25e

Browse files
committed
feat: Updated code analyzers to follow new affected artefacts data structure
1 parent 88eb261 commit 778c25e

File tree

6 files changed

+165
-138
lines changed

6 files changed

+165
-138
lines changed
Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
from .cs_code_analyzer import cs_get_used_artifacts, cs_is_imported
22
from .java_code_analyzer import java_get_used_artifacts, java_is_imported
33
from .js_ts_code_analyzer import js_ts_get_used_artifacts, js_ts_is_imported
4-
from .py_code_analyzer import python_get_used_artifacts, python_is_imported
4+
from .py_code_analyzer import py_get_used_artifacts, py_is_imported
5+
from .rb_code_analyzer import rb_get_used_artifacts, rb_is_imported
56
from .rs_code_analyzer import rs_get_used_artifacts, rs_is_imported
67

78
__all__ = [
@@ -11,8 +12,10 @@
1112
"java_is_imported",
1213
"js_ts_get_used_artifacts",
1314
"js_ts_is_imported",
14-
"python_get_used_artifacts",
15-
"python_is_imported",
15+
"py_get_used_artifacts",
16+
"py_is_imported",
17+
"rb_get_used_artifacts",
18+
"rb_is_imported",
1619
"rs_get_used_artifacts",
1720
"rs_is_imported"
1821
]

app/utils/code_analyzer/codes/cs_code_analyzer.py

Lines changed: 31 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
from regex import findall, search
44

5+
from .is_relevant import is_relevant
6+
57

68
async def cs_is_imported(file_path: str, namespace: str) -> Any:
79
with open(file_path, encoding="utf-8") as file:
@@ -16,28 +18,32 @@ async def cs_get_used_artifacts(
1618
filename: str,
1719
namespace: str,
1820
cve_description: str,
19-
affected_artefacts: dict[str, list[str]]
21+
affected_artefacts: dict[str, dict[str, list[str]]]
2022
) -> list[dict[str, Any]]:
2123
with open(filename, encoding="utf-8") as file:
2224
code = file.read()
2325
current_line = 1
2426
used_artifacts = await get_child_artifacts(namespace, code, cve_description, affected_artefacts)
2527
for line in code.split("\n"):
2628
if not search(r"using\s", line):
27-
for (artifact, _type) in used_artifacts:
29+
for (artifact, _type, source) in used_artifacts:
2830
if artifact in line:
29-
used_artifacts[(artifact, _type)].append(current_line)
31+
used_artifacts[(artifact, _type, source)].append(current_line)
3032
current_line += 1
3133
used_artifacts = {
32-
(artifact, _type): lines
33-
for (artifact, _type), lines in used_artifacts.items()
34+
(artifact, _type, source): lines
35+
for (artifact, _type, source), lines in used_artifacts.items()
3436
if lines
3537
}
3638
result = []
37-
for (artifact_name, artifact_type), used_in_lines in used_artifacts.items():
39+
groups_by_name_type = {}
40+
for (artifact_name, artifact_type, source), used_in_lines in used_artifacts.items():
41+
groups_by_name_type.setdefault((artifact_name, artifact_type, used_in_lines), []).append(source)
42+
for (artifact_name, artifact_type, used_in_lines), sources in groups_by_name_type.items():
3843
result.append({
3944
"artifact_name": artifact_name,
4045
"artifact_type": artifact_type,
46+
"sources": sources,
4147
"used_in_lines": used_in_lines
4248
})
4349
return result
@@ -48,27 +54,26 @@ async def get_child_artifacts(
4854
code: str,
4955
cve_description: str,
5056
affected_artefacts: dict[str, list[str]]
51-
) -> dict[tuple[str, str], list[int]]:
52-
used_artifacts: dict[tuple[str, str], list[int]] = {}
53-
def is_relevant(artifact: str, artifact_type: str) -> bool:
54-
artifact_lower = artifact.lower()
55-
if artifact_lower in cve_description.lower():
56-
return True
57-
return artifact in affected_artefacts.get(artifact_type, [])
58-
for match in findall(rf"{parent}\.[^\(\)\s:;]+", code):
59-
for artifact in match.split(".")[1:]:
60-
clean = artifact.strip()
61-
for artifact_type in affected_artefacts:
62-
if is_relevant(clean, artifact_type):
63-
used_artifacts.setdefault((clean, artifact_type), [])
64-
for match in findall(rf"using\s+{parent}\s*;\s*{{[^}}]+}}", code):
65-
for artifact in match.split("{")[1].split("}")[0].split(","):
66-
clean = artifact.strip()
67-
for artifact_type in affected_artefacts:
68-
if is_relevant(clean, artifact_type):
69-
used_artifacts.setdefault((clean, artifact_type), [])
57+
) -> dict[tuple[str, str, str], list[int]]:
58+
used_artifacts: dict[tuple[str, str, str], list[int]] = {}
59+
patterns = [
60+
(rf"{parent}\.[^\(\)\s:;]+", "split_by_dot"),
61+
(rf"using\s+{parent}\s*;\s*{{[^}}]+}}", "split_by_braces"),
62+
]
63+
for pattern, split_type in patterns:
64+
for match in findall(pattern, code):
65+
if split_type == "split_by_dot":
66+
artifacts = match.split(".")[1:]
67+
elif split_type == "split_by_braces":
68+
artifacts = match.split("{")[1].split("}")[0].split(",")
69+
for artifact in artifacts:
70+
clean = artifact.strip()
71+
for source, artifact_types in affected_artefacts.items():
72+
for artifact_type, artefacts in artifact_types["artefacts"].items():
73+
if await is_relevant(clean, artefacts, cve_description):
74+
used_artifacts.setdefault((clean, artifact_type, source), [])
7075
aux = {}
71-
for (artifact, _) in used_artifacts:
76+
for (artifact, _, _) in used_artifacts:
7277
aux.update(await get_child_artifacts(artifact, code, cve_description, affected_artefacts))
7378
used_artifacts.update(aux)
7479
return used_artifacts

app/utils/code_analyzer/codes/java_code_analyzer.py

Lines changed: 34 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
from regex import findall, search
44

5+
from .is_relevant import is_relevant
6+
57

68
async def java_is_imported(file_path: str, dependency: str) -> Any:
79
with open(file_path, encoding="utf-8") as file:
@@ -16,28 +18,32 @@ async def java_get_used_artifacts(
1618
filename: str,
1719
dependency: str,
1820
cve_description: str,
19-
affected_artefacts: dict[str, list[str]]
21+
affected_artefacts: dict[str, dict[str, list[str]]]
2022
) -> list[dict[str, Any]]:
2123
with open(filename, encoding="utf-8") as file:
2224
code = file.read()
2325
current_line = 1
2426
used_artifacts = await get_child_artifacts(dependency, code, cve_description, affected_artefacts)
2527
for line in code.split("\n"):
2628
if "import" not in line:
27-
for (artifact, _type) in used_artifacts:
29+
for (artifact, _type, source) in used_artifacts:
2830
if artifact in line:
29-
used_artifacts[(artifact, _type)].append(current_line)
31+
used_artifacts[(artifact, _type, source)].append(current_line)
3032
current_line += 1
3133
used_artifacts = {
32-
(artifact, _type): lines
33-
for (artifact, _type), lines in used_artifacts.items()
34+
(artifact, _type, source): lines
35+
for (artifact, _type, source), lines in used_artifacts.items()
3436
if lines
3537
}
3638
result = []
37-
for (artifact_name, artifact_type), used_in_lines in used_artifacts.items():
39+
groups_by_name_type = {}
40+
for (artifact_name, artifact_type, source), used_in_lines in used_artifacts.items():
41+
groups_by_name_type.setdefault((artifact_name, artifact_type, used_in_lines), []).append(source)
42+
for (artifact_name, artifact_type, used_in_lines), sources in groups_by_name_type.items():
3843
result.append({
3944
"artifact_name": artifact_name,
4045
"artifact_type": artifact_type,
46+
"sources": sources,
4147
"used_in_lines": used_in_lines
4248
})
4349
return result
@@ -48,27 +54,29 @@ async def get_child_artifacts(
4854
code: str,
4955
cve_description: str,
5056
affected_artefacts: dict[str, list[str]]
51-
) -> dict[tuple[str, str], list[int]]:
52-
used_artifacts: dict[tuple[str, str], list[int]] = {}
53-
def is_relevant(artifact: str, artifact_type: str) -> bool:
54-
artifact_lower = artifact.lower()
55-
if artifact_lower in cve_description.lower():
56-
return True
57-
return artifact in affected_artefacts.get(artifact_type, [])
58-
for match in findall(rf"{parent}\.[^\(\)\s:;]+", code):
59-
for artifact in match.split(".")[1:]:
60-
clean = artifact.strip()
61-
for artifact_type in affected_artefacts:
62-
if is_relevant(clean, artifact_type):
63-
used_artifacts.setdefault((clean, artifact_type), [])
64-
for match in findall(rf"import\s+{parent}\.[^\(\)\s:;]+;", code):
65-
for artifact in match.split(parent + ".")[1:]:
66-
clean = artifact.replace(";", "").strip()
67-
for artifact_type in affected_artefacts:
68-
if is_relevant(clean, artifact_type):
69-
used_artifacts.setdefault((clean, artifact_type), [])
57+
) -> dict[tuple[str, str, str], list[int]]:
58+
used_artifacts: dict[tuple[str, str, str], list[int]] = {}
59+
patterns = [
60+
(rf"{parent}\.[^\(\)\s:;]+", "split_by_dot"),
61+
(rf"import\s+{parent}\.[^\(\)\s:;]+;", "split_by_import"),
62+
]
63+
for pattern, split_type in patterns:
64+
for match in findall(pattern, code):
65+
if split_type == "split_by_dot":
66+
artifacts = match.split(".")[1:]
67+
elif split_type == "split_by_import":
68+
artifacts = match.split(parent + ".")[1:]
69+
for artifact in artifacts:
70+
if split_type == "split_by_import":
71+
clean = artifact.replace(";", "").strip()
72+
else:
73+
clean = artifact.strip()
74+
for source, artifact_types in affected_artefacts.items():
75+
for artifact_type, artefacts in artifact_types["artefacts"].items():
76+
if await is_relevant(clean, artefacts, cve_description):
77+
used_artifacts.setdefault((clean, artifact_type, source), [])
7078
aux = {}
71-
for (artifact, _) in used_artifacts:
79+
for (artifact, _, _) in used_artifacts:
7280
aux.update(await get_child_artifacts(artifact, code, cve_description, affected_artefacts))
7381
used_artifacts.update(aux)
7482
return used_artifacts

app/utils/code_analyzer/codes/js_ts_code_analyzer.py

Lines changed: 32 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
from regex import findall, search
44

5+
from .is_relevant import is_relevant
6+
57

68
async def js_ts_is_imported(file_path: str, dependency: str) -> Any:
79
with open(file_path, encoding="utf-8") as file:
@@ -16,28 +18,32 @@ async def js_ts_get_used_artifacts(
1618
filename: str,
1719
dependency: str,
1820
cve_description: str,
19-
affected_artefacts: dict[str, list[str]]
21+
affected_artefacts: dict[str, dict[str, list[str]]]
2022
) -> list[dict[str, Any]]:
2123
with open(filename, encoding="utf-8") as file:
2224
code = file.read()
2325
current_line = 1
2426
used_artifacts = await get_child_artifacts(dependency, code, cve_description, affected_artefacts)
2527
for line in code.split("\n"):
2628
if not search(r"import\s|require\(", line):
27-
for (artifact, _type) in used_artifacts:
29+
for (artifact, _type, source) in used_artifacts:
2830
if artifact in line:
29-
used_artifacts[(artifact, _type)].append(current_line)
31+
used_artifacts[(artifact, _type, source)].append(current_line)
3032
current_line += 1
3133
used_artifacts = {
32-
(artifact, _type): lines
33-
for (artifact, _type), lines in used_artifacts.items()
34+
(artifact, _type, source): lines
35+
for (artifact, _type, source), lines in used_artifacts.items()
3436
if lines
3537
}
3638
result = []
37-
for (artifact_name, artifact_type), used_in_lines in used_artifacts.items():
39+
groups_by_name_type = {}
40+
for (artifact_name, artifact_type, source), used_in_lines in used_artifacts.items():
41+
groups_by_name_type.setdefault((artifact_name, artifact_type, used_in_lines), []).append(source)
42+
for (artifact_name, artifact_type, used_in_lines), sources in groups_by_name_type.items():
3843
result.append({
3944
"artifact_name": artifact_name,
4045
"artifact_type": artifact_type,
46+
"sources": sources,
4147
"used_in_lines": used_in_lines
4248
})
4349
return result
@@ -48,33 +54,27 @@ async def get_child_artifacts(
4854
code: str,
4955
cve_description: str,
5056
affected_artefacts: dict[str, list[str]]
51-
) -> dict[tuple[str, str], list[int]]:
52-
used_artifacts: dict[tuple[str, str], list[int]] = {}
53-
def is_relevant(artifact: str, artifact_type: str) -> bool:
54-
artifact_lower = artifact.lower()
55-
if artifact_lower in cve_description.lower():
56-
return True
57-
return artifact in affected_artefacts.get(artifact_type, [])
58-
for match in findall(rf"{parent}\.[^\(\)\s:;]+", code):
59-
for artifact in match.split(".")[1:]:
60-
clean = artifact.strip()
61-
for artifact_type in affected_artefacts:
62-
if is_relevant(clean, artifact_type):
63-
used_artifacts.setdefault((clean, artifact_type), [])
64-
for match in findall(rf"import\s+{{[^}}]+}}\s+from\s+['\"]{parent}['\"]", code):
65-
for artifact in match.split("{")[1].split("}")[0].split(","):
66-
clean = artifact.strip()
67-
for artifact_type in affected_artefacts:
68-
if is_relevant(clean, artifact_type):
69-
used_artifacts.setdefault((clean, artifact_type), [])
70-
for match in findall(rf"const\s+{{[^}}]+}}\s*=\s*require\(['\"]{parent}['\"]\)", code):
71-
for artifact in match.split("{")[1].split("}")[0].split(","):
72-
clean = artifact.strip()
73-
for artifact_type in affected_artefacts:
74-
if is_relevant(clean, artifact_type):
75-
used_artifacts.setdefault((clean, artifact_type), [])
57+
) -> dict[tuple[str, str, str], list[int]]:
58+
used_artifacts: dict[tuple[str, str, str], list[int]] = {}
59+
patterns = [
60+
(rf"{parent}\.[^\(\)\s:;]+", "split_by_dot"),
61+
(rf"import\s+{{[^}}]+}}\s+from\s+['\"]{parent}['\"]", "split_by_braces"),
62+
(rf"const\s+{{[^}}]+}}\s*=\s*require\(['\"]{parent}['\"]\)", "split_by_braces"),
63+
]
64+
for pattern, split_type in patterns:
65+
for match in findall(pattern, code):
66+
if split_type == "split_by_dot":
67+
artifacts = match.split(".")[1:]
68+
elif split_type == "split_by_braces":
69+
artifacts = match.split("{")[1].split("}")[0].split(",")
70+
for artifact in artifacts:
71+
clean = artifact.strip()
72+
for source, artifact_types in affected_artefacts.items():
73+
for artifact_type, artefacts in artifact_types["artefacts"].items():
74+
if await is_relevant(clean, artefacts, cve_description):
75+
used_artifacts.setdefault((clean, artifact_type, source), [])
7676
aux = {}
77-
for (artifact, _) in used_artifacts:
77+
for (artifact, _, _) in used_artifacts:
7878
aux.update(await get_child_artifacts(artifact, code, cve_description, affected_artefacts))
7979
used_artifacts.update(aux)
8080
return used_artifacts

0 commit comments

Comments
 (0)