diff --git a/README.md b/README.md index aacba08c..d54dabeb 100644 --- a/README.md +++ b/README.md @@ -352,6 +352,43 @@ python krr.py simple --selector 'app.kubernetes.io/instance in (robusta, ingress ``` +
+ Group jobs by specific labels + +Group jobs that have specific labels into GroupedJob objects for consolidated resource recommendations. This is useful for batch jobs, data processing pipelines, or any workload where you want to analyze resource usage across multiple related jobs. + +```sh +krr simple --job-grouping-labels app,team +``` + +This will: +- Group jobs that have either `app` or `team` labels (or both) +- Create GroupedJob objects with names like `app=frontend`, `team=backend`, etc. +- Provide resource recommendations for the entire group instead of individual jobs +- Jobs with the specified labels will be excluded from regular Job listing + +You can specify multiple labels separated by commas: + +```sh +krr simple --job-grouping-labels app,team,environment +``` + +Each job will be grouped by each label it has, so a job with `app=api,team=backend` will appear in both `app=api` and `team=backend` groups. + +### Limiting how many jobs are included per group + +Use `--job-grouping-limit ` to cap how many jobs are included **per group** (useful when there are many historical jobs). + +```sh +krr simple --job-grouping-labels app,team --job-grouping-limit 3 +``` + +* Each label group will include at most **N** jobs (e.g., the first 3 returned by the API). +* Other matching jobs beyond the limit are ignored for that group. +* If not specified, the default limit is **500** jobs per group. + +
+
Override the kubectl context diff --git a/conftest.py b/conftest.py new file mode 100644 index 00000000..e5f7baf6 --- /dev/null +++ b/conftest.py @@ -0,0 +1,2 @@ +# Make sure pytest loads the asyncio plugin so `async def` tests run. +pytest_plugins = ("pytest_asyncio",) diff --git a/poetry.lock b/poetry.lock index aba6add0..2dee0b43 100644 --- a/poetry.lock +++ b/poetry.lock @@ -28,6 +28,18 @@ files = [ about-time = "4.2.1" grapheme = "0.6.0" +[[package]] +name = "altgraph" +version = "0.17.4" +description = "Python graph (network) package" +optional = false +python-versions = "*" +groups = ["main", "dev"] +files = [ + {file = "altgraph-0.17.4-py2.py3-none-any.whl", hash = "sha256:642743b4750de17e655e6711601b077bc6598dbfa3ba5fa2b2a35ce12b508dff"}, + {file = "altgraph-0.17.4.tar.gz", hash = "sha256:1b5afbb98f6c4dcadb2e2ae6ab9fa994bbb8c1d75f4fa96d340f9437ae454406"}, +] + [[package]] name = "black" version = "23.12.1" @@ -552,6 +564,31 @@ files = [ {file = "idna-3.7.tar.gz", hash = "sha256:028ff3aadf0609c1fd278d8ea3089299412a7a8b9bd005dd08b9f8285bcb5cfc"}, ] +[[package]] +name = "importlib-metadata" +version = "8.7.0" +description = "Read metadata from Python packages" +optional = false +python-versions = ">=3.9" +groups = ["main", "dev"] +markers = "python_version == \"3.9\"" +files = [ + {file = "importlib_metadata-8.7.0-py3-none-any.whl", hash = "sha256:e5dd1551894c77868a30651cef00984d50e1002d06942a7101d34870c5f02afd"}, + {file = "importlib_metadata-8.7.0.tar.gz", hash = "sha256:d13b81ad223b890aa16c5471f2ac3056cf76c5f10f82d6f9292f0b415f389000"}, +] + +[package.dependencies] +zipp = ">=3.20" + +[package.extras] +check = ["pytest-checkdocs (>=2.4)", "pytest-ruff (>=0.2.1) ; sys_platform != \"cygwin\""] +cover = ["pytest-cov"] +doc = ["furo", "jaraco.packaging (>=9.3)", "jaraco.tidelift (>=1.4)", "rst.linker (>=1.9)", "sphinx (>=3.5)", "sphinx-lint"] +enabler = ["pytest-enabler (>=2.2)"] +perf = ["ipython"] +test = ["flufl.flake8", "importlib_resources (>=1.3) ; python_version < \"3.9\"", "jaraco.test (>=5.4)", "packaging", "pyfakefs", "pytest (>=6,!=8.1.*)", "pytest-perf (>=0.9.2)"] +type = ["pytest-mypy"] + [[package]] name = "importlib-resources" version = "6.3.0" @@ -752,6 +789,22 @@ websocket-client = ">=0.32.0,<0.40.0 || >0.40.0,<0.41.dev0 || >=0.43.dev0" [package.extras] adal = ["adal (>=1.0.2)"] +[[package]] +name = "macholib" +version = "1.16.3" +description = "Mach-O header analysis and editing" +optional = false +python-versions = "*" +groups = ["main", "dev"] +markers = "sys_platform == \"darwin\"" +files = [ + {file = "macholib-1.16.3-py2.py3-none-any.whl", hash = "sha256:0e315d7583d38b8c77e815b1ecbdbf504a8258d8b3e17b61165c6feb60d18f2c"}, + {file = "macholib-1.16.3.tar.gz", hash = "sha256:07ae9e15e8e4cd9a788013d81f5908b3609aa76f9b1421bae9c4d7606ec86a30"}, +] + +[package.dependencies] +altgraph = ">=0.17" + [[package]] name = "matplotlib" version = "3.8.3" @@ -1035,6 +1088,19 @@ files = [ {file = "pathspec-0.12.1.tar.gz", hash = "sha256:a482d51503a1ab33b1c67a6c3813a26953dbdc71c31dacaef9a838c4e29f5712"}, ] +[[package]] +name = "pefile" +version = "2024.8.26" +description = "Python PE parsing module" +optional = false +python-versions = ">=3.6.0" +groups = ["main", "dev"] +markers = "sys_platform == \"win32\"" +files = [ + {file = "pefile-2024.8.26-py3-none-any.whl", hash = "sha256:76f8b485dcd3b1bb8166f1128d395fa3d87af26360c2358fb75b80019b957c6f"}, + {file = "pefile-2024.8.26.tar.gz", hash = "sha256:3ff6c5d8b43e8c37bb6e6dd5085658d658a7a0bdcd20b6a07b1fcfc1c4e9d632"}, +] + [[package]] name = "pillow" version = "10.3.0" @@ -1176,14 +1242,14 @@ requests = "*" [[package]] name = "prometrix" -version = "0.2.7" +version = "0.2.5" description = "A Python Prometheus client for all Prometheus instances." optional = false python-versions = "<4.0,>=3.9" groups = ["main"] files = [ - {file = "prometrix-0.2.7-py3-none-any.whl", hash = "sha256:9df81fae71f288478a208e3e974332217ec56da04a233f89d16ca214d23975be"}, - {file = "prometrix-0.2.7.tar.gz", hash = "sha256:123f072631b12a65be02fa5de90a8245e69263749d961910f96b09b1714b29af"}, + {file = "prometrix-0.2.5-py3-none-any.whl", hash = "sha256:82fcd249d24ae2e3c5ce2903abba91383d44f8c48590c418c5345d834d2f37e0"}, + {file = "prometrix-0.2.5.tar.gz", hash = "sha256:6d3dddc9763bdc328a8df3b4679e0befcab6325a82281576a3717f483c566d5a"}, ] [package.dependencies] @@ -1318,6 +1384,57 @@ files = [ plugins = ["importlib-metadata ; python_version < \"3.8\""] windows-terminal = ["colorama (>=0.4.6)"] +[[package]] +name = "pyinstaller" +version = "5.13.2" +description = "PyInstaller bundles a Python application and all its dependencies into a single package." +optional = false +python-versions = "<3.13,>=3.7" +groups = ["main", "dev"] +files = [ + {file = "pyinstaller-5.13.2-py3-none-macosx_10_13_universal2.whl", hash = "sha256:16cbd66b59a37f4ee59373a003608d15df180a0d9eb1a29ff3bfbfae64b23d0f"}, + {file = "pyinstaller-5.13.2-py3-none-manylinux2014_aarch64.whl", hash = "sha256:8f6dd0e797ae7efdd79226f78f35eb6a4981db16c13325e962a83395c0ec7420"}, + {file = "pyinstaller-5.13.2-py3-none-manylinux2014_i686.whl", hash = "sha256:65133ed89467edb2862036b35d7c5ebd381670412e1e4361215e289c786dd4e6"}, + {file = "pyinstaller-5.13.2-py3-none-manylinux2014_ppc64le.whl", hash = "sha256:7d51734423685ab2a4324ab2981d9781b203dcae42839161a9ee98bfeaabdade"}, + {file = "pyinstaller-5.13.2-py3-none-manylinux2014_s390x.whl", hash = "sha256:2c2fe9c52cb4577a3ac39626b84cf16cf30c2792f785502661286184f162ae0d"}, + {file = "pyinstaller-5.13.2-py3-none-manylinux2014_x86_64.whl", hash = "sha256:c63ef6133eefe36c4b2f4daf4cfea3d6412ece2ca218f77aaf967e52a95ac9b8"}, + {file = "pyinstaller-5.13.2-py3-none-musllinux_1_1_aarch64.whl", hash = "sha256:aadafb6f213549a5906829bb252e586e2cf72a7fbdb5731810695e6516f0ab30"}, + {file = "pyinstaller-5.13.2-py3-none-musllinux_1_1_x86_64.whl", hash = "sha256:b2e1c7f5cceb5e9800927ddd51acf9cc78fbaa9e79e822c48b0ee52d9ce3c892"}, + {file = "pyinstaller-5.13.2-py3-none-win32.whl", hash = "sha256:421cd24f26144f19b66d3868b49ed673176765f92fa9f7914cd2158d25b6d17e"}, + {file = "pyinstaller-5.13.2-py3-none-win_amd64.whl", hash = "sha256:ddcc2b36052a70052479a9e5da1af067b4496f43686ca3cdda99f8367d0627e4"}, + {file = "pyinstaller-5.13.2-py3-none-win_arm64.whl", hash = "sha256:27cd64e7cc6b74c5b1066cbf47d75f940b71356166031deb9778a2579bb874c6"}, + {file = "pyinstaller-5.13.2.tar.gz", hash = "sha256:c8e5d3489c3a7cc5f8401c2d1f48a70e588f9967e391c3b06ddac1f685f8d5d2"}, +] + +[package.dependencies] +altgraph = "*" +macholib = {version = ">=1.8", markers = "sys_platform == \"darwin\""} +pefile = {version = ">=2022.5.30", markers = "sys_platform == \"win32\""} +pyinstaller-hooks-contrib = ">=2021.4" +pywin32-ctypes = {version = ">=0.2.1", markers = "sys_platform == \"win32\""} +setuptools = ">=42.0.0" + +[package.extras] +encryption = ["tinyaes (>=1.0.0)"] +hook-testing = ["execnet (>=1.5.0)", "psutil", "pytest (>=2.7.3)"] + +[[package]] +name = "pyinstaller-hooks-contrib" +version = "2025.9" +description = "Community maintained hooks for PyInstaller" +optional = false +python-versions = ">=3.8" +groups = ["main", "dev"] +files = [ + {file = "pyinstaller_hooks_contrib-2025.9-py3-none-any.whl", hash = "sha256:ccbfaa49399ef6b18486a165810155e5a8d4c59b41f20dc5da81af7482aaf038"}, + {file = "pyinstaller_hooks_contrib-2025.9.tar.gz", hash = "sha256:56e972bdaad4e9af767ed47d132362d162112260cbe488c9da7fee01f228a5a6"}, +] + +[package.dependencies] +importlib_metadata = {version = ">=4.6", markers = "python_version < \"3.10\""} +packaging = ">=22.0" +setuptools = ">=42.0.0" + [[package]] name = "pyparsing" version = "3.1.2" @@ -1356,6 +1473,25 @@ tomli = {version = ">=1.0.0", markers = "python_version < \"3.11\""} [package.extras] testing = ["argcomplete", "attrs (>=19.2.0)", "hypothesis (>=3.56)", "mock", "nose", "pygments (>=2.7.2)", "requests", "setuptools", "xmlschema"] +[[package]] +name = "pytest-asyncio" +version = "0.23.8" +description = "Pytest support for asyncio" +optional = false +python-versions = ">=3.8" +groups = ["dev"] +files = [ + {file = "pytest_asyncio-0.23.8-py3-none-any.whl", hash = "sha256:50265d892689a5faefb84df80819d1ecef566eb3549cf915dfb33569359d1ce2"}, + {file = "pytest_asyncio-0.23.8.tar.gz", hash = "sha256:759b10b33a6dc61cce40a8bd5205e302978bbbcc00e279a8b61d9a6a3c82e4d3"}, +] + +[package.dependencies] +pytest = ">=7.0.0,<9" + +[package.extras] +docs = ["sphinx (>=5.3)", "sphinx-rtd-theme (>=1.0)"] +testing = ["coverage (>=6.2)", "hypothesis (>=5.7.1)"] + [[package]] name = "python-dateutil" version = "2.9.0.post0" @@ -1383,6 +1519,19 @@ files = [ {file = "pytz-2024.1.tar.gz", hash = "sha256:2a29735ea9c18baf14b448846bde5a48030ed267578472d8955cd0e7443a9812"}, ] +[[package]] +name = "pywin32-ctypes" +version = "0.2.3" +description = "A (partial) reimplementation of pywin32 using ctypes/cffi" +optional = false +python-versions = ">=3.6" +groups = ["main", "dev"] +markers = "sys_platform == \"win32\"" +files = [ + {file = "pywin32-ctypes-0.2.3.tar.gz", hash = "sha256:d162dc04946d704503b2edc4d55f3dba5c1d539ead017afa00142c38b9885755"}, + {file = "pywin32_ctypes-0.2.3-py3-none-any.whl", hash = "sha256:8a1513379d709975552d202d942d9837758905c8d01eb82b8bcc30918929e7b8"}, +] + [[package]] name = "pyyaml" version = "6.0.1" @@ -1646,7 +1795,7 @@ version = "80.9.0" description = "Easily download, build, install, upgrade, and uninstall Python packages" optional = false python-versions = ">=3.9" -groups = ["main"] +groups = ["main", "dev"] files = [ {file = "setuptools-80.9.0-py3-none-any.whl", hash = "sha256:062d34222ad13e0cc312a4c02d73f059e86a4acbfbdea8f8f76b28c99f306922"}, {file = "setuptools-80.9.0.tar.gz", hash = "sha256:f36b47402ecde768dbfafc46e8e4207b4360c654f1f3bb84475f0a28628fb19c"}, @@ -1886,11 +2035,12 @@ version = "3.20.2" description = "Backport of pathlib-compatible object wrapper for zip files" optional = false python-versions = ">=3.8" -groups = ["main"] +groups = ["main", "dev"] files = [ {file = "zipp-3.20.2-py3-none-any.whl", hash = "sha256:a817ac80d6cf4b23bf7f2828b7cabf326f15a001bea8b1f9b49631780ba28350"}, {file = "zipp-3.20.2.tar.gz", hash = "sha256:bc9eb26f4506fda01b81bcde0ca78103b6e62f991b381fec825435c836edbc29"}, ] +markers = {dev = "python_version == \"3.9\""} [package.extras] check = ["pytest-checkdocs (>=2.4)", "pytest-ruff (>=0.2.1) ; sys_platform != \"cygwin\""] @@ -1903,4 +2053,4 @@ type = ["pytest-mypy"] [metadata] lock-version = "2.1" python-versions = ">=3.9,<=3.12.9" -content-hash = "e6fd6f18166a59e2f1d53ae4773f08542f9a52eedb0fd7e4c90fe09967388595" +content-hash = "c3ceb51ceb3892bb0e99588df2510ecab9edb70503622177275ef00ebc823553" diff --git a/pyproject.toml b/pyproject.toml index 57c47117..22bec1d4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -30,7 +30,7 @@ kubernetes = "^26.1.0" prometheus-api-client = "0.5.3" numpy = ">=1.26.4,<1.27.0" alive-progress = "^3.1.2" -prometrix = "^0.2.7" +prometrix = "0.2.5" slack-sdk = "^3.21.3" pandas = "2.2.2" requests = ">2.32.4" @@ -41,6 +41,7 @@ urllib3 = "^1.26.20" setuptools = "^80.9.0" zipp = "^3.19.1" tenacity = "^9.0.0" +pyinstaller = "^5.9.0" @@ -53,6 +54,8 @@ types-pyyaml = "^6.0.12.8" types-cachetools = "^5.3.0.4" types-requests = "^2.28.11.15" pytest = "^7.2.2" +pytest-asyncio = ">=0.21,<0.24" +pyinstaller = "^5.9.0" [build-system] requires = ["poetry-core"] diff --git a/requirements.txt b/requirements.txt index 30c55fa3..b695788a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -27,7 +27,7 @@ packaging==24.0 ; python_version >= "3.9" and python_full_version < "3.13" pandas==2.2.2 ; python_version >= "3.9" and python_full_version < "3.13" pillow==10.3.0 ; python_version >= "3.9" and python_full_version < "3.13" prometheus-api-client==0.5.3 ; python_version >= "3.9" and python_full_version < "3.13" -prometrix==0.2.7 ; python_version >= "3.9" and python_full_version < "3.13" +prometrix==0.2.5 ; python_version >= "3.9" and python_full_version < "3.13" pyasn1-modules==0.3.0 ; python_version >= "3.9" and python_full_version < "3.13" pyasn1==0.5.1 ; python_version >= "3.9" and python_full_version < "3.13" pydantic==1.10.15 ; python_version >= "3.9" and python_full_version < "3.13" @@ -54,3 +54,4 @@ tzlocal==5.2 ; python_version >= "3.9" and python_full_version < "3.13" urllib3==1.26.20 ; python_version >= "3.9" and python_full_version < "3.13" websocket-client==1.7.0 ; python_version >= "3.9" and python_full_version < "3.13" zipp==3.20.2 ; python_version >= "3.9" and python_full_version < "3.13" +pytest-asyncio==0.23.7 ; python_version >= "3.9" and python_full_version < "3.13" diff --git a/robusta_krr/core/integrations/kubernetes/__init__.py b/robusta_krr/core/integrations/kubernetes/__init__.py index 8fd5ab96..76f3af96 100644 --- a/robusta_krr/core/integrations/kubernetes/__init__.py +++ b/robusta_krr/core/integrations/kubernetes/__init__.py @@ -107,6 +107,7 @@ async def list_scannable_objects(self) -> list[K8sObjectData]: self._list_all_daemon_set(), self._list_all_jobs(), self._list_all_cronjobs(), + self._list_all_groupedjobs(), ) return [ @@ -146,6 +147,22 @@ async def list_pods(self, object: K8sObjectData) -> list[PodData]: ] selector = f"batch.kubernetes.io/controller-uid in ({','.join(ownered_jobs_uids)})" + elif object.kind == "GroupedJob": + if not hasattr(object._api_resource, '_label_filter') or not object._api_resource._label_filter: + return [] + + # Use the label+value filter to get pods + ret: V1PodList = await loop.run_in_executor( + self.executor, + lambda: self.core.list_namespaced_pod( + namespace=object.namespace, label_selector=object._api_resource._label_filter + ), + ) + + # Apply the job grouping limit to pod results + limited_pods = ret.items[:settings.job_grouping_limit] + return [PodData(name=pod.metadata.name, deleted=False) for pod in limited_pods] + else: if object.selector is None: return [] @@ -442,15 +459,24 @@ def _list_all_daemon_set(self) -> list[K8sObjectData]: ) def _list_all_jobs(self) -> list[K8sObjectData]: + def filter_jobs(item): + # Skip jobs owned by CronJobs + if any(owner.kind == "CronJob" for owner in item.metadata.owner_references or []): + return False + + # Skip jobs that have any of the grouping labels (they will be handled by GroupedJob) + if settings.job_grouping_labels and item.metadata.labels: + if any(label in item.metadata.labels for label in settings.job_grouping_labels): + return False + + return True + return self._list_scannable_objects( kind="Job", all_namespaces_request=self.batch.list_job_for_all_namespaces, namespaced_request=self.batch.list_namespaced_job, extract_containers=lambda item: item.spec.template.spec.containers, - # NOTE: If the job has ownerReference and it is a CronJob, then we should skip it - filter_workflows=lambda item: not any( - owner.kind == "CronJob" for owner in item.metadata.owner_references or [] - ), + filter_workflows=filter_jobs, ) def _list_all_cronjobs(self) -> list[K8sObjectData]: @@ -461,6 +487,75 @@ def _list_all_cronjobs(self) -> list[K8sObjectData]: extract_containers=lambda item: item.spec.job_template.spec.template.spec.containers, ) + async def _list_all_groupedjobs(self) -> list[K8sObjectData]: + """List all GroupedJob objects by grouping jobs with the specified labels.""" + if not settings.job_grouping_labels: + logger.debug("No job grouping labels configured, skipping GroupedJob listing") + return [] + + if not self._should_list_resource("GroupedJob"): + logger.debug("Skipping GroupedJob in cluster") + return [] + + logger.debug(f"Listing GroupedJobs with grouping labels: {settings.job_grouping_labels}") + + # Get all jobs that have any of the grouping labels + all_jobs = await self._list_namespaced_or_global_objects( + kind="Job", + all_namespaces_request=self.batch.list_job_for_all_namespaces, + namespaced_request=self.batch.list_namespaced_job, + ) + + grouped_jobs = defaultdict(list) + for job in all_jobs: + if (job.metadata.labels and + not any(owner.kind == "CronJob" for owner in job.metadata.owner_references or [])): + + for label_name in settings.job_grouping_labels: + if label_name in job.metadata.labels: + label_value = job.metadata.labels[label_name] + group_key = f"{label_name}={label_value}" + grouped_jobs[group_key].append(job) + + result = [] + for group_name, jobs in grouped_jobs.items(): + jobs_by_namespace = defaultdict(list) + for job in jobs: + jobs_by_namespace[job.metadata.namespace].append(job) + + for namespace, namespace_jobs in jobs_by_namespace.items(): + limited_jobs = namespace_jobs[:settings.job_grouping_limit] + + container_names = set() + for job in limited_jobs: + for container in job.spec.template.spec.containers: + container_names.add(container.name) + + for container_name in container_names: + template_job = limited_jobs[0] + template_container = None + for container in template_job.spec.template.spec.containers: + if container.name == container_name: + template_container = container + break + + if template_container: + grouped_job = self.__build_scannable_object( + item=template_job, + container=template_container, + kind="GroupedJob" + ) + + grouped_job.name = group_name + grouped_job.namespace = namespace + grouped_job._api_resource._grouped_jobs = limited_jobs + grouped_job._api_resource._label_filter = group_name + + result.append(grouped_job) + + logger.debug("Found %d GroupedJob groups", len(result)) + return result + async def __list_hpa_v1(self) -> dict[HPAKey, HPAData]: loop = asyncio.get_running_loop() res = await loop.run_in_executor( diff --git a/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py b/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py index d795add6..2cab35a4 100644 --- a/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py +++ b/robusta_krr/core/integrations/prometheus/metrics_service/prometheus_metrics_service.py @@ -21,6 +21,8 @@ from ..prometheus_utils import ClusterNotSpecifiedException, generate_prometheus_config from .base_metric_service import MetricsService +PROM_REFRESH_CREDS_SEC = int(os.environ.get("PROM_REFRESH_CREDS_SEC", "600")) # 10 minutes + logger = logging.getLogger("krr") @@ -104,8 +106,21 @@ def __init__( headers |= {"Authorization": self.auth_header} elif not settings.inside_cluster and self.api_client is not None: self.api_client.update_params_for_auth(headers, {}, ["BearerToken"]) - self.prom_config = generate_prometheus_config(url=self.url, headers=headers, metrics_service=self) - self.prometheus = get_custom_prometheus_connect(self.prom_config) + self.headers = headers + self.prom_config = None + self.prometheus = None + self._last_init_at = None + self.get_prometheus() + + def get_prometheus(self): + now = datetime.utcnow() + if (not self.prometheus + or not self._last_init_at + or now - self._last_init_at >= timedelta(seconds=PROM_REFRESH_CREDS_SEC)): + self.prom_config = generate_prometheus_config(url=self.url, headers=self.headers, metrics_service=self) # type: ignore + self.prometheus = get_custom_prometheus_connect(self.prom_config) + self._last_init_at = now + return self.prometheus def check_connection(self): """ @@ -113,14 +128,14 @@ def check_connection(self): Raises: PrometheusNotFound: If the connection to Prometheus cannot be established. """ - self.prometheus.check_prometheus_connection() + self.get_prometheus().check_prometheus_connection() @retry(wait=wait_random(min=2, max=10), stop=stop_after_attempt(5)) async def query(self, query: str) -> dict: loop = asyncio.get_running_loop() return await loop.run_in_executor( self.executor, - lambda: self.prometheus.safe_custom_query(query=query)["result"], + lambda: self.get_prometheus().safe_custom_query(query=query)["result"], ) @retry(wait=wait_random(min=2, max=10), stop=stop_after_attempt(5)) @@ -128,7 +143,7 @@ async def query_range(self, query: str, start: datetime, end: datetime, step: ti loop = asyncio.get_running_loop() return await loop.run_in_executor( self.executor, - lambda: self.prometheus.safe_custom_query_range( + lambda: self.get_prometheus().safe_custom_query_range( query=query, start_time=start, end_time=end, step=f"{step.seconds}s" )["result"], ) @@ -155,7 +170,7 @@ def validate_cluster_name(self): def get_cluster_names(self) -> Optional[List[str]]: try: - return self.prometheus.get_label_values(label_name=settings.prometheus_label) + return self.get_prometheus().get_label_values(label_name=settings.prometheus_label) except PrometheusApiClientException: logger.error("Labels api not present on prometheus client") return [] @@ -194,7 +209,7 @@ async def gather_data( """ logger.debug(f"Gathering {LoaderClass.__name__} metric for {object}") try: - metric_loader = LoaderClass(self.prometheus, self.name(), self.executor) + metric_loader = LoaderClass(self.get_prometheus(), self.name(), self.executor) data = await metric_loader.load_data(object, period, step) except Exception: logger.exception("Failed to gather resource history data for %s", object) @@ -334,6 +349,13 @@ async def load_pods(self, object: K8sObjectData, period: timedelta) -> list[PodD pod_owner_kind = "Job" del jobs + elif object.kind == "GroupedJob": + if hasattr(object._api_resource, '_grouped_jobs'): + pod_owners = [job.metadata.name for job in object._api_resource._grouped_jobs] + pod_owner_kind = "Job" + else: + pod_owners = [object.name] + pod_owner_kind = object.kind else: pod_owners = [object.name] pod_owner_kind = object.kind diff --git a/robusta_krr/core/models/config.py b/robusta_krr/core/models/config.py index 27ce56c6..62616c34 100644 --- a/robusta_krr/core/models/config.py +++ b/robusta_krr/core/models/config.py @@ -52,6 +52,10 @@ class Config(pd.BaseSettings): # Threading settings max_workers: int = pd.Field(6, ge=1) + + # Job grouping settings + job_grouping_labels: Union[list[str], str, None] = pd.Field(None, description="Label name(s) to use for grouping jobs into GroupedJob workload type") + job_grouping_limit: int = pd.Field(500, ge=1, description="Maximum number of jobs/pods to query per GroupedJob group") # Logging Settings format: str @@ -130,6 +134,15 @@ def validate_resources(cls, v: Union[list[str], Literal["*"]]) -> Union[list[str # So this will preserve the big and small letters of the resource return [next(r for r in KindLiteral.__args__ if r.lower() == val.lower()) for val in v] + @pd.validator("job_grouping_labels", pre=True) + def validate_job_grouping_labels(cls, v: Union[list[str], str, None]) -> Union[list[str], None]: + if v is None: + return None + if isinstance(v, str): + # Split comma-separated string into list + return [label.strip() for label in v.split(',')] + return v + def create_strategy(self) -> AnyStrategy: StrategyType = AnyStrategy.find(self.strategy) StrategySettingsType = StrategyType.get_settings_type() diff --git a/robusta_krr/core/models/objects.py b/robusta_krr/core/models/objects.py index 71ba0ca2..286e98b7 100644 --- a/robusta_krr/core/models/objects.py +++ b/robusta_krr/core/models/objects.py @@ -8,7 +8,7 @@ from robusta_krr.utils.batched import batched from kubernetes.client.models import V1LabelSelector -KindLiteral = Literal["Deployment", "DaemonSet", "StatefulSet", "Job", "CronJob", "Rollout", "DeploymentConfig", "StrimziPodSet"] +KindLiteral = Literal["Deployment", "DaemonSet", "StatefulSet", "Job", "CronJob", "Rollout", "DeploymentConfig", "StrimziPodSet", "GroupedJob"] class PodData(pd.BaseModel): diff --git a/robusta_krr/main.py b/robusta_krr/main.py index b29d7e10..8604b20b 100644 --- a/robusta_krr/main.py +++ b/robusta_krr/main.py @@ -220,6 +220,18 @@ def run_strategy( help="Max workers to use for async requests.", rich_help_panel="Threading Settings", ), + job_grouping_labels: Optional[str] = typer.Option( + None, + "--job-grouping-labels", + help="Label name(s) to use for grouping jobs into GroupedJob workload type. Can be a single label or comma-separated labels (e.g., 'app,team').", + rich_help_panel="Job Grouping Settings", + ), + job_grouping_limit: int = typer.Option( + 500, + "--job-grouping-limit", + help="Maximum number of jobs/pods to query per GroupedJob group (default: 500).", + rich_help_panel="Job Grouping Settings", + ), format: str = typer.Option( "table", "--formatter", @@ -357,6 +369,8 @@ def run_strategy( coralogix_token=coralogix_token, openshift=openshift, max_workers=max_workers, + job_grouping_labels=job_grouping_labels, + job_grouping_limit=job_grouping_limit, format=format, show_cluster_name=show_cluster_name, verbose=verbose, diff --git a/tests/test_grouped_jobs.py b/tests/test_grouped_jobs.py new file mode 100644 index 00000000..283f6fa9 --- /dev/null +++ b/tests/test_grouped_jobs.py @@ -0,0 +1,238 @@ +import pytest +from unittest.mock import AsyncMock, MagicMock, patch +from robusta_krr.core.integrations.kubernetes import ClusterLoader +from robusta_krr.core.models.config import Config + + +@pytest.fixture +def mock_config(): + """Mock config with job grouping settings""" + config = MagicMock(spec=Config) + config.job_grouping_labels = ["app", "team"] + config.job_grouping_limit = 3 # Small limit for testing + config.max_workers = 4 + config.get_kube_client = MagicMock() + config.resources = "*" + return config + + +@pytest.fixture +def mock_kubernetes_loader(mock_config): + """Create a ClusterLoader instance with mocked dependencies""" + with patch("robusta_krr.core.integrations.kubernetes.settings", mock_config): + loader = ClusterLoader() + loader.batch = MagicMock() + loader.core = MagicMock() + loader.executor = MagicMock() + loader._ClusterLoader__hpa_list = {} # type: ignore # needed for mock + return loader + + +def create_mock_job(name: str, namespace: str, labels: dict): + """Create a mock V1Job object""" + job = MagicMock() + job.metadata.name = name + job.metadata.namespace = namespace + job.metadata.labels = labels + job.metadata.owner_references = [] + + # Create a mock container with a proper name + container = MagicMock() + container.name = "main-container" + job.spec.template.spec.containers = [container] + return job + + +@pytest.mark.asyncio +async def test_list_all_groupedjobs_with_limit(mock_kubernetes_loader, mock_config): + """Test that _list_all_groupedjobs respects the job_grouping_limit""" + + # Create mock jobs - more than the limit (3) + mock_jobs = [ + create_mock_job("job-1", "default", {"app": "frontend"}), + create_mock_job("job-2", "default", {"app": "frontend"}), + create_mock_job("job-3", "default", {"app": "frontend"}), + create_mock_job("job-4", "default", {"app": "frontend"}), # This should be excluded + create_mock_job("job-5", "default", {"app": "frontend"}), # This should be excluded + create_mock_job("job-6", "default", {"app": "backend"}), + create_mock_job("job-7", "default", {"app": "backend"}), + create_mock_job("job-8", "default", {"app": "backend"}), + create_mock_job("job-9", "default", {"app": "backend"}), # This should be excluded + ] + + # Mock the _list_namespaced_or_global_objects method + mock_kubernetes_loader._list_namespaced_or_global_objects = AsyncMock(return_value=mock_jobs) + + # Mock the __build_scannable_object method + def mock_build_scannable_object(item, container, kind): + obj = MagicMock() + obj._api_resource = MagicMock() + return obj + + mock_kubernetes_loader._KubernetesLoader__build_scannable_object = mock_build_scannable_object + + # Patch the settings to use our mock config + with patch("robusta_krr.core.integrations.kubernetes.settings", mock_config): + # Call the method + result = await mock_kubernetes_loader._list_all_groupedjobs() + + # Verify we got 2 objects (1 frontend + 1 backend, one per unique container name) + assert len(result) == 2 + + # Group results by name to verify grouping + frontend_objects = [g for g in result if g.name == "app=frontend"] + backend_objects = [g for g in result if g.name == "app=backend"] + + # Verify we got 1 frontend object (one per unique container name) + assert len(frontend_objects) == 1 + assert frontend_objects[0].namespace == "default" + assert frontend_objects[0].container == "main-container" + + # Verify we got 1 backend object (one per unique container name) + assert len(backend_objects) == 1 + assert backend_objects[0].namespace == "default" + assert backend_objects[0].container == "main-container" + + # Verify all objects in each group have the same grouped_jobs list + frontend_grouped_jobs = frontend_objects[0]._api_resource._grouped_jobs + assert len(frontend_grouped_jobs) == 3 + assert frontend_grouped_jobs[0].metadata.name == "job-1" + assert frontend_grouped_jobs[1].metadata.name == "job-2" + assert frontend_grouped_jobs[2].metadata.name == "job-3" + + backend_grouped_jobs = backend_objects[0]._api_resource._grouped_jobs + assert len(backend_grouped_jobs) == 3 + assert backend_grouped_jobs[0].metadata.name == "job-6" + assert backend_grouped_jobs[1].metadata.name == "job-7" + assert backend_grouped_jobs[2].metadata.name == "job-8" + + +@pytest.mark.asyncio +async def test_list_all_groupedjobs_with_different_namespaces(mock_kubernetes_loader, mock_config): + """Test that GroupedJob objects are created separately for different namespaces""" + + # Create mock jobs in different namespaces + mock_jobs = [ + create_mock_job("job-1", "namespace-1", {"app": "frontend"}), + create_mock_job("job-2", "namespace-1", {"app": "frontend"}), + create_mock_job("job-3", "namespace-2", {"app": "frontend"}), + create_mock_job("job-4", "namespace-2", {"app": "frontend"}), + ] + + mock_kubernetes_loader._list_namespaced_or_global_objects = AsyncMock(return_value=mock_jobs) + + def mock_build_scannable_object(item, container, kind): + obj = MagicMock() + obj._api_resource = MagicMock() + return obj + + mock_kubernetes_loader._KubernetesLoader__build_scannable_object = mock_build_scannable_object + + # Patch the settings to use our mock config + with patch("robusta_krr.core.integrations.kubernetes.settings", mock_config): + # Call the method + result = await mock_kubernetes_loader._list_all_groupedjobs() + + # Verify we got 2 objects (1 per namespace, one per unique container name) + assert len(result) == 2 + + # Group results by namespace + ns1_objects = [g for g in result if g.namespace == "namespace-1"] + ns2_objects = [g for g in result if g.namespace == "namespace-2"] + + # Check namespace-1 objects + assert len(ns1_objects) == 1 + assert ns1_objects[0].name == "app=frontend" + assert ns1_objects[0].container == "main-container" + assert len(ns1_objects[0]._api_resource._grouped_jobs) == 2 + + # Check namespace-2 objects + assert len(ns2_objects) == 1 + assert ns2_objects[0].name == "app=frontend" + assert ns2_objects[0].container == "main-container" + assert len(ns2_objects[0]._api_resource._grouped_jobs) == 2 + + +@pytest.mark.asyncio +async def test_list_all_groupedjobs_with_cronjob_owner_reference(mock_kubernetes_loader, mock_config): + """Test that jobs with CronJob owner references are excluded""" + + # Create mock jobs - one with CronJob owner, one without + mock_jobs = [ + create_mock_job("job-1", "default", {"app": "frontend"}), + create_mock_job("job-2", "default", {"app": "frontend"}), + ] + + # Add CronJob owner reference to the second job + mock_jobs[1].metadata.owner_references = [MagicMock(kind="CronJob")] + + mock_kubernetes_loader._list_namespaced_or_global_objects = AsyncMock(return_value=mock_jobs) + + def mock_build_scannable_object(item, container, kind): + obj = MagicMock() + obj._api_resource = MagicMock() + return obj + + mock_kubernetes_loader._KubernetesLoader__build_scannable_object = mock_build_scannable_object + + # Patch the settings to use our mock config + with patch("robusta_krr.core.integrations.kubernetes.settings", mock_config): + # Call the method + result = await mock_kubernetes_loader._list_all_groupedjobs() + + # Verify we got 1 object (only the job without CronJob owner) + assert len(result) == 1 + obj = result[0] + assert obj.name == "app=frontend" + assert len(obj._api_resource._grouped_jobs) == 1 + assert obj._api_resource._grouped_jobs[0].metadata.name == "job-1" + + +@pytest.mark.asyncio +async def test_list_all_groupedjobs_no_grouping_labels(mock_kubernetes_loader): + """Test that no GroupedJob objects are created when no grouping labels are configured""" + + # Mock config with no grouping labels + mock_config_no_labels = MagicMock(spec=Config) + mock_config_no_labels.job_grouping_labels = None + + with patch("robusta_krr.core.integrations.kubernetes.settings", mock_config_no_labels): + result = await mock_kubernetes_loader._list_all_groupedjobs() + assert len(result) == 0 + + +@pytest.mark.asyncio +async def test_list_all_groupedjobs_multiple_labels(mock_kubernetes_loader, mock_config): + """Test that jobs with different grouping labels create separate groups""" + + # Create mock jobs with different labels + mock_jobs = [ + create_mock_job("job-1", "default", {"app": "frontend"}), + create_mock_job("job-2", "default", {"team": "backend"}), + create_mock_job("job-3", "default", {"app": "api"}), + ] + + mock_kubernetes_loader._list_namespaced_or_global_objects = AsyncMock(return_value=mock_jobs) + + def mock_build_scannable_object(item, container, kind): + obj = MagicMock() + obj._api_resource = MagicMock() + return obj + + mock_kubernetes_loader._KubernetesLoader__build_scannable_object = mock_build_scannable_object + + # Patch the settings to use our mock config + with patch("robusta_krr.core.integrations.kubernetes.settings", mock_config): + # Call the method + result = await mock_kubernetes_loader._list_all_groupedjobs() + + # Verify we got 3 objects (one for each label+value combination, one per unique container name) + assert len(result) == 3 + + group_names = {g.name for g in result} + assert "app=frontend" in group_names + assert "team=backend" in group_names + assert "app=api" in group_names + + # Verify all objects have the same container name + assert all(obj.container == "main-container" for obj in result)