11# Copyright (c) Saga Inc.
22# Distributed under the terms of the GNU Affero General Public License v3.0 License.
33
4- from typing import Any , Final , List , Optional
4+ from typing import Any , Final , List , Optional , cast
55import os
6+ import json
67from mito_ai .utils .schema import MITO_FOLDER
78
89RULES_DIR_PATH : Final [str ] = os .path .join (MITO_FOLDER , 'rules' )
10+ RULES_METADATA_FILENAME : Final [str ] = '_metadata.json'
11+
12+
13+ def _sanitize_rule_name (rule_name : str ) -> str :
14+ """
15+ Sanitizes a rule name to prevent path traversal attacks.
16+ Raises ValueError if the rule name contains unsafe characters.
17+
18+ Args:
19+ rule_name: The rule name to sanitize
20+
21+ Returns:
22+ The sanitized rule name (with .md extension stripped if present)
23+
24+ Raises:
25+ ValueError: If the rule name contains path traversal sequences or other unsafe characters
26+ """
27+ if not rule_name :
28+ raise ValueError ("Rule name cannot be empty" )
29+
30+ # Strip .md extension if present
31+ if rule_name .endswith ('.md' ):
32+ rule_name = rule_name [:- 3 ]
33+
34+ # Check for path traversal sequences
35+ if '..' in rule_name or '/' in rule_name or '\\ ' in rule_name :
36+ raise ValueError (f"Rule name contains invalid characters: { rule_name } " )
37+
38+ # Check for absolute paths
39+ if os .path .isabs (rule_name ):
40+ raise ValueError (f"Rule name cannot be an absolute path: { rule_name } " )
41+
42+ # Check for null bytes or other control characters
43+ if '\x00 ' in rule_name :
44+ raise ValueError ("Rule name cannot contain null bytes" )
45+
46+ # Ensure it's a valid filename (no reserved characters on Windows)
47+ # Windows reserved: < > : " | ? *
48+ invalid_chars = set ('<>:|?*"' )
49+ if any (c in rule_name for c in invalid_chars ):
50+ raise ValueError (f"Rule name contains invalid filename characters: { rule_name } " )
51+
52+ return rule_name
53+
54+
55+ def _validate_rule_path (file_path : str , rule_name : str ) -> None :
56+ """
57+ Validates that a rule file path is within the rules directory.
58+ This provides defense-in-depth protection against path traversal attacks.
59+
60+ Args:
61+ file_path: The file path to validate
62+ rule_name: The rule name (for error messages)
63+
64+ Raises:
65+ ValueError: If the resolved path is outside RULES_DIR_PATH
66+ """
67+ resolved_path = os .path .abspath (file_path )
68+ rules_dir_abs = os .path .abspath (RULES_DIR_PATH )
69+ if not resolved_path .startswith (rules_dir_abs ):
70+ raise ValueError (f"Invalid rule name: { rule_name } " )
71+
72+
73+ def _get_metadata_path () -> str :
74+ return os .path .join (RULES_DIR_PATH , RULES_METADATA_FILENAME )
75+
76+
77+ def _load_metadata () -> dict [Any , Any ]:
78+ path = _get_metadata_path ()
79+ if not os .path .exists (path ):
80+ return {}
81+ try :
82+ with open (path , 'r' ) as f :
83+ return cast (dict [Any , Any ], json .load (f ))
84+ except (json .JSONDecodeError , OSError ):
85+ return {}
86+
87+
88+ def _save_metadata (metadata : dict ) -> None :
89+ if not os .path .exists (RULES_DIR_PATH ):
90+ os .makedirs (RULES_DIR_PATH )
91+ path = _get_metadata_path ()
92+ with open (path , 'w' ) as f :
93+ json .dump (metadata , f , indent = 2 )
94+
95+
96+ def get_rule_default (rule_name : str ) -> bool :
97+ """Returns whether the rule is marked as a default (auto-applied) rule."""
98+ if rule_name .endswith ('.md' ):
99+ rule_name = rule_name [:- 3 ]
100+ metadata = _load_metadata ()
101+ entry = metadata .get (rule_name , {})
102+ return bool (entry .get ('is_default' , False ))
103+
104+
105+ def set_rule_default (rule_name : str , is_default : bool ) -> None :
106+ """Sets whether the rule is a default (auto-applied) rule."""
107+ if rule_name .endswith ('.md' ):
108+ rule_name = rule_name [:- 3 ]
109+ metadata = _load_metadata ()
110+ metadata [rule_name ] = {** metadata .get (rule_name , {}), 'is_default' : is_default }
111+ _save_metadata (metadata )
112+
9113
10114def set_rules_file (rule_name : str , value : Any ) -> None :
11115 """
12116 Updates the value of a specific rule file in the rules directory
13117 """
118+ # Sanitize rule name to prevent path traversal
119+ rule_name = _sanitize_rule_name (rule_name )
120+
14121 # Ensure the directory exists
15122 if not os .path .exists (RULES_DIR_PATH ):
16123 os .makedirs (RULES_DIR_PATH )
17-
124+
18125 # Create the file path to the rule name as a .md file
19126 file_path = os .path .join (RULES_DIR_PATH , f"{ rule_name } .md" )
20127
128+ # Additional safety check: ensure the resolved path is still within RULES_DIR_PATH
129+ _validate_rule_path (file_path , rule_name )
130+
21131 with open (file_path , 'w+' ) as f :
22132 f .write (value )
133+
134+
135+ def delete_rule (rule_name : str ) -> None :
136+ """
137+ Deletes a rule file from the rules directory. Normalizes rule_name (strips .md).
138+ Metadata for this rule is removed by cleanup_rules_metadata().
139+ """
140+ # Sanitize rule name to prevent path traversal
141+ rule_name = _sanitize_rule_name (rule_name )
142+
143+ file_path = os .path .join (RULES_DIR_PATH , f"{ rule_name } .md" )
144+
145+ # Additional safety check: ensure the resolved path is still within RULES_DIR_PATH
146+ _validate_rule_path (file_path , rule_name )
23147
148+ if os .path .exists (file_path ):
149+ os .remove (file_path )
150+
24151
25152def get_rule (rule_name : str ) -> Optional [str ]:
26153 """
27154 Retrieves the value of a specific rule file from the rules directory
28155 """
29-
30- if rule_name .endswith ('.md' ):
31- rule_name = rule_name [:- 3 ]
156+ # Sanitize rule name to prevent path traversal
157+ rule_name = _sanitize_rule_name (rule_name )
32158
33159 file_path = os .path .join (RULES_DIR_PATH , f"{ rule_name } .md" )
34160
161+ # Additional safety check: ensure the resolved path is still within RULES_DIR_PATH
162+ _validate_rule_path (file_path , rule_name )
163+
35164 if not os .path .exists (file_path ):
36165 return None
37166
@@ -47,10 +176,45 @@ def get_all_rules() -> List[str]:
47176 if not os .path .exists (RULES_DIR_PATH ):
48177 os .makedirs (RULES_DIR_PATH )
49178 return [] # Return empty list if directory didn't exist
50-
179+
51180 try :
52181 return [f for f in os .listdir (RULES_DIR_PATH ) if f .endswith ('.md' )]
53182 except OSError as e :
54183 # Log the error if needed and return empty list
55184 print (f"Error reading rules directory: { e } " )
56185 return []
186+
187+
188+ def cleanup_rules_metadata () -> None :
189+ """
190+ Removes metadata entries for rules that no longer exist on disk (deleted or renamed).
191+ Call after rule create/update so metadata stays in sync with actual rule files.
192+ """
193+ current_files = get_all_rules ()
194+ current_rule_names = {f [:- 3 ] if f .endswith ('.md' ) else f for f in current_files }
195+ metadata = _load_metadata ()
196+ if not metadata :
197+ return
198+ keys_to_remove = [k for k in metadata if k not in current_rule_names ]
199+ if not keys_to_remove :
200+ return
201+ for k in keys_to_remove :
202+ del metadata [k ]
203+ _save_metadata (metadata )
204+
205+
206+ def get_default_rules_content () -> str :
207+ """
208+ Returns the concatenated content of all rules marked as default (auto-applied).
209+ Each rule is included as "Rule name:\n \n {content}". Returns empty string if no default rules.
210+ """
211+ rule_files = get_all_rules ()
212+ parts : List [str ] = []
213+ for f in rule_files :
214+ rule_name = f [:- 3 ] if f .endswith ('.md' ) else f
215+ if not get_rule_default (rule_name ):
216+ continue
217+ content = get_rule (rule_name )
218+ if content and content .strip ():
219+ parts .append (f"{ rule_name } :\n \n { content } " )
220+ return '\n \n ' .join (parts ) if parts else ""
0 commit comments