Skip to content

Commit 291e1e4

Browse files
committed
refactoring to batches
1 parent 7afd7d0 commit 291e1e4

File tree

2 files changed

+166
-34
lines changed

2 files changed

+166
-34
lines changed

robusta_krr/core/integrations/kubernetes/__init__.py

Lines changed: 162 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -261,6 +261,81 @@ def _should_list_resource(self, resource: str) -> bool:
261261
return True
262262
return resource in settings.resources
263263

264+
async def _list_namespaced_or_global_objects_batched(
265+
self,
266+
kind: KindLiteral,
267+
all_namespaces_request: Callable,
268+
namespaced_request: Callable,
269+
limit: Optional[int] = None,
270+
continue_ref: Optional[str] = None,
271+
) -> tuple[list[Any], Optional[str]]:
272+
logger.debug(f"Listing {kind}s in {self.cluster} with batching (limit={limit})")
273+
loop = asyncio.get_running_loop()
274+
275+
try:
276+
if self.namespaces == "*":
277+
requests = [
278+
loop.run_in_executor(
279+
self.executor,
280+
lambda: all_namespaces_request(
281+
watch=False,
282+
label_selector=settings.selector,
283+
limit=limit,
284+
_continue=continue_ref,
285+
),
286+
)
287+
]
288+
else:
289+
requests = [
290+
loop.run_in_executor(
291+
self.executor,
292+
lambda ns=namespace: namespaced_request(
293+
namespace=ns,
294+
watch=False,
295+
label_selector=settings.selector,
296+
limit=limit,
297+
_continue=continue_ref,
298+
),
299+
)
300+
for namespace in self.namespaces
301+
]
302+
303+
result = [
304+
item
305+
for request_result in await asyncio.gather(*requests)
306+
for item in request_result.items
307+
]
308+
309+
# Extract continue token from the first result
310+
next_continue_ref = None
311+
if requests:
312+
first_result = await requests[0]
313+
next_continue_ref = getattr(first_result.metadata, '_continue', None)
314+
315+
logger.debug(f"Found {len(result)} {kind} in {self.cluster}")
316+
return result, next_continue_ref
317+
318+
except ApiException as e:
319+
if e.status == 410 and e.body:
320+
# Continue token expired, extract new token from error and continue
321+
import json
322+
try:
323+
error_body = json.loads(e.body)
324+
new_continue_token = error_body.get("metadata", {}).get("continue")
325+
if new_continue_token:
326+
logger.info("Continue token expired for jobs listing. Continuing")
327+
# Retry with new continue token
328+
return await self._list_namespaced_or_global_objects_batched(
329+
kind=kind,
330+
all_namespaces_request=all_namespaces_request,
331+
namespaced_request=namespaced_request,
332+
limit=limit,
333+
continue_ref=new_continue_token,
334+
)
335+
except (json.JSONDecodeError, KeyError):
336+
pass
337+
raise
338+
264339
async def _list_namespaced_or_global_objects(
265340
self,
266341
kind: KindLiteral,
@@ -458,26 +533,57 @@ def _list_all_daemon_set(self) -> list[K8sObjectData]:
458533
extract_containers=lambda item: item.spec.template.spec.containers,
459534
)
460535

461-
def _list_all_jobs(self) -> list[K8sObjectData]:
462-
def filter_jobs(item):
463-
# Skip jobs owned by CronJobs
464-
if any(owner.kind == "CronJob" for owner in item.metadata.owner_references or []):
465-
return False
536+
537+
async def _list_all_jobs(self) -> list[K8sObjectData]:
538+
"""List all jobs using batched loading with 500 batch size."""
539+
if not self._should_list_resource("Job"):
540+
return []
541+
542+
all_jobs = []
543+
try:
544+
continue_ref: Optional[str] = None
545+
batch_count = 0
466546

467-
# Skip jobs that have any of the grouping labels (they will be handled by GroupedJob)
468-
if settings.job_grouping_labels and item.metadata.labels:
469-
if any(label in item.metadata.labels for label in settings.job_grouping_labels):
470-
return False
547+
while batch_count < settings.discovery_job_max_batches:
548+
jobs_batch, next_continue_ref = await self._list_namespaced_or_global_objects_batched(
549+
kind="Job",
550+
all_namespaces_request=self.batch.list_job_for_all_namespaces,
551+
namespaced_request=self.batch.list_namespaced_job,
552+
limit=settings.discovery_job_batch_size,
553+
continue_ref=continue_ref,
554+
)
555+
556+
# Process jobs in this batch
557+
for job in jobs_batch:
558+
# Skip jobs owned by CronJobs
559+
if any(owner.kind == "CronJob" for owner in job.metadata.owner_references or []):
560+
continue
561+
562+
# Skip jobs that have any of the grouping labels (they will be handled by GroupedJob)
563+
if settings.job_grouping_labels and job.metadata.labels:
564+
if any(label in job.metadata.labels for label in settings.job_grouping_labels):
565+
continue
566+
567+
# Add regular jobs
568+
for container in job.spec.template.spec.containers:
569+
all_jobs.append(self.__build_scannable_object(job, container, "Job"))
570+
571+
batch_count += 1
572+
573+
# Check if we have more batches
574+
continue_ref = next_continue_ref
575+
if not continue_ref:
576+
break
471577

472-
return True
473-
474-
return self._list_scannable_objects(
475-
kind="Job",
476-
all_namespaces_request=self.batch.list_job_for_all_namespaces,
477-
namespaced_request=self.batch.list_namespaced_job,
478-
extract_containers=lambda item: item.spec.template.spec.containers,
479-
filter_workflows=filter_jobs,
480-
)
578+
logger.debug("Found %d regular jobs", len(all_jobs))
579+
return all_jobs
580+
581+
except Exception as e:
582+
logger.error(
583+
"Failed to run jobs discovery",
584+
exc_info=True,
585+
)
586+
return all_jobs
481587

482588
def _list_all_cronjobs(self) -> list[K8sObjectData]:
483589
return self._list_scannable_objects(
@@ -497,25 +603,47 @@ async def _list_all_groupedjobs(self) -> list[K8sObjectData]:
497603
logger.debug("Skipping GroupedJob in cluster")
498604
return []
499605

500-
logger.debug(f"Listing GroupedJobs with grouping labels: {settings.job_grouping_labels}")
501-
502-
# Get all jobs that have any of the grouping labels
503-
all_jobs = await self._list_namespaced_or_global_objects(
504-
kind="Job",
505-
all_namespaces_request=self.batch.list_job_for_all_namespaces,
506-
namespaced_request=self.batch.list_namespaced_job,
507-
)
606+
logger.debug("Listing GroupedJobs with grouping labels: %s", settings.job_grouping_labels)
508607

608+
# Get all jobs that have any of the grouping labels using batched loading
509609
grouped_jobs = defaultdict(list)
510-
for job in all_jobs:
511-
if (job.metadata.labels and
512-
not any(owner.kind == "CronJob" for owner in job.metadata.owner_references or [])):
610+
continue_ref: Optional[str] = None
611+
batch_count = 0
612+
613+
try:
614+
while batch_count < settings.discovery_job_max_batches:
615+
jobs_batch, next_continue_ref = await self._list_namespaced_or_global_objects_batched(
616+
kind="Job",
617+
all_namespaces_request=self.batch.list_job_for_all_namespaces,
618+
namespaced_request=self.batch.list_namespaced_job,
619+
limit=settings.discovery_job_batch_size,
620+
continue_ref=continue_ref,
621+
)
622+
623+
# Process jobs in this batch immediately - only keep grouped jobs
624+
for job in jobs_batch:
625+
if (job.metadata.labels and
626+
not any(owner.kind == "CronJob" for owner in job.metadata.owner_references or [])):
627+
628+
for label_name in settings.job_grouping_labels:
629+
if label_name in job.metadata.labels:
630+
label_value = job.metadata.labels[label_name]
631+
group_key = f"{label_name}={label_value}"
632+
grouped_jobs[group_key].append(job)
633+
break # Only add to first matching group
513634

514-
for label_name in settings.job_grouping_labels:
515-
if label_name in job.metadata.labels:
516-
label_value = job.metadata.labels[label_name]
517-
group_key = f"{label_name}={label_value}"
518-
grouped_jobs[group_key].append(job)
635+
batch_count += 1
636+
637+
continue_ref = next_continue_ref
638+
if not continue_ref:
639+
break
640+
641+
except Exception as e:
642+
logger.error(
643+
"Failed to run grouped jobs discovery",
644+
exc_info=True,
645+
)
646+
raise e
519647

520648
result = []
521649
for group_name, jobs in grouped_jobs.items():

robusta_krr/core/models/config.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,10 @@ class Config(pd.BaseSettings):
5353
# Threading settings
5454
max_workers: int = pd.Field(6, ge=1)
5555

56+
# Discovery settings
57+
discovery_job_batch_size: int = pd.Field(1000, ge=1, description="Batch size for Kubernetes job API calls")
58+
discovery_job_max_batches: int = pd.Field(50, ge=1, description="Maximum number of job batches to process to prevent infinite loops")
59+
5660
# Job grouping settings
5761
job_grouping_labels: Union[list[str], str, None] = pd.Field(None, description="Label name(s) to use for grouping jobs into GroupedJob workload type")
5862
job_grouping_limit: int = pd.Field(500, ge=1, description="Maximum number of jobs/pods to query per GroupedJob group")

0 commit comments

Comments
 (0)