Skip to content

Commit 9ce49cd

Browse files
[ROB-2447] Oomkill job fix (#484)
Co-authored-by: arik <[email protected]>
1 parent 599350f commit 9ce49cd

File tree

6 files changed

+670
-59
lines changed

6 files changed

+670
-59
lines changed

robusta_krr/core/integrations/kubernetes/__init__.py

Lines changed: 195 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,14 @@
2323
from robusta_krr.core.models.result import ResourceAllocations
2424
from robusta_krr.utils.object_like_dict import ObjectLikeDict
2525

26+
27+
class LightweightJobInfo:
28+
"""Lightweight job object containing only the fields needed for GroupedJob processing."""
29+
def __init__(self, name: str, namespace: str):
30+
self.name = name
31+
self.namespace = namespace
32+
33+
2634
from . import config_patch as _
2735

2836
logger = logging.getLogger("krr")
@@ -261,6 +269,82 @@ def _should_list_resource(self, resource: str) -> bool:
261269
return True
262270
return resource in settings.resources
263271

272+
def _is_job_owned_by_cronjob(self, job: V1Job) -> bool:
273+
"""Check if a job is owned by a CronJob."""
274+
return any(owner.kind == "CronJob" for owner in job.metadata.owner_references or [])
275+
276+
def _is_job_grouped(self, job: V1Job) -> bool:
277+
"""Check if a job has any of the grouping labels."""
278+
if not settings.job_grouping_labels or not job.metadata.labels:
279+
return False
280+
return any(label in job.metadata.labels for label in settings.job_grouping_labels)
281+
282+
async def _list_namespaced_or_global_objects_batched(
283+
self,
284+
kind: KindLiteral,
285+
all_namespaces_request: Callable,
286+
namespaced_request: Callable,
287+
namespace: str,
288+
limit: Optional[int] = None,
289+
continue_ref: Optional[str] = None,
290+
) -> tuple[list[Any], Optional[str]]:
291+
logger.debug("Listing %s in %s with batching (limit=%d)", kind, self.cluster, limit)
292+
loop = asyncio.get_running_loop()
293+
294+
try:
295+
if namespace == "*":
296+
requests = [
297+
loop.run_in_executor(
298+
self.executor,
299+
lambda: all_namespaces_request(
300+
watch=False,
301+
label_selector=settings.selector,
302+
limit=limit,
303+
_continue=continue_ref,
304+
),
305+
)
306+
]
307+
else:
308+
requests = [
309+
loop.run_in_executor(
310+
self.executor,
311+
lambda: namespaced_request(
312+
namespace=namespace,
313+
watch=False,
314+
label_selector=settings.selector,
315+
limit=limit,
316+
_continue=continue_ref,
317+
),
318+
) ]
319+
320+
gathered_results = await asyncio.gather(*requests)
321+
322+
result = [
323+
item
324+
for request_result in gathered_results
325+
for item in request_result.items
326+
]
327+
328+
next_continue_ref = None
329+
if gathered_results:
330+
next_continue_ref = getattr(gathered_results[0].metadata, '_continue', None)
331+
332+
return result, next_continue_ref
333+
334+
except ApiException as e:
335+
if e.status == 410 and e.body:
336+
# Continue token expired
337+
import json
338+
try:
339+
error_body = json.loads(e.body)
340+
new_continue_token = error_body.get("metadata", {}).get("continue")
341+
if new_continue_token:
342+
logger.info("Continue token expired for jobs listing. Continuing")
343+
return [], new_continue_token
344+
except (json.JSONDecodeError, KeyError):
345+
pass
346+
raise
347+
264348
async def _list_namespaced_or_global_objects(
265349
self,
266350
kind: KindLiteral,
@@ -458,26 +542,57 @@ def _list_all_daemon_set(self) -> list[K8sObjectData]:
458542
extract_containers=lambda item: item.spec.template.spec.containers,
459543
)
460544

461-
def _list_all_jobs(self) -> list[K8sObjectData]:
462-
def filter_jobs(item):
463-
# Skip jobs owned by CronJobs
464-
if any(owner.kind == "CronJob" for owner in item.metadata.owner_references or []):
465-
return False
545+
546+
async def _list_all_jobs(self) -> list[K8sObjectData]:
547+
"""List all jobs using batched loading with 500 batch size."""
548+
if not self._should_list_resource("Job"):
549+
return []
550+
551+
namespaces = self.namespaces if self.namespaces != "*" else ["*"]
552+
all_jobs = []
553+
try:
554+
batch_count = 0
555+
for namespace in namespaces:
556+
continue_ref: Optional[str] = None
557+
while batch_count < settings.discovery_job_max_batches:
558+
jobs_batch, next_continue_ref = await self._list_namespaced_or_global_objects_batched(
559+
kind="Job",
560+
all_namespaces_request=self.batch.list_job_for_all_namespaces,
561+
namespaced_request=self.batch.list_namespaced_job,
562+
namespace=namespace,
563+
limit=settings.discovery_job_batch_size,
564+
continue_ref=continue_ref,
565+
)
566+
continue_ref = next_continue_ref
567+
568+
if not jobs_batch and continue_ref:
569+
# refreshed continue token, count error batches
570+
batch_count += 1
571+
continue
572+
if not jobs_batch:
573+
# no more jobs to batch do not count empty batches
574+
break
575+
576+
batch_count += 1
577+
for job in jobs_batch:
578+
if self._is_job_owned_by_cronjob(job):
579+
continue
580+
if self._is_job_grouped(job):
581+
continue
582+
for container in job.spec.template.spec.containers:
583+
all_jobs.append(self.__build_scannable_object(job, container, "Job"))
584+
if not continue_ref:
585+
break
466586

467-
# Skip jobs that have any of the grouping labels (they will be handled by GroupedJob)
468-
if settings.job_grouping_labels and item.metadata.labels:
469-
if any(label in item.metadata.labels for label in settings.job_grouping_labels):
470-
return False
587+
logger.debug("Found %d regular jobs", len(all_jobs))
588+
return all_jobs
471589

472-
return True
473-
474-
return self._list_scannable_objects(
475-
kind="Job",
476-
all_namespaces_request=self.batch.list_job_for_all_namespaces,
477-
namespaced_request=self.batch.list_namespaced_job,
478-
extract_containers=lambda item: item.spec.template.spec.containers,
479-
filter_workflows=filter_jobs,
480-
)
590+
except Exception as e:
591+
logger.error(
592+
"Failed to run jobs discovery",
593+
exc_info=True,
594+
)
595+
return all_jobs
481596

482597
def _list_all_cronjobs(self) -> list[K8sObjectData]:
483598
return self._list_scannable_objects(
@@ -497,42 +612,82 @@ async def _list_all_groupedjobs(self) -> list[K8sObjectData]:
497612
logger.debug("Skipping GroupedJob in cluster")
498613
return []
499614

500-
logger.debug(f"Listing GroupedJobs with grouping labels: {settings.job_grouping_labels}")
501-
502-
# Get all jobs that have any of the grouping labels
503-
all_jobs = await self._list_namespaced_or_global_objects(
504-
kind="Job",
505-
all_namespaces_request=self.batch.list_job_for_all_namespaces,
506-
namespaced_request=self.batch.list_namespaced_job,
507-
)
615+
logger.debug("Listing GroupedJobs with grouping labels: %s", settings.job_grouping_labels)
508616

509617
grouped_jobs = defaultdict(list)
510-
for job in all_jobs:
511-
if (job.metadata.labels and
512-
not any(owner.kind == "CronJob" for owner in job.metadata.owner_references or [])):
513-
514-
for label_name in settings.job_grouping_labels:
515-
if label_name in job.metadata.labels:
516-
label_value = job.metadata.labels[label_name]
517-
group_key = f"{label_name}={label_value}"
518-
grouped_jobs[group_key].append(job)
618+
grouped_jobs_template = {} # Store only ONE full job as template per group - needed for class K8sObjectData
619+
continue_ref: Optional[str] = None
620+
batch_count = 0
621+
namespaces = self.namespaces if self.namespaces != "*" else ["*"]
622+
try:
623+
batch_count = 0
624+
for namespace in namespaces:
625+
continue_ref = None
626+
while batch_count < settings.discovery_job_max_batches:
627+
jobs_batch, next_continue_ref = await self._list_namespaced_or_global_objects_batched(
628+
kind="Job",
629+
all_namespaces_request=self.batch.list_job_for_all_namespaces,
630+
namespaced_request=self.batch.list_namespaced_job,
631+
namespace=namespace,
632+
limit=settings.discovery_job_batch_size,
633+
continue_ref=continue_ref,
634+
)
635+
636+
continue_ref = next_continue_ref
637+
638+
if not jobs_batch and continue_ref:
639+
# refreshed continue token, count error batches
640+
batch_count += 1
641+
continue
642+
if not jobs_batch:
643+
# no more jobs to batch do not count empty batches
644+
break
645+
646+
batch_count += 1
647+
for job in jobs_batch:
648+
if not job.metadata.labels or self._is_job_owned_by_cronjob(job) or not self._is_job_grouped(job):
649+
continue
650+
for label_name in settings.job_grouping_labels:
651+
if label_name not in job.metadata.labels:
652+
continue
653+
# label_name is value of grouped job label
654+
label_value = job.metadata.labels[label_name]
655+
group_key = f"{label_name}={label_value}"
656+
lightweight_job = LightweightJobInfo(
657+
name=job.metadata.name,
658+
namespace=job.metadata.namespace
659+
)
660+
# Store lightweight job info only for grouped jobs
661+
grouped_jobs[group_key].append(lightweight_job)
662+
# Keep only ONE full job as template per group
663+
if group_key not in grouped_jobs_template:
664+
grouped_jobs_template[group_key] = job
665+
if not continue_ref:
666+
break
667+
668+
except Exception as e:
669+
logger.error(
670+
"Failed to run grouped jobs discovery",
671+
exc_info=True,
672+
)
673+
raise
519674

520675
result = []
521676
for group_name, jobs in grouped_jobs.items():
677+
template_job = grouped_jobs_template[group_name]
678+
522679
jobs_by_namespace = defaultdict(list)
523680
for job in jobs:
524-
jobs_by_namespace[job.metadata.namespace].append(job)
681+
jobs_by_namespace[job.namespace].append(job)
525682

526683
for namespace, namespace_jobs in jobs_by_namespace.items():
527684
limited_jobs = namespace_jobs[:settings.job_grouping_limit]
528685

529686
container_names = set()
530-
for job in limited_jobs:
531-
for container in job.spec.template.spec.containers:
532-
container_names.add(container.name)
687+
for container in template_job.spec.template.spec.containers:
688+
container_names.add(container.name)
533689

534690
for container_name in container_names:
535-
template_job = limited_jobs[0]
536691
template_container = None
537692
for container in template_job.spec.template.spec.containers:
538693
if container.name == container_name:

robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -351,7 +351,7 @@ async def load_pods(self, object: K8sObjectData, period: timedelta) -> list[PodD
351351
del jobs
352352
elif object.kind == "GroupedJob":
353353
if hasattr(object._api_resource, '_grouped_jobs'):
354-
pod_owners = [job.metadata.name for job in object._api_resource._grouped_jobs]
354+
pod_owners = [job.name for job in object._api_resource._grouped_jobs]
355355
pod_owner_kind = "Job"
356356
else:
357357
pod_owners = [object.name]

robusta_krr/core/models/config.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,10 @@ class Config(pd.BaseSettings):
5353
# Threading settings
5454
max_workers: int = pd.Field(6, ge=1)
5555

56+
# Discovery settings
57+
discovery_job_batch_size: int = pd.Field(5000, ge=1, description="Batch size for Kubernetes job API calls")
58+
discovery_job_max_batches: int = pd.Field(100, ge=1, description="Maximum number of job batches to process to prevent infinite loops")
59+
5660
# Job grouping settings
5761
job_grouping_labels: Union[list[str], str, None] = pd.Field(None, description="Label name(s) to use for grouping jobs into GroupedJob workload type")
5862
job_grouping_limit: int = pd.Field(500, ge=1, description="Maximum number of jobs/pods to query per GroupedJob group")

robusta_krr/main.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,18 @@ def run_strategy(
232232
help="Maximum number of jobs/pods to query per GroupedJob group (default: 500).",
233233
rich_help_panel="Job Grouping Settings",
234234
),
235+
discovery_job_batch_size: int = typer.Option(
236+
5000,
237+
"--discovery-job-batch-size",
238+
help="Batch size for Kubernetes job API calls (default: 5000).",
239+
rich_help_panel="Job Discovery Settings",
240+
),
241+
discovery_job_max_batches: int = typer.Option(
242+
100,
243+
"--discovery-job-max-batches",
244+
help="Maximum number of job batches to process to prevent infinite loops (default: 100).",
245+
rich_help_panel="Job Discovery Settings",
246+
),
235247
format: str = typer.Option(
236248
"table",
237249
"--formatter",
@@ -371,6 +383,8 @@ def run_strategy(
371383
max_workers=max_workers,
372384
job_grouping_labels=job_grouping_labels,
373385
job_grouping_limit=job_grouping_limit,
386+
discovery_job_batch_size=discovery_job_batch_size,
387+
discovery_job_max_batches=discovery_job_max_batches,
374388
format=format,
375389
show_cluster_name=show_cluster_name,
376390
verbose=verbose,

0 commit comments

Comments
 (0)