Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions aikido_zen/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ def protect(mode="daemon", token=""):

import aikido_zen.sinks.builtins
import aikido_zen.sinks.os
import aikido_zen.sinks.pathlib
import aikido_zen.sinks.shutil
import aikido_zen.sinks.io
import aikido_zen.sinks.http_client
Expand Down
29 changes: 29 additions & 0 deletions aikido_zen/sinks/pathlib.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
"""
Sink module for python's `pathlib`
"""

import aikido_zen.vulnerabilities as vulns
from aikido_zen.helpers.get_argument import get_argument
from aikido_zen.helpers.register_call import register_call
from aikido_zen.sinks import before, patch_function, on_import


@before
def _pathlib_truediv_patch(func, instance, args, kwargs):
path = get_argument(args, kwargs, 0, "key")
op = "pathlib.PurePath.__truediv__"
register_call(op, "fs_op")

vulns.run_vulnerability_scan(kind="path_traversal", op=op, args=(path,))


@on_import("pathlib")
def patch(m):
"""
patching module pathlib
- patches PurePath.__truediv__ : Path() / Path() -> join operation
"""

# PurePath() / "my/path/test.txt"
# This is accomplished by overloading the __truediv__ function on the Path class
patch_function(m, "PurePath.__truediv__", _pathlib_truediv_patch)
66 changes: 65 additions & 1 deletion aikido_zen/sinks/tests/os_test.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,45 @@
import pytest
from pathlib import Path, PurePath
from unittest.mock import patch
import aikido_zen.sinks.os
import aikido_zen

aikido_zen.protect()
from aikido_zen.context import Context
from aikido_zen.errors import AikidoPathTraversal
from aikido_zen.sinks.tests.clickhouse_driver_test import set_blocking_to_true

kind = "path_traversal"


def set_context(param):
wsgi_request = {
"REQUEST_METHOD": "GET",
"HTTP_HEADER_1": "header 1 value",
"HTTP_HEADER_2": "Header 2 value",
"RANDOM_VALUE": "Random value",
"HTTP_COOKIE": "sessionId=abc123xyz456;",
"wsgi.url_scheme": "http",
"HTTP_HOST": "localhost:8080",
"PATH_INFO": "/hello",
"QUERY_STRING": "user=JohnDoe&age=30&age=35",
"CONTENT_TYPE": "application/json",
"REMOTE_ADDR": "198.51.100.23",
}
context = Context(
req=wsgi_request,
body={
"param": param,
},
source="flask",
)
context.set_as_current_context()


@pytest.fixture(autouse=True)
def set_blocking_to_true(monkeypatch):
monkeypatch.setenv("AIKIDO_BLOCK", "1")


def test_ospath_commands():
with patch(
"aikido_zen.vulnerabilities.run_vulnerability_scan"
Expand Down Expand Up @@ -39,6 +73,36 @@ def test_ospath_commands():
mock_run_vulnerability_scan.assert_any_call(kind=kind, op=op, args=args)


def test_os_create_path_with_multiple_slashes():
import os

file_path = "////etc/passwd"
set_context(file_path)
with pytest.raises(AikidoPathTraversal):
full_path = Path("flaskr/resources/blogs/") / file_path
open(full_path, "r").close()


def test_os_create_path_with_multiple_double_slashes():
import os

file_path = "////etc//passwd"
set_context(file_path)
with pytest.raises(AikidoPathTraversal):
full_path = Path("flaskr/resources/blogs/") / file_path
open(full_path, "r").close()


def test_os_path_traversal_with_multiple_slashes():
import os

file_path = "home///..////..////my_secret.txt"
set_context(file_path)
with pytest.raises(AikidoPathTraversal):
full_path = Path("flaskr/resources/blogs/") / file_path
open(full_path, "r").close()


def test_ospath_command_absolute_path():
with patch(
"aikido_zen.vulnerabilities.run_vulnerability_scan"
Expand Down
14 changes: 10 additions & 4 deletions aikido_zen/vulnerabilities/path_traversal/unsafe_path_start.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,19 @@

def starts_with_unsafe_path(file_path, user_input):
"""Check if the file path starts with any dangerous paths and the user input."""
lower_case_path = file_path.lower()
lower_case_user_input = user_input.lower()
path_parsed = ensure_one_leading_slash(file_path.lower())
input_parsed = ensure_one_leading_slash(user_input.lower())

for dangerous_start in dangerous_path_starts:
if lower_case_path.startswith(dangerous_start) and lower_case_path.startswith(
lower_case_user_input
if path_parsed.startswith(dangerous_start) and path_parsed.startswith(
input_parsed
):
return True

return False


def ensure_one_leading_slash(path: str) -> str:
if path.startswith("/"):
return "/" + path.lstrip("/")
return path