diff --git a/integration_tests/sonar/test_sonar_use_secure_protocols.py b/integration_tests/sonar/test_sonar_use_secure_protocols.py new file mode 100644 index 00000000..e7acef9a --- /dev/null +++ b/integration_tests/sonar/test_sonar_use_secure_protocols.py @@ -0,0 +1,30 @@ +from codemodder.codemods.test import SonarIntegrationTest +from core_codemods.sonar.sonar_use_secure_protocols import ( + SonarUseSecureProtocols, + SonarUseSecureProtocolsTransformer, +) + + +class TestSonarUseSecureProtocols(SonarIntegrationTest): + codemod = SonarUseSecureProtocols + code_path = "tests/samples/use_secure_protocols.py" + replacement_lines = [ + ( + 5, + """url = "https://example.com"\n""", + ), + ] + # fmt: off + expected_diff = ( + """--- \n""" + """+++ \n""" + """@@ -2,4 +2,4 @@\n""" + ''' import smtplib\n''' + ''' import telnetlib\n''' + ''' \n''' + '''-url = "http://example.com"\n''' + '''+url = "https://example.com"\n''' + ) + # fmt: on + expected_line_change = "5" + change_description = SonarUseSecureProtocolsTransformer.change_description diff --git a/src/codemodder/scripts/generate_docs.py b/src/codemodder/scripts/generate_docs.py index 63a0095c..86965e6c 100644 --- a/src/codemodder/scripts/generate_docs.py +++ b/src/codemodder/scripts/generate_docs.py @@ -334,6 +334,11 @@ class DocMetadata: guidance_explained="Our change provides the most secure way to create cookies in Flask. However, it's possible you have configured your Flask application configurations to use secure cookies. In these cases, using the default parameters for `set_cookie` is safe.", need_sarif="Yes (Sonar)", ), + "use-secure-protocols": DocMetadata( + importance="High", + guidance_explained="While secure protocols are widely supported by a variety of application and server software, it may require explicit configuration to support those protocols.", + need_sarif="Yes (Sonar)", + ), } SEMGREP_CODEMOD_NAMES = [ diff --git a/src/core_codemods/__init__.py b/src/core_codemods/__init__.py index 0ae2c15c..047bf881 100644 --- a/src/core_codemods/__init__.py +++ b/src/core_codemods/__init__.py @@ -95,6 +95,7 @@ from .sonar.sonar_tempfile_mktemp import SonarTempfileMktemp from .sonar.sonar_timezone_aware_datetime import SonarTimezoneAwareDatetime from .sonar.sonar_url_sandbox import SonarUrlSandbox +from .sonar.sonar_use_secure_protocols import SonarUseSecureProtocols from .sql_parameterization import SQLQueryParameterization from .str_concat_in_seq_literal import StrConcatInSeqLiteral from .subprocess_shell_false import SubprocessShellFalse @@ -206,6 +207,7 @@ SonarTimezoneAwareDatetime, SonarSandboxProcessCreation, SonarSecureCookie, + SonarUseSecureProtocols, ], ) diff --git a/src/core_codemods/docs/sonar_python_use-secure-protocols.md b/src/core_codemods/docs/sonar_python_use-secure-protocols.md new file mode 100644 index 00000000..79f38f39 --- /dev/null +++ b/src/core_codemods/docs/sonar_python_use-secure-protocols.md @@ -0,0 +1,24 @@ +Communication using clear-text protocols allows an attacker to sniff or tamper with the transported data. + +This codemod will replace any detected clear text protocol with their cryptographic enabled version. + +Our changes look like the following: +```diff +- url = "http://example.com" ++ url = "https://example.com" + +- ftp_con = ftplib.FTP("ftp.example.com") ++ ftp_con = ftplib.FTP_TLS("ftp.example.com") ++ smtp_context = ssl.create_default_context() ++ smtp_context.verify_mode = ssl.CERT_REQUIRED ++ smtp_context.check_hostname = True +smtp_con = smtplib.SMTP("smtp.example.com", port=587) ++ smtp.starttls(context=smtp_context) + + ++ smtp_context_1 = ssl.create_default_context() ++ smtp_context_1.verify_mode = ssl.CERT_REQUIRED ++ smtp_context_1.check_hostname = True +- smtp_con_2 = smtplib.SMTP("smtp.gmail.com") ++ smtp_con_2 = smtplib.SMTP_SSL("smtp.gmail.com", context=smtp_context_1) +``` diff --git a/src/core_codemods/sonar/api.py b/src/core_codemods/sonar/api.py index 5688770d..ef1f7099 100644 --- a/src/core_codemods/sonar/api.py +++ b/src/core_codemods/sonar/api.py @@ -10,6 +10,25 @@ class SonarCodemod(SASTCodemod): + + def __init__( + self, + *, + metadata: Metadata, + transformer: BaseTransformerPipeline, + default_extensions: list[str] | None = None, + requested_rules: list[str] | None = None, + provider: str | None = None, + ): + super().__init__( + metadata=metadata, + detector=SonarDetector(), + transformer=transformer, + default_extensions=default_extensions, + requested_rules=requested_rules, + provider=provider, + ) + @property def origin(self): return "sonar" @@ -38,7 +57,6 @@ def from_core_codemod_with_multiple_rules( ), ), transformer=transformer if transformer else other.transformer, - detector=SonarDetector(), default_extensions=other.default_extensions, requested_rules=[tr.id for tr in rules], ) diff --git a/src/core_codemods/sonar/sonar_use_secure_protocols.py b/src/core_codemods/sonar/sonar_use_secure_protocols.py new file mode 100644 index 00000000..869be3e4 --- /dev/null +++ b/src/core_codemods/sonar/sonar_use_secure_protocols.py @@ -0,0 +1,206 @@ +import libcst as cst +from libcst.codemod import CodemodContext + +from codemodder.codemods.base_codemod import ( + Metadata, + ReviewGuidance, + ToolMetadata, + ToolRule, +) +from codemodder.codemods.libcst_transformer import ( + LibcstResultTransformer, + LibcstTransformerPipeline, +) +from codemodder.codemods.utils_mixin import NameAndAncestorResolutionMixin +from codemodder.codetf import Reference +from codemodder.file_context import FileContext +from codemodder.result import Result +from core_codemods.sonar.api import SonarCodemod + +rules = [ + ToolRule( + id="python:S5332", + name="Using clear-text protocols is security-sensitive", + url="https://rules.sonarsource.com/python/RSPEC-5332/", + ), +] + + +class SonarUseSecureProtocolsTransformer( + LibcstResultTransformer, NameAndAncestorResolutionMixin +): + change_description = "Modified URLs or calls to use secure protocols" + + def __init__( + self, + context: CodemodContext, + results: list[Result] | None, + file_context: FileContext, + _transformer: bool = False, + ): + self.nodes_memory_with_context_name: dict[cst.CSTNode, str] = {} + super().__init__(context, results, file_context, _transformer) + + def _match_and_handle_statement( + self, possible_smtp_call, original_node_statement, updated_node_statement + ): + maybe_name = self.find_base_name(possible_smtp_call) + match possible_smtp_call: + case cst.Call() if maybe_name == "smtplib.SMTP": + # get the stored context_name or create a new one: + if possible_smtp_call in self.nodes_memory_with_context_name: + context_name = self.nodes_memory_with_context_name[ + possible_smtp_call + ] + else: + context_name = self.generate_available_name( + original_node_statement, ["smtp_context"] + ) + + new_statements = [] + new_statements.append( + cst.parse_statement( + f"{context_name} = ssl.create_default_context()" + ) + ) + new_statements.append( + cst.parse_statement( + f"{context_name}.verify_mode = ssl.CERT_REQUIRED" + ) + ) + new_statements.append( + cst.parse_statement(f"{context_name}.check_hostname = True") + ) + new_statements.append(updated_node_statement) + # don't append this if we changed the call to SSL version + if possible_smtp_call in self.nodes_memory_with_context_name: + self.nodes_memory_with_context_name.pop(possible_smtp_call) + else: + new_statements.append( + cst.parse_statement(f"smtplib.starttls(context={context_name})") + ) + self.add_needed_import("smtplib") + self.add_needed_import("ssl") + self.report_change(possible_smtp_call) + return cst.FlattenSentinel(new_statements) + return updated_node_statement + + def leave_SimpleStatementLine(self, original_node, updated_node): + match original_node.body: + # match the first statement that is either selected or is an assignment whose value is selected + case [cst.Assign() as a, *_] if self.node_is_selected(a.value): + return self._match_and_handle_statement( + a.value, original_node, updated_node + ) + case [s, *_] if self.node_is_selected(s): + return self._match_and_handle_statement(s, original_node, updated_node) + return updated_node + + def leave_Call(self, original_node, updated_node): + if self.node_is_selected(original_node): + match self.find_base_name(original_node): + case "ftplib.FTP": + new_func = cst.parse_expression("ftplib.FTP_TLS") + self.report_change(original_node) + self.add_needed_import("ftplib") + return updated_node.with_changes(func=new_func) + # Just using ssl.create_default_context() may not be enough for older python versions + # See https://stackoverflow.com/questions/33857698/sending-email-from-python-using-starttls + case "smtplib.SMTP": + # port is the second positional, check that + maybe_port_value = ( + original_node.args[1] + if len(original_node.args) >= 2 + and original_node.args[1].keyword is None + else None + ) + # find port keyword, if any + maybe_port_value = maybe_port_value or next( + iter( + [ + a + for a in original_node.args + if a.keyword and a.keyword.value == "port" + ] + ), + None, + ) + maybe_port_value = ( + maybe_port_value.value if maybe_port_value else None + ) + match maybe_port_value: + case None: + return self._change_to_smtp_ssl(original_node, updated_node) + case cst.Integer() if maybe_port_value.value == "0": + return self._change_to_smtp_ssl(original_node, updated_node) + return updated_node + + def _change_to_smtp_ssl(self, original_node, updated_node): + # remember this node so we don't add the starttls + new_func = cst.parse_expression("smtplib.SMTP_SSL") + + context_name = self.generate_available_name(original_node, ["smtp_context"]) + self.nodes_memory_with_context_name[original_node] = context_name + + new_args = [ + *original_node.args, + cst.Arg( + keyword=cst.Name("context"), + value=cst.Name(context_name), + ), + ] + return updated_node.with_changes(func=new_func, args=new_args) + + def leave_SimpleString( + self, original_node: cst.SimpleString, updated_node: cst.SimpleString + ) -> cst.BaseExpression: + if self.node_is_selected(original_node): + match original_node.raw_value: + case original_node.raw_value as s if s.startswith("http"): + self.report_change(original_node) + return updated_node.with_changes( + value=original_node.prefix + + original_node.quote + + s.replace("http", "https", 1) + + original_node.quote + ) + case original_node.raw_value as s if s.startswith("ftp"): + self.report_change(original_node) + return updated_node.with_changes( + value=original_node.prefix + + original_node.quote + + s.replace("ftp", "sftp", 1) + + original_node.quote + ) + return updated_node + + +SonarUseSecureProtocols = SonarCodemod( + metadata=Metadata( + name="use-secure-protocols", + summary="Use encrypted protocols instead of clear-text", + review_guidance=ReviewGuidance.MERGE_AFTER_CURSORY_REVIEW, + references=[ + Reference( + url="https://docs.python.org/3/library/ftplib.html#ftplib.FTP_TLS" + ), + Reference( + url="https://docs.python.org/3/library/smtplib.html#smtplib.SMTP.starttls" + ), + Reference(url="https://owasp.org/Top10/A02_2021-Cryptographic_Failures/"), + Reference( + url="https://owasp.org/www-project-top-ten/2017/A3_2017-Sensitive_Data_Exposure" + ), + Reference(url="https://cwe.mitre.org/data/definitions/200"), + Reference(url="https://cwe.mitre.org/data/definitions/319"), + ] + + [Reference(url=tr.url or "", description=tr.name) for tr in rules], + tool=ToolMetadata( + name="Sonar", + rules=rules, + ), + ), + transformer=LibcstTransformerPipeline(SonarUseSecureProtocolsTransformer), + default_extensions=[".py"], + requested_rules=[tr.id for tr in rules], +) diff --git a/tests/codemods/sonar/test_sonar_use_secure_protocols.py b/tests/codemods/sonar/test_sonar_use_secure_protocols.py new file mode 100644 index 00000000..3d77eae5 --- /dev/null +++ b/tests/codemods/sonar/test_sonar_use_secure_protocols.py @@ -0,0 +1,133 @@ +import json + +from codemodder.codemods.test import BaseSASTCodemodTest +from core_codemods.sonar.sonar_use_secure_protocols import SonarUseSecureProtocols + + +class TestSonarUseSecureProtocols(BaseSASTCodemodTest): + codemod = SonarUseSecureProtocols + tool = "sonar" + + def test_name(self): + assert self.codemod.name == "use-secure-protocols" + + def test_replace_in_strings(self, tmpdir): + input_code = """\ + url = "http://example.com" + """ + expected = """\ + url = "https://example.com" + """ + issues = { + "hotspots": [ + { + "component": "code.py", + "status": "TO_REVIEW", + "textRange": { + "startLine": 1, + "endLine": 1, + "startOffset": 6, + "endOffset": 26, + }, + "ruleKey": "python:S5332", + }, + ], + } + self.run_and_assert( + tmpdir, input_code, expected, results=json.dumps(issues), num_changes=1 + ) + + def test_ftp_call(self, tmpdir): + input_code = """\ + import ftplib + ftp_con = ftplib.FTP("ftp.example.com") + """ + expected = """\ + import ftplib + ftp_con = ftplib.FTP_TLS("ftp.example.com") + """ + issues = { + "hotspots": [ + { + "component": "code.py", + "status": "TO_REVIEW", + "textRange": { + "startLine": 2, + "endLine": 2, + "startOffset": 10, + "endOffset": 39, + }, + "ruleKey": "python:S5332", + }, + ], + } + self.run_and_assert( + tmpdir, input_code, expected, results=json.dumps(issues), num_changes=1 + ) + + def test_smtp_call(self, tmpdir): + input_code = """\ + import smtplib + smtp_con = smtplib.SMTP("smtp.example.com", port=587) + """ + expected = """\ + import smtplib + import ssl + + smtp_context = ssl.create_default_context() + smtp_context.verify_mode = ssl.CERT_REQUIRED + smtp_context.check_hostname = True + smtp_con = smtplib.SMTP("smtp.example.com", port=587) + smtplib.starttls(context=smtp_context) + """ + issues = { + "hotspots": [ + { + "component": "code.py", + "status": "TO_REVIEW", + "textRange": { + "startLine": 2, + "endLine": 2, + "startOffset": 11, + "endOffset": 53, + }, + "ruleKey": "python:S5332", + }, + ], + } + self.run_and_assert( + tmpdir, input_code, expected, results=json.dumps(issues), num_changes=1 + ) + + def test_smtp_call_default_port(self, tmpdir): + input_code = """\ + import smtplib + smtp_con = smtplib.SMTP("smtp.example.com", port=0) + """ + expected = """\ + import smtplib + import ssl + + smtp_context = ssl.create_default_context() + smtp_context.verify_mode = ssl.CERT_REQUIRED + smtp_context.check_hostname = True + smtp_con = smtplib.SMTP_SSL("smtp.example.com", port=0, context = smtp_context) + """ + issues = { + "hotspots": [ + { + "component": "code.py", + "status": "TO_REVIEW", + "textRange": { + "startLine": 2, + "endLine": 2, + "startOffset": 11, + "endOffset": 51, + }, + "ruleKey": "python:S5332", + }, + ], + } + self.run_and_assert( + tmpdir, input_code, expected, results=json.dumps(issues), num_changes=1 + ) diff --git a/tests/samples/sonar_hotspots.json b/tests/samples/sonar_hotspots.json index 897220e9..cf79c044 100644 --- a/tests/samples/sonar_hotspots.json +++ b/tests/samples/sonar_hotspots.json @@ -128,6 +128,26 @@ }, "flows": [], "ruleKey": "python:S5247" + }, + { + "key": "AZSN_hIp0UcGAUz9sZqH", + "component": "pixee_codemodder-python:use_secure_protocols.py", + "project": "pixee_codemodder-python", + "securityCategory": "encrypt-data", + "vulnerabilityProbability": "LOW", + "status": "TO_REVIEW", + "line": 5, + "message": "Using http protocol is insecure. Use https instead", + "creationDate": "2025-01-22T13:20:10+0100", + "updateDate": "2025-01-22T13:29:45+0100", + "textRange": { + "startLine": 5, + "endLine": 5, + "startOffset": 6, + "endOffset": 26 + }, + "flows": [], + "ruleKey": "python:S5332" } ], "components": [ diff --git a/tests/samples/use_secure_protocols.py b/tests/samples/use_secure_protocols.py new file mode 100644 index 00000000..9d0f3511 --- /dev/null +++ b/tests/samples/use_secure_protocols.py @@ -0,0 +1,5 @@ +import ftplib +import smtplib +import telnetlib + +url = "http://example.com" diff --git a/tests/test_sonar_results.py b/tests/test_sonar_results.py index 54227d4d..07094d61 100644 --- a/tests/test_sonar_results.py +++ b/tests/test_sonar_results.py @@ -43,7 +43,7 @@ def test_parse_issues_json(): def test_parse_hotspots_json(): results = SonarResultSet.from_json(SAMPLE_DIR / "sonar_hotspots.json") - assert len(results) == 4 + assert len(results) == 5 def test_combined_json(tmpdir): @@ -54,7 +54,7 @@ def test_combined_json(tmpdir): ) results = SonarResultSet.from_json(Path(tmpdir).joinpath("combined.json")) - assert len(results) == 38 + assert len(results) == 39 def test_empty_issues(tmpdir, caplog):