Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
752ab1a
Add python profiling utilities
Eden-D-Zhang Oct 14, 2025
29436b6
Add profiling to query functions
Eden-D-Zhang Oct 14, 2025
4d19c81
Remove `CLP_ENABLE_PROFILING` environment variable check
Eden-D-Zhang Oct 14, 2025
b5bdcf1
Lint
Eden-D-Zhang Oct 14, 2025
02b29fb
Merge branch 'main' of https://github.com/y-scope/clp into pyinstrume…
Eden-D-Zhang Oct 14, 2025
6f65035
Clean up docstrings
Eden-D-Zhang Oct 14, 2025
c2be1f6
Remove __all__
Eden-D-Zhang Oct 15, 2025
a677177
Merge branch 'main' of https://github.com/Eden-D-Zhang/clp into pyins…
Eden-D-Zhang Oct 15, 2025
cc85f93
Remove unnecessary function
Eden-D-Zhang Oct 15, 2025
9810d31
Address review
Eden-D-Zhang Oct 16, 2025
b013429
Merge branch 'main' into pyinstrument_profile
Eden-D-Zhang Oct 16, 2025
a33a9c1
Address review
Eden-D-Zhang Oct 16, 2025
99d134b
Lint
Eden-D-Zhang Oct 16, 2025
61e8332
Merge branch 'main' of https://github.com/y-scope/clp into pyinstrume…
Eden-D-Zhang Oct 21, 2025
ab6ee4f
Upate lock file
Eden-D-Zhang Oct 21, 2025
cbb32de
Update dependencies and lock file
Eden-D-Zhang Oct 21, 2025
8447f12
Merge branch 'main' of https://github.com/y-scope/clp into pyinstrume…
Eden-D-Zhang Oct 21, 2025
159e645
Address review
Eden-D-Zhang Oct 21, 2025
4080df6
Delete file
Eden-D-Zhang Oct 21, 2025
7df8e09
Merge branch 'main' into pyinstrument_profile
Eden-D-Zhang Oct 21, 2025
4089eb0
Change constant name
Eden-D-Zhang Oct 21, 2025
11641e2
Merge branch 'pyinstrument_profile' of https://github.com/Eden-D-Zhan…
Eden-D-Zhang Oct 21, 2025
9437246
Change typing imports according to coderabbit
Eden-D-Zhang Oct 21, 2025
ada4073
Lint
Eden-D-Zhang Oct 21, 2025
e94d2cc
Add clp_logging logger, type annotations
Eden-D-Zhang Oct 23, 2025
8beeda2
Merge branch 'main' of https://github.com/y-scope/clp into pyinstrume…
Eden-D-Zhang Oct 23, 2025
e6dfbea
Lint
Eden-D-Zhang Oct 23, 2025
5501b7f
Merge branch 'pyinstrument_profile' into profile-setting
sitaowang1998 Oct 23, 2025
555592f
Add env for enable profiling
sitaowang1998 Oct 23, 2025
24d8213
Add default for CLP_ENABLE_PROFILING
sitaowang1998 Oct 23, 2025
3ec8d9f
Merge branch 'main' into profile-setting
sitaowang1998 Oct 24, 2025
e9cba89
Rename variable
sitaowang1998 Oct 24, 2025
344b018
Reorder env
sitaowang1998 Oct 24, 2025
1d9a7be
Merge branch 'main' into profile-setting
sitaowang1998 Oct 27, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,13 @@ def _set_up_env_for_query_worker(self, num_workers: int) -> EnvVarsDict:
"CLP_QUERY_WORKER_LOGGING_LEVEL": self._clp_config.query_worker.logging_level,
}

# Profiling config
env_vars |= {
"CLP_QUERY_WORKER_ENABLE_PROFILING": str(
self._clp_config.query_worker.enable_profiling
).lower(),
}

# Resources
env_vars |= {
"CLP_QUERY_WORKER_CONCURRENCY": str(num_workers),
Expand Down
1 change: 1 addition & 0 deletions components/clp-py-utils/clp_py_utils/clp_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ class CompressionWorker(BaseModel):

class QueryWorker(BaseModel):
logging_level: LoggingLevel = "INFO"
enable_profiling: bool = False

Comment on lines 275 to 278
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

🧩 Analysis chain

Add field description for clarity and ensure env plumbing is present.

Document the new config knob.

 class QueryWorker(BaseModel):
     logging_level: LoggingLevel = "INFO"
-    enable_profiling: bool = False
+    enable_profiling: bool = Field(
+        False,
+        description="Enable pyinstrument runtime profiling for query worker tasks."
+    )

Run to confirm the plumbing from config -> env -> container:


🏁 Script executed:

#!/bin/bash
# Verify config->env mapping for profiling
rg -nC2 -e 'CLP_QUERY_WORKER_ENABLE_PROFILING|CLP_ENABLE_PROFILING|enable_profiling' \
  --glob '!**/dist/**' --glob '!**/build/**'

Length of output: 2231


Add field description for documentation clarity.

The config → env plumbing is already correctly implemented and verified. Consider adding a Field description to document the profiling knob:

[optional_refactor_recommended]

 class QueryWorker(BaseModel):
     logging_level: LoggingLevel = "INFO"
-    enable_profiling: bool = False
+    enable_profiling: bool = Field(
+        False,
+        description="Enable pyinstrument runtime profiling for query worker tasks."
+    )
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
class QueryWorker(BaseModel):
logging_level: LoggingLevel = "INFO"
enable_profiling: bool = False
class QueryWorker(BaseModel):
logging_level: LoggingLevel = "INFO"
enable_profiling: bool = Field(
False,
description="Enable pyinstrument runtime profiling for query worker tasks."
)
🤖 Prompt for AI Agents
In components/clp-py-utils/clp_py_utils/clp_config.py around lines 272 to 275,
add a Field description to the enable_profiling attribute to document what the
profiling knob does; replace the bare bool annotation with a pydantic Field for
enable_profiling that includes a concise description (e.g., when True, enables
runtime profiling for query workers and controls output location/verbosity as
applicable) so the generated docs and env config are clear.


class Redis(BaseModel):
Expand Down
260 changes: 260 additions & 0 deletions components/clp-py-utils/clp_py_utils/profiling_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,260 @@
"""
Profiling utilities for CLP query execution performance analysis.

This module provides lightweight profiling decorators using pyinstrument.

Profile outputs include:
- HTML files with interactive flame graphs and call trees.
- Text summaries showing call hierarchy and timing.
"""

import datetime
import functools
import inspect
import os
from collections.abc import Callable
from pathlib import Path
from typing import Any, TypeVar

from pyinstrument import Profiler

from clp_py_utils.clp_logging import get_logger

logger = get_logger("profiler")

F = TypeVar("F", bound=Callable[..., Any])

PROFILING_INTERVAL_SECONDS = 0.001
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Make sampling interval configurable via env.

Allows tuning overhead without code changes.

-PROFILING_INTERVAL_SECONDS = 0.001
+PROFILING_INTERVAL_SECONDS = float(os.getenv("CLP_PROFILING_INTERVAL_SECONDS", "0.001"))
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
PROFILING_INTERVAL_SECONDS = 0.001
PROFILING_INTERVAL_SECONDS = float(os.getenv("CLP_PROFILING_INTERVAL_SECONDS", "0.001"))
🤖 Prompt for AI Agents
In components/clp-py-utils/clp_py_utils/profiling_utils.py around line 27, the
hard-coded PROFILING_INTERVAL_SECONDS = 0.001 should be made configurable via an
environment variable; change it to read an env var (e.g.
CLP_PROFILING_INTERVAL_SECONDS) with a fallback default of 0.001, parse it as a
float, validate it is > 0 (or else use the default), and ensure os is imported;
update any related tests or docs to mention the new env var name and default.



def profile(
section_name: str | None = None,
job_id_param: str = "job_id",
task_id_param: str = "task_id",
) -> Callable[[F], F]:
"""
Profiles function execution as decorator with automatic context extraction.

Output files are written to $CLP_LOGS_DIR/profiles/ (e.g., clp-package/var/log/query_worker/
profiles/).

:param section_name: Override for profile section name. If None, uses function name.
:param job_id_param: Parameter name to extract job_id from (default: "job_id").
Can use dot notation for attributes, e.g., "job.id".
:param task_id_param: Parameter name to extract task_id from (default: "task_id").
Can use dot notation for attributes, e.g., "task.id".
:return: Decorated function with profiling capabilities.
"""

def decorator(func: F) -> F:
name = section_name or func.__name__
is_async = inspect.iscoroutinefunction(func)

if is_async:

@functools.wraps(func)
async def async_wrapper(*args: Any, **kwargs: Any) -> Any:
if not _is_profiling_enabled():
return await func(*args, **kwargs)

# Profiling enabled: extract context and profile execution
job_id, task_id = _extract_context_from_args(
func, args, kwargs, job_id_param, task_id_param
)

profiler = Profiler(interval=PROFILING_INTERVAL_SECONDS)
try:
profiler.start()
except RuntimeError as e:
# Skip profiling this function to avoid conflicts
if "already a profiler running" in str(e):
logger.debug(
f"Skipping nested profiling for {name} "
f"(parent profiler already active)"
)
return await func(*args, **kwargs)
raise

try:
result = await func(*args, **kwargs)
return result
finally:
profiler.stop()
_save_profile(profiler, name, job_id, task_id)

return async_wrapper # type: ignore

@functools.wraps(func)
def sync_wrapper(*args: Any, **kwargs: Any) -> Any:
if not _is_profiling_enabled():
return func(*args, **kwargs)

# Profiling enabled: extract context and profile execution
job_id, task_id = _extract_context_from_args(
func, args, kwargs, job_id_param, task_id_param
)

profiler = Profiler(interval=PROFILING_INTERVAL_SECONDS)
try:
profiler.start()
except RuntimeError as e:
# Skip profiling this function to avoid conflicts
if "already a profiler running" in str(e):
logger.debug(
f"Skipping nested profiling for {name} (parent profiler already active)"
)
return func(*args, **kwargs)
raise

try:
result = func(*args, **kwargs)
return result
finally:
profiler.stop()
_save_profile(profiler, name, job_id, task_id)

return sync_wrapper # type: ignore

return decorator


def _extract_context_from_args(
func: Callable,
args: tuple,
kwargs: dict,
job_id_param: str = "job_id",
task_id_param: str = "task_id",
) -> tuple[str, str]:
"""
Extracts job_id and task_id from function arguments.

:param func: The function being profiled.
:param args: Positional arguments passed to the function.
:param kwargs: Keyword arguments passed to the function.
:param job_id_param: Name/path of the parameter containing job_id (default: "job_id").
:param task_id_param: Name/path of the parameter containing task_id (default: "task_id").
:return: tuple of (job_id, task_id) as strings. Empty strings if not found.
"""
job_id = ""
task_id = ""

try:
# Get function signature
sig = inspect.signature(func)
param_names = list(sig.parameters.keys())

def extract_value(param_spec: str) -> str:
"""Extract value from parameter, supporting dot notation for attributes."""
if not param_spec:
return ""

# Split on '.' to handle attribute access
parts = param_spec.split(".")
param_name = parts[0]

# Find the parameter value
value = None
if param_name in kwargs:
value = kwargs[param_name]
elif param_name in param_names:
idx = param_names.index(param_name)
if idx < len(args):
value = args[idx]

if value is None:
return ""

# Navigate through attributes if dot notation was used
for attr_name in parts[1:]:
if hasattr(value, attr_name):
value = getattr(value, attr_name)
else:
return ""

return str(value)

# Extract job_id and task_id
job_id = extract_value(job_id_param)
task_id = extract_value(task_id_param)

except Exception as e:
logger.debug(f"Failed to extract context from {func.__name__}: {e}")

return job_id, task_id


def _is_profiling_enabled() -> bool:
"""
Checks if profiling is enabled.

:return: If `CLP_ENABLE_PROFILING` environment variable is set to `true`.
"""
profiling_enabled = os.getenv("CLP_ENABLE_PROFILING")
if profiling_enabled is not None and profiling_enabled.lower() == "true":
return True
return False


def _save_profile(
profiler: Profiler, section_name: str, job_id: str = "", task_id: str = ""
) -> None:
"""
Saves profiler output to HTML and text formats. Generates .html and .txt files.

:param profiler: The pyinstrument Profiler object containing profiling data.
:param section_name: Name identifying this profiling section.
:param job_id: Optional job identifier for filename.
:param task_id: Optional task identifier for filename.
"""
try:
# Get the session for logging
session = profiler.last_session
if not session:
logger.debug(f"No profiling session for {section_name}")
return

duration = session.duration
sample_count = session.sample_count

timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S_%f")
filename_parts = [section_name]

if job_id:
filename_parts.append(f"job{job_id}")
if task_id:
filename_parts.append(f"task{task_id}")

filename_parts.append(timestamp)
base_filename = "_".join(filename_parts)

Comment on lines +219 to +229
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Prefer timezone-aware timestamps for filenames.

Ensures clarity across hosts and logs.

-        timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S_%f")
+        timestamp = datetime.datetime.now(datetime.timezone.utc).strftime("%Y%m%d_%H%M%S_%fZ")

Confirm pyinstrument 5.1.x exposes session.sample_count (line 217) as used here.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S_%f")
filename_parts = [section_name]
if job_id:
filename_parts.append(f"job{job_id}")
if task_id:
filename_parts.append(f"task{task_id}")
filename_parts.append(timestamp)
base_filename = "_".join(filename_parts)
timestamp = datetime.datetime.now(datetime.timezone.utc).strftime("%Y%m%d_%H%M%S_%fZ")
filename_parts = [section_name]
if job_id:
filename_parts.append(f"job{job_id}")
if task_id:
filename_parts.append(f"task{task_id}")
filename_parts.append(timestamp)
base_filename = "_".join(filename_parts)
🧰 Tools
🪛 Ruff (0.14.1)

219-219: datetime.datetime.now() called without a tz argument

(DTZ005)

🤖 Prompt for AI Agents
In components/clp-py-utils/clp_py_utils/profiling_utils.py around lines 219 to
229, replace the naive datetime.now() filename timestamp with a timezone-aware
UTC timestamp (e.g.
datetime.datetime.now(datetime.timezone.utc).strftime("%Y%m%d_%H%M%S_%fZ")) so
filenames clearly reflect UTC across hosts; keep the rest of filename_parts
logic unchanged. Also verify that pyinstrument 5.1.x exposes
session.sample_count before using it (if it does not, read the session API and
use the appropriate property/method or guard access with hasattr and fallback).

output_dir = Path(os.getenv("CLP_LOGS_DIR")) / "profiles"
output_dir.mkdir(exist_ok=True, parents=True)

# Save HTML with interactive visualization
html_path = output_dir / f"{base_filename}.html"
with open(html_path, "w", encoding="utf-8") as f:
f.write(profiler.output_html())

# Save human-readable text summary with call hierarchy
txt_path = output_dir / f"{base_filename}.txt"
with open(txt_path, "w", encoding="utf-8") as f:
# Header
Comment on lines +230 to +241
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Guard against missing CLP_LOGS_DIR (prevents crash).

Path(None) will raise when CLP_LOGS_DIR is unset. Provide a fallback and warn.

-        output_dir = Path(os.getenv("CLP_LOGS_DIR")) / "profiles"
-        output_dir.mkdir(exist_ok=True, parents=True)
+        logs_dir_env = os.getenv("CLP_LOGS_DIR")
+        base_dir = Path(logs_dir_env) if logs_dir_env else Path.cwd()
+        if not logs_dir_env:
+            logger.warning("CLP_LOGS_DIR not set; writing profiles under %s/profiles", base_dir)
+        output_dir = base_dir / "profiles"
+        output_dir.mkdir(exist_ok=True, parents=True)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
output_dir = Path(os.getenv("CLP_LOGS_DIR")) / "profiles"
output_dir.mkdir(exist_ok=True, parents=True)
# Save HTML with interactive visualization
html_path = output_dir / f"{base_filename}.html"
with open(html_path, "w", encoding="utf-8") as f:
f.write(profiler.output_html())
# Save human-readable text summary with call hierarchy
txt_path = output_dir / f"{base_filename}.txt"
with open(txt_path, "w", encoding="utf-8") as f:
# Header
logs_dir_env = os.getenv("CLP_LOGS_DIR")
base_dir = Path(logs_dir_env) if logs_dir_env else Path.cwd()
if not logs_dir_env:
logger.warning("CLP_LOGS_DIR not set; writing profiles under %s/profiles", base_dir)
output_dir = base_dir / "profiles"
output_dir.mkdir(exist_ok=True, parents=True)
# Save HTML with interactive visualization
html_path = output_dir / f"{base_filename}.html"
with open(html_path, "w", encoding="utf-8") as f:
f.write(profiler.output_html())
# Save human-readable text summary with call hierarchy
txt_path = output_dir / f"{base_filename}.txt"
with open(txt_path, "w", encoding="utf-8") as f:
# Header
🤖 Prompt for AI Agents
In components/clp-py-utils/clp_py_utils/profiling_utils.py around lines 230 to
241, the code assumes CLP_LOGS_DIR is set and calls
Path(os.getenv("CLP_LOGS_DIR")), which will raise if the env var is None; guard
by reading os.getenv("CLP_LOGS_DIR") into a variable, if it's falsy use a safe
fallback (e.g., tempfile.gettempdir() or Path.cwd()), log a warning using the
module logger or logging.warning that CLP_LOGS_DIR was unset and the fallback is
being used, then construct the Path from that fallback and continue to mkdir and
write files as before.

f.write("=" * 80 + "\n")
f.write(f"CLP Query Profiling Report (pyinstrument)\n")
f.write(f"Section: {section_name}\n")
if job_id:
f.write(f"Job ID: {job_id}\n")
if task_id:
f.write(f"Task ID: {task_id}\n")
f.write(f"Timestamp: {timestamp}\n")
f.write("=" * 80 + "\n\n")
f.write(profiler.output_text(unicode=True, color=False))

logger.info(
f"Profile saved: {section_name} "
f"(duration={duration:.6f}s, samples={sample_count}) "
f"HTML={html_path}, TXT={txt_path}"
)

except Exception as e:
logger.error(f"Failed to save profile for {section_name}: {e}", exc_info=True)
1 change: 1 addition & 0 deletions components/clp-py-utils/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ dependencies = [
"mariadb>=1.0.11,<1.1.dev0",
"mysql-connector-python>=9.4.0",
"pydantic>=2.12.3",
"pyinstrument>=5.1.1",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Pin pyinstrument to major and consider optionality.

To avoid unexpected breaks on future major releases, pin the upper bound. If you want to keep profiling optional, pair this with a lazy import (see profiling_utils note).

Apply:

-    "pyinstrument>=5.1.1",
+    "pyinstrument>=5.1.1,<6",
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
"pyinstrument>=5.1.1",
"pyinstrument>=5.1.1,<6",

"python-Levenshtein>=0.27.1",
"PyYAML>=6.0.3",
"result>=0.17.0",
Expand Down
Loading
Loading