Skip to content
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,31 @@ python krr.py simple --selector 'app.kubernetes.io/instance in (robusta, ingress
```
</details>

<details>
<summary>Group jobs by specific labels</summary>

Group jobs that have specific labels into GroupedJob objects for consolidated resource recommendations. This is useful for batch jobs, data processing pipelines, or any workload where you want to analyze resource usage across multiple related jobs.

```sh
krr simple --job-grouping-labels app,team
```

This will:
- Group jobs that have either `app` or `team` labels (or both)
- Create GroupedJob objects with names like `app=frontend`, `team=backend`, etc.
- Provide resource recommendations for the entire group instead of individual jobs
- Jobs with the specified labels will be excluded from regular Job listing

You can specify multiple labels separated by commas:

```sh
krr simple --job-grouping-labels app,team,environment
```

Each job will be grouped by each label it has, so a job with `app=api,team=backend` will appear in both `app=api` and `team=backend` groups.

</details>

<details>
<summary>Override the kubectl context</summary>

Expand Down
2 changes: 2 additions & 0 deletions conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# Make sure pytest loads the asyncio plugin so `async def` tests run.
pytest_plugins = ("pytest_asyncio",)
81 changes: 52 additions & 29 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ kubernetes = "^26.1.0"
prometheus-api-client = "0.5.3"
numpy = ">=1.26.4,<1.27.0"
alive-progress = "^3.1.2"
prometrix = "^0.2.7"
prometrix = "0.2.5"
slack-sdk = "^3.21.3"
pandas = "2.2.2"
requests = ">2.32.4"
Expand All @@ -41,6 +41,7 @@ urllib3 = "^1.26.20"
setuptools = "^80.9.0"
zipp = "^3.19.1"
tenacity = "^9.0.0"
pyinstaller = "^5.9.0"



Expand All @@ -52,8 +53,9 @@ flake8 = "^6.0.0"
types-pyyaml = "^6.0.12.8"
types-cachetools = "^5.3.0.4"
types-requests = "^2.28.11.15"
pyinstaller = "^5.9.0"
pytest = "^7.2.2"
pytest-asyncio = ">=0.21,<0.24"
pyinstaller = "^5.9.0"

[build-system]
requires = ["poetry-core"]
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ packaging==24.0 ; python_version >= "3.9" and python_full_version < "3.13"
pandas==2.2.2 ; python_version >= "3.9" and python_full_version < "3.13"
pillow==10.3.0 ; python_version >= "3.9" and python_full_version < "3.13"
prometheus-api-client==0.5.3 ; python_version >= "3.9" and python_full_version < "3.13"
prometrix==0.2.7 ; python_version >= "3.9" and python_full_version < "3.13"
prometrix==0.2.5 ; python_version >= "3.9" and python_full_version < "3.13"
pyasn1-modules==0.3.0 ; python_version >= "3.9" and python_full_version < "3.13"
pyasn1==0.5.1 ; python_version >= "3.9" and python_full_version < "3.13"
pydantic==1.10.15 ; python_version >= "3.9" and python_full_version < "3.13"
Expand Down
91 changes: 87 additions & 4 deletions robusta_krr/core/integrations/kubernetes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ async def list_scannable_objects(self) -> list[K8sObjectData]:
self._list_all_daemon_set(),
self._list_all_jobs(),
self._list_all_cronjobs(),
self._list_all_groupedjobs(),
)

return [
Expand Down Expand Up @@ -146,6 +147,22 @@ async def list_pods(self, object: K8sObjectData) -> list[PodData]:
]
selector = f"batch.kubernetes.io/controller-uid in ({','.join(ownered_jobs_uids)})"

elif object.kind == "GroupedJob":
if not hasattr(object._api_resource, '_label_filter') or not object._api_resource._label_filter:
return []

# Use the label+value filter to get pods
ret: V1PodList = await loop.run_in_executor(
self.executor,
lambda: self.core.list_namespaced_pod(
namespace=object.namespace, label_selector=object._api_resource._label_filter
),
)

# Apply the job grouping limit to pod results
limited_pods = ret.items[:settings.job_grouping_limit]
return [PodData(name=pod.metadata.name, deleted=False) for pod in limited_pods]

else:
if object.selector is None:
return []
Expand Down Expand Up @@ -442,15 +459,24 @@ def _list_all_daemon_set(self) -> list[K8sObjectData]:
)

def _list_all_jobs(self) -> list[K8sObjectData]:
def filter_jobs(item):
# Skip jobs owned by CronJobs
if any(owner.kind == "CronJob" for owner in item.metadata.owner_references or []):
return False

# Skip jobs that have any of the grouping labels (they will be handled by GroupedJob)
if settings.job_grouping_labels and item.metadata.labels:
if any(label in item.metadata.labels for label in settings.job_grouping_labels):
return False

return True

return self._list_scannable_objects(
kind="Job",
all_namespaces_request=self.batch.list_job_for_all_namespaces,
namespaced_request=self.batch.list_namespaced_job,
extract_containers=lambda item: item.spec.template.spec.containers,
# NOTE: If the job has ownerReference and it is a CronJob, then we should skip it
filter_workflows=lambda item: not any(
owner.kind == "CronJob" for owner in item.metadata.owner_references or []
),
filter_workflows=filter_jobs,
)

def _list_all_cronjobs(self) -> list[K8sObjectData]:
Expand All @@ -461,6 +487,63 @@ def _list_all_cronjobs(self) -> list[K8sObjectData]:
extract_containers=lambda item: item.spec.job_template.spec.template.spec.containers,
)

async def _list_all_groupedjobs(self) -> list[K8sObjectData]:
"""List all GroupedJob objects by grouping jobs with the specified labels."""
if not settings.job_grouping_labels:
logger.debug("No job grouping labels configured, skipping GroupedJob listing")
return []

if not self._should_list_resource("GroupedJob"):
logger.debug("Skipping GroupedJob in cluster")
return []

logger.debug(f"Listing GroupedJobs with grouping labels: {settings.job_grouping_labels}")

# Get all jobs that have any of the grouping labels
all_jobs = await self._list_namespaced_or_global_objects(
kind="Job",
all_namespaces_request=self.batch.list_job_for_all_namespaces,
namespaced_request=self.batch.list_namespaced_job,
)

grouped_jobs = defaultdict(list)
for job in all_jobs:
if (job.metadata.labels and
not any(owner.kind == "CronJob" for owner in job.metadata.owner_references or [])):

for label_name in settings.job_grouping_labels:
if label_name in job.metadata.labels:
label_value = job.metadata.labels[label_name]
group_key = f"{label_name}={label_value}"
grouped_jobs[group_key].append(job)

result = []
for group_name, jobs in grouped_jobs.items():
jobs_by_namespace = defaultdict(list)
for job in jobs:
jobs_by_namespace[job.metadata.namespace].append(job)

for namespace, namespace_jobs in jobs_by_namespace.items():
limited_jobs = namespace_jobs[:settings.job_grouping_limit]
template_job = limited_jobs[0]
template_container = template_job.spec.template.spec.containers[0]

grouped_job = self.__build_scannable_object(
item=template_job,
container=template_container,
kind="GroupedJob"
)

grouped_job.name = group_name
grouped_job.namespace = namespace
grouped_job._api_resource._grouped_jobs = limited_jobs
grouped_job._api_resource._label_filter = group_name

result.append(grouped_job)

logger.debug("Found %d GroupedJob groups", len(result))
return result

async def __list_hpa_v1(self) -> dict[HPAKey, HPAData]:
loop = asyncio.get_running_loop()
res = await loop.run_in_executor(
Expand Down
Loading
Loading