Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions integration_tests/sonar/test_sonar_use_secure_protocols.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from codemodder.codemods.test import SonarIntegrationTest
from core_codemods.sonar.sonar_use_secure_protocols import (
SonarUseSecureProtocols,
SonarUseSecureProtocolsTransformer,
)


class TestSonarUseSecureProtocols(SonarIntegrationTest):
codemod = SonarUseSecureProtocols
code_path = "tests/samples/use_secure_protocols.py"
replacement_lines = [
(
5,
"""url = "https://example.com"\n""",
),
]
# fmt: off
expected_diff = (
"""--- \n"""
"""+++ \n"""
"""@@ -2,4 +2,4 @@\n"""
''' import smtplib\n'''
''' import telnetlib\n'''
''' \n'''
'''-url = "http://example.com"\n'''
'''+url = "https://example.com"\n'''
)
# fmt: on
expected_line_change = "5"
change_description = SonarUseSecureProtocolsTransformer.change_description
5 changes: 5 additions & 0 deletions src/codemodder/scripts/generate_docs.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,11 @@ class DocMetadata:
guidance_explained="Our change provides the most secure way to create cookies in Flask. However, it's possible you have configured your Flask application configurations to use secure cookies. In these cases, using the default parameters for `set_cookie` is safe.",
need_sarif="Yes (Sonar)",
),
"use-secure-protocols": DocMetadata(
importance="High",
guidance_explained="While secure protocols are widely supported by a variety of application and server software, it may require explicit configuration to support those protocols.",
need_sarif="Yes (Sonar)",
),
}

SEMGREP_CODEMOD_NAMES = [
Expand Down
2 changes: 2 additions & 0 deletions src/core_codemods/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@
from .sonar.sonar_tempfile_mktemp import SonarTempfileMktemp
from .sonar.sonar_timezone_aware_datetime import SonarTimezoneAwareDatetime
from .sonar.sonar_url_sandbox import SonarUrlSandbox
from .sonar.sonar_use_secure_protocols import SonarUseSecureProtocols
from .sql_parameterization import SQLQueryParameterization
from .str_concat_in_seq_literal import StrConcatInSeqLiteral
from .subprocess_shell_false import SubprocessShellFalse
Expand Down Expand Up @@ -206,6 +207,7 @@
SonarTimezoneAwareDatetime,
SonarSandboxProcessCreation,
SonarSecureCookie,
SonarUseSecureProtocols,
],
)

Expand Down
24 changes: 24 additions & 0 deletions src/core_codemods/docs/sonar_python_use-secure-protocols.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
Communication using clear-text protocols allows an attacker to sniff or tamper with the transported data.

This codemod will replace any detected clear text protocol with their cryptographic enabled version.

Our changes look like the following:
```diff
- url = "http://example.com"
+ url = "https://example.com"

- ftp_con = ftplib.FTP("ftp.example.com")
+ ftp_con = ftplib.FTP_TLS("ftp.example.com")
+ smtp_context = ssl.create_default_context()
+ smtp_context.verify_mode = ssl.CERT_REQUIRED
+ smtp_context.check_hostname = True
smtp_con = smtplib.SMTP("smtp.example.com", port=587)
+ smtp.starttls(context=smtp_context)


+ smtp_context_1 = ssl.create_default_context()
+ smtp_context_1.verify_mode = ssl.CERT_REQUIRED
+ smtp_context_1.check_hostname = True
- smtp_con_2 = smtplib.SMTP("smtp.gmail.com")
+ smtp_con_2 = smtplib.SMTP_SSL("smtp.gmail.com", context=smtp_context_1)
```
20 changes: 19 additions & 1 deletion src/core_codemods/sonar/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,25 @@


class SonarCodemod(SASTCodemod):

def __init__(
self,
*,
metadata: Metadata,
transformer: BaseTransformerPipeline,
default_extensions: list[str] | None = None,
requested_rules: list[str] | None = None,
provider: str | None = None,
):
super().__init__(
metadata=metadata,
detector=SonarDetector(),
transformer=transformer,
default_extensions=default_extensions,
requested_rules=requested_rules,
provider=provider,
)

@property
def origin(self):
return "sonar"
Expand Down Expand Up @@ -38,7 +57,6 @@ def from_core_codemod_with_multiple_rules(
),
),
transformer=transformer if transformer else other.transformer,
detector=SonarDetector(),
default_extensions=other.default_extensions,
requested_rules=[tr.id for tr in rules],
)
Expand Down
206 changes: 206 additions & 0 deletions src/core_codemods/sonar/sonar_use_secure_protocols.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
import libcst as cst
from libcst.codemod import CodemodContext

from codemodder.codemods.base_codemod import (
Metadata,
ReviewGuidance,
ToolMetadata,
ToolRule,
)
from codemodder.codemods.libcst_transformer import (
LibcstResultTransformer,
LibcstTransformerPipeline,
)
from codemodder.codemods.utils_mixin import NameAndAncestorResolutionMixin
from codemodder.codetf import Reference
from codemodder.file_context import FileContext
from codemodder.result import Result
from core_codemods.sonar.api import SonarCodemod

rules = [
ToolRule(
id="python:S5332",
name="Using clear-text protocols is security-sensitive",
url="https://rules.sonarsource.com/python/RSPEC-5332/",
),
]


class SonarUseSecureProtocolsTransformer(
LibcstResultTransformer, NameAndAncestorResolutionMixin
):
change_description = "Modified URLs or calls to use secure protocols"

def __init__(
self,
context: CodemodContext,
results: list[Result] | None,
file_context: FileContext,
_transformer: bool = False,
):
self.nodes_memory_with_context_name: dict[cst.CSTNode, str] = {}
super().__init__(context, results, file_context, _transformer)

def _match_and_handle_statement(
self, possible_smtp_call, original_node_statement, updated_node_statement
):
maybe_name = self.find_base_name(possible_smtp_call)
match possible_smtp_call:
case cst.Call() if maybe_name == "smtplib.SMTP":
# get the stored context_name or create a new one:
if possible_smtp_call in self.nodes_memory_with_context_name:
context_name = self.nodes_memory_with_context_name[
possible_smtp_call
]
else:
context_name = self.generate_available_name(
original_node_statement, ["smtp_context"]
)

new_statements = []
new_statements.append(
cst.parse_statement(
f"{context_name} = ssl.create_default_context()"
)
)
new_statements.append(
cst.parse_statement(
f"{context_name}.verify_mode = ssl.CERT_REQUIRED"
)
)
new_statements.append(
cst.parse_statement(f"{context_name}.check_hostname = True")
)
new_statements.append(updated_node_statement)
# don't append this if we changed the call to SSL version
if possible_smtp_call in self.nodes_memory_with_context_name:
self.nodes_memory_with_context_name.pop(possible_smtp_call)
else:
new_statements.append(
cst.parse_statement(f"smtplib.starttls(context={context_name})")
)
self.add_needed_import("smtplib")
self.add_needed_import("ssl")
self.report_change(possible_smtp_call)
return cst.FlattenSentinel(new_statements)
return updated_node_statement

def leave_SimpleStatementLine(self, original_node, updated_node):
match original_node.body:
# match the first statement that is either selected or is an assignment whose value is selected
case [cst.Assign() as a, *_] if self.node_is_selected(a.value):
return self._match_and_handle_statement(
a.value, original_node, updated_node
)
case [s, *_] if self.node_is_selected(s):
return self._match_and_handle_statement(s, original_node, updated_node)
return updated_node

def leave_Call(self, original_node, updated_node):
if self.node_is_selected(original_node):
match self.find_base_name(original_node):
case "ftplib.FTP":
new_func = cst.parse_expression("ftplib.FTP_TLS")
self.report_change(original_node)
self.add_needed_import("ftplib")
return updated_node.with_changes(func=new_func)
# Just using ssl.create_default_context() may not be enough for older python versions
# See https://stackoverflow.com/questions/33857698/sending-email-from-python-using-starttls
case "smtplib.SMTP":
# port is the second positional, check that
maybe_port_value = (
original_node.args[1]
if len(original_node.args) >= 2
and original_node.args[1].keyword is None
else None
)
# find port keyword, if any
maybe_port_value = maybe_port_value or next(
iter(
[
a
for a in original_node.args
if a.keyword and a.keyword.value == "port"
]
),
None,
)
maybe_port_value = (
maybe_port_value.value if maybe_port_value else None
)
match maybe_port_value:
case None:
return self._change_to_smtp_ssl(original_node, updated_node)
case cst.Integer() if maybe_port_value.value == "0":
return self._change_to_smtp_ssl(original_node, updated_node)
return updated_node

def _change_to_smtp_ssl(self, original_node, updated_node):
# remember this node so we don't add the starttls
new_func = cst.parse_expression("smtplib.SMTP_SSL")

context_name = self.generate_available_name(original_node, ["smtp_context"])
self.nodes_memory_with_context_name[original_node] = context_name

new_args = [
*original_node.args,
cst.Arg(
keyword=cst.Name("context"),
value=cst.Name(context_name),
),
]
return updated_node.with_changes(func=new_func, args=new_args)

def leave_SimpleString(
self, original_node: cst.SimpleString, updated_node: cst.SimpleString
) -> cst.BaseExpression:
if self.node_is_selected(original_node):
match original_node.raw_value:
case original_node.raw_value as s if s.startswith("http"):
self.report_change(original_node)
return updated_node.with_changes(
value=original_node.prefix
+ original_node.quote
+ s.replace("http", "https", 1)
+ original_node.quote
)
case original_node.raw_value as s if s.startswith("ftp"):
self.report_change(original_node)
return updated_node.with_changes(
value=original_node.prefix
+ original_node.quote
+ s.replace("ftp", "sftp", 1)
+ original_node.quote
)
return updated_node


SonarUseSecureProtocols = SonarCodemod(
metadata=Metadata(
name="use-secure-protocols",
summary="Use encrypted protocols instead of clear-text",
review_guidance=ReviewGuidance.MERGE_AFTER_CURSORY_REVIEW,
references=[
Reference(
url="https://docs.python.org/3/library/ftplib.html#ftplib.FTP_TLS"
),
Reference(
url="https://docs.python.org/3/library/smtplib.html#smtplib.SMTP.starttls"
),
Reference(url="https://owasp.org/Top10/A02_2021-Cryptographic_Failures/"),
Reference(
url="https://owasp.org/www-project-top-ten/2017/A3_2017-Sensitive_Data_Exposure"
),
Reference(url="https://cwe.mitre.org/data/definitions/200"),
Reference(url="https://cwe.mitre.org/data/definitions/319"),
]
+ [Reference(url=tr.url or "", description=tr.name) for tr in rules],
tool=ToolMetadata(
name="Sonar",
rules=rules,
),
),
transformer=LibcstTransformerPipeline(SonarUseSecureProtocolsTransformer),
default_extensions=[".py"],
requested_rules=[tr.id for tr in rules],
)
Loading
Loading