Skip to content

Commit c41800c

Browse files
committed
fix namespace continue bug
1 parent 8e11b77 commit c41800c

File tree

1 file changed

+75
-71
lines changed

1 file changed

+75
-71
lines changed

robusta_krr/core/integrations/kubernetes/__init__.py

Lines changed: 75 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -284,14 +284,15 @@ async def _list_namespaced_or_global_objects_batched(
284284
kind: KindLiteral,
285285
all_namespaces_request: Callable,
286286
namespaced_request: Callable,
287+
namespace: str,
287288
limit: Optional[int] = None,
288289
continue_ref: Optional[str] = None,
289290
) -> tuple[list[Any], Optional[str]]:
290291
logger.debug("Listing %s in %s with batching (limit=%d)", kind, self.cluster, limit)
291292
loop = asyncio.get_running_loop()
292293

293294
try:
294-
if self.namespaces == "*":
295+
if namespace == "*":
295296
requests = [
296297
loop.run_in_executor(
297298
self.executor,
@@ -307,16 +308,14 @@ async def _list_namespaced_or_global_objects_batched(
307308
requests = [
308309
loop.run_in_executor(
309310
self.executor,
310-
lambda ns=namespace: namespaced_request(
311-
namespace=ns,
311+
lambda: namespaced_request(
312+
namespace=namespace,
312313
watch=False,
313314
label_selector=settings.selector,
314315
limit=limit,
315316
_continue=continue_ref,
316317
),
317-
)
318-
for namespace in self.namespaces
319-
]
318+
) ]
320319

321320
gathered_results = await asyncio.gather(*requests)
322321

@@ -549,39 +548,40 @@ async def _list_all_jobs(self) -> list[K8sObjectData]:
549548
if not self._should_list_resource("Job"):
550549
return []
551550

551+
namespaces = self.namespaces if self.namespaces != "*" else ["*"]
552552
all_jobs = []
553553
try:
554-
continue_ref: Optional[str] = None
555554
batch_count = 0
556-
557-
while batch_count < settings.discovery_job_max_batches:
558-
jobs_batch, next_continue_ref = await self._list_namespaced_or_global_objects_batched(
559-
kind="Job",
560-
all_namespaces_request=self.batch.list_job_for_all_namespaces,
561-
namespaced_request=self.batch.list_namespaced_job,
562-
limit=settings.discovery_job_batch_size,
563-
continue_ref=continue_ref,
564-
)
565-
566-
batch_count += 1
567-
continue_ref = next_continue_ref
555+
for namespace in namespaces:
556+
continue_ref: Optional[str] = None
557+
while batch_count < settings.discovery_job_max_batches:
558+
jobs_batch, next_continue_ref = await self._list_namespaced_or_global_objects_batched(
559+
kind="Job",
560+
all_namespaces_request=self.batch.list_job_for_all_namespaces,
561+
namespaced_request=self.batch.list_namespaced_job,
562+
namespace=namespace,
563+
limit=settings.discovery_job_batch_size,
564+
continue_ref=continue_ref,
565+
)
566+
batch_count += 1
567+
continue_ref = next_continue_ref
568568

569-
# refreshed continue token
570-
if not jobs_batch and continue_ref:
571-
continue
572-
573-
for job in jobs_batch:
574-
if self._is_job_owned_by_cronjob(job):
575-
continue
576-
577-
if self._is_job_grouped(job):
569+
# refreshed continue token
570+
if not jobs_batch and continue_ref:
578571
continue
579-
580-
for container in job.spec.template.spec.containers:
581-
all_jobs.append(self.__build_scannable_object(job, container, "Job"))
582-
583-
if not continue_ref:
584-
break
572+
if not jobs_batch:
573+
# no more jobs to batch
574+
break
575+
576+
for job in jobs_batch:
577+
if self._is_job_owned_by_cronjob(job):
578+
continue
579+
if self._is_job_grouped(job):
580+
continue
581+
for container in job.spec.template.spec.containers:
582+
all_jobs.append(self.__build_scannable_object(job, container, "Job"))
583+
if not continue_ref:
584+
break
585585

586586
logger.debug("Found %d regular jobs", len(all_jobs))
587587
return all_jobs
@@ -617,47 +617,51 @@ async def _list_all_groupedjobs(self) -> list[K8sObjectData]:
617617
grouped_jobs_template = {} # Store only ONE full job as template per group - needed for class K8sObjectData
618618
continue_ref: Optional[str] = None
619619
batch_count = 0
620-
620+
namespaces = self.namespaces if self.namespaces != "*" else ["*"]
621621
try:
622-
while batch_count < settings.discovery_job_max_batches:
623-
jobs_batch, next_continue_ref = await self._list_namespaced_or_global_objects_batched(
624-
kind="Job",
625-
all_namespaces_request=self.batch.list_job_for_all_namespaces,
626-
namespaced_request=self.batch.list_namespaced_job,
627-
limit=settings.discovery_job_batch_size,
628-
continue_ref=continue_ref,
629-
)
630-
631-
batch_count += 1
632-
continue_ref = next_continue_ref
622+
batch_count = 0
623+
for namespace in namespaces:
624+
continue_ref = None
625+
while batch_count < settings.discovery_job_max_batches:
626+
jobs_batch, next_continue_ref = await self._list_namespaced_or_global_objects_batched(
627+
kind="Job",
628+
all_namespaces_request=self.batch.list_job_for_all_namespaces,
629+
namespaced_request=self.batch.list_namespaced_job,
630+
namespace=namespace,
631+
limit=settings.discovery_job_batch_size,
632+
continue_ref=continue_ref,
633+
)
634+
635+
batch_count += 1
636+
continue_ref = next_continue_ref
633637

634-
# refreshed continue token
635-
if not jobs_batch and continue_ref:
636-
continue
637-
638-
for job in jobs_batch:
639-
if not job.metadata.labels or self._is_job_owned_by_cronjob(job) or not self._is_job_grouped(job):
638+
# refreshed continue token
639+
if not jobs_batch and continue_ref:
640640
continue
641-
642-
for label_name in settings.job_grouping_labels:
643-
if label_name not in job.metadata.labels:
644-
continue
641+
if not jobs_batch:
642+
# no more jobs to batch
643+
break
645644

646-
# label_name is value of grouped job label
647-
label_value = job.metadata.labels[label_name]
648-
group_key = f"{label_name}={label_value}"
649-
lightweight_job = LightweightJobInfo(
650-
name=job.metadata.name,
651-
namespace=job.metadata.namespace
652-
)
653-
# Store lightweight job info only for grouped jobs
654-
grouped_jobs[group_key].append(lightweight_job)
655-
# Keep only ONE full job as template per group
656-
if group_key not in grouped_jobs_template:
657-
grouped_jobs_template[group_key] = job
658-
659-
if not continue_ref:
660-
break
645+
for job in jobs_batch:
646+
if not job.metadata.labels or self._is_job_owned_by_cronjob(job) or not self._is_job_grouped(job):
647+
continue
648+
for label_name in settings.job_grouping_labels:
649+
if label_name not in job.metadata.labels:
650+
continue
651+
# label_name is value of grouped job label
652+
label_value = job.metadata.labels[label_name]
653+
group_key = f"{label_name}={label_value}"
654+
lightweight_job = LightweightJobInfo(
655+
name=job.metadata.name,
656+
namespace=job.metadata.namespace
657+
)
658+
# Store lightweight job info only for grouped jobs
659+
grouped_jobs[group_key].append(lightweight_job)
660+
# Keep only ONE full job as template per group
661+
if group_key not in grouped_jobs_template:
662+
grouped_jobs_template[group_key] = job
663+
if not continue_ref:
664+
break
661665

662666
except Exception as e:
663667
logger.error(

0 commit comments

Comments
 (0)