Skip to content
634 changes: 634 additions & 0 deletions docs/HEARTBEAT_SYSTEM_README.md

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion exe-requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
pyinstaller==6.12.0
pyinstaller==6.17.0
staticx @ git+https://github.com/Granulate/staticx.git@33eefdadc72832d5aa67c0792768c9e76afb746d; platform.machine == "x86_64"
606 changes: 606 additions & 0 deletions gprofiler/heartbeat.py

Large diffs are not rendered by default.

161 changes: 130 additions & 31 deletions gprofiler/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
from granulate_utils.linux.ns import is_root, is_running_in_init_pid
from granulate_utils.linux.process import is_process_running
from granulate_utils.metadata.cloud import get_aws_execution_env
from psutil import NoSuchProcess, Process
from psutil import NoSuchProcess, Process, process_iter
from requests import RequestException, Timeout

from gprofiler import __version__
Expand All @@ -48,6 +48,7 @@
from gprofiler.diagnostics import log_diagnostics, set_diagnostics
from gprofiler.exceptions import APIError, NoProfilersEnabledError
from gprofiler.gprofiler_types import ProcessToProfileData, UserArgs, integers_list, positive_integer
from gprofiler.heartbeat import DynamicGProfilerManager, HeartbeatClient
from gprofiler.hw_metrics import HWMetricsMonitor, HWMetricsMonitorBase, NoopHWMetricsMonitor
from gprofiler.log import RemoteLogsHandler, initial_root_logger_setup
from gprofiler.merge import concatenate_from_external_file, concatenate_profiles, merge_profiles
Expand Down Expand Up @@ -167,6 +168,8 @@ def __init__(
profiling_mode=profiling_mode,
container_names_client=container_names_client,
processes_to_profile=processes_to_profile,
max_processes_per_profiler=user_args.get("max_processes_per_profiler", 0),
max_system_processes_for_system_profilers=user_args.get("max_system_processes_for_system_profilers", 0),
)
self.system_profiler, self.process_profilers = get_profilers(user_args, profiler_state=self._profiler_state)
self._usage_logger = usage_logger
Expand Down Expand Up @@ -279,8 +282,37 @@ def start(self) -> None:
self._system_metrics_monitor.start()
self._hw_metrics_monitor.start()

# Check if system should skip continuous profilers due to process count
skip_system_profilers = False
if self._profiler_state.max_system_processes_for_system_profilers > 0:
try:
total_processes = len(list(process_iter()))
if total_processes > self._profiler_state.max_system_processes_for_system_profilers:
skip_system_profilers = True
logger.warning(
f"Skipping system profilers (perf) - {total_processes} processes exceed threshold "
f"of {self._profiler_state.max_system_processes_for_system_profilers}. "
f"Runtime profilers (py-spy, Java, etc.) will continue normally."
)
else:
logger.debug(
f"System process count: {total_processes} "
f"(threshold: {self._profiler_state.max_system_processes_for_system_profilers})"
)
except Exception as e:
logger.warning(f"Could not count system processes, continuing with all profilers: {e}")

for prof in list(self.all_profilers):
try:
# Skip system profilers if threshold exceeded
if (
skip_system_profilers
and hasattr(prof, "_is_system_wide_profiler")
and prof._is_system_wide_profiler()
):
logger.info(f"Skipping {prof.__class__.__name__} due to high system process count")
continue

prof.start()
except Exception:
# the SystemProfiler is handled separately - let the user run with '--perf-mode none' if they
Expand Down Expand Up @@ -594,6 +626,25 @@ def parse_cmd_args() -> configargparse.Namespace:
help="Comma separated list of processes that will be filtered to profile,"
" given multiple times will append pids to one list",
)
parser.add_argument(
"--max-processes-runtime-profiler",
dest="max_processes_per_profiler",
type=positive_integer,
default=0,
help="Maximum number of processes to profile per runtime profiler (0=unlimited). "
"When exceeded, profiles only the top N processes by CPU usage. "
"Does not affect system-wide profilers (perf, eBPF). Default: %(default)s",
)
parser.add_argument(
"--skip-system-profilers-above",
dest="max_system_processes_for_system_profilers",
type=positive_integer,
default=0,
help="Skip system-wide profilers (perf only) when total system processes exceed this threshold (0=unlimited). "
"When exceeded, prevents perf profiler from starting to reduce resource usage on busy systems. "
"PyPerf has its own threshold via --python-skip-pyperf-profiler-above. "
"Runtime profilers (py-spy, Java, etc.) continue normally with --max-processes limiting. Default: %(default)s",
)
parser.add_argument(
"--rootless",
action="store_true",
Expand Down Expand Up @@ -861,6 +912,22 @@ def parse_cmd_args() -> configargparse.Namespace:
"The file modification indicates the last snapshot time.",
)

parser.add_argument(
"--enable-heartbeat-server",
action="store_true",
dest="enable_heartbeat_server",
default=False,
help="Enable heartbeat communication with server for dynamic profiling commands",
)

parser.add_argument(
"--heartbeat-interval",
type=positive_integer,
dest="heartbeat_interval",
default=30,
help="Interval in seconds for sending heartbeats to server (default: %(default)s)",
)

if is_linux() and not is_aarch64():
hw_metrics_options = parser.add_argument_group("hardware metrics")
hw_metrics_options.add_argument(
Expand Down Expand Up @@ -936,6 +1003,14 @@ def parse_cmd_args() -> configargparse.Namespace:
if args.profile_spawned_processes and args.pids_to_profile is not None:
parser.error("--pids is not allowed when profiling spawned processes")

if args.enable_heartbeat_server:
if not args.upload_results:
parser.error("--enable-heartbeat-server requires --upload-results to be enabled")
if not args.server_token:
parser.error("--enable-heartbeat-server requires --token to be provided")
if not args.service_name:
parser.error("--enable-heartbeat-server requires --service-name to be provided")

return args


Expand Down Expand Up @@ -1215,37 +1290,61 @@ def main() -> None:

ApplicationIdentifiers.init(enrichment_options)
set_diagnostics(args.diagnostics)
gprofiler = GProfiler(
output_dir=args.output_dir,
flamegraph=args.flamegraph,
rotating_output=args.rotating_output,
rootless=args.rootless,
profiler_api_client=profiler_api_client,
collect_metrics=args.collect_metrics,
collect_metadata=args.collect_metadata,
enrichment_options=enrichment_options,
state=state,
usage_logger=usage_logger,
user_args=args.__dict__,
duration=args.duration,
profile_api_version=args.profile_api_version,
profiling_mode=args.profiling_mode,
collect_hw_metrics=getattr(args, "collect_hw_metrics", False),
profile_spawned_processes=args.profile_spawned_processes,
remote_logs_handler=remote_logs_handler,
controller_process=controller_process,
processes_to_profile=processes_to_profile,
external_metadata_path=external_metadata_path,
heartbeat_file_path=heartbeat_file_path,
perfspect_path=perfspect_path,
perfspect_duration=getattr(args, "tool_perfspect_duration", 60),
verbose=args.verbose,
)
logger.info("gProfiler initialized and ready to start profiling")
if args.continuous:
gprofiler.run_continuous()

# Check if heartbeat server mode is enabled FIRST
if args.enable_heartbeat_server:
# Create heartbeat client
heartbeat_client = HeartbeatClient(
api_server=args.api_server,
service_name=args.service_name,
server_token=args.server_token,
verify=args.verify,
)

# Create dynamic profiler manager
manager = DynamicGProfilerManager(args, heartbeat_client)
manager.heartbeat_interval = args.heartbeat_interval

try:
logger.info("Starting heartbeat mode - waiting for server commands...")
manager.start_heartbeat_loop()
except KeyboardInterrupt:
logger.info("Received interrupt signal, stopping heartbeat mode...")
finally:
manager.stop()
else:
gprofiler.run_single()
# Normal profiling mode
gprofiler = GProfiler(
output_dir=args.output_dir,
flamegraph=args.flamegraph,
rotating_output=args.rotating_output,
rootless=args.rootless,
profiler_api_client=profiler_api_client,
collect_metrics=args.collect_metrics,
collect_metadata=args.collect_metadata,
enrichment_options=enrichment_options,
state=state,
usage_logger=usage_logger,
user_args=args.__dict__,
duration=args.duration,
profile_api_version=args.profile_api_version,
profiling_mode=args.profiling_mode,
collect_hw_metrics=getattr(args, "collect_hw_metrics", False),
profile_spawned_processes=args.profile_spawned_processes,
remote_logs_handler=remote_logs_handler,
controller_process=controller_process,
processes_to_profile=processes_to_profile,
external_metadata_path=external_metadata_path,
heartbeat_file_path=heartbeat_file_path,
perfspect_path=perfspect_path,
perfspect_duration=getattr(args, "tool_perfspect_duration", 60),
verbose=args.verbose,
)
logger.info("gProfiler initialized and ready to start profiling")
if args.continuous:
gprofiler.run_continuous()
else:
gprofiler.run_single()

except KeyboardInterrupt:
pass
Expand Down
2 changes: 2 additions & 0 deletions gprofiler/profiler_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ class ProfilerState:
profiling_mode: str
container_names_client: Optional[ContainerNamesClient]
processes_to_profile: Optional[List[Process]]
max_processes_per_profiler: int
max_system_processes_for_system_profilers: int

def __post_init__(self) -> None:
self._temporary_dir = TemporaryDirectoryWithMode(dir=self.storage_dir, mode=0o755)
Expand Down
43 changes: 43 additions & 0 deletions gprofiler/profilers/perf.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,30 @@ def add_highest_avg_depth_stacks_per_process(
action="store_false",
dest="perf_memory_restart",
),
ProfilerArgument(
"--perf-use-cgroups",
help="Use cgroup-based profiling instead of PID-based profiling for better reliability. "
"Profiles the top N cgroups by resource usage, avoiding crashes from invalid PIDs.",
action="store_true",
default=False,
dest="perf_use_cgroups",
),
ProfilerArgument(
"--perf-max-cgroups",
help="Maximum number of cgroups to profile when using --perf-use-cgroups. Default: %(default)s",
type=int,
default=50,
dest="perf_max_cgroups",
),
ProfilerArgument(
"--perf-max-docker-containers",
help="Maximum number of individual Docker containers to profile instead of the broad 'docker' cgroup. "
"When set, profiles the top N highest-resource individual containers rather than all containers together. "
"Set to 0 to use the broad 'docker' cgroup (default behavior). Default: %(default)s",
type=int,
default=0,
dest="perf_max_docker_containers",
),
],
disablement_help="Disable the global perf of processes,"
" and instead only concatenate runtime-specific profilers results",
Expand All @@ -138,6 +162,10 @@ class SystemProfiler(ProfilerBase):
versions of Go processes.
"""

def _is_system_wide_profiler(self) -> bool:
"""Perf is a system-wide profiler that can be disabled on busy systems."""
return True

def __init__(
self,
frequency: int,
Expand All @@ -148,6 +176,9 @@ def __init__(
perf_inject: bool,
perf_node_attach: bool,
perf_memory_restart: bool,
perf_use_cgroups: bool = False,
perf_max_cgroups: int = 50,
perf_max_docker_containers: int = 0,
min_duration: int = 0,
):
super().__init__(frequency, duration, profiler_state, min_duration)
Expand All @@ -159,6 +190,12 @@ def __init__(
self._node_processes: List[Process] = []
self._node_processes_attached: List[Process] = []
self._perf_memory_restart = perf_memory_restart
self._perf_mode = perf_mode
self._perf_dwarf_stack_size = perf_dwarf_stack_size
self._perf_inject = perf_inject
self._perf_use_cgroups = perf_use_cgroups
self._perf_max_cgroups = perf_max_cgroups
self._perf_max_docker_containers = perf_max_docker_containers
switch_timeout_s = duration * 3 # allow gprofiler to be delayed up to 3 intervals before timing out.
extra_args = []
try:
Expand All @@ -184,6 +221,9 @@ def __init__(
extra_args=extra_args,
processes_to_profile=self._profiler_state.processes_to_profile,
switch_timeout_s=switch_timeout_s,
use_cgroups=self._perf_use_cgroups,
max_cgroups=self._perf_max_cgroups,
max_docker_containers=self._perf_max_docker_containers,
)
self._perfs.append(self._perf_fp)
else:
Expand All @@ -200,6 +240,9 @@ def __init__(
extra_args=extra_args,
processes_to_profile=self._profiler_state.processes_to_profile,
switch_timeout_s=switch_timeout_s,
use_cgroups=self._perf_use_cgroups,
max_cgroups=self._perf_max_cgroups,
max_docker_containers=self._perf_max_docker_containers,
)
self._perfs.append(self._perf_dwarf)
else:
Expand Down
Loading
Loading