Skip to content

Commit 5a5d1db

Browse files
committed
Initial implementation of transformation
1 parent 9df2b5d commit 5a5d1db

File tree

3 files changed

+168
-2
lines changed

3 files changed

+168
-2
lines changed
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
Communication using clear-text protocols allows an attacker to sniff or tamper with the transported data.
2+
3+
This codemod will replace any detected clear text protocol with their cryptographic enabled version.
4+
5+
Our changes look like the following:
6+
```diff
7+
- url = "http://example.com"
8+
+ url = "https://example.com"
9+
10+
- ftp_con = ftplib.FTP("ftp.example.com")
11+
+ ftp_con = ftplib.FTP_TLS("ftp.example.com")
12+
smtp_con = smtplib.SMTP("smtp.example.com", port=587)
13+
+ smtp_context = ssl.create_default_context()
14+
+ smtp_context.verify_mode = ssl.CERT_REQUIRED
15+
+ smtp_context.check_hostname = True
16+
+ smtp.starttls(context=smtp_context)
17+
18+
19+
- smtp_con_2 = smtplib.SMTP("smtp.gmail.com")
20+
+ smtp_con_2 = smtplib.SMTP_SSL("smtp.gmail.com")
21+
```

src/core_codemods/sonar/api.py

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,25 @@
1010

1111

1212
class SonarCodemod(SASTCodemod):
13+
14+
def __init__(
15+
self,
16+
*,
17+
metadata: Metadata,
18+
transformer: BaseTransformerPipeline,
19+
default_extensions: list[str] | None = None,
20+
requested_rules: list[str] | None = None,
21+
provider: str | None = None,
22+
):
23+
super().__init__(
24+
metadata=metadata,
25+
detector=SonarDetector(),
26+
transformer=transformer,
27+
default_extensions=default_extensions,
28+
requested_rules=requested_rules,
29+
provider=provider,
30+
)
31+
1332
@property
1433
def origin(self):
1534
return "sonar"
@@ -38,7 +57,6 @@ def from_core_codemod_with_multiple_rules(
3857
),
3958
),
4059
transformer=transformer if transformer else other.transformer,
41-
detector=SonarDetector(),
4260
default_extensions=other.default_extensions,
4361
requested_rules=[tr.id for tr in rules],
4462
)

src/core_codemods/sonar/sonar_use_secure_protocols.py

Lines changed: 128 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
1+
import libcst as cst
2+
13
from codemodder.codemods.base_codemod import Metadata, ReviewGuidance, ToolRule
24
from codemodder.codemods.libcst_transformer import (
35
LibcstResultTransformer,
46
LibcstTransformerPipeline,
57
)
8+
from codemodder.codemods.utils_mixin import NameAndAncestorResolutionMixin
69
from codemodder.codetf import Reference
710
from core_codemods.sonar.api import SonarCodemod
811

@@ -15,10 +18,134 @@
1518
]
1619

1720

18-
class SonarUseSecureProtocolsTransformer(LibcstResultTransformer):
21+
class SonarUseSecureProtocolsTransformer(
22+
LibcstResultTransformer, NameAndAncestorResolutionMixin
23+
):
1924
change_description = "Modified URLs or calls to use secure protocols"
2025

26+
def _match_and_handle_statement(
27+
self, possible_smtp_call, original_node_statement, updated_node_statement
28+
):
29+
maybe_name = self.find_base_name(possible_smtp_call)
30+
match possible_smtp_call:
31+
case cst.Call() if maybe_name == "smtplib.SMTP":
32+
new_statements = []
33+
new_statements.append(
34+
cst.parse_statement("smtp_context = ssl.create_default_context()")
35+
)
36+
new_statements.append(
37+
cst.parse_statement("smtp_context.verify_mode = ssl.CERT_REQUIRED")
38+
)
39+
new_statements.append(
40+
cst.parse_statement("smtp_context.check_hostname = True")
41+
)
42+
new_statements.append(updated_node_statement)
43+
# TODO don't append this if we changed the call to SSL version
44+
new_statements.append(
45+
cst.parse_statement("smtp.starttls(context=smtp_context)")
46+
)
47+
self.add_needed_import("smtp")
48+
self.add_needed_import("ssl")
49+
self.report_change(possible_smtp_call)
50+
return cst.FlattenSentinel(new_statements)
51+
return updated_node_statement
52+
53+
def leave_SimpleStatementLine(self, original_node, updated_node):
54+
match original_node.body:
55+
# match the first statement that is either selected or is an assignment whose value is selected
56+
case [cst.Assign() as a, *_] if self.node_is_selected(a.value):
57+
58+
return self._match_and_handle_statement(
59+
a.value, original_node, updated_node
60+
)
61+
case [s, *_] if self.node_is_selected(s):
62+
return self._match_and_handle_statement(s, original_node, updated_node)
63+
return updated_node
64+
2165
def leave_Call(self, original_node, updated_node):
66+
if self.node_is_selected(original_node):
67+
match self.find_base_name(original_node):
68+
case "ftplib.FTP":
69+
new_func = cst.parse_expression("ftplib.FTP_TLS")
70+
self.report_change(original_node)
71+
self.add_needed_import("ftplib")
72+
return updated_node.with_changes(func=new_func)
73+
case "smtplib.SMTP":
74+
# port is the second positional, check that
75+
maybe_port_value = (
76+
original_node.args[1]
77+
if len(original_node.args) >= 2
78+
and original_node.args[1].keyword is None
79+
else None
80+
)
81+
# find port keyword, if any
82+
maybe_port_value = maybe_port_value or next(
83+
iter(
84+
[
85+
a
86+
for a in original_node.args
87+
if a.keyword and a.keyword.value == "port"
88+
]
89+
),
90+
None,
91+
)
92+
maybe_port_value = (
93+
maybe_port_value.value if maybe_port_value else None
94+
)
95+
match maybe_port_value:
96+
case None:
97+
new_func = cst.parse_expression("smtplib.SMTP_SSL")
98+
self.report_change(original_node)
99+
self.add_needed_import("smtplib")
100+
new_args = [
101+
*original_node.args,
102+
cst.Arg(
103+
keyword=cst.Name("context"),
104+
value=cst.Name("smtp_context"),
105+
),
106+
]
107+
return updated_node.with_changes(
108+
func=new_func, args=new_args
109+
)
110+
# TODO still needs the context object statements here
111+
# TODO only change this if it mathces the statement pattern in leave_statemenet
112+
# TODO use a flag for this in visit_SimpleStatement
113+
case cst.Integer() if maybe_port_value == "0":
114+
new_func = cst.parse_expression("smtplib.SMTP_SSL")
115+
self.report_change(original_node)
116+
self.add_needed_import("smtplib")
117+
new_args = [
118+
*original_node.args,
119+
cst.Arg(
120+
keyword=cst.Name("context"),
121+
value=cst.Name("smtp_context"),
122+
),
123+
]
124+
return updated_node.with_changes(func=new_func)
125+
126+
return updated_node
127+
128+
def leave_SimpleString(
129+
self, original_node: cst.SimpleString, updated_node: cst.SimpleString
130+
) -> cst.BaseExpression:
131+
if self.node_is_selected(original_node):
132+
match original_node.raw_value:
133+
case original_node.raw_value as s if s.startswith("http"):
134+
self.report_change(original_node)
135+
return updated_node.with_changes(
136+
value=original_node.prefix
137+
+ original_node.quote
138+
+ s.replace("http", "https", 1)
139+
+ original_node.quote
140+
)
141+
case original_node.raw_value as s if s.startswith("ftp"):
142+
self.report_change(original_node)
143+
return updated_node.with_changes(
144+
value=original_node.prefix
145+
+ original_node.quote
146+
+ s.replace("ftp", "sftp", 1)
147+
+ original_node.quote
148+
)
22149
return updated_node
23150

24151

0 commit comments

Comments
 (0)