Skip to content

Commit 60ff4b5

Browse files
committed
Enhance error handling and add input validation in pipeline generator
- Updated exception handling in NiftiWriter and VLMResultsWriter operators to catch specific exceptions instead of a general catch-all. - Introduced a new method in AppGenerator to sanitize strings for valid Python identifiers, improving security against code injection. - Added validation for model IDs to ensure only safe characters are accepted, preventing potential security vulnerabilities. - Implemented unit tests to verify the correctness of the new sanitization and validation features. Signed-off-by: [Your Name] <[email protected]> Signed-off-by: Victor Chang <[email protected]>
1 parent 46f921c commit 60ff4b5

File tree

6 files changed

+151
-8
lines changed

6 files changed

+151
-8
lines changed

monai/deploy/operators/nifti_writer_operator.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ def compute(self, op_input, op_output, context):
7676
filename = None
7777
try:
7878
filename = op_input.receive(self.input_name_filename)
79-
except:
79+
except Exception:
8080
pass
8181

8282
if image is None:

monai/deploy/operators/vlm_results_writer_operator.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,5 +163,5 @@ def compute(self, op_input, op_output, context):
163163
f,
164164
indent=2,
165165
)
166-
except:
166+
except Exception:
167167
pass

tools/pipeline-generator/pipeline_generator/generator/app_generator.py

Lines changed: 43 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,32 @@
2626
class AppGenerator:
2727
"""Generates MONAI Deploy applications from MONAI Bundles."""
2828

29+
@staticmethod
30+
def _sanitize_for_python_identifier(name: str) -> str:
31+
"""Sanitize a string to be a valid Python identifier.
32+
33+
Args:
34+
name: String to sanitize
35+
36+
Returns:
37+
Valid Python identifier
38+
"""
39+
# Replace invalid characters with underscores
40+
sanitized = "".join(c if c.isalnum() or c == "_" else "_" for c in name)
41+
42+
# Remove leading/trailing underscores
43+
sanitized = sanitized.strip("_")
44+
45+
# Ensure it doesn't start with a digit
46+
if sanitized and sanitized[0].isdigit():
47+
sanitized = f"_{sanitized}"
48+
49+
# Ensure it's not empty (all chars were invalid)
50+
if not sanitized:
51+
sanitized = "app"
52+
53+
return sanitized
54+
2955
def __init__(self, settings: Optional[Settings] = None) -> None:
3056
"""Initialize the generator.
3157
@@ -41,7 +67,11 @@ def __init__(self, settings: Optional[Settings] = None) -> None:
4167
loader=FileSystemLoader(str(template_dir)),
4268
trim_blocks=True,
4369
lstrip_blocks=True,
44-
autoescape=False,
70+
# Autoescape is intentionally disabled because we're generating
71+
# Python code, YAML, and other non-HTML files. HTML escaping would
72+
# break the generated code. Security is handled via input validation
73+
# in generate_app() method.
74+
autoescape=False, # nosec B701
4575
)
4676

4777
def generate_app(
@@ -62,6 +92,10 @@ def generate_app(
6292
Returns:
6393
Path to the generated application directory
6494
"""
95+
# Validate model_id to prevent code injection
96+
if not model_id or not all(c.isalnum() or c in "/-_" for c in model_id):
97+
raise ValueError(f"Invalid model_id: {model_id}. Only alphanumeric characters, /, -, and _ are allowed.")
98+
6599
# Create output directory
66100
output_dir.mkdir(parents=True, exist_ok=True)
67101

@@ -158,9 +192,14 @@ def _prepare_context(
158192

159193
# Determine app name
160194
if not app_name:
161-
# Sanitize name to ensure valid Python identifier
162-
sanitized_name = "".join(c if c.isalnum() else "" for c in model_short_name.title())
163-
app_name = f"{sanitized_name}App" if sanitized_name else "GeneratedApp"
195+
# For auto-generated names, apply title case after replacing underscores
196+
# This ensures "test_model" becomes "TestModel" not "Test_Model"
197+
title_name = model_short_name.replace("_", " ").replace("-", " ").title().replace(" ", "")
198+
sanitized_name = self._sanitize_for_python_identifier(title_name)
199+
app_name = f"{sanitized_name}App"
200+
else:
201+
# Ensure user-provided app_name is also a valid Python identifier
202+
app_name = self._sanitize_for_python_identifier(app_name)
164203

165204
# Determine task type from metadata
166205
task = metadata.get("task", "segmentation").lower()

tools/pipeline-generator/pipeline_generator/generator/bundle_downloader.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@ def download_bundle(self, model_id: str, output_dir: Path, cache_dir: Optional[P
5151
repo_id=model_id,
5252
local_dir=bundle_dir,
5353
cache_dir=cache_dir,
54-
local_dir_use_symlinks=False, # Copy files instead of symlinks
5554
)
5655

5756
logger.info(f"Bundle downloaded to: {local_path}")

tools/pipeline-generator/tests/test_bundle_downloader.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@ def test_download_bundle_success(self, mock_snapshot_download, tmp_path):
4141
repo_id="MONAI/spleen_ct_segmentation",
4242
local_dir=output_dir / "model",
4343
cache_dir=cache_dir,
44-
local_dir_use_symlinks=False,
4544
)
4645

4746
@patch("pipeline_generator.generator.bundle_downloader.snapshot_download")
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
"""Test security features of the pipeline generator."""
2+
3+
import pytest
4+
from pathlib import Path
5+
6+
from pipeline_generator.generator.app_generator import AppGenerator
7+
8+
9+
class TestSecurity:
10+
"""Test security measures in the app generator."""
11+
12+
def test_model_id_validation(self):
13+
"""Test that invalid model IDs are rejected."""
14+
generator = AppGenerator()
15+
output_dir = Path("/tmp/test")
16+
17+
# Valid model IDs
18+
valid_ids = [
19+
"MONAI/spleen_ct_segmentation",
20+
"test-org/model_name",
21+
"user/model-with-dashes",
22+
"org/model_with_underscores",
23+
]
24+
25+
# Invalid model IDs that could cause code injection
26+
invalid_ids = [
27+
"test; rm -rf /", # Shell command injection
28+
"test' OR '1'='1", # SQL injection style
29+
"test<script>alert('xss')</script>", # HTML/JS injection
30+
"test`echo hacked`", # Command substitution
31+
"test$(rm -rf /)", # Command substitution
32+
"test\" + __import__('os').system('ls') + \"", # Python injection
33+
"", # Empty
34+
None, # None
35+
]
36+
37+
# Test valid IDs (should not raise)
38+
for model_id in valid_ids:
39+
# We're just testing validation, not full generation
40+
try:
41+
# This will fail at download stage, but validation should pass
42+
generator.generate_app(model_id, output_dir)
43+
except ValueError as e:
44+
if "Invalid model_id" in str(e):
45+
pytest.fail(f"Valid model_id '{model_id}' was rejected: {e}")
46+
# Other errors are fine (e.g., download failures)
47+
48+
# Test invalid IDs (should raise ValueError)
49+
for model_id in invalid_ids:
50+
if model_id is None:
51+
continue # Skip None test as it would fail at type checking
52+
with pytest.raises(ValueError, match="Invalid model_id"):
53+
generator.generate_app(model_id, output_dir)
54+
55+
def test_app_name_sanitization(self):
56+
"""Test that app names are properly sanitized for Python identifiers."""
57+
# Test cases mapping input to expected sanitized output
58+
test_cases = [
59+
("test; rm -rf /", "test__rm__rfApp"), # Multiple special chars become underscores
60+
("test-with-dashes", "test_with_dashesApp"),
61+
("test.with.dots", "test_with_dotsApp"),
62+
("test space", "test_spaceApp"),
63+
("123test", "_123testApp"), # Starting with digit
64+
("Test", "TestApp"), # Normal case
65+
]
66+
67+
for input_name, expected_class_name in test_cases:
68+
# The AppGenerator will sanitize the name internally
69+
# We test the sanitization function directly
70+
sanitized = AppGenerator._sanitize_for_python_identifier(input_name)
71+
result_with_app = f"{sanitized}App"
72+
assert (
73+
result_with_app == expected_class_name
74+
), f"Failed for '{input_name}': got '{result_with_app}', expected '{expected_class_name}'"
75+
76+
def test_sanitize_for_python_identifier(self):
77+
"""Test the Python identifier sanitization method."""
78+
test_cases = [
79+
("normal_name", "normal_name"),
80+
("name-with-dashes", "name_with_dashes"),
81+
("name.with.dots", "name_with_dots"),
82+
("name with spaces", "name_with_spaces"),
83+
("123name", "_123name"), # Can't start with digit
84+
("", "app"), # Empty string
85+
("!@#$%", "app"), # All invalid chars
86+
("name!@#valid", "name___valid"),
87+
("CamelCase", "CamelCase"), # Preserve case
88+
]
89+
90+
for input_str, expected in test_cases:
91+
result = AppGenerator._sanitize_for_python_identifier(input_str)
92+
assert result == expected, f"Failed for '{input_str}': got '{result}', expected '{expected}'"
93+
94+
def test_no_autoescape_with_comment(self):
95+
"""Test that autoescape is disabled with proper documentation."""
96+
generator = AppGenerator()
97+
98+
# Verify autoescape is False
99+
assert generator.env.autoescape is False
100+
101+
# The comment explaining why is in the source code
102+
# This test just verifies the runtime behavior
103+
104+
105+
if __name__ == "__main__":
106+
pytest.main([__file__, "-v"])

0 commit comments

Comments
 (0)