Skip to content
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
752ab1a
Add python profiling utilities
Eden-D-Zhang Oct 14, 2025
29436b6
Add profiling to query functions
Eden-D-Zhang Oct 14, 2025
4d19c81
Remove `CLP_ENABLE_PROFILING` environment variable check
Eden-D-Zhang Oct 14, 2025
b5bdcf1
Lint
Eden-D-Zhang Oct 14, 2025
02b29fb
Merge branch 'main' of https://github.com/y-scope/clp into pyinstrume…
Eden-D-Zhang Oct 14, 2025
6f65035
Clean up docstrings
Eden-D-Zhang Oct 14, 2025
c2be1f6
Remove __all__
Eden-D-Zhang Oct 15, 2025
a677177
Merge branch 'main' of https://github.com/Eden-D-Zhang/clp into pyins…
Eden-D-Zhang Oct 15, 2025
cc85f93
Remove unnecessary function
Eden-D-Zhang Oct 15, 2025
9810d31
Address review
Eden-D-Zhang Oct 16, 2025
b013429
Merge branch 'main' into pyinstrument_profile
Eden-D-Zhang Oct 16, 2025
a33a9c1
Address review
Eden-D-Zhang Oct 16, 2025
99d134b
Lint
Eden-D-Zhang Oct 16, 2025
61e8332
Merge branch 'main' of https://github.com/y-scope/clp into pyinstrume…
Eden-D-Zhang Oct 21, 2025
ab6ee4f
Upate lock file
Eden-D-Zhang Oct 21, 2025
cbb32de
Update dependencies and lock file
Eden-D-Zhang Oct 21, 2025
8447f12
Merge branch 'main' of https://github.com/y-scope/clp into pyinstrume…
Eden-D-Zhang Oct 21, 2025
159e645
Address review
Eden-D-Zhang Oct 21, 2025
4080df6
Delete file
Eden-D-Zhang Oct 21, 2025
7df8e09
Merge branch 'main' into pyinstrument_profile
Eden-D-Zhang Oct 21, 2025
4089eb0
Change constant name
Eden-D-Zhang Oct 21, 2025
11641e2
Merge branch 'pyinstrument_profile' of https://github.com/Eden-D-Zhan…
Eden-D-Zhang Oct 21, 2025
9437246
Change typing imports according to coderabbit
Eden-D-Zhang Oct 21, 2025
ada4073
Lint
Eden-D-Zhang Oct 21, 2025
e94d2cc
Add clp_logging logger, type annotations
Eden-D-Zhang Oct 23, 2025
8beeda2
Merge branch 'main' of https://github.com/y-scope/clp into pyinstrume…
Eden-D-Zhang Oct 23, 2025
e6dfbea
Lint
Eden-D-Zhang Oct 23, 2025
022a7ab
Merge branch 'main' into pyinstrument_profile
Eden-D-Zhang Oct 23, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
263 changes: 263 additions & 0 deletions components/clp-py-utils/clp_py_utils/profiling_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,263 @@
"""
Profiling utilities for CLP query execution performance analysis.
This module provides lightweight profiling decorators using pyinstrument.
Profile outputs include:
- HTML files with interactive flame graphs and call trees
- Text summaries showing call hierarchy and timing
"""

import datetime
import functools
import inspect
import logging
import os
from pathlib import Path
from typing import Any, Callable, Optional, Tuple, TypeVar

from pyinstrument import Profiler

logger = logging.getLogger(__name__)

F = TypeVar("F", bound=Callable[..., Any])

PROFILING_INTERVAL = 0.001


def profile(
section_name: Optional[str] = None,
job_id_param: Optional[str] = None,
task_id_param: Optional[str] = None,
) -> Callable[[F], F]:
"""
Decorator for profiling function execution with automatic context extraction.
Output files are written to $CLP_LOGS_DIR/profiles/ (e.g., clp-package/var/log/query_worker/
profiles/).
:param section_name: Override for profile section name. If None, uses function name.
:param job_id_param: Parameter name to extract job_id from (default: "job_id").
Can use dot notation for attributes, e.g., "job.id"
:param task_id_param: Parameter name to extract task_id from (default: "task_id").
Can use dot notation for attributes, e.g., "task.id"
:return: Decorated function with profiling capabilities
"""

def decorator(func: F) -> F:
name = section_name or func.__name__
is_async = inspect.iscoroutinefunction(func)

if is_async:

@functools.wraps(func)
async def async_wrapper(*args, **kwargs):
if not _is_profiling_enabled():
return await func(*args, **kwargs)

# Profiling enabled: extract context and profile execution
job_id, task_id = _extract_context_from_args(
func, args, kwargs, job_id_param, task_id_param
)

profiler = Profiler(interval=PROFILING_INTERVAL)
try:
profiler.start()
except RuntimeError as e:
# Skip profiling this function to avoid conflicts
if "already a profiler running" in str(e):
logger.debug(
f"Skipping nested profiling for {name} "
f"(parent profiler already active)"
Comment on lines +71 to +73
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Replace f-string logging with parametric style.

F-string logging is flagged by static analysis (G004) and has been noted in multiple past reviews. Use parametric logging for better performance and structured logging support.

Apply this diff for both occurrences:

-                        logger.debug(
-                            f"Skipping nested profiling for {name} "
-                            f"(parent profiler already active)"
-                        )
+                        logger.debug(
+                            "Skipping nested profiling for %s (parent profiler already active)",
+                            name,
+                        )

And for line 104:

-                    logger.debug(
-                        f"Skipping nested profiling for {name} (parent profiler already active)"
-                    )
+                    logger.debug(
+                        "Skipping nested profiling for %s (parent profiler already active)",
+                        name,
+                    )

Based on static analysis hints and past review comments.

Also applies to: 104-104

🧰 Tools
🪛 Ruff (0.14.1)

72-73: Logging statement uses f-string

(G004)

🤖 Prompt for AI Agents
In components/clp-py-utils/clp_py_utils/profiling_utils.py around lines 71-73
(and also apply to the second occurrence and line 104), replace the f-string
logger.debug calls with parametric logging: change logger.debug(f"Skipping
nested profiling for {name} (parent profiler already active)") to
logger.debug("Skipping nested profiling for %s (parent profiler already
active)", name) (and similarly update the other two occurrences) so logging uses
parameter substitution instead of f-strings.

)
Comment on lines +71 to +74
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Replace f-string logging with parametric style.

F-string logging is flagged by static analysis (G004). Use parametric logging for better performance and structured logging support.

Apply this diff for both occurrences:

-                        logger.debug(
-                            f"Skipping nested profiling for {name} "
-                            f"(parent profiler already active)"
-                        )
+                        logger.debug(
+                            "Skipping nested profiling for %s (parent profiler already active)",
+                            name,
+                        )

Based on static analysis hints and coding guidelines.

Also applies to: 102-104

🧰 Tools
🪛 Ruff (0.14.1)

71-72: Logging statement uses f-string

(G004)

🤖 Prompt for AI Agents
In components/clp-py-utils/clp_py_utils/profiling_utils.py around lines 70-73
and 102-104, the logger.debug calls use f-strings which static analysis flags
(G004); change them to parametric logging by passing the message template and
variables as separate arguments (e.g. "Skipping nested profiling for %s (parent
profiler already active)", name) so the logger can handle string interpolation
and avoid evaluating the f-string eagerly; update both occurrences to use the
parametric style.

return await func(*args, **kwargs)
raise

try:
result = await func(*args, **kwargs)
return result
finally:
profiler.stop()
_save_profile(profiler, name, job_id, task_id)

return async_wrapper # type: ignore

else:

@functools.wraps(func)
def sync_wrapper(*args, **kwargs):
if not _is_profiling_enabled():
return func(*args, **kwargs)

# Profiling enabled: extract context and profile execution
job_id, task_id = _extract_context_from_args(
func, args, kwargs, job_id_param, task_id_param
)

profiler = Profiler(interval=PROFILING_INTERVAL)
try:
profiler.start()
except RuntimeError as e:
# Skip profiling this function to avoid conflicts
if "already a profiler running" in str(e):
logger.debug(
f"Skipping nested profiling for {name} "
f"(parent profiler already active)"
)
return func(*args, **kwargs)
raise

try:
result = func(*args, **kwargs)
return result
finally:
profiler.stop()
_save_profile(profiler, name, job_id, task_id)

return sync_wrapper # type: ignore

return decorator


def _extract_context_from_args(
func: Callable,
args: tuple,
kwargs: dict,
Comment on lines +123 to +124
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Use more precise type annotations for args and kwargs.

The args and kwargs parameters could use more specific generic types for better type safety.

Apply this diff:

 def _extract_context_from_args(
     func: Callable,
-    args: tuple,
-    kwargs: dict,
+    args: tuple[Any, ...],
+    kwargs: dict[str, Any],
     job_id_param: str = "job_id",
     task_id_param: str = "task_id",
 ) -> tuple[str, str]:
🤖 Prompt for AI Agents
In components/clp-py-utils/clp_py_utils/profiling_utils.py around lines 122 to
123, the params args: tuple and kwargs: dict are too generic; change them to
precise generic typing (e.g., args: Tuple[Any, ...] and kwargs: Dict[str, Any]
or Mapping[str, Any]) and add the necessary typing imports (Any, Tuple, Dict or
Mapping) at the top of the file; update any related type hints or docstrings to
match the new annotations.

job_id_param: Optional[str] = None,
task_id_param: Optional[str] = None,
) -> Tuple[str, str]:
"""
Extract job_id and task_id from function arguments.
:param func: The function being profiled
:param args: Positional arguments passed to the function
:param kwargs: Keyword arguments passed to the function
:param job_id_param: Name/path of the parameter containing job_id (default: "job_id").
:param task_id_param: Name/path of the parameter containing task_id (default: "task_id").
:return: Tuple of (job_id, task_id) as strings. Empty strings if not found.
"""
job_id = ""
task_id = ""

try:
# Get function signature
sig = inspect.signature(func)
param_names = list(sig.parameters.keys())

def extract_value(param_spec: str) -> str:
"""Extract value from parameter, supporting dot notation for attributes."""
if not param_spec:
return ""

# Split on '.' to handle attribute access
parts = param_spec.split(".")
param_name = parts[0]

# Find the parameter value
value = None
if param_name in kwargs:
value = kwargs[param_name]
elif param_name in param_names:
idx = param_names.index(param_name)
if idx < len(args):
value = args[idx]

if value is None:
return ""

# Navigate through attributes if dot notation was used
for attr_name in parts[1:]:
if hasattr(value, attr_name):
value = getattr(value, attr_name)
else:
return ""

return str(value)

Comment on lines +146 to +175
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Optional: support dict-style context extraction

If callers pass dict-like objects, walk keys as well as attributes.

Apply this diff:

-            for attr_name in parts[1:]:
-                if hasattr(value, attr_name):
-                    value = getattr(value, attr_name)
-                else:
-                    return ""
+            for attr_name in parts[1:]:
+                if isinstance(value, dict) and attr_name in value:
+                    value = value[attr_name]
+                elif hasattr(value, attr_name):
+                    value = getattr(value, attr_name)
+                else:
+                    return ""
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def extract_value(param_spec: str) -> str:
"""Extract value from parameter, supporting dot notation for attributes."""
if not param_spec:
return ""
# Split on '.' to handle attribute access
parts = param_spec.split(".")
param_name = parts[0]
# Find the parameter value
value = None
if param_name in kwargs:
value = kwargs[param_name]
elif param_name in param_names:
idx = param_names.index(param_name)
if idx < len(args):
value = args[idx]
if value is None:
return ""
# Navigate through attributes if dot notation was used
for attr_name in parts[1:]:
if hasattr(value, attr_name):
value = getattr(value, attr_name)
else:
return ""
return str(value)
def extract_value(param_spec: str) -> str:
"""Extract value from parameter, supporting dot notation for attributes."""
if not param_spec:
return ""
# Split on '.' to handle attribute access
parts = param_spec.split(".")
param_name = parts[0]
# Find the parameter value
value = None
if param_name in kwargs:
value = kwargs[param_name]
elif param_name in param_names:
idx = param_names.index(param_name)
if idx < len(args):
value = args[idx]
if value is None:
return ""
# Navigate through attributes if dot notation was used
- for attr_name in parts[1:]:
- if hasattr(value, attr_name):
- value = getattr(value, attr_name)
- else:
for attr_name in parts[1:]:
if isinstance(value, dict) and attr_name in value:
value = value[attr_name]
elif hasattr(value, attr_name):
value = getattr(value, attr_name)
else:
return ""
return str(value)

# Extract job_id
job_id_key = job_id_param or "job_id"
job_id = extract_value(job_id_key)

# Extract task_id
task_id_key = task_id_param or "task_id"
task_id = extract_value(task_id_key)

except Exception as e:
logger.debug(f"Failed to extract context from {func.__name__}: {e}")

return job_id, task_id
Comment on lines +180 to +183
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Avoid blind except Exception; narrow to expected errors and fix logging style.

-    except Exception as e:
-        logger.debug(f"Failed to extract context from {func.__name__}: {e}")
+    except (AttributeError, KeyError, IndexError, ValueError, TypeError) as e:
+        logger.debug("Failed to extract context from %s: %s", func.__name__, e)
🧰 Tools
🪛 Ruff (0.14.1)

178-178: Do not catch blind exception: Exception

(BLE001)


179-179: Logging statement uses f-string

(G004)

🤖 Prompt for AI Agents
In components/clp-py-utils/clp_py_utils/profiling_utils.py around lines 178 to
181, replace the blind "except Exception" with a narrow exception handler for
the expected extraction errors (e.g., except (AttributeError, KeyError,
IndexError, ValueError) as e) and update the logging call to use structured
logging parameters and include exception info (for example logger.debug("Failed
to extract context from %s: %s", func.__name__, e, exc_info=True)) so only
relevant errors are caught and the log contains proper exception details.



def _is_profiling_enabled() -> bool:
"""
Check if profiling is enabled.
TODO: Add `CLPConfig` mechanism to enable/disable profiling for each component.
:return: False
"""
return False

Comment on lines 186 to 194
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Enable profiling via env and make interval tunable

Avoid code edits to toggle profiling. Read from env and allow interval override.

Apply this diff:

-PROFILING_INTERVAL = 0.001
+PROFILING_INTERVAL = float(os.getenv("CLP_PROFILING_INTERVAL", "0.005"))
@@
-def _is_profiling_enabled() -> bool:
+def _is_profiling_enabled() -> bool:
@@
-    :return: False
+    :return: True if CLP_ENABLE_PROFILING is truthy, else False
@@
-    return False
+    return os.getenv("CLP_ENABLE_PROFILING", "").lower() in {"1", "true", "yes", "on"}

Also applies to: 25-25

🤖 Prompt for AI Agents
In components/clp-py-utils/clp_py_utils/profiling_utils.py around lines 191-199
(and also apply same change at line 25), replace the hardcoded return False with
logic that reads an environment variable to enable profiling (e.g.
CLP_PROFILING_ENABLED or CLP_PROFILING) and returns True for common truthy
values ("1","true","yes") and False otherwise; also make the profiling interval
tunable by reading an environment variable (e.g. CLP_PROFILING_INTERVAL_SECONDS)
elsewhere where the interval is used (or add a helper _get_profiling_interval()
that reads the env and falls back to the current default), parsing ints safely
and using a sensible default on parse failure.


def _save_profile(
profiler: Profiler, section_name: str, job_id: str = "", task_id: str = ""
) -> None:
"""
Save profiler output to HTML and text formats. Generates .html and .txt files.
:param profiler: The pyinstrument Profiler object containing profiling data
:param section_name: Name identifying this profiling section
:param job_id: Optional job identifier for filename
:param task_id: Optional task identifier for filename
"""
try:
# Get the session for logging
session = profiler.last_session
if not session:
logger.debug(f"No profiling session for {section_name}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Fix logging hygiene: remove f-strings, use logger.exception.

Multiple logging issues flagged by static analysis and past reviews:

  1. Line 211: f-string logging (G004)
  2. Line 241: empty f-string prefix (F541)
  3. Lines 252-254: f-string logging (G004)
  4. Line 258: should use logger.exception instead of logger.error(..., exc_info=True) (G201) + f-string logging

Apply these diffs:

         if not session:
-            logger.debug(f"No profiling session for {section_name}")
+            logger.debug("No profiling session for %s", section_name)
             return
-            f.write(f"CLP Query Profiling Report (pyinstrument)\n")
+            f.write("CLP Query Profiling Report (pyinstrument)\n")
-        logger.info(
-            f"Profile saved: {section_name} "
-            f"(duration={duration:.6f}s, samples={sample_count}) "
-            f"HTML={html_path}, TXT={txt_path}"
-        )
+        logger.info(
+            "Profile saved: %s (duration=%.6fs, samples=%s) HTML=%s, TXT=%s",
+            section_name,
+            duration,
+            sample_count,
+            html_path,
+            txt_path,
+        )
-    except Exception as e:
-        logger.error(f"Failed to save profile for {section_name}: {e}", exc_info=True)
+    except Exception:
+        logger.exception("Failed to save profile for %s", section_name)

Based on static analysis hints and past review comments.

Also applies to: 241-241, 252-254, 258-258

🧰 Tools
🪛 Ruff (0.14.1)

211-211: Logging statement uses f-string

(G004)

🤖 Prompt for AI Agents
In components/clp-py-utils/clp_py_utils/profiling_utils.py around lines 211,
241, 252-254 and 258, replace all f-string logging with logger's lazy %-style or
.format-style parameterized logging (e.g. logger.debug("No profiling session for
%s", section_name)) to avoid G004, remove the empty f-string prefix at line 241
(replace with a plain string or parameterized log), and change the error call at
line 258 that uses logger.error(..., exc_info=True) to logger.exception(...) so
the exception is logged correctly; ensure all messages pass variables as
arguments rather than interpolating them into the format string.

return

Comment on lines +211 to +213
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Logging hygiene: parametric logs, remove stray f, and prefer logger.exception.

-        if not session:
-            logger.debug(f"No profiling session for {section_name}")
+        if not session:
+            logger.debug("No profiling session for %s", section_name)
             return
@@
-            f.write(f"CLP Query Profiling Report (pyinstrument)\n")
+            f.write("CLP Query Profiling Report (pyinstrument)\n")
@@
-        logger.info(
-            f"Profile saved: {section_name} "
-            f"(duration={duration:.6f}s, samples={sample_count}) "
-            f"HTML={html_path}, TXT={txt_path}"
-        )
+        logger.info(
+            "Profile saved: %s (duration=%.6fs, samples=%s) HTML=%s, TXT=%s",
+            section_name,
+            duration,
+            sample_count,
+            html_path,
+            txt_path,
+        )
@@
-    except Exception as e:
-        logger.error(f"Failed to save profile for {section_name}: {e}", exc_info=True)
+    except Exception:
+        logger.exception("Failed to save profile for %s", section_name)

Also applies to: 239-246, 249-256

🧰 Tools
🪛 Ruff (0.14.1)

209-209: Logging statement uses f-string

(G004)

🤖 Prompt for AI Agents
In components/clp-py-utils/clp_py_utils/profiling_utils.py around lines 209-211
(and similarly for 239-246 and 249-256), replace f-string log calls with
parametric logging and use logger.exception where an exception context is
available; specifically, change logger.debug(f"No profiling session for
{section_name}") to logger.debug("No profiling session for %s", section_name)
(and for error cases use logger.exception("Descriptive message: %s",
section_name) inside exception handlers) and remove stray leading f characters
so all logs use structured parameters instead of f-strings.

duration = session.duration
sample_count = session.sample_count

timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S_%f")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Address multiple logging and formatting issues.

Several issues flagged by static analysis:

  1. Line 216: datetime.now() called without timezone (DTZ005)
  2. Line 240: Empty f-string prefix (F541)
  3. Lines 250-254: F-string logging (G004)
  4. Line 257: Should use logger.exception instead of logger.error(..., exc_info=True) (G201)

Apply this diff:

-        timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S_%f")
+        timestamp = datetime.datetime.now(datetime.timezone.utc).strftime("%Y%m%d_%H%M%S_%fZ")
-            f.write(f"CLP Query Profiling Report (pyinstrument)\n")
+            f.write("CLP Query Profiling Report (pyinstrument)\n")
-        logger.info(
-            f"Profile saved: {section_name} "
-            f"(duration={duration:.6f}s, samples={sample_count}) "
-            f"HTML={html_path}, TXT={txt_path}"
-        )
+        logger.info(
+            "Profile saved: %s (duration=%.6fs, samples=%s) HTML=%s, TXT=%s",
+            section_name,
+            duration,
+            sample_count,
+            html_path,
+            txt_path,
+        )
-    except Exception as e:
-        logger.error(f"Failed to save profile for {section_name}: {e}", exc_info=True)
+    except Exception:
+        logger.exception("Failed to save profile for %s", section_name)

Based on static analysis hints.

Also applies to: 240-240, 250-257

🧰 Tools
🪛 Ruff (0.14.1)

216-216: datetime.datetime.now() called without a tz argument

(DTZ005)

🤖 Prompt for AI Agents
In components/clp-py-utils/clp_py_utils/profiling_utils.py around lines 216,
240, 250-254 and 257: replace naive datetime.now() with a timezone-aware call
(e.g., datetime.datetime.now(datetime.timezone.utc) or use timezone-aware
helper) at line 216; remove the empty/incorrect f-string prefix on line 240 (use
a plain string or a proper f-string only when interpolating); change the logging
calls on lines 250-254 to use lazy interpolation (logger.debug("... %s", value)
/ logger.info("... %s", value) etc.) instead of building f-strings before
passing to the logger; and replace logger.error(..., exc_info=True) on line 257
with logger.exception(...) to log the exception with traceback.

filename_parts = [section_name]

if job_id:
filename_parts.append(f"job{job_id}")
if task_id:
filename_parts.append(f"task{task_id}")

filename_parts.append(timestamp)
base_filename = "_".join(filename_parts)

output_dir = Path(os.getenv("CLP_LOGS_DIR")) / "profiles"
output_dir.mkdir(exist_ok=True, parents=True)

Comment on lines +228 to +230
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Critical: handle missing CLP_LOGS_DIR to avoid crash when enabled

Path(os.getenv("CLP_LOGS_DIR")) raises a TypeError if the env var is unset. Provide a safe fallback and warn.

Apply this diff:

-        output_dir = Path(os.getenv("CLP_LOGS_DIR")) / "profiles"
+        logs_dir = os.getenv("CLP_LOGS_DIR")
+        if not logs_dir:
+            logger.warning("CLP_LOGS_DIR is not set; writing profiles to CWD ./profiles")
+            output_dir = Path.cwd() / "profiles"
+        else:
+            output_dir = Path(logs_dir) / "profiles"
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
output_dir = Path(os.getenv("CLP_LOGS_DIR")) / "profiles"
output_dir.mkdir(exist_ok=True, parents=True)
logs_dir = os.getenv("CLP_LOGS_DIR")
if not logs_dir:
logger.warning("CLP_LOGS_DIR is not set; writing profiles to CWD ./profiles")
output_dir = Path.cwd() / "profiles"
else:
output_dir = Path(logs_dir) / "profiles"
output_dir.mkdir(exist_ok=True, parents=True)
🤖 Prompt for AI Agents
components/clp-py-utils/clp_py_utils/profiling_utils.py around lines 233-235:
currently Path(os.getenv("CLP_LOGS_DIR")) will raise a TypeError if the env var
is unset; instead read the env var into a variable, check if it's None or empty,
log a warning (use the module logger or warnings.warn) and choose a safe
fallback directory (e.g. tempfile.mkdtemp(prefix="clp_logs_") or
os.getcwd()/".clp_logs"), then construct a Path from that string and call
output_dir.mkdir(exist_ok=True, parents=True); ensure the code uses the
checked/fallback path variable rather than calling Path on a possibly None
value.

# Save HTML with interactive visualization
html_path = output_dir / f"{base_filename}.html"
with open(html_path, "w", encoding="utf-8") as f:
f.write(profiler.output_html())

# Save human-readable text summary with call hierarchy
txt_path = output_dir / f"{base_filename}.txt"
with open(txt_path, "w", encoding="utf-8") as f:
# Header
f.write("=" * 80 + "\n")
f.write(f"CLP Query Profiling Report (pyinstrument)\n")
f.write(f"Section: {section_name}\n")
if job_id:
f.write(f"Job ID: {job_id}\n")
if task_id:
f.write(f"Task ID: {task_id}\n")
f.write(f"Timestamp: {timestamp}\n")
f.write("=" * 80 + "\n\n")
f.write(profiler.output_text(unicode=True, color=False))

Comment on lines +241 to +250
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Logging/text nits: remove stray f, use logger.exception, timezone

  • Remove the stray f-string with no placeholders.
  • Prefer logger.exception over error(..., exc_info=True).
  • Consider timezone-aware timestamps or time.time() for filenames.

Apply this diff:

-            f.write(f"CLP Query Profiling Report (pyinstrument)\n")
+            f.write("CLP Query Profiling Report (pyinstrument)\n")
@@
-        logger.info(
+        logger.info(
             f"Profile saved: {section_name} "
             f"(duration={duration:.6f}s, samples={sample_count}) "
             f"HTML={html_path}, TXT={txt_path}"
         )
@@
-    except Exception as e:
-        logger.error(f"Failed to save profile for {section_name}: {e}", exc_info=True)
+    except Exception:
+        logger.exception("Failed to save profile for %s", section_name)

Optionally:

-        timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S_%f")
+        timestamp = datetime.datetime.now(datetime.timezone.utc).strftime("%Y%m%d_%H%M%S_%fZ")

Also applies to: 256-260, 263-263, 222-222

🧰 Tools
🪛 Ruff (0.14.0)

246-246: f-string without any placeholders

Remove extraneous f prefix

(F541)

logger.info(
f"Profile saved: {section_name} "
f"(duration={duration:.6f}s, samples={sample_count}) "
f"HTML={html_path}, TXT={txt_path}"
)

except Exception as e:
logger.error(f"Failed to save profile for {section_name}: {e}", exc_info=True)
1 change: 1 addition & 0 deletions components/clp-py-utils/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ boto3 = "^1.35.81"
mariadb = "~1.0.11"
mysql-connector-python = "^8.2.0"
pydantic = "^2.11.9"
pyinstrument = "^5.0.0"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we plan to include the profiling feature in the formal releases? if not, maybe we should put this into a

[dependency-groups]
dev = [
    ...
]

that said, the pyinstrument package is only ~270KB (https://pypi.org/project/pyinstrument/#files) so it shouldn't hurt to be bundled into the release.

python-dotenv = "^1.0.1"
python-Levenshtein = "~0.22"
sqlalchemy = "~2.0"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
WorkerConfig,
)
from clp_py_utils.clp_logging import set_logging_level
from clp_py_utils.profiling_utils import profile
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Decorator usage looks correct; add explicit context mapping and env check

  • Order is right (@profile wraps the function that Celery registers).
  • Consider passing job_id_param="job_id", task_id_param="task_id" for clarity.
  • Ensure CLP_LOGS_DIR is set in worker env; otherwise saving profiles will fail once enabled (see _save_profile).

Also applies to: 183-184

🤖 Prompt for AI Agents
In
components/job-orchestration/job_orchestration/executor/query/extract_stream_task.py
around lines 17 and 183-184, update the @profile decorator usage to pass
explicit context mapping by adding job_id_param="job_id" and
task_id_param="task_id" so the profiler knows which function args map to
job/task IDs, and add an environment check (e.g., verify CLP_LOGS_DIR is present
and non-empty in os.environ) before attempting to save profiles in _save_profile
to avoid failures when the worker env is missing CLP_LOGS_DIR.

from clp_py_utils.s3_utils import (
generate_s3_virtual_hosted_style_url,
get_credential_env_vars,
Expand Down Expand Up @@ -179,6 +180,7 @@ def _make_command_and_env_vars(


@app.task(bind=True)
@profile(section_name="extract_stream_task")
def extract_stream(
self: Task,
job_id: str,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
WorkerConfig,
)
from clp_py_utils.clp_logging import set_logging_level
from clp_py_utils.profiling_utils import profile
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Good instrumentation; consider explicit job/task mapping and env readiness

  • Order of decorators is correct.
  • Optionally pass job_id_param="job_id", task_id_param="task_id" to be explicit.
  • Ensure CLP_LOGS_DIR exists in the search worker environment when profiling is enabled.

Also applies to: 169-170

🤖 Prompt for AI Agents
In
components/job-orchestration/job_orchestration/executor/query/fs_search_task.py
around line 15 (and also at lines 169-170), the profile decorator is used but
not explicit about which args map to job and task and the code assumes
CLP_LOGS_DIR exists; update the profile decorator calls to include
job_id_param="job_id", task_id_param="task_id" for clarity, and add an
environment-readiness check that ensures CLP_LOGS_DIR exists (create the
directory if missing or raise a clear error) before profiling is enabled so
profiling can write logs reliably.

from clp_py_utils.s3_utils import generate_s3_virtual_hosted_style_url, get_credential_env_vars
from clp_py_utils.sql_adapter import SQL_Adapter
from job_orchestration.executor.query.celery import app
Expand Down Expand Up @@ -165,6 +166,7 @@ def _make_command_and_env_vars(


@app.task(bind=True)
@profile(section_name="search_task")
def search(
self: Task,
job_id: str,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
)
from clp_py_utils.core import read_yaml_config_file
from clp_py_utils.decorators import exception_default_value
from clp_py_utils.profiling_utils import profile
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Scheduler profiling: sane targets; add guardrails for async sections and interval

  • Targets are sensible; job_id_param="job.id" is correct.
  • For long‑lived async functions (acquire_reducer_for_job, handle_finished_search_job), consider:
    • Using a coarser interval (e.g., 5–10ms) to reduce overhead/trace size.
    • Making interval and enablement configurable via env to avoid code changes when toggling.

Also applies to: 547-555, 569-611, 898-902

🤖 Prompt for AI Agents
In
components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py
around lines 49 (and also apply similar changes at 547-555, 569-611, 898-902),
the profiler is applied with defaults that are too fine-grained for long‑lived
async sections; update profiling to use a coarser sampling interval (e.g.,
5–10ms) and add guardrails: make both profiling enablement and interval
configurable via environment variables (e.g., JOB_SCHEDULER_PROFILING_ENABLED,
JOB_SCHEDULER_PROFILE_INTERVAL_MS) and wrap or conditionally apply the profiler
only when enabled so async functions like acquire_reducer_for_job and
handle_finished_search_job use the higher interval (or are skipped) to reduce
overhead and trace size.

from clp_py_utils.sql_adapter import SQL_Adapter
from job_orchestration.executor.query.extract_stream_task import extract_stream
from job_orchestration.executor.query.fs_search_task import search
Expand Down Expand Up @@ -543,6 +544,7 @@ def get_task_group_for_job(
raise NotImplementedError(error_msg)


@profile(section_name="scheduler_dispatch_job", job_id_param="job.id")
def dispatch_query_job(
db_conn,
job: QueryJob,
Expand All @@ -564,6 +566,7 @@ def dispatch_query_job(
job.state = InternalJobState.RUNNING


@profile(section_name="scheduler_acquire_reducer", job_id_param="job.id")
async def acquire_reducer_for_job(job: SearchJob):
reducer_host: Optional[str] = None
reducer_port: Optional[int] = None
Expand Down Expand Up @@ -892,6 +895,7 @@ def found_max_num_latest_results(
return max_timestamp_in_remaining_archives <= min_timestamp_in_top_results


@profile(section_name="scheduler_handle_finished_search", job_id_param="job.id")
async def handle_finished_search_job(
db_conn, job: SearchJob, task_results: Optional[Any], results_cache_uri: str
) -> None:
Expand Down
Loading