Skip to content

Commit 0ee1c88

Browse files
committed
grouped_job_working
1 parent ac7f5a6 commit 0ee1c88

File tree

5 files changed

+122
-5
lines changed

5 files changed

+122
-5
lines changed

robusta_krr/core/integrations/kubernetes/__init__.py

Lines changed: 101 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ async def list_scannable_objects(self) -> list[K8sObjectData]:
107107
self._list_all_daemon_set(),
108108
self._list_all_jobs(),
109109
self._list_all_cronjobs(),
110+
self._list_all_groupedjobs(),
110111
)
111112

112113
return [
@@ -146,6 +147,22 @@ async def list_pods(self, object: K8sObjectData) -> list[PodData]:
146147
]
147148
selector = f"batch.kubernetes.io/controller-uid in ({','.join(ownered_jobs_uids)})"
148149

150+
elif object.kind == "GroupedJob":
151+
# For GroupedJob, we need to get pods using the label+value filter
152+
if not hasattr(object._api_resource, '_label_filters') or not object._api_resource._label_filters:
153+
return []
154+
155+
# Use the label+value filter to get pods
156+
label_selector = ",".join(object._api_resource._label_filters)
157+
ret: V1PodList = await loop.run_in_executor(
158+
self.executor,
159+
lambda: self.core.list_namespaced_pod(
160+
namespace=object.namespace, label_selector=label_selector
161+
),
162+
)
163+
164+
return [PodData(name=pod.metadata.name, deleted=False) for pod in ret.items]
165+
149166
else:
150167
if object.selector is None:
151168
return []
@@ -442,15 +459,24 @@ def _list_all_daemon_set(self) -> list[K8sObjectData]:
442459
)
443460

444461
def _list_all_jobs(self) -> list[K8sObjectData]:
462+
def filter_jobs(item):
463+
# Skip jobs owned by CronJobs
464+
if any(owner.kind == "CronJob" for owner in item.metadata.owner_references or []):
465+
return False
466+
467+
# Skip jobs that have any of the grouping labels (they will be handled by GroupedJob)
468+
if settings.job_grouping_labels and item.metadata.labels:
469+
if any(label in item.metadata.labels for label in settings.job_grouping_labels):
470+
return False
471+
472+
return True
473+
445474
return self._list_scannable_objects(
446475
kind="Job",
447476
all_namespaces_request=self.batch.list_job_for_all_namespaces,
448477
namespaced_request=self.batch.list_namespaced_job,
449478
extract_containers=lambda item: item.spec.template.spec.containers,
450-
# NOTE: If the job has ownerReference and it is a CronJob, then we should skip it
451-
filter_workflows=lambda item: not any(
452-
owner.kind == "CronJob" for owner in item.metadata.owner_references or []
453-
),
479+
filter_workflows=filter_jobs,
454480
)
455481

456482
def _list_all_cronjobs(self) -> list[K8sObjectData]:
@@ -461,6 +487,77 @@ def _list_all_cronjobs(self) -> list[K8sObjectData]:
461487
extract_containers=lambda item: item.spec.job_template.spec.template.spec.containers,
462488
)
463489

490+
async def _list_all_groupedjobs(self) -> list[K8sObjectData]:
491+
"""List all GroupedJob objects by grouping jobs with the specified labels."""
492+
if not settings.job_grouping_labels:
493+
logger.debug("No job grouping labels configured, skipping GroupedJob listing")
494+
return []
495+
496+
if not self._should_list_resource("GroupedJob"):
497+
logger.debug("Skipping GroupedJob in cluster")
498+
return []
499+
500+
logger.debug(f"Listing GroupedJobs with grouping labels: {settings.job_grouping_labels}")
501+
502+
# Get all jobs that have any of the grouping labels
503+
all_jobs = await self._list_namespaced_or_global_objects(
504+
kind="Job",
505+
all_namespaces_request=self.batch.list_job_for_all_namespaces,
506+
namespaced_request=self.batch.list_namespaced_job,
507+
)
508+
509+
# Group jobs by individual grouping label values AND namespace (OR logic)
510+
grouped_jobs = defaultdict(list)
511+
for job in all_jobs:
512+
if (job.metadata.labels and
513+
not any(owner.kind == "CronJob" for owner in job.metadata.owner_references or [])):
514+
515+
# Check if job has any of the grouping labels
516+
for label_name in settings.job_grouping_labels:
517+
if label_name in job.metadata.labels:
518+
label_value = job.metadata.labels[label_name]
519+
group_key = f"{job.metadata.namespace}/{label_name}={label_value}"
520+
grouped_jobs[group_key].append(job)
521+
522+
# Create GroupedJob objects
523+
result = []
524+
for group_name, jobs in grouped_jobs.items():
525+
# Use the first job as the template for the group
526+
template_job = jobs[0]
527+
528+
# Create a virtual container that represents the group
529+
# We'll use the first job's container as the template
530+
template_container = template_job.spec.template.spec.containers[0]
531+
532+
# Create the GroupedJob object
533+
grouped_job = self.__build_scannable_object(
534+
item=template_job,
535+
container=template_container,
536+
kind="GroupedJob"
537+
)
538+
539+
# Override the name to be the group name
540+
grouped_job.name = group_name
541+
grouped_job.namespace = template_job.metadata.namespace
542+
543+
# Store all jobs in the group for later pod listing
544+
grouped_job._api_resource._grouped_jobs = jobs
545+
546+
# Store the label+value filter for pod listing
547+
# Extract the label+value pair from the group name
548+
grouped_job._api_resource._label_filters = []
549+
# The group name is in format "namespace/label_name=label_value"
550+
# Extract just the label=value part for the selector
551+
if "/" in group_name and "=" in group_name:
552+
# Split by "/" and take everything after the first "/"
553+
namespace_part, label_part = group_name.split("/", 1)
554+
grouped_job._api_resource._label_filters.append(label_part)
555+
556+
result.append(grouped_job)
557+
558+
logger.debug(f"Found {len(result)} GroupedJob groups")
559+
return result
560+
464561
async def __list_hpa_v1(self) -> dict[HPAKey, HPAData]:
465562
loop = asyncio.get_running_loop()
466563
res = await loop.run_in_executor(

robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ def __init__(
107107
elif not settings.inside_cluster and self.api_client is not None:
108108
self.api_client.update_params_for_auth(headers, {}, ["BearerToken"])
109109
self.prom_config = generate_prometheus_config(url=self.url, headers=headers, metrics_service=self)
110+
self.prometheus = None
110111
self.get_prometheus()
111112

112113
def get_prometheus(self):

robusta_krr/core/models/config.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,9 @@ class Config(pd.BaseSettings):
5252

5353
# Threading settings
5454
max_workers: int = pd.Field(6, ge=1)
55+
56+
# Job grouping settings
57+
job_grouping_labels: Union[list[str], str, None] = pd.Field(None, description="Label name(s) to use for grouping jobs into GroupedJob workload type")
5558

5659
# Logging Settings
5760
format: str
@@ -130,6 +133,15 @@ def validate_resources(cls, v: Union[list[str], Literal["*"]]) -> Union[list[str
130133
# So this will preserve the big and small letters of the resource
131134
return [next(r for r in KindLiteral.__args__ if r.lower() == val.lower()) for val in v]
132135

136+
@pd.validator("job_grouping_labels", pre=True)
137+
def validate_job_grouping_labels(cls, v: Union[list[str], str, None]) -> Union[list[str], None]:
138+
if v is None:
139+
return None
140+
if isinstance(v, str):
141+
# Split comma-separated string into list
142+
return [label.strip() for label in v.split(',')]
143+
return v
144+
133145
def create_strategy(self) -> AnyStrategy:
134146
StrategyType = AnyStrategy.find(self.strategy)
135147
StrategySettingsType = StrategyType.get_settings_type()

robusta_krr/core/models/objects.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from robusta_krr.utils.batched import batched
99
from kubernetes.client.models import V1LabelSelector
1010

11-
KindLiteral = Literal["Deployment", "DaemonSet", "StatefulSet", "Job", "CronJob", "Rollout", "DeploymentConfig", "StrimziPodSet"]
11+
KindLiteral = Literal["Deployment", "DaemonSet", "StatefulSet", "Job", "CronJob", "Rollout", "DeploymentConfig", "StrimziPodSet", "GroupedJob"]
1212

1313

1414
class PodData(pd.BaseModel):

robusta_krr/main.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,12 @@ def run_strategy(
220220
help="Max workers to use for async requests.",
221221
rich_help_panel="Threading Settings",
222222
),
223+
job_grouping_labels: Optional[str] = typer.Option(
224+
None,
225+
"--job-grouping-labels",
226+
help="Label name(s) to use for grouping jobs into GroupedJob workload type. Can be a single label or comma-separated labels (e.g., 'app,team').",
227+
rich_help_panel="Job Grouping Settings",
228+
),
223229
format: str = typer.Option(
224230
"table",
225231
"--formatter",
@@ -357,6 +363,7 @@ def run_strategy(
357363
coralogix_token=coralogix_token,
358364
openshift=openshift,
359365
max_workers=max_workers,
366+
job_grouping_labels=job_grouping_labels,
360367
format=format,
361368
show_cluster_name=show_cluster_name,
362369
verbose=verbose,

0 commit comments

Comments
 (0)