-
Notifications
You must be signed in to change notification settings - Fork 31
feat(auto-annotation): integrate YOLO auto-labeling and enhance data management #223
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| executor_type = get_from_cfg(task_id, "executor_type") | ||
| if not WRAPPERS.get(executor_type).cancel(task_id): | ||
| raise APIException(ErrorCode.CANCEL_TASK_ERROR) | ||
| except Exception as e: |
Check failure
Code scanning / CodeQL
Uncontrolled data used in path expression High
user-provided value
This path depends on a
user-provided value
Show autofix suggestion
Hide autofix suggestion
Copilot Autofix
AI 5 days ago
In general, to fix uncontrolled path usage, you should normalize the path and verify that it stays within a designated safe root directory, and/or constrain the untrusted component (here, task_id) to a safe format. Simply calling os.path.exists on an untrusted path does not provide any security.
For this code, the least intrusive fix is:
- Introduce a constant base directory for flows, e.g.
FLOW_BASE_DIR = "/flow". - Implement a safe path resolution helper that:
- Joins the base directory with a relative path (such as
task_id/process.yaml). - Normalizes the result with
os.path.abspathoros.path.normpath. - Verifies that the normalized path is still inside the base directory (e.g. via
os.path.commonpath). - Optionally rejects absolute
task_idsegments outright.
- Joins the base directory with a relative path (such as
- Update
check_valid_pathso it can optionally enforce that paths are under a given base directory. For uses that should be under/flow, call it with that base. - Use this helper for both:
- The
config_pathcomputed inget_from_cfg. - The
config_pathinsubmit_task.
This removes redundant ad‑hoc string interpolation and ensures consistent validation.
- The
We must keep existing behavior (still read /flow/{task_id}/process.yaml when task_id is safe), but add the containment check and raise the same FILE_NOT_FOUND_ERROR / log errors when the resolved path is invalid or outside the allowed base. No new imports are strictly needed; os.path.commonpath and os.path.join are already available via import os.
-
Copy modified line R20 -
Copy modified line R86 -
Copy modified line R131 -
Copy modified lines R133-R141 -
Copy modified lines R145-R151 -
Copy modified lines R153-R158 -
Copy modified lines R160-R163
| @@ -17,6 +17,7 @@ | ||
|
|
||
| # 日志配置 | ||
| LOG_DIR = "/var/log/datamate/runtime" | ||
| FLOW_BASE_DIR = "/flow" | ||
| os.makedirs(LOG_DIR, exist_ok=True) | ||
| logger.add( | ||
| f"{LOG_DIR}/runtime.log", | ||
| @@ -82,7 +83,7 @@ | ||
|
|
||
| @app.post("/api/task/{task_id}/submit") | ||
| async def submit_task(task_id): | ||
| config_path = f"/flow/{task_id}/process.yaml" | ||
| config_path = get_flow_config_path(task_id) | ||
| logger.info("Start submitting job...") | ||
|
|
||
| dataset_path = get_from_cfg(task_id, "dataset_path") | ||
| @@ -127,17 +128,39 @@ | ||
| return success_json_info | ||
|
|
||
|
|
||
| def check_valid_path(file_path): | ||
| def check_valid_path(file_path, base_dir: str = None): | ||
| full_path = os.path.abspath(file_path) | ||
| if base_dir is not None: | ||
| base_dir_abs = os.path.abspath(base_dir) | ||
| try: | ||
| common = os.path.commonpath([base_dir_abs, full_path]) | ||
| except ValueError: | ||
| # Occurs if paths are on different drives (on Windows) or invalid | ||
| return False | ||
| if common != base_dir_abs: | ||
| return False | ||
| return os.path.exists(full_path) | ||
|
|
||
|
|
||
| def get_from_cfg(task_id, key): | ||
| config_path = f"/flow/{task_id}/process.yaml" | ||
| if not check_valid_path(config_path): | ||
| logger.error(f"config_path is not existed! please check this path.") | ||
| def get_flow_config_path(task_id: str) -> str: | ||
| """ | ||
| Build a safe absolute path to the flow configuration file for a task. | ||
| Ensures the resulting path stays within FLOW_BASE_DIR. | ||
| """ | ||
| # Disallow absolute task_id to avoid bypassing the base directory via join | ||
| if os.path.isabs(task_id): | ||
| raise APIException(ErrorCode.FILE_NOT_FOUND_ERROR) | ||
| relative_path = os.path.join(task_id, "process.yaml") | ||
| full_path = os.path.abspath(os.path.join(FLOW_BASE_DIR, relative_path)) | ||
| if not check_valid_path(full_path, base_dir=FLOW_BASE_DIR): | ||
| logger.error(f"config_path is not existed or invalid! please check this path.") | ||
| raise APIException(ErrorCode.FILE_NOT_FOUND_ERROR) | ||
| return full_path | ||
|
|
||
|
|
||
| def get_from_cfg(task_id, key): | ||
| config_path = get_flow_config_path(task_id) | ||
|
|
||
| with open(config_path, "r", encoding='utf-8') as f: | ||
| content = f.read() | ||
| cfg = yaml.safe_load(content) |
| return success_json_info | ||
|
|
||
|
|
||
| def check_valid_path(file_path): |
Check failure
Code scanning / CodeQL
Uncontrolled data used in path expression High
user-provided value
This path depends on a
user-provided value
Show autofix suggestion
Hide autofix suggestion
Copilot Autofix
AI 5 days ago
To fix this, the path derived from the untrusted task_id must be validated before being used with open() (and indirectly before being used by wrappers that likely read the same config). The goal is to ensure that regardless of the task_id value, the resulting config_path always points to a file within a known safe root directory, and that there is no way to traverse out of that directory using .., absolute paths, or similar tricks.
The best way to do this without changing existing functionality is:
- Define a constant safe root for flow configurations, e.g.
FLOW_ROOT = "/flow". - Replace direct f-string construction of
config_pathwithos.path.join(FLOW_ROOT, task_id, "process.yaml")followed byos.path.normpathto collapse..segments. - Verify that the normalized
config_pathstays withinFLOW_ROOTby checking that it starts withFLOW_ROOT + os.sepor exactly equalsFLOW_ROOT, and reject otherwise (raiseAPIException(ErrorCode.FILE_NOT_FOUND_ERROR)or a more specific error). - Reuse this safe construction in both
submit_task(whereconfig_pathis passed to wrappers) andget_from_cfg(whereconfig_pathis opened and read). - Optionally, further restrict
task_idto a safe filename pattern (e.g., alphanumerics,-,_) using a regex to reduce attack surface.
Concretely, in runtime/python-executor/datamate/operator_runtime.py:
- Add a module-level constant
FLOW_ROOT = "/flow"under the existingLOG_DIRdefinition. - Introduce a helper function, e.g.
build_config_path(task_id: str) -> str, that:- Joins
FLOW_ROOT,task_id, and"process.yaml". - Normalizes with
os.path.normpath. - Converts to an absolute path.
- Ensures the resulting path is under
FLOW_ROOTusing a prefix check. - Raises
APIException(ErrorCode.FILE_NOT_FOUND_ERROR)if validation fails.
- Joins
- Update
submit_taskto call this helper instead of constructingconfig_pathwith an f-string. - Update
get_from_cfgsimilarly to use the helper instead of its own f-string.
This centralizes the logic for safe path construction and ensures both call sites are protected against path traversal and similar issues.
-
Copy modified line R20 -
Copy modified line R86 -
Copy modified lines R136-R149 -
Copy modified line R151
| @@ -17,6 +17,7 @@ | ||
|
|
||
| # 日志配置 | ||
| LOG_DIR = "/var/log/datamate/runtime" | ||
| FLOW_ROOT = "/flow" | ||
| os.makedirs(LOG_DIR, exist_ok=True) | ||
| logger.add( | ||
| f"{LOG_DIR}/runtime.log", | ||
| @@ -82,7 +83,7 @@ | ||
|
|
||
| @app.post("/api/task/{task_id}/submit") | ||
| async def submit_task(task_id): | ||
| config_path = f"/flow/{task_id}/process.yaml" | ||
| config_path = build_config_path(task_id) | ||
| logger.info("Start submitting job...") | ||
|
|
||
| dataset_path = get_from_cfg(task_id, "dataset_path") | ||
| @@ -132,8 +133,22 @@ | ||
| return os.path.exists(full_path) | ||
|
|
||
|
|
||
| def build_config_path(task_id: str) -> str: | ||
| # Build a normalized, absolute config path under FLOW_ROOT and validate it. | ||
| raw_path = os.path.join(FLOW_ROOT, task_id, "process.yaml") | ||
| normalized_path = os.path.abspath(os.path.normpath(raw_path)) | ||
|
|
||
| # Ensure the resulting path stays within the FLOW_ROOT directory. | ||
| flow_root_abs = os.path.abspath(FLOW_ROOT) | ||
| if not (normalized_path == flow_root_abs or normalized_path.startswith(flow_root_abs + os.sep)): | ||
| logger.error(f"Invalid config path derived from task_id '{task_id}'.") | ||
| raise APIException(ErrorCode.FILE_NOT_FOUND_ERROR) | ||
|
|
||
| return normalized_path | ||
|
|
||
|
|
||
| def get_from_cfg(task_id, key): | ||
| config_path = f"/flow/{task_id}/process.yaml" | ||
| config_path = build_config_path(task_id) | ||
| if not check_valid_path(config_path): | ||
| logger.error(f"config_path is not existed! please check this path.") | ||
| raise APIException(ErrorCode.FILE_NOT_FOUND_ERROR) |
✨ What’s included
This PR introduces a complete YOLO-based auto-annotation workflow and improves data management capabilities.
Auto Annotation (YOLO)
Data Management Enhancements
🎯 Why this change
🧪 How to test
📝 Notes