diff --git a/docs/preprocess.md b/docs/preprocess.md index 318abb1..afaa97f 100644 --- a/docs/preprocess.md +++ b/docs/preprocess.md @@ -4,7 +4,8 @@ ### Attributes Omitted - **UUID** - **Nodes**: NodesList have more specific information -- **Preempted**: Status have more valid information +- **Preempted**: Contains unreliable data. Use Status column instead (PREEMPT for + unfinished, COMPLETE/FAILED/etc. for finished preempted jobs). - **EndTime**: Can be calculated from StartTime and Elapsed ### Options for Including or Omitting Jobs @@ -12,6 +13,7 @@ - If `GPUType` is null, the value will be filled with `["cpu"]` - If `GPUs` is null or is 0, the value will be 0. - **Keeping jobs where the status is "Failed" or "Cancelled"** +- **Keeping jobs where the QOS is customized (not normal, long, or short)** ### Records Omitted If: - `Elapsed` is less than the minimum threshold diff --git a/notebooks/Efficiency Analysis.ipynb b/notebooks/Efficiency Analysis.ipynb index 1d07964..c308329 100644 --- a/notebooks/Efficiency Analysis.ipynb +++ b/notebooks/Efficiency Analysis.ipynb @@ -1,614 +1,618 @@ { - "cells": [ - { - "cell_type": "markdown", - "id": "0", - "metadata": {}, - "source": [ - "# [Efficiency Analysis](#toc0_)\n", - "This notebook demonstrates the use of `EfficiencyAnalysis` class in `src/analysis/efficiency_analysis.py` for analyzing the efficiency of jobs, users, and PI groups." - ] - }, - { - "cell_type": "markdown", - "id": "1", - "metadata": {}, - "source": [ - "**Table of contents** \n", - "- [Efficiency Analysis](#toc1_) \n", - " - [Setup](#toc1_1_) \n", - " - [Example: Analyze workload efficiency of GPU users who set no VRAM constraints and used 0 GB of VRAM](#toc1_2_) \n", - " - [Job Efficiency Metrics](#toc1_2_1_) \n", - " - [Find most inefficient jobs with no VRAM constraints based on `vram_hours`](#toc1_2_1_1_) \n", - " - [User Efficiency Metrics](#toc1_2_2_) \n", - " - [Find Inefficient Users based on `expected_value_alloc_vram_efficiency`](#toc1_2_2_1_) \n", - " - [Find Inefficient Users based on `vram_hours`](#toc1_2_2_2_) \n", - " - [PI Group Efficiency Metrics](#toc1_2_3_) \n", - " - [Find Inefficient PIs based on `vram_hours`](#toc1_2_3_1_) \n", - " - [Example: Analyze all jobs with no VRAM constraints](#toc1_3_) \n", - " - [Job Efficiency Metrics](#toc1_3_1_) \n", - " - [Problem with duplicate JobIDs](#toc1_3_1_1_) \n", - " - [Top users with most number of jobs that have no VRAM constraints](#toc1_3_1_2_) \n", - " - [Find inefficient jobs with no VRAM Constraints based on `alloc_vram_efficiency_score`](#toc1_3_1_3_) \n", - "\n", - "\n", - "" - ] - }, - { - "cell_type": "markdown", - "id": "2", - "metadata": {}, - "source": [ - "## [Setup](#toc0_)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "3", - "metadata": {}, - "outputs": [], - "source": [ - "# Import required modules\n", - "import sys\n", - "from pathlib import Path\n", - "import pandas as pd\n", - "import matplotlib.pyplot as plt\n", - "import seaborn as sns" - ] - }, - { - "cell_type": "markdown", - "id": "4", - "metadata": {}, - "source": [ - "Jupyter server should be run at the notebook directory, so the output of the following cell would be the project root:" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "5", - "metadata": {}, - "outputs": [], - "source": [ - "project_root = str(Path.cwd().resolve().parent)\n", - "print(f\"Project root: {project_root}\")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "6", - "metadata": {}, - "outputs": [], - "source": [ - "# Add project root to sys.path for module imports\n", - "if project_root not in sys.path:\n", - " sys.path.insert(0, project_root)\n", - "\n", - "from src.analysis import efficiency_analysis as ea\n", - "from src.visualization import JobsWithMetricsVisualizer, UsersWithMetricsVisualizer\n", - "\n", - "# Automatically reload modules before executing code\n", - "# This is useful for development to see changes without restarting the kernel.\n", - "%load_ext autoreload\n", - "# Reload all modules imported with %aimport every time before executing the Python code typed.\n", - "%autoreload 2" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "7", - "metadata": {}, - "outputs": [], - "source": [ - "# Load the jobs DataFrame from DuckDB\n", - "preprocessed_jobs_df = ea.load_preprocessed_jobs_dataframe_from_duckdb(\n", - " db_path=\"../data/slurm_data.db\",\n", - " table_name=\"Jobs\",\n", - ")\n", - "display(preprocessed_jobs_df.head(10))\n", - "print(preprocessed_jobs_df.shape)" - ] - }, - { - "cell_type": "markdown", - "id": "8", - "metadata": {}, - "source": [ - "## [Example: Analyze workload efficiency of GPU users who set no VRAM constraints and used 0 GB of VRAM](#toc0_)\n" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "9", - "metadata": {}, - "outputs": [], - "source": [ - "efficiency_analysis = ea.EfficiencyAnalysis(jobs_df=preprocessed_jobs_df)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "10", - "metadata": {}, - "outputs": [], - "source": [ - "filtered_jobs = efficiency_analysis.filter_jobs_for_analysis(\n", - " vram_constraint_filter=pd.NA, # No VRAM constraints\n", - " gpu_mem_usage_filter=0, # Used 0 GB of VRAM\n", - ")\n", - "filtered_jobs" - ] - }, - { - "cell_type": "markdown", - "id": "11", - "metadata": {}, - "source": [ - "Generate all metrics:" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "12", - "metadata": {}, - "outputs": [], - "source": [ - "metrics_dict = efficiency_analysis.calculate_all_efficiency_metrics(filtered_jobs)\n", - "\n", - "jobs_with_metrics = metrics_dict[\"jobs_with_efficiency_metrics\"]\n", - "users_with_metrics = metrics_dict[\"users_with_efficiency_metrics\"]\n", - "pi_accounts_with_metrics = metrics_dict[\"pi_accounts_with_efficiency_metrics\"]" - ] - }, - { - "cell_type": "markdown", - "id": "13", - "metadata": {}, - "source": [ - "### [Job Efficiency Metrics](#toc0_)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "14", - "metadata": {}, - "outputs": [], - "source": [ - "# Set option to display all columns\n", - "pd.set_option(\"display.max_columns\", None)\n", - "# Display the DataFrame\n", - "display(jobs_with_metrics.head(10))\n", - "# To revert to default settings (optional)\n", - "pd.reset_option(\"display.max_columns\")\n", - "\n", - "print(f\"Jobs found: {len(jobs_with_metrics)}\")" - ] - }, - { - "cell_type": "markdown", - "id": "15", - "metadata": {}, - "source": [ - "#### [Find most inefficient jobs with no VRAM constraints based on `vram_hours`](#toc0_)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "16", - "metadata": {}, - "outputs": [], - "source": [ - "inefficient_jobs_vram_hours = efficiency_analysis.sort_and_filter_records_with_metrics(\n", - " metrics_df_name_enum=ea.MetricsDataFrameNameEnum.JOBS,\n", - " sorting_key=\"vram_hours\",\n", - " ascending=False, # Sort by vram_hours in descending order\n", - " filter_criteria={\n", - " \"vram_hours\": {\"min\": 80 * 24, \"inclusive\": True}, # VRAM-hours threshold for identifying inefficient jobs\n", - " },\n", - ")\n", - "# Display top inefficient users by VRAM-hours\n", - "print(\"\\nTop inefficient Jobs by VRAM-hours:\")\n", - "display(inefficient_jobs_vram_hours.head(10))\n", - "\n", - "# Plot top inefficient jobs by VRAM-hours, with VRAM-hours as labels\n", - "jobs_with_metrics_visualizer = JobsWithMetricsVisualizer(inefficient_jobs_vram_hours.head(20))\n", - "jobs_with_metrics_visualizer.visualize(\n", - " column=\"vram_hours\",\n", - " bar_label_columns=[\"vram_hours\", \"job_hours\"],\n", - ")" - ] - }, - { - "cell_type": "markdown", - "id": "17", - "metadata": {}, - "source": [ - "### [User Efficiency Metrics](#toc0_)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "18", - "metadata": {}, - "outputs": [], - "source": [ - "users_with_metrics" - ] - }, - { - "cell_type": "markdown", - "id": "19", - "metadata": {}, - "source": [ - "#### [Find Inefficient Users based on `expected_value_alloc_vram_efficiency`](#toc0_)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "20", - "metadata": {}, - "outputs": [], - "source": [ - "inefficient_users_alloc_vram_eff = efficiency_analysis.sort_and_filter_records_with_metrics(\n", - " metrics_df_name_enum=ea.MetricsDataFrameNameEnum.USERS,\n", - " sorting_key=\"expected_value_alloc_vram_efficiency\",\n", - " ascending=True, # we want to find users with low efficiency\n", - " filter_criteria={\n", - " \"expected_value_alloc_vram_efficiency\": {\"max\": 0.3, \"inclusive\": True},\n", - " \"job_count\": {\"min\": 5, \"inclusive\": True}, # Minimum number of jobs to consider a user\n", - " },\n", - ")\n", - "print(\"\\nTop inefficient users by allocated vram efficiency:\")\n", - "display(inefficient_users_alloc_vram_eff.head(20))\n", - "\n", - "# Plot top inefficient users by allocated vram efficiency, with allocated vram efficiency as labels\n", - "users_with_metrics_visualizer = UsersWithMetricsVisualizer(inefficient_users_alloc_vram_eff.head(20))\n", - "users_with_metrics_visualizer.visualize(\n", - " column=\"expected_value_alloc_vram_efficiency\",\n", - " bar_label_columns=[\"expected_value_alloc_vram_efficiency\", \"user_job_hours\"],\n", - " figsize=(8, 10),\n", - ")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "21", - "metadata": {}, - "outputs": [], - "source": [ - "inefficient_users = efficiency_analysis.sort_and_filter_records_with_metrics(\n", - " metrics_df_name_enum=ea.MetricsDataFrameNameEnum.USERS,\n", - " sorting_key=\"expected_value_alloc_vram_efficiency\",\n", - " ascending=True, # we want to find users with low efficiency\n", - " filter_criteria={\n", - " \"expected_value_alloc_vram_efficiency\": {\"max\": 0.3, \"inclusive\": True},\n", - " \"job_count\": {\"min\": 5, \"inclusive\": True}, # Minimum number of jobs to consider a user\n", - " },\n", - ")\n", - "\n", - "# Display top inefficient users by job count\n", - "print(\"\\nTop inefficient users by allocated vram efficiency:\")\n", - "display(inefficient_users.head(10))\n", - "\n", - "\n", - "# Plot top inefficient users by GPU hours, with efficiency as labels\n", - "top_users = inefficient_users.head(10)\n", - "\n", - "plt.figure(figsize=(8, 5))\n", - "barplot = sns.barplot(y=top_users[\"User\"], x=top_users[\"user_job_hours\"], orient=\"h\")\n", - "plt.xlabel(\"Job Hours\")\n", - "plt.ylabel(\"User\")\n", - "plt.title(\"Top 10 Inefficient Users by Allocated VRAM Efficiency Contribution\")\n", - "\n", - "# Annotate bars with expected_value_alloc_vram_efficiency, keeping text fully inside the plot's right spine\n", - "ax = barplot\n", - "xmax = top_users[\"user_job_hours\"].max()\n", - "# Add headroom for annotation space (20% extra)\n", - "xlim = xmax * 1.20 if xmax > 0 else 1\n", - "ax.set_xlim(0, xlim)\n", - "\n", - "# Calculate annotation x-position: place at 98% of xlim or just left of the right spine, whichever is smaller\n", - "for i, (job_hours, efficiency) in enumerate(\n", - " zip(\n", - " top_users[\"user_job_hours\"],\n", - " top_users[\"expected_value_alloc_vram_efficiency\"],\n", - " strict=True,\n", - " )\n", - "):\n", - " # Place annotation at min(job_hours + 2% of xlim, 98% of xlim)\n", - " xpos = min(job_hours + xlim * 0.02, xlim * 0.98)\n", - " # If bar is very close to right spine, nudge annotation left to avoid overlap\n", - " if xpos > xlim * 0.96:\n", - " xpos = xlim * 0.96\n", - " ax.text(xpos, i, f\"Eff: {efficiency:.2f}\", va=\"center\", ha=\"left\", fontsize=10, color=\"black\", clip_on=True)\n", - "\n", - "plt.tight_layout()\n", - "plt.show()" - ] - }, - { - "cell_type": "markdown", - "id": "22", - "metadata": {}, - "source": [ - "#### [Find Inefficient Users based on `vram_hours`](#toc0_)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "23", - "metadata": {}, - "outputs": [], - "source": [ - "inefficient_users_vram_hours = efficiency_analysis.find_inefficient_users_by_vram_hours(\n", - " vram_hours_filter={\"min\": 200, \"inclusive\": True}, # VRAM-hours threshold for identifying inefficient users\n", - " min_jobs=5, # Minimum number of jobs to consider a user\n", - ")\n", - "# Display top inefficient users by VRAM-hours\n", - "print(\"\\nTop inefficient users by VRAM-hours:\")\n", - "display(inefficient_users_vram_hours.head(20))\n", - "\n", - "\n", - "# Plot top inefficient users by VRAM-hours, with VRAM-hours as labels\n", - "users_with_metrics_visualizer = UsersWithMetricsVisualizer(inefficient_users_vram_hours.head(20))\n", - "users_with_metrics_visualizer.visualize(\n", - " column=\"vram_hours\", bar_label_columns=[\"vram_hours\", \"user_job_hours\"], figsize=(8, 10)\n", - ")" - ] - }, - { - "cell_type": "markdown", - "id": "24", - "metadata": {}, - "source": [ - "### [PI Group Efficiency Metrics](#toc0_)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "25", - "metadata": {}, - "outputs": [], - "source": [ - "pi_accounts_with_metrics" - ] - }, - { - "cell_type": "markdown", - "id": "26", - "metadata": {}, - "source": [ - "#### [Find Inefficient PIs based on `vram_hours`](#toc0_)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "27", - "metadata": {}, - "outputs": [], - "source": [ - "inefficient_pis_vram_hours = efficiency_analysis.sort_and_filter_records_with_metrics(\n", - " metrics_df_name_enum=ea.MetricsDataFrameNameEnum.PI_GROUPS,\n", - " sorting_key=\"pi_acc_vram_hours\",\n", - " ascending=False,\n", - " filter_criteria={\n", - " \"pi_acc_vram_hours\": {\"min\": 200, \"inclusive\": True}, # VRAM-hours threshold for identifying inefficient users\n", - " \"job_count\": {\"min\": 5, \"inclusive\": True}, # Minimum number of jobs to consider a PI account\n", - " },\n", - ")\n", - "# Display top inefficient users by VRAM-hours\n", - "print(\"\\nTop inefficient PI Groups by VRAM-hours:\")\n", - "display(inefficient_pis_vram_hours.head(20))\n", - "\n", - "top_pi_accounts = inefficient_pis_vram_hours.head(20)\n", - "\n", - "# Plot top inefficient users by VRAM-hours, with VRAM-hours as labels\n", - "plt.figure(figsize=(8, 8))\n", - "barplot = sns.barplot(\n", - " y=top_pi_accounts[\"pi_account\"],\n", - " x=top_pi_accounts[\"pi_acc_vram_hours\"],\n", - " order=top_pi_accounts[\"pi_account\"].tolist(), # Only show present values\n", - " orient=\"h\",\n", - ")\n", - "plt.xlabel(\"VRAM-Hours\")\n", - "plt.ylabel(\"PI Account\")\n", - "plt.title(\"Top Inefficient PI Accounts by VRAM-Hours\")\n", - "# Annotate bars with gpu_hours, keeping text fully inside the plot's right spine\n", - "ax = barplot\n", - "xmax = top_pi_accounts[\"pi_acc_vram_hours\"].max()\n", - "# Add headroom for annotation space (20% extra)\n", - "xlim = xmax * 1.6 if xmax > 0 else 1\n", - "ax.set_xlim(0, xlim)\n", - "# Calculate annotation x-position: place at 98% of xlim or just left of the right spine, whichever is smaller\n", - "for i, (vram_hours, pi_acc_job_hours) in enumerate(\n", - " zip(\n", - " top_pi_accounts[\"pi_acc_vram_hours\"],\n", - " top_pi_accounts[\"pi_acc_job_hours\"],\n", - " strict=True,\n", - " )\n", - "):\n", - " # Place annotation at min(vram_hours + 2% of xlim, 98% of xlim)\n", - " xpos = min(vram_hours + xlim * 0.02, xlim * 0.98)\n", - " ax.text(\n", - " xpos,\n", - " i,\n", - " f\"VRAM-Hours: {vram_hours:.2f}\\n Job Hours: {pi_acc_job_hours:.2f}\",\n", - " va=\"center\",\n", - " ha=\"left\",\n", - " fontsize=10,\n", - " color=\"black\",\n", - " clip_on=True,\n", - " )\n", - "plt.tight_layout()\n", - "plt.show()" - ] - }, - { - "cell_type": "markdown", - "id": "28", - "metadata": {}, - "source": [ - "## [Example: Analyze all jobs with no VRAM constraints](#toc0_)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "29", - "metadata": {}, - "outputs": [], - "source": [ - "# Filter jobs where no VRAM constraint was set but a GPU was allocated\n", - "no_vram_constraint_efficiency_analysis = ea.EfficiencyAnalysis(jobs_df=preprocessed_jobs_df)\n", - "all_no_vram_constraint_jobs = no_vram_constraint_efficiency_analysis.filter_jobs_for_analysis(\n", - " vram_constraint_filter={\"min\": 0, \"inclusive\": False}, # No VRAM constraints\n", - " gpu_count_filter={\"min\": 1, \"inclusive\": True}, # At least one GPU allocated\n", - " gpu_mem_usage_filter={\"min\": 0, \"inclusive\": False}, # Used more than 0 GiB of VRAM\n", - ")\n", - "\n", - "display(all_no_vram_constraint_jobs.head(10))\n", - "print(all_no_vram_constraint_jobs.shape)" - ] - }, - { - "cell_type": "markdown", - "id": "30", - "metadata": {}, - "source": [ - "### [Job Efficiency Metrics](#toc0_)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "31", - "metadata": {}, - "outputs": [], - "source": [ - "no_vram_constraint_jobs_with_metrics = no_vram_constraint_efficiency_analysis.calculate_job_efficiency_metrics(\n", - " all_no_vram_constraint_jobs\n", - ")\n", - "\n", - "# Set option to display all columns\n", - "pd.set_option(\"display.max_columns\", None)\n", - "# Display the DataFrame\n", - "display(no_vram_constraint_jobs_with_metrics.head(10))\n", - "# To revert to default settings (optional)\n", - "pd.reset_option(\"display.max_columns\")\n", - "print(f\"Jobs found: {len(no_vram_constraint_jobs_with_metrics)}\")" - ] - }, - { - "cell_type": "markdown", - "id": "32", - "metadata": {}, - "source": [ - "#### [Problem with duplicate JobIDs](#toc0_)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "33", - "metadata": {}, - "outputs": [], - "source": [ - "# select jobs with specific job id\n", - "pd.set_option(\"display.max_columns\", None)\n", - "# Display the DataFrame\n", - "display(no_vram_constraint_jobs_with_metrics[no_vram_constraint_jobs_with_metrics[\"JobID\"] == 24374463])\n", - "pd.reset_option(\"display.max_columns\")" - ] - }, - { - "cell_type": "markdown", - "id": "34", - "metadata": {}, - "source": [ - "#### [Top users with most number of jobs that have no VRAM constraints](#toc0_)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "35", - "metadata": {}, - "outputs": [], - "source": [ - "# Plot top users by number of jobs with no VRAM constraints\n", - "if not all_no_vram_constraint_jobs.empty:\n", - " plt.figure(figsize=(10, 5))\n", - " user_counts = all_no_vram_constraint_jobs[\"User\"].value_counts().head(20)\n", - " sns.barplot(x=user_counts.values, y=user_counts.index, orient=\"h\")\n", - " plt.xlabel(\"Number of Jobs\")\n", - " plt.ylabel(\"User\")\n", - " plt.title(\"Top 20 Users: Jobs with no VRAM Constraints\")\n", - " plt.tight_layout()\n", - " plt.show()\n", - "else:\n", - " print(\"No jobs found without VRAM constraints.\")" - ] - }, - { - "cell_type": "markdown", - "id": "36", - "metadata": {}, - "source": [ - "#### [Find inefficient jobs with no VRAM Constraints based on `alloc_vram_efficiency_score`](#toc0_)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "37", - "metadata": {}, - "outputs": [], - "source": [ - "low_alloc_vram_score_jobs = no_vram_constraint_efficiency_analysis.sort_and_filter_records_with_metrics(\n", - " metrics_df_name_enum=ea.MetricsDataFrameNameEnum.JOBS,\n", - " sorting_key=\"alloc_vram_efficiency_score\",\n", - " ascending=True, # Sort by alloc_vram_efficiency_score in ascending order\n", - " filter_criteria={\n", - " \"alloc_vram_efficiency_score\": {\"max\": -10, \"inclusive\": True}, # score threshold\n", - " },\n", - ")\n", - "# Display top inefficient users by alloc_vram_efficiency_score\n", - "print(\"\\nTop inefficient Jobs by allocated VRAM efficiency score:\")\n", - "\n", - "display(low_alloc_vram_score_jobs.head(20))\n", - "\n", - "jobs_with_metrics_visualizer = JobsWithMetricsVisualizer(low_alloc_vram_score_jobs.head(20))\n", - "jobs_with_metrics_visualizer.visualize(\n", - " column=\"alloc_vram_efficiency_score\",\n", - " bar_label_columns=[\"alloc_vram_efficiency_score\", \"job_hours\"],\n", - " figsize=(10, 12),\n", - ")" - ] - } - ], - "metadata": {}, - "nbformat": 4, - "nbformat_minor": 5 + "cells": [ + { + "cell_type": "markdown", + "id": "0", + "metadata": {}, + "source": [ + "# [Efficiency Analysis](#toc0_)\n", + "This notebook demonstrates the use of `EfficiencyAnalysis` class in `src/analysis/efficiency_analysis.py` for analyzing the efficiency of jobs, users, and PI groups." + ] + }, + { + "cell_type": "markdown", + "id": "1", + "metadata": {}, + "source": [ + "**Table of contents** \n", + "- [Efficiency Analysis](#toc1_) \n", + " - [Setup](#toc1_1_) \n", + " - [Example: Analyze workload efficiency of GPU users who set no VRAM constraints and used 0 GB of VRAM](#toc1_2_) \n", + " - [Job Efficiency Metrics](#toc1_2_1_) \n", + " - [Find most inefficient jobs with no VRAM constraints based on `vram_hours`](#toc1_2_1_1_) \n", + " - [User Efficiency Metrics](#toc1_2_2_) \n", + " - [Find Inefficient Users based on `expected_value_alloc_vram_efficiency`](#toc1_2_2_1_) \n", + " - [Find Inefficient Users based on `vram_hours`](#toc1_2_2_2_) \n", + " - [PI Group Efficiency Metrics](#toc1_2_3_) \n", + " - [Find Inefficient PIs based on `vram_hours`](#toc1_2_3_1_) \n", + " - [Example: Analyze all jobs with no VRAM constraints](#toc1_3_) \n", + " - [Job Efficiency Metrics](#toc1_3_1_) \n", + " - [Problem with duplicate JobIDs](#toc1_3_1_1_) \n", + " - [Top users with most number of jobs that have no VRAM constraints](#toc1_3_1_2_) \n", + " - [Find inefficient jobs with no VRAM Constraints based on `alloc_vram_efficiency_score`](#toc1_3_1_3_) \n", + "\n", + "\n", + "" + ] + }, + { + "cell_type": "markdown", + "id": "2", + "metadata": {}, + "source": [ + "## [Setup](#toc0_)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "3", + "metadata": {}, + "outputs": [], + "source": [ + "# Import required modules\n", + "import sys\n", + "from pathlib import Path\n", + "import pandas as pd\n", + "import matplotlib.pyplot as plt\n", + "import seaborn as sns" + ] + }, + { + "cell_type": "markdown", + "id": "4", + "metadata": {}, + "source": [ + "Jupyter server should be run at the notebook directory, so the output of the following cell would be the project root:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "5", + "metadata": {}, + "outputs": [], + "source": [ + "project_root = str(Path.cwd().resolve().parent)\n", + "print(f\"Project root: {project_root}\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "6", + "metadata": {}, + "outputs": [], + "source": [ + "# Add project root to sys.path for module imports\n", + "if project_root not in sys.path:\n", + " sys.path.insert(0, project_root)\n", + "\n", + "from src.analysis import efficiency_analysis as ea\n", + "from src.visualization import JobsWithMetricsVisualizer, UsersWithMetricsVisualizer\n", + "from src.utilities import load_and_preprocess_jobs\n", + "# Automatically reload modules before executing code\n", + "# This is useful for development to see changes without restarting the kernel.\n", + "%load_ext autoreload\n", + "# Reload all modules imported with %aimport every time before executing the Python code typed.\n", + "%autoreload 2" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "7", + "metadata": {}, + "outputs": [], + "source": [ + "# Load the jobs DataFrame from DuckDB\n", + "preprocessed_jobs_df = load_and_preprocess_jobs(\n", + " db_path=\"../data/slurm_data.db\",\n", + " table_name=\"Jobs\",\n", + ")\n", + "display(preprocessed_jobs_df.head(10))\n", + "print(preprocessed_jobs_df.shape)" + ] + }, + { + "cell_type": "markdown", + "id": "8", + "metadata": {}, + "source": [ + "## [Example: Analyze workload efficiency of GPU users who set no VRAM constraints and used 0 GB of VRAM](#toc0_)\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "9", + "metadata": {}, + "outputs": [], + "source": [ + "efficiency_analysis = ea.EfficiencyAnalysis(jobs_df=preprocessed_jobs_df)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "10", + "metadata": {}, + "outputs": [], + "source": [ + "filtered_jobs = efficiency_analysis.filter_jobs_for_analysis(\n", + " vram_constraint_filter=pd.NA, # No VRAM constraints\n", + " gpu_mem_usage_filter=0, # Used 0 GB of VRAM\n", + ")\n", + "filtered_jobs" + ] + }, + { + "cell_type": "markdown", + "id": "11", + "metadata": {}, + "source": [ + "Generate all metrics:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "12", + "metadata": {}, + "outputs": [], + "source": [ + "metrics_dict = efficiency_analysis.calculate_all_efficiency_metrics(filtered_jobs)\n", + "\n", + "jobs_with_metrics = metrics_dict[\"jobs_with_efficiency_metrics\"]\n", + "users_with_metrics = metrics_dict[\"users_with_efficiency_metrics\"]\n", + "pi_accounts_with_metrics = metrics_dict[\"pi_accounts_with_efficiency_metrics\"]" + ] + }, + { + "cell_type": "markdown", + "id": "13", + "metadata": {}, + "source": [ + "### [Job Efficiency Metrics](#toc0_)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "14", + "metadata": {}, + "outputs": [], + "source": [ + "# Set option to display all columns\n", + "pd.set_option(\"display.max_columns\", None)\n", + "# Display the DataFrame\n", + "display(jobs_with_metrics.head(10))\n", + "# To revert to default settings (optional)\n", + "pd.reset_option(\"display.max_columns\")\n", + "\n", + "print(f\"Jobs found: {len(jobs_with_metrics)}\")" + ] + }, + { + "cell_type": "markdown", + "id": "15", + "metadata": {}, + "source": [ + "#### [Find most inefficient jobs with no VRAM constraints based on `vram_hours`](#toc0_)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "16", + "metadata": {}, + "outputs": [], + "source": [ + "inefficient_jobs_vram_hours = efficiency_analysis.sort_and_filter_records_with_metrics(\n", + " metrics_df_name_enum=ea.MetricsDataFrameNameEnum.JOBS,\n", + " sorting_key=\"vram_hours\",\n", + " ascending=False, # Sort by vram_hours in descending order\n", + " filter_criteria={\n", + " \"vram_hours\": {\"min\": 80 * 24, \"inclusive\": True}, # VRAM-hours threshold for identifying inefficient jobs\n", + " },\n", + ")\n", + "# Display top inefficient users by VRAM-hours\n", + "print(\"\\nTop inefficient Jobs by VRAM-hours:\")\n", + "display(inefficient_jobs_vram_hours.head(10))\n", + "\n", + "# Plot top inefficient jobs by VRAM-hours, with VRAM-hours as labels\n", + "jobs_with_metrics_visualizer = JobsWithMetricsVisualizer(inefficient_jobs_vram_hours.head(20))\n", + "jobs_with_metrics_visualizer.visualize(\n", + " column=\"vram_hours\",\n", + " bar_label_columns=[\"vram_hours\", \"job_hours\"],\n", + ")" + ] + }, + { + "cell_type": "markdown", + "id": "17", + "metadata": {}, + "source": [ + "### [User Efficiency Metrics](#toc0_)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "18", + "metadata": {}, + "outputs": [], + "source": [ + "users_with_metrics" + ] + }, + { + "cell_type": "markdown", + "id": "19", + "metadata": {}, + "source": [ + "#### [Find Inefficient Users based on `expected_value_alloc_vram_efficiency`](#toc0_)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "20", + "metadata": {}, + "outputs": [], + "source": [ + "inefficient_users_alloc_vram_eff = efficiency_analysis.sort_and_filter_records_with_metrics(\n", + " metrics_df_name_enum=ea.MetricsDataFrameNameEnum.USERS,\n", + " sorting_key=\"expected_value_alloc_vram_efficiency\",\n", + " ascending=True, # we want to find users with low efficiency\n", + " filter_criteria={\n", + " \"expected_value_alloc_vram_efficiency\": {\"max\": 0.3, \"inclusive\": True},\n", + " \"job_count\": {\"min\": 5, \"inclusive\": True}, # Minimum number of jobs to consider a user\n", + " },\n", + ")\n", + "print(\"\\nTop inefficient users by allocated vram efficiency:\")\n", + "display(inefficient_users_alloc_vram_eff.head(20))\n", + "\n", + "# Plot top inefficient users by allocated vram efficiency, with allocated vram efficiency as labels\n", + "users_with_metrics_visualizer = UsersWithMetricsVisualizer(inefficient_users_alloc_vram_eff.head(20))\n", + "users_with_metrics_visualizer.visualize(\n", + " column=\"expected_value_alloc_vram_efficiency\",\n", + " bar_label_columns=[\"expected_value_alloc_vram_efficiency\", \"user_job_hours\"],\n", + " figsize=(8, 10),\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "21", + "metadata": {}, + "outputs": [], + "source": [ + "inefficient_users = efficiency_analysis.sort_and_filter_records_with_metrics(\n", + " metrics_df_name_enum=ea.MetricsDataFrameNameEnum.USERS,\n", + " sorting_key=\"expected_value_alloc_vram_efficiency\",\n", + " ascending=True, # we want to find users with low efficiency\n", + " filter_criteria={\n", + " \"expected_value_alloc_vram_efficiency\": {\"max\": 0.3, \"inclusive\": True},\n", + " \"job_count\": {\"min\": 5, \"inclusive\": True}, # Minimum number of jobs to consider a user\n", + " },\n", + ")\n", + "\n", + "# Display top inefficient users by job count\n", + "print(\"\\nTop inefficient users by allocated vram efficiency:\")\n", + "display(inefficient_users.head(10))\n", + "\n", + "\n", + "# Plot top inefficient users by GPU hours, with efficiency as labels\n", + "top_users = inefficient_users.head(10)\n", + "\n", + "plt.figure(figsize=(8, 5))\n", + "barplot = sns.barplot(y=top_users[\"User\"], x=top_users[\"user_job_hours\"], orient=\"h\")\n", + "plt.xlabel(\"Job Hours\")\n", + "plt.ylabel(\"User\")\n", + "plt.title(\"Top 10 Inefficient Users by Allocated VRAM Efficiency Contribution\")\n", + "\n", + "# Annotate bars with expected_value_alloc_vram_efficiency, keeping text fully inside the plot's right spine\n", + "ax = barplot\n", + "xmax = top_users[\"user_job_hours\"].max()\n", + "# Add headroom for annotation space (20% extra)\n", + "xlim = xmax * 1.20 if xmax > 0 else 1\n", + "ax.set_xlim(0, xlim)\n", + "\n", + "# Calculate annotation x-position: place at 98% of xlim or just left of the right spine, whichever is smaller\n", + "for i, (job_hours, efficiency) in enumerate(\n", + " zip(\n", + " top_users[\"user_job_hours\"],\n", + " top_users[\"expected_value_alloc_vram_efficiency\"],\n", + " strict=True,\n", + " )\n", + "):\n", + " # Place annotation at min(job_hours + 2% of xlim, 98% of xlim)\n", + " xpos = min(job_hours + xlim * 0.02, xlim * 0.98)\n", + " # If bar is very close to right spine, nudge annotation left to avoid overlap\n", + " if xpos > xlim * 0.96:\n", + " xpos = xlim * 0.96\n", + " ax.text(xpos, i, f\"Eff: {efficiency:.2f}\", va=\"center\", ha=\"left\", fontsize=10, color=\"black\", clip_on=True)\n", + "\n", + "plt.tight_layout()\n", + "plt.show()" + ] + }, + { + "cell_type": "markdown", + "id": "22", + "metadata": {}, + "source": [ + "#### [Find Inefficient Users based on `vram_hours`](#toc0_)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "23", + "metadata": {}, + "outputs": [], + "source": [ + "inefficient_users_vram_hours = efficiency_analysis.find_inefficient_users_by_vram_hours(\n", + " vram_hours_filter={\"min\": 200, \"inclusive\": True}, # VRAM-hours threshold for identifying inefficient users\n", + " min_jobs=5, # Minimum number of jobs to consider a user\n", + ")\n", + "# Display top inefficient users by VRAM-hours\n", + "print(\"\\nTop inefficient users by VRAM-hours:\")\n", + "display(inefficient_users_vram_hours.head(20))\n", + "\n", + "\n", + "# Plot top inefficient users by VRAM-hours, with VRAM-hours as labels\n", + "users_with_metrics_visualizer = UsersWithMetricsVisualizer(inefficient_users_vram_hours.head(20))\n", + "users_with_metrics_visualizer.visualize(\n", + " column=\"vram_hours\", bar_label_columns=[\"vram_hours\", \"user_job_hours\"], figsize=(8, 10)\n", + ")" + ] + }, + { + "cell_type": "markdown", + "id": "24", + "metadata": {}, + "source": [ + "### [PI Group Efficiency Metrics](#toc0_)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "25", + "metadata": {}, + "outputs": [], + "source": [ + "pi_accounts_with_metrics" + ] + }, + { + "cell_type": "markdown", + "id": "26", + "metadata": {}, + "source": [ + "#### [Find Inefficient PIs based on `vram_hours`](#toc0_)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "27", + "metadata": {}, + "outputs": [], + "source": [ + "inefficient_pis_vram_hours = efficiency_analysis.sort_and_filter_records_with_metrics(\n", + " metrics_df_name_enum=ea.MetricsDataFrameNameEnum.PI_GROUPS,\n", + " sorting_key=\"pi_acc_vram_hours\",\n", + " ascending=False,\n", + " filter_criteria={\n", + " \"pi_acc_vram_hours\": {\"min\": 200, \"inclusive\": True}, # VRAM-hours threshold for identifying inefficient users\n", + " \"job_count\": {\"min\": 5, \"inclusive\": True}, # Minimum number of jobs to consider a PI account\n", + " },\n", + ")\n", + "# Display top inefficient users by VRAM-hours\n", + "print(\"\\nTop inefficient PI Groups by VRAM-hours:\")\n", + "display(inefficient_pis_vram_hours.head(20))\n", + "\n", + "top_pi_accounts = inefficient_pis_vram_hours.head(20)\n", + "\n", + "# Plot top inefficient users by VRAM-hours, with VRAM-hours as labels\n", + "plt.figure(figsize=(8, 8))\n", + "barplot = sns.barplot(\n", + " y=top_pi_accounts[\"pi_account\"],\n", + " x=top_pi_accounts[\"pi_acc_vram_hours\"],\n", + " order=top_pi_accounts[\"pi_account\"].tolist(), # Only show present values\n", + " orient=\"h\",\n", + ")\n", + "plt.xlabel(\"VRAM-Hours\")\n", + "plt.ylabel(\"PI Account\")\n", + "plt.title(\"Top Inefficient PI Accounts by VRAM-Hours\")\n", + "# Annotate bars with gpu_hours, keeping text fully inside the plot's right spine\n", + "ax = barplot\n", + "xmax = top_pi_accounts[\"pi_acc_vram_hours\"].max()\n", + "# Add headroom for annotation space (20% extra)\n", + "xlim = xmax * 1.6 if xmax > 0 else 1\n", + "ax.set_xlim(0, xlim)\n", + "# Calculate annotation x-position: place at 98% of xlim or just left of the right spine, whichever is smaller\n", + "for i, (vram_hours, pi_acc_job_hours) in enumerate(\n", + " zip(\n", + " top_pi_accounts[\"pi_acc_vram_hours\"],\n", + " top_pi_accounts[\"pi_acc_job_hours\"],\n", + " strict=True,\n", + " )\n", + "):\n", + " # Place annotation at min(vram_hours + 2% of xlim, 98% of xlim)\n", + " xpos = min(vram_hours + xlim * 0.02, xlim * 0.98)\n", + " ax.text(\n", + " xpos,\n", + " i,\n", + " f\"VRAM-Hours: {vram_hours:.2f}\\n Job Hours: {pi_acc_job_hours:.2f}\",\n", + " va=\"center\",\n", + " ha=\"left\",\n", + " fontsize=10,\n", + " color=\"black\",\n", + " clip_on=True,\n", + " )\n", + "plt.tight_layout()\n", + "plt.show()" + ] + }, + { + "cell_type": "markdown", + "id": "28", + "metadata": {}, + "source": [ + "## [Example: Analyze all jobs with no VRAM constraints](#toc0_)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "29", + "metadata": {}, + "outputs": [], + "source": [ + "# Filter jobs where no VRAM constraint was set but a GPU was allocated\n", + "no_vram_constraint_efficiency_analysis = ea.EfficiencyAnalysis(jobs_df=preprocessed_jobs_df)\n", + "all_no_vram_constraint_jobs = no_vram_constraint_efficiency_analysis.filter_jobs_for_analysis(\n", + " vram_constraint_filter={\"min\": 0, \"inclusive\": False}, # No VRAM constraints\n", + " gpu_count_filter={\"min\": 1, \"inclusive\": True}, # At least one GPU allocated\n", + " gpu_mem_usage_filter={\"min\": 0, \"inclusive\": False}, # Used more than 0 GiB of VRAM\n", + ")\n", + "\n", + "display(all_no_vram_constraint_jobs.head(10))\n", + "print(all_no_vram_constraint_jobs.shape)" + ] + }, + { + "cell_type": "markdown", + "id": "30", + "metadata": {}, + "source": [ + "### [Job Efficiency Metrics](#toc0_)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "31", + "metadata": {}, + "outputs": [], + "source": [ + "no_vram_constraint_jobs_with_metrics = no_vram_constraint_efficiency_analysis.calculate_job_efficiency_metrics(\n", + " all_no_vram_constraint_jobs\n", + ")\n", + "\n", + "# Set option to display all columns\n", + "pd.set_option(\"display.max_columns\", None)\n", + "# Display the DataFrame\n", + "display(no_vram_constraint_jobs_with_metrics.head(10))\n", + "# To revert to default settings (optional)\n", + "pd.reset_option(\"display.max_columns\")\n", + "print(f\"Jobs found: {len(no_vram_constraint_jobs_with_metrics)}\")" + ] + }, + { + "cell_type": "markdown", + "id": "32", + "metadata": {}, + "source": [ + "#### [Problem with duplicate JobIDs](#toc0_)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "33", + "metadata": {}, + "outputs": [], + "source": [ + "# select jobs with specific job id\n", + "pd.set_option(\"display.max_columns\", None)\n", + "# Display the DataFrame\n", + "display(no_vram_constraint_jobs_with_metrics[no_vram_constraint_jobs_with_metrics[\"JobID\"] == 24374463])\n", + "pd.reset_option(\"display.max_columns\")" + ] + }, + { + "cell_type": "markdown", + "id": "34", + "metadata": {}, + "source": [ + "#### [Top users with most number of jobs that have no VRAM constraints](#toc0_)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "35", + "metadata": {}, + "outputs": [], + "source": [ + "# Plot top users by number of jobs with no VRAM constraints\n", + "if not all_no_vram_constraint_jobs.empty:\n", + " plt.figure(figsize=(10, 5))\n", + " user_counts = all_no_vram_constraint_jobs[\"User\"].value_counts().head(20)\n", + " sns.barplot(x=user_counts.values, y=user_counts.index, orient=\"h\")\n", + " plt.xlabel(\"Number of Jobs\")\n", + " plt.ylabel(\"User\")\n", + " plt.title(\"Top 20 Users: Jobs with no VRAM Constraints\")\n", + " plt.tight_layout()\n", + " plt.show()\n", + "else:\n", + " print(\"No jobs found without VRAM constraints.\")" + ] + }, + { + "cell_type": "markdown", + "id": "36", + "metadata": {}, + "source": [ + "#### [Find inefficient jobs with no VRAM Constraints based on `alloc_vram_efficiency_score`](#toc0_)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "37", + "metadata": {}, + "outputs": [], + "source": [ + "low_alloc_vram_score_jobs = no_vram_constraint_efficiency_analysis.sort_and_filter_records_with_metrics(\n", + " metrics_df_name_enum=ea.MetricsDataFrameNameEnum.JOBS,\n", + " sorting_key=\"alloc_vram_efficiency_score\",\n", + " ascending=True, # Sort by alloc_vram_efficiency_score in ascending order\n", + " filter_criteria={\n", + " \"alloc_vram_efficiency_score\": {\"max\": -10, \"inclusive\": True}, # score threshold\n", + " },\n", + ")\n", + "# Display top inefficient users by alloc_vram_efficiency_score\n", + "print(\"\\nTop inefficient Jobs by allocated VRAM efficiency score:\")\n", + "\n", + "display(low_alloc_vram_score_jobs.head(20))\n", + "\n", + "jobs_with_metrics_visualizer = JobsWithMetricsVisualizer(low_alloc_vram_score_jobs.head(20))\n", + "jobs_with_metrics_visualizer.visualize(\n", + " column=\"alloc_vram_efficiency_score\",\n", + " bar_label_columns=[\"alloc_vram_efficiency_score\", \"job_hours\"],\n", + " figsize=(10, 12),\n", + ")" + ] + } + ], + "metadata": { + "language_info": { + "name": "python" + } + }, + "nbformat": 4, + "nbformat_minor": 5 } diff --git a/src/analysis/__init__.py b/src/analysis/__init__.py index 6515e0f..403d021 100644 --- a/src/analysis/__init__.py +++ b/src/analysis/__init__.py @@ -1,4 +1 @@ from .efficiency_analysis import EfficiencyAnalysis as EfficiencyAnalysis -from .efficiency_analysis import ( - load_preprocessed_jobs_dataframe_from_duckdb as load_preprocessed_jobs_dataframe_from_duckdb, -) diff --git a/src/analysis/efficiency_analysis.py b/src/analysis/efficiency_analysis.py index fd6f120..879fde6 100644 --- a/src/analysis/efficiency_analysis.py +++ b/src/analysis/efficiency_analysis.py @@ -4,53 +4,12 @@ The aim is to identify potential inefficiencies in GPU usage and notify users or PIs about these issues. """ -from pathlib import Path from typing import cast import numpy as np import pandas as pd - from src.config.constants import DEFAULT_MIN_ELAPSED_SECONDS from src.config.enum_constants import FilterTypeEnum, MetricsDataFrameNameEnum -from src.database import DatabaseConnection -from src.preprocess.preprocess import preprocess_data - - -def load_preprocessed_jobs_dataframe_from_duckdb( - db_path: str | Path, - table_name: str = "Jobs", - sample_size: int | None = None, - random_state: pd._typing.RandomState | None = None, -) -> pd.DataFrame: - """ - Load jobs DataFrame from a DuckDB database and preprocess it. - - Args: - db_path (str or Path): Path to the DuckDB database. - table_name (str, optional): Table name to query. Defaults to 'Jobs'. - sample_size (int, optional): Number of rows to sample from the DataFrame. Defaults to None (no sampling). - random_state (pd._typing.RandomState, optional): Random state for reproducibility. Defaults to None. - - Returns: - pd.DataFrame: DataFrame containing the table data. - - Raises: - RuntimeError: If the jobs DataFrame cannot be loaded from the database. - """ - if isinstance(db_path, Path): - db_path = db_path.resolve() - try: - db = DatabaseConnection(str(db_path)) - - jobs_df = db.fetch_all_jobs(table_name=table_name) - processed_data = preprocess_data( - jobs_df, min_elapsed_seconds=0, include_failed_cancelled_jobs=False, include_cpu_only_jobs=False - ) - if sample_size is not None: - processed_data = processed_data.sample(n=sample_size, random_state=random_state) - return processed_data - except Exception as e: - raise RuntimeError(f"Failed to load jobs DataFrame: {e}") from e class EfficiencyAnalysis: diff --git a/src/config/enum_constants.py b/src/config/enum_constants.py index 584cefa..de7b886 100644 --- a/src/config/enum_constants.py +++ b/src/config/enum_constants.py @@ -172,3 +172,90 @@ class PreprocessingErrorTypeEnum(Enum): UNKNOWN_GPU_TYPE = "Unknown GPU Type" NO_VALID_NODES = "No Valid Nodes" GPU_TYPE_NULL = "GPU Type is Null" + + +@unique +class OptionalColumnsEnum(Enum): + """ + An enumeration representing optional columns used for filtering in preprocess code. + + Attributes: + STATUS: Job status column. + ACCOUNT: Account column. + QOS: Quality of Service column. + ARRAY_ID: Position in job array. + JOB_NAME: Name of job. + IS_ARRAY: Indicator if job is part of an array. + INTERACTIVE: Indicator if job was interactive. + USER: Unity user. + EXIT_CODE: Job exit code. + TIME_LIMIT: Job time limit (seconds). + GPU_COMPUTE_USAGE: GPU compute usage (pct). + CPUS: Number of CPUs. + MEMORY: Job allocated memory (bytes). + CPU_MEM_USAGE: CPU memory usage column. + CPU_COMPUTE_USAGE: CPU compute usage (pct). + """ + + STATUS = "Status" + ACCOUNT = "Account" + QOS = "QOS" + ARRAY_ID = "ArrayID" + JOB_NAME = "JobName" + IS_ARRAY = "IsArray" + INTERACTIVE = "Interactive" + USER = "User" + EXIT_CODE = "ExitCode" + TIME_LIMIT = "TimeLimit" + GPU_COMPUTE_USAGE = "GPUComputeUsage" + CPUS = "CPUs" + MEMORY = "Memory" + CPU_MEM_USAGE = "CPUMemUsage" + CPU_COMPUTE_USAGE = "CPUComputeUsage" + + +@unique +class RequiredColumnsEnum(Enum): + """ + An enumeration representing required columns that must be present in the dataframe. + + Attributes: + GPU_TYPE: GPU type column. + CONSTRAINTS: Job constraints column. + START_TIME: Job start time column. + SUBMIT_TIME: Job submit time column. + NODE_LIST: Node list column. + GPUS: Number of GPUs column. + GPU_MEM_USAGE: GPU memory usage column. + PARTITION: Partition column. + ELAPSED: Job elapsed time column. + """ + + JOB_ID = "JobID" + GPU_TYPE = "GPUType" + CONSTRAINTS = "Constraints" + START_TIME = "StartTime" + SUBMIT_TIME = "SubmitTime" + NODE_LIST = "NodeList" + GPUS = "GPUs" + GPU_MEM_USAGE = "GPUMemUsage" + ELAPSED = "Elapsed" + PARTITION = "Partition" + + +@unique +class ExcludedColumnsEnum(Enum): + """ + An enumeration representing columns that should be omitted during preprocessing. + + Attributes: + UUID: Unique identifier column. + END_TIME: Job end time column. + NODES: Number of nodes column. + PREEMPTED: Job preemption status column. + """ + + UUID = "UUID" + END_TIME = "EndTime" + NODES = "Nodes" + PREEMPTED = "Preempted" diff --git a/src/database/database_connection.py b/src/database/database_connection.py index 750d171..848493d 100644 --- a/src/database/database_connection.py +++ b/src/database/database_connection.py @@ -36,12 +36,8 @@ def disconnect(self) -> None: if self.connection is not None: self.connection.close() self.connection = None - - def __del__(self) -> None: - """Ensure the connection is closed when the object is deleted.""" - self.disconnect() - if os.getenv("RUN_ENV") != "TEST": - print(f"Disconnected from {self.db_url}") + if os.getenv("RUN_ENV") != "TEST": + print(f"Disconnected from {self.db_url}") def is_connected(self) -> bool: """ diff --git a/src/preprocess/preprocess.py b/src/preprocess/preprocess.py index 9c69ed5..7a8f86b 100644 --- a/src/preprocess/preprocess.py +++ b/src/preprocess/preprocess.py @@ -20,6 +20,9 @@ QOSEnum, PartitionTypeEnum, PreprocessingErrorTypeEnum, + OptionalColumnsEnum, + RequiredColumnsEnum, + ExcludedColumnsEnum, ) from ..config.remote_config import PartitionInfoFetcher from ..config.paths import PREPROCESSING_ERRORS_LOG_FILE @@ -618,18 +621,238 @@ def _fill_missing(res: pd.DataFrame, include_cpu_only_jobs: bool) -> None: # all NaN values are np.nan # fill default values for specific columns - res.loc[:, "ArrayID"] = res["ArrayID"].fillna(-1) - res.loc[:, "Interactive"] = res["Interactive"].fillna("non-interactive") - res.loc[:, "Constraints"] = ( - res["Constraints"].fillna("").apply(lambda x: [] if isinstance(x, str) and x == "" else list(x)) - ) + fill_map = { + "ArrayID": lambda col: col.fillna(-1), + "Interactive": lambda col: col.fillna("non-interactive"), + "Constraints": lambda col: col.fillna("").apply(lambda x: [] if isinstance(x, str) and x == "" else list(x)), + "GPUs": lambda col: col.fillna(0), + } + res.loc[:, "GPUType"] = res.apply( lambda row: _safe_apply_function( _validate_gpu_type, row["GPUType"], include_cpu_only_jobs, job_id=row["JobID"], idx=row.name ), axis=1, ) - res.loc[:, "GPUs"] = res["GPUs"].fillna(0) + + for col, fill_func in fill_map.items(): + if col in res.columns: + res.loc[:, col] = fill_func(res[col]) + + +def _validate_columns_and_filter_records( + data: pd.DataFrame, + min_elapsed_seconds: int, + include_failed_cancelled_jobs: bool, + include_cpu_only_jobs: bool, + include_custom_qos_jobs: bool, +) -> pd.DataFrame: + """ + Validate required columns and filter records based on specified criteria. + + This function performs two main operations: + 1. Validates that all required columns are present and warns about missing optional columns + 2. Applies filtering conditions to remove unwanted records based on various criteria + + Args: + data (pd.DataFrame): The input dataframe to validate and filter. + min_elapsed_seconds (int): Minimum elapsed time in seconds to keep a job record. + include_failed_cancelled_jobs (bool): Whether to include jobs with status FAILED or CANCELLED. + include_cpu_only_jobs (bool): Whether to include jobs that do not use GPUs (CPU-only jobs). + include_custom_qos_jobs (bool): Whether to include entries with custom qos values. + + Returns: + pd.DataFrame: The validated and filtered dataframe. + + Raises: + KeyError: If any columns in RequiredColumnsEnum do not exist in the dataframe. + + Notes: + # Handling missing columns logic: + - columns in REQUIRED_COLUMNS are columns that are must-have for basic metrics calculation. + - columns in OPTIONAL_COLUMNS are columns that are involved in preprocessing logics. + - For any columns in REQUIRED_COLUMNS that do not exist, a KeyError will be raised. + - For any columns in OPTIONAL_COLUMNS but not in REQUIRED_COLUMNS, a warning will be raised. + - _fill_missing, records filtering, and type conversion logic will happen only if columns involved exist + + """ + qos_values = set([member.value for member in QOSEnum]) + exist_column_set = set(data.columns.to_list()) + + # Ensure required columns are present + for required_col in RequiredColumnsEnum: + if required_col.value not in exist_column_set: + raise KeyError(f"Column {required_col.value} does not exist in dataframe.") + + # raise warnings if optional columns are not present + for optional_col in OptionalColumnsEnum: + if optional_col.value not in exist_column_set: + warnings.warn( + ( + f"Column '{optional_col.value}' is missing from the dataframe. " + "This may impact filtering operations and downstream processing." + ), + UserWarning, + stacklevel=2, + ) + + # filtering records + mask = pd.Series([True] * len(data), index=data.index) + + # Get partition info for GPU filtering + partition_info = PartitionInfoFetcher().get_info() + gpu_partitions = [p["name"] for p in partition_info if p["type"] == PartitionTypeEnum.GPU.value] + + filter_conditions = { + "Elapsed": lambda df: df["Elapsed"] >= min_elapsed_seconds, + "Account": lambda df: df["Account"] != AdminsAccountEnum.ROOT.value, + "Partition": lambda df: (df["Partition"] != AdminPartitionEnum.BUILDING.value) + & (include_cpu_only_jobs | df["Partition"].isin(gpu_partitions)), + "QOS": lambda df: (df["QOS"] != QOSEnum.UPDATES.value) + & (include_custom_qos_jobs | df["QOS"].isin(qos_values)), + "Status": lambda df: include_failed_cancelled_jobs + | ((df["Status"] != StatusEnum.FAILED.value) & (df["Status"] != StatusEnum.CANCELLED.value)), + } + + for col, func in filter_conditions.items(): + if col not in exist_column_set: + continue + mask &= func(data) + + return data[mask].copy() + + +def _cast_type_and_add_columns(data: pd.DataFrame) -> None: + """ + Cast existing columns to appropriate data types and add derived metrics as new columns. + + Handles both empty and non-empty dataframes by applying type casting to existing columns + and either adding empty columns with correct dtypes or calculating actual derived values. + + Raises a warning if the dataframe is empty after preprocessing operations. + + Args: + data (pd.DataFrame): The dataframe to modify. Must contain the required columns for processing. + + Returns: + None: The function modifies the DataFrame in place. + + Warnings: + UserWarning: If the dataframe is empty after filtering and preprocessing operations. + """ + exist_column_set = set(data.columns.to_list()) + + if data.empty: + # Raise warning for empty dataframe + warnings.warn("Dataframe results from database and filtering is empty.", UserWarning, stacklevel=3) + + # Type casting for columns involving time + time_columns = ["StartTime", "SubmitTime"] + for col in time_columns: + if col not in exist_column_set: + continue + data[col] = pd.to_datetime(data[col], errors="coerce") + + duration_columns = ["TimeLimit", "Elapsed"] + for col in duration_columns: + if col not in exist_column_set: + continue + target_col = data[col] * 60 if col == "TimeLimit" else data[col] + data[col] = pd.to_timedelta(target_col, unit="s", errors="coerce") + + # Convert columns to categorical + for col, enum_obj in ATTRIBUTE_CATEGORIES.items(): + if col not in exist_column_set: + continue + enum_values = [e.value for e in enum_obj] + unique_values = data[col].unique().tolist() + all_categories = list(set(enum_values) | set(unique_values)) + data[col] = pd.Categorical(data[col], categories=all_categories, ordered=False) + + if data.empty: + # Add new columns with correct types for empty dataframe + data["Queued"] = pd.Series([], dtype="timedelta64[ns]") + data["vram_constraint"] = pd.Series([], dtype=pd.Int64Dtype()) + data["partition_constraint"] = pd.Series([], dtype=pd.Int64Dtype()) + data["requested_vram"] = pd.Series([], dtype=pd.Int64Dtype()) + data["allocated_vram"] = pd.Series([], dtype=pd.Int64Dtype()) + # Only add user_jobs/account_jobs if columns exist + if "User" in data.columns: + data["user_jobs"] = pd.Series([], dtype=pd.Int64Dtype()) + if "Account" in data.columns: + data["account_jobs"] = pd.Series([], dtype=pd.Int64Dtype()) + else: + # Calculate queue time + data.loc[:, "Queued"] = data["StartTime"] - data["SubmitTime"] + + # Apply all metrics using the single safe function + data.loc[:, "vram_constraint"] = data.apply( + lambda row: _safe_apply_function( + _get_vram_constraint, row["Constraints"], row["GPUs"], job_id=row["JobID"], idx=row.name + ), + axis=1, + ).astype(pd.Int64Dtype()) + + data.loc[:, "partition_constraint"] = data.apply( + lambda row: _safe_apply_function( + _get_partition_constraint, row["Partition"], row["GPUs"], job_id=row["JobID"], idx=row.name + ), + axis=1, + ).astype(pd.Int64Dtype()) + + data.loc[:, "requested_vram"] = data.apply( + lambda row: _safe_apply_function( + _get_requested_vram, + row["vram_constraint"], + row["partition_constraint"], + job_id=row["JobID"], + idx=row.name, + ), + axis=1, + ).astype(pd.Int64Dtype()) + + data.loc[:, "allocated_vram"] = data.apply( + lambda row: _safe_apply_function( + _get_approx_allocated_vram, + row["GPUType"], + row["NodeList"], + row["GPUs"], + row["GPUMemUsage"], + job_id=row["JobID"], + idx=row.name, + ), + axis=1, + ) + + if error_indices: + data = data.drop(index=list(error_indices)).reset_index(drop=True) + + # Add derived columns for user_jobs and account_jobs only if the source columns exist + if "User" in exist_column_set: + data.loc[:, "user_jobs"] = data.groupby("User", observed=True)["User"].transform("size") + if "Account" in exist_column_set: + data.loc[:, "account_jobs"] = data.groupby("Account", observed=True)["Account"].transform("size") + + +def _check_for_infinity_values(data: pd.DataFrame) -> None: + """ + Check for infinity values in memory usage columns and raise warnings if found. + + Args: + data (pd.DataFrame): The dataframe to check for infinity values. + + Returns: + None: The function only raises warnings if infinity values are found. + """ + mem_usage_columns = ["CPUMemUsage", "GPUMemUsage"] + exist_column_set = set(data.columns.to_list()) + for col_name in mem_usage_columns: + if col_name not in exist_column_set: + continue + filtered = data[data[col_name] == np.inf].copy() + if len(filtered) > 0: + message = f"Some entries in {col_name} having infinity values. This may be caused by an overflow." + warnings.warn(message=message, stacklevel=2, category=UserWarning) def _write_preprocessing_error_logs(preprocessing_error_logs: list[dict]) -> None: @@ -682,20 +905,24 @@ def preprocess_data( min_elapsed_seconds: int = DEFAULT_MIN_ELAPSED_SECONDS, include_failed_cancelled_jobs: bool = False, include_cpu_only_jobs: bool = False, + include_custom_qos_jobs: bool = False, + apply_filter: bool = True, ) -> pd.DataFrame: """ Preprocess dataframe, filtering out unwanted rows and columns, filling missing values and converting types. This function will take in a dataframe to create a new dataframe satisfying given criteria. + Args: input_df (pd.DataFrame): The input dataframe containing job data. min_elapsed_seconds (int, optional): Minimum elapsed time in seconds to keep a job record. Defaults to 600. include_failed_cancelled_jobs (bool, optional): Whether to include jobs with status FAILED or CANCELLED. include_cpu_only_jobs (bool, optional): Whether to include jobs that do not use GPUs (CPU-only jobs). - - Returns: - pd.DataFrame: The preprocessed dataframe + include_custom_qos_jobs (bool, optional): Whether to include entries with custom qos values or not. + Default to False + apply_filter (bool, optional): Whether to apply filtering operations and columns removal to the data. + Defaults to True. Notes: - The function supports two formats for the 'GPUType' column in the dataframe: @@ -704,104 +931,39 @@ def preprocess_data( - Both formats are automatically detected and handled for all VRAM calculations and downstream processing. - The output DataFrame will have missing values filled, time columns converted, and new columns added for VRAM and job statistics. - """ - cols_to_remove = [col for col in ["UUID", "EndTime", "Nodes", "Preempted"] if col in input_df.columns] - data = input_df.drop(columns=cols_to_remove, axis=1, inplace=False) + Returns: + pd.DataFrame: The preprocessed dataframe - first_non_null = data["GPUType"].dropna().iloc[0] + """ + data = input_df.copy() + if apply_filter: + # Drop unnecessary columns, ignoring errors in case any of them is not in the dataframe + data = input_df.drop( + columns=[member.value for member in ExcludedColumnsEnum if member.value in input_df.columns], + axis=1, + inplace=False, + ) + # Perform column validation and filtering + data = _validate_columns_and_filter_records( + data, + min_elapsed_seconds, + include_failed_cancelled_jobs, + include_cpu_only_jobs, + include_custom_qos_jobs, + ) # Log the format of GPUType being used - if isinstance(first_non_null, dict): - print("[Preprocessing] Running with new database format: GPU types as dictionary.") - elif isinstance(first_non_null, list): - print("[Preprocessing] Running with old database format: GPU types as list.") - - mask = pd.Series([True] * len(data), index=data.index) - - mask &= data["Elapsed"] >= min_elapsed_seconds - mask &= data["Account"] != AdminsAccountEnum.ROOT.value - mask &= data["Partition"] != AdminPartitionEnum.BUILDING.value - mask &= data["QOS"] != QOSEnum.UPDATES.value - # Filter out failed or cancelled jobs, except when include_failed_cancel_jobs is True - mask &= ( - (data["Status"] != StatusEnum.FAILED.value) & (data["Status"] != StatusEnum.CANCELLED.value) - ) | include_failed_cancelled_jobs - # Filter out jobs whose partition type is not 'gpu', unless include_cpu_only_jobs is True. - partition_info = PartitionInfoFetcher().get_info() - gpu_partitions = [p["name"] for p in partition_info if p["type"] == PartitionTypeEnum.GPU.value] - mask &= data["Partition"].isin(gpu_partitions) | include_cpu_only_jobs - - data = data[mask].copy() - + if not data.empty: + first_non_null = data["GPUType"].dropna().iloc[0] + if isinstance(first_non_null, dict): + print("[Preprocessing] Running with new database format: GPU types as dictionary.") + elif isinstance(first_non_null, list): + print("[Preprocessing] Running with old database format: GPU types as list.") _fill_missing(data, include_cpu_only_jobs) + _cast_type_and_add_columns(data) - # Type casting for columns involving time - time_columns = ["StartTime", "SubmitTime"] - for col in time_columns: - data[col] = pd.to_datetime(data[col], errors="coerce") - - time_limit_in_seconds = data["TimeLimit"] * 60 - data["TimeLimit"] = pd.to_timedelta(time_limit_in_seconds, unit="s", errors="coerce") - data["Elapsed"] = pd.to_timedelta(data["Elapsed"], unit="s", errors="coerce") - - # Added parameters for calculating VRAM metrics - data.loc[:, "Queued"] = data["StartTime"] - data["SubmitTime"] - - # Apply all metrics using the single safe function - data.loc[:, "vram_constraint"] = data.apply( - lambda row: _safe_apply_function( - _get_vram_constraint, row["Constraints"], row["GPUs"], job_id=row["JobID"], idx=row.name - ), - axis=1, - ).astype(pd.Int64Dtype()) - - data.loc[:, "partition_constraint"] = data.apply( - lambda row: _safe_apply_function( - _get_partition_constraint, row["Partition"], row["GPUs"], job_id=row["JobID"], idx=row.name - ), - axis=1, - ).astype(pd.Int64Dtype()) - - data.loc[:, "requested_vram"] = data.apply( - lambda row: _safe_apply_function( - _get_requested_vram, row["vram_constraint"], row["partition_constraint"], job_id=row["JobID"], idx=row.name - ), - axis=1, - ).astype(pd.Int64Dtype()) - - data.loc[:, "allocated_vram"] = data.apply( - lambda row: _safe_apply_function( - _get_approx_allocated_vram, - row["GPUType"], - row["NodeList"], - row["GPUs"], - row["GPUMemUsage"], - job_id=row["JobID"], - idx=row.name, - ), - axis=1, - ) - - if error_indices: - data = data.drop(index=list(error_indices)).reset_index(drop=True) - - data.loc[:, "user_jobs"] = data.groupby("User")["User"].transform("size") - data.loc[:, "account_jobs"] = data.groupby("Account")["Account"].transform("size") - - # Convert columns to categorical - for col, enum_obj in ATTRIBUTE_CATEGORIES.items(): - enum_values = [e.value for e in enum_obj] - unique_values = data[col].unique().tolist() - all_categories = list(set(enum_values) | set(unique_values)) - data[col] = pd.Categorical(data[col], categories=all_categories, ordered=False) - - # Raise warning if GPUMemUsage or CPUMemUsage having infinity values - mem_usage_columns = ["CPUMemUsage", "GPUMemUsage"] - for col_name in mem_usage_columns: - filtered = data[data[col_name] == np.inf].copy() - if len(filtered) > 0: - message = f"Some entries in {col_name} having infinity values. This may be caused by an overflow." - warnings.warn(message=message, stacklevel=2, category=UserWarning) + # Check for infinity values in memory usage columns + _check_for_infinity_values(data) # Identify and handle duplicate JobIDs duplicate_rows = data[data["JobID"].duplicated(keep=False)] diff --git a/src/utilities/__init__.py b/src/utilities/__init__.py new file mode 100644 index 0000000..8cc37b0 --- /dev/null +++ b/src/utilities/__init__.py @@ -0,0 +1,4 @@ +from .load_and_preprocess_jobs import ( + load_and_preprocess_jobs as load_and_preprocess_jobs, + load_and_preprocess_jobs_custom_query as load_and_preprocess_jobs_custom_query, +) diff --git a/src/utilities/load_and_preprocess_jobs.py b/src/utilities/load_and_preprocess_jobs.py new file mode 100644 index 0000000..49764fc --- /dev/null +++ b/src/utilities/load_and_preprocess_jobs.py @@ -0,0 +1,157 @@ +import pandas as pd +from pathlib import Path +from src.preprocess.preprocess import preprocess_data +from src.database import DatabaseConnection +from src.config.constants import DEFAULT_MIN_ELAPSED_SECONDS +from src.config.enum_constants import ( + QOSEnum, + AdminPartitionEnum, + AdminsAccountEnum, + StatusEnum, + PartitionTypeEnum, + ExcludedColumnsEnum, +) +from src.config.remote_config import PartitionInfoFetcher +from datetime import datetime, timedelta + + +def load_and_preprocess_jobs( + db_path: str | Path, + table_name: str = "Jobs", + dates_back: int | None = None, + include_failed_cancelled_jobs: bool = False, + include_cpu_only_jobs: bool = False, + include_custom_qos_jobs: bool = False, + min_elapsed_seconds: int = DEFAULT_MIN_ELAPSED_SECONDS, + random_state: pd._typing.RandomState | None = None, + sample_size: int | None = None, +) -> pd.DataFrame: + """ + Load jobs DataFrame from a DuckDB database with standard filtering and preprocess it. + + This function constructs a SQL query with predefined filtering conditions based on the provided + parameters and then preprocesses the resulting data. + + Args: + db_path (str or Path): Path to the DuckDB database. + table_name (str, optional): Table name to query. Defaults to 'Jobs'. + dates_back (int, optional): Number of days back to filter jobs based on StartTime. + Defaults to None. If None, will not filter by startTime. + include_failed_cancelled_jobs (bool, optional): If True, include jobs with FAILED or CANCELLED status. + Defaults to False. + include_cpu_only_jobs (bool, optional): If True, include jobs that do not use GPUs (CPU-only jobs). + Defaults to False. + include_custom_qos_jobs (bool, optional): If True, include jobs with custom qos values. Defaults to False. + min_elapsed_seconds (int, optional): Minimum elapsed time in seconds to filter jobs by elapsed time. + Defaults to DEFAULT_MIN_ELAPSED_SECONDS. + random_state (pd._typing.RandomState, optional): Random state for reproducibility. Defaults to None. + sample_size (int, optional): Number of rows to sample from the DataFrame. Defaults to None (no sampling). + + Returns: + pd.DataFrame: Preprocessed DataFrame containing the filtered job data. + + Raises: + RuntimeError: If the jobs DataFrame cannot be loaded from the database. + """ + + # check if the query contains condition of date_back in the form "StartTime > date" + + if isinstance(db_path, Path): + db_path = db_path.resolve() + try: + db = DatabaseConnection(str(db_path)) + qos_values = "(" + ",".join(f"'{obj.value}'" for obj in QOSEnum) + ")" + excluded_columns = "(" + ", ".join(f"{obj.value}" for obj in ExcludedColumnsEnum) + ")" + + # get cpu partition list + partition_info = PartitionInfoFetcher().get_info() + gpu_partitions = [p["name"] for p in partition_info if p["type"] == PartitionTypeEnum.GPU.value] + gpu_partitions_str = "(" + ",".join(f"'{partition_name}'" for partition_name in gpu_partitions) + ")" + + conditions_arr = [ + f"Elapsed >= {min_elapsed_seconds}", + f"Account != '{AdminsAccountEnum.ROOT.value}'", + f"Partition != '{AdminPartitionEnum.BUILDING.value}'", + f"QOS != '{QOSEnum.UPDATES.value}'", + ] + if dates_back is not None: + cutoff = datetime.now() - timedelta(days=dates_back) + conditions_arr.append(f"StartTime >= '{cutoff}'") + if not include_custom_qos_jobs: + conditions_arr.append(f"QOS in {qos_values}") + if not include_cpu_only_jobs: + conditions_arr.append(f"Partition IN {gpu_partitions_str}") + if not include_failed_cancelled_jobs: + conditions_arr.append(f"Status != '{StatusEnum.FAILED.value}'") + conditions_arr.append(f"Status != '{StatusEnum.CANCELLED.value}'") + + query = f"SELECT * EXCLUDE {excluded_columns} FROM {table_name} WHERE {' AND '.join(conditions_arr)}" + jobs_df = db.fetch_query(query=query) + processed_data = preprocess_data(jobs_df, apply_filter=False) + if sample_size is not None: + processed_data = processed_data.sample(n=sample_size, random_state=random_state) + return processed_data + except Exception as e: + raise RuntimeError(f"Failed to load jobs DataFrame: {e}") from e + + +def load_and_preprocess_jobs_custom_query( + db_path: str | Path, + table_name: str = "Jobs", + custom_query: str | None = None, + random_state: pd._typing.RandomState | None = None, + sample_size: int | None = None, +) -> pd.DataFrame: + """ + Load jobs DataFrame from a DuckDB database using a custom SQL query and preprocess it. + + This function allows for complete control over the SQL query used to fetch data from the database. + The preprocessing is done with permissive settings to avoid filtering out any records that the + user specifically requested through their custom query. + + Args: + db_path (str or Path): Path to the DuckDB database. + table_name (str, optional): Table name to use in default query if custom_query is None. Defaults to 'Jobs'. + custom_query (str, optional): Custom SQL query to execute. If None, defaults to "SELECT * FROM {table_name}". + random_state (pd._typing.RandomState, optional): Random state for reproducibility. Defaults to None. + sample_size (int, optional): Number of rows to sample from the DataFrame. Defaults to None (no sampling). + + Returns: + pd.DataFrame: Preprocessed DataFrame containing the data returned by the custom query. + + Notes: + The preprocessing is performed with the following permissive settings: + - min_elapsed_seconds=0 (no minimum elapsed time filtering) + - include_failed_cancelled_jobs=True (include all job statuses) + - include_cpu_only_jobs=True (include CPU-only jobs) + - include_custom_qos_jobs=True (include custom QOS jobs) + + This ensures that the function doesn't inadvertently filter out records that the user + explicitly requested through their custom query. + + Raises: + RuntimeError: If the jobs DataFrame cannot be loaded from the database. + """ + if isinstance(db_path, Path): + db_path = db_path.resolve() + try: + db = DatabaseConnection(str(db_path)) + + if custom_query is None: + custom_query = f"SELECT * FROM {table_name}" + + jobs_df = db.fetch_query(custom_query) + + # Use permissive preprocessing settings to preserve all data from the custom query. + processed_data = preprocess_data( + jobs_df, + min_elapsed_seconds=0, + include_failed_cancelled_jobs=True, + include_cpu_only_jobs=True, + include_custom_qos_jobs=True, + ) + if sample_size is not None: + processed_data = processed_data.sample(n=sample_size, random_state=random_state) + return processed_data + except Exception as e: + raise RuntimeError(f"Failed to load jobs DataFrame: {e}") from e diff --git a/tests/conftest.py b/tests/conftest.py index 10c77cc..9cb47b6 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -7,22 +7,113 @@ from src.database import DatabaseConnection from .mock_data.convert_csv_to_db import convert_csv_to_db +from src.config.enum_constants import QOSEnum, AdminPartitionEnum, AdminsAccountEnum, PartitionTypeEnum, StatusEnum +from src.config.remote_config import PartitionInfoFetcher -@pytest.fixture(scope="module") -def mock_data_frame(request: pytest.FixtureRequest) -> Generator[pd.DataFrame]: - temp_db_dir = tempfile.mkdtemp() +def preprocess_mock_data( + db_path: str, + table_name: str = "Jobs", + min_elapsed_seconds: int = 0, + include_cpu_only_jobs: bool = False, + include_custom_qos_jobs: bool = False, + include_failed_cancelled_jobs: bool = False, +) -> pd.DataFrame: + """ + Helper function to filter job records from database based on various criteria. + + This function applies the same filtering logic as the preprocessing pipeline + to create a ground truth dataset for testing purposes. It filters out jobs + based on elapsed time, account type, partition type, QOS values, and status. + + Args: + db_path (str): Path to the DuckDB database file. + table_name (str, optional): Name of the table to query. Defaults to "Jobs". + min_elapsed_seconds (int, optional): Minimum elapsed time in seconds to filter jobs. + Jobs with elapsed time below this threshold are excluded. Defaults to 0. + include_cpu_only_jobs (bool, optional): If True, include jobs that run on CPU-only + partitions. If False, only include jobs from GPU partitions. Defaults to False. + include_custom_qos_jobs (bool, optional): If True, include jobs with custom QOS values + (not in the standard QOS enum). If False, only include jobs with standard QOS. + Defaults to False. + include_failed_cancelled_jobs (bool, optional): If True, include jobs with FAILED + or CANCELLED status. If False, exclude these jobs. Defaults to False. + + Returns: + pd.DataFrame: Filtered DataFrame containing job records that meet the specified criteria. + + Raises: + Exception: If there's an error during database operations or query execution. + + Note: + This function is used in tests to create expected results for comparison with + the actual pipeline output. It excludes jobs with: + - Root account + - Building partition + - Updates QOS + - Smaller elapsed time than min_elapsed_seconds + And applies additional filters based on the provided parameters. + """ + qos_values = "(" + ",".join(f"'{obj.value}'" for obj in QOSEnum) + ")" + + # get cpu partition list + partition_info = PartitionInfoFetcher().get_info() + gpu_partitions = [p["name"] for p in partition_info if p["type"] == PartitionTypeEnum.GPU.value] + gpu_partitions_str = "(" + ",".join(f"'{partition_name}'" for partition_name in gpu_partitions) + ")" mem_db = None + try: + mem_db = DatabaseConnection( + db_path + ) # with read_only = True as we don't expect to write into database directly from tests + + conditions_arr = [ + f"Elapsed >= {min_elapsed_seconds}", + f"Account != '{AdminsAccountEnum.ROOT.value}'", + f"Partition != '{AdminPartitionEnum.BUILDING.value}'", + f"QOS != '{QOSEnum.UPDATES.value}'", + ] + if not include_custom_qos_jobs: + conditions_arr.append(f"QOS in {qos_values}") + if not include_cpu_only_jobs: + conditions_arr.append(f"Partition IN {gpu_partitions_str}") + if not include_failed_cancelled_jobs: + conditions_arr.append(f"Status != '{StatusEnum.FAILED.value}'") + conditions_arr.append(f"Status != '{StatusEnum.CANCELLED.value}'") + + query = f"SELECT * FROM {table_name} WHERE {' AND '.join(conditions_arr)}" + return mem_db.fetch_query(query=query) + except Exception as e: + raise Exception("Exception at helper_filter_irrelevant_records") from e + finally: + if mem_db is not None: + mem_db.disconnect() + + +# Get path to the temporary mock database file +@pytest.fixture(scope="module") +def mock_data_path(request: pytest.FixtureRequest) -> Generator[str]: try: is_new_format = request.param + temp_db_dir = tempfile.mkdtemp() temp_db_path = f"{temp_db_dir}/mock_new_format.db" if is_new_format else f"{temp_db_dir}/mock.db" csv_path = "tests/mock_data/mock_new_format.csv" if is_new_format else "tests/mock_data/mock.csv" convert_csv_to_db(csv_path, temp_db_path, new_format=is_new_format) - mem_db = DatabaseConnection(temp_db_path, read_only=False) + yield temp_db_path + finally: + shutil.rmtree(temp_db_dir) + + +# load mock database as a Dataframe +@pytest.fixture(scope="module") +def mock_data_frame(mock_data_path: str) -> Generator[pd.DataFrame]: + mem_db = None + try: + mem_db = DatabaseConnection( + mock_data_path + ) # with read_only = True as we don't expect to write into database directly from tests yield mem_db.fetch_all_jobs() except Exception as e: raise Exception("Exception at mock_data_frame") from e finally: if mem_db is not None: mem_db.disconnect() - shutil.rmtree(temp_db_dir) diff --git a/tests/mock_data/mock.csv b/tests/mock_data/mock.csv index 7ae0d44..c8c7cd4 100644 --- a/tests/mock_data/mock.csv +++ b/tests/mock_data/mock.csv @@ -1,13 +1,14 @@ -"UUID","JobID","ArrayID","JobName","IsArray","Interactive","Preempted","Account","User","Constraints","QOS","Status","ExitCode","SubmitTime","StartTime","EndTime","Elapsed","TimeLimit","Partition","Nodes","NodeList","CPUs","Memory","GPUs","GPUType","GPUMemUsage","GPUComputeUsage","CPUMemUsage","CPUComputeUsage" -"2900505501739784686173995476","7","","rr",false,"matlab",false,"acc2","user6","['\'vram23\'']","normal","FAILED","ERROR","2025-02-17 09:31","2025-02-19 14:55","2025-02-19 18:32","12999","480","umd-cscdr-cpu","umd-cscdr-cpu[003-010]","[gypsum-gpu018]","6","36864","",,"0","0","6808277000","16.268034" -"2900505501739784686173995476","6","15","predictionstuff",true,"",false,"acc4","user1","['\'vram23\'']","normal","FAILED","ERROR","2025-02-17 09:31","2025-02-19 14:55","2025-02-19 18:32","12999","480","gpu","gypsum-gpu018","[gypsum-gpu018]","6","36864","1",[m40],"246022140","100","6808277000","16.268034" -"2900505501739784686173995476","8","","auto-generation stuff",false,"matlab",false,"acc3","user7","['\'vram23\'']","normal","FAILED","ERROR","2025-02-17 09:31","2025-02-19 14:55","2025-02-19 18:32","12999","480","umd-cscdr-cpu","umd-cscdr-cpu[003-010]","[gypsum-gpu018]","6","36864","",,"0","0","6808277000","16.268034" -"28500523017388652451738865260","20","","do_something",false,"",false,"acc1","user1","['\'amd1900x\'', '\'amd7402\'', '\'amd7502\'', '\'amd7543\'', '\'amd7702\'', '\'amd7763\'', '\'amd9654\'', '\'intel2620v3\'', '\'intel4110\'', '\'intel4116\'', '\'intel4214r\'', '\'intel4215r\'', '\'intel5118\'', '\'intel5218\'', '\'intel6126\'', '\'intel6130\'', '\'intel6140\'', '\'intel6148\'', '\'intel6226r\'', '\'intel6238r\'', '\'intel6240\'', '\'intel6248r\'', '\'intel6326\'', '\'intel6526y\'', '\'intel8352y\'', '\'intel8358\'', '\'intel8480\'']","normal","CANCELLED","SUCCESS","2025-02-17 09:31","2025-02-19 14:55","2025-02-19 18:32","1120390","24000","umd-cscdr-cpu","umd-cscdr-cpu[003-010]","[umd-cscdr-cpu003, umd-cscdr-cpu004, umd-cscdr-cpu005, umd-cscdr-cpu006, umd-cscdr-cpu007, umd-cscdr-cpu008, umd-cscdr-cpu009, umd-cscdr-cpu010]","441","661500","",,"0","0","100659000000","14.494155" -"28519169017389527551738952757","26","","MLstuff",false,"",false,"acc2","user2","['\'avx512\'', '\'amd1900x\'', '\'amd7402\'', '\'amd7502\'', '\'amd7543\'', '\'amd7702\'', '\'amd7763\'', '\'amd9654\'', '\'intel2620v3\'', '\'intel4110\'', '\'intel4116\'', '\'intel4214r\'', '\'intel4215r\'', '\'intel5118\'', '\'intel5218\'', '\'intel6126\'', '\'intel6130\'', '\'intel6140\'', '\'intel6148\'', '\'intel6226r\'', '\'intel6238r\'', '\'intel6240\'', '\'intel6248r\'', '\'intel6326\'', '\'intel6526y\'', '\'intel8352y\'', '\'intel8358\'', '\'intel8480\'']","normal","CANCELLED","SUCCESS","2025-02-17 09:31","2025-02-19 14:55","2025-02-19 18:32","1034244","30600","umd-cscdr-cpu","umd-cscdr-cpu[022-023]","[umd-cscdr-cpu022, umd-cscdr-cpu023]","128","131072","",,"0","0","729686000","0.7811957" -"29005047017397838091739976531","11","","collab2",false,"shell",false,"acc2","user2","","normal","COMPLETED","SUCCESS","2025-03-01 10:00","2025-03-01 12:00","2025-03-01 16:00","18002","480","gpu","gypsum-gpu005","[gypsum-gpu005]","6","36864","1",[m40],"250216450","100","9442972000","16.508013" -"29005030017397824291739976308","10","","collab",false,"",false,"acc1","user3","['\'vram23\'']","normal","COMPLETED","SUCCESS","2025-03-01 10:00","2025-03-01 12:00","2025-03-01 16:00","22016","480","gpu","gypsum-gpu002","[gypsum-gpu002]","6","36864","1",[m40],"250347520","100","9278689000","16.562754" -"29005055017397846861739976936","15","","LLMstuff",false,"",false,"acc2","user2","['\'vram23\'']","normal","COMPLETED","SUCCESS","2025-02-17 09:31","2025-02-19 14:55","2025-02-19 18:32","12999","480","gpu","gypsum-gpu018","[gypsum-gpu018]","6","36864","1",[m40],"246022140","100","6808277000","16.268034" -"2900505501739784686173995476","3","10","something",true,"shell",false,"acc1","user2","['\'vram23\'']","normal","COMPLETED","ERROR","2025-02-17 09:31","2025-02-19 14:55","2025-02-19 18:32","12999","30000","gpu","gypsum-gpu018","[gypsum-gpu018]","10","36864","4",[m40],"246022140","100","6808277000","16.268034" -"2900505501739784686173995476","1","12","statistics_test",true,"shell",false,"acc3","user4","['\'vram23\'']","normal","COMPLETED","SUCCESS","2025-02-17 09:31","2025-02-19 14:55","2025-02-19 18:32","12999","480","umd-cscdr-cpu","umd-cscdr-cpu[003-010]","[gypsum-gpu018]","6","36864","",,"0","0","6808277000","16.268034" -"2900505501739784686173995476","2","","something",false,"jupyter",false,"acc1","user5","","normal","COMPLETED","SUCCESS","2025-02-17 09:31","2025-02-19 14:55","2025-02-19 18:32","12999","480","gpu","gypsum-gpu018","[gypsum-gpu018]","6","36864","2",[2080_ti],"0","0","6808277000","16.268034" -"2900505501739784686173990000","9","","auto-generation stuff",false,"matlab",false,"root","root","","updates","COMPLETED","SUCCESS","2025-02-17 09:31","2025-02-19 14:55","2025-02-19 18:32","12999","480","building","umd-cscdr-cpu[003-010]","[gypsum-gpu018]","6","36864","",,"0","0","6808277000","16.268034" +UUID,JobID,ArrayID,JobName,IsArray,Interactive,Preempted,Account,User,Constraints,QOS,Status,ExitCode,SubmitTime,StartTime,EndTime,Elapsed,TimeLimit,Partition,Nodes,NodeList,CPUs,Memory,GPUs,GPUType,GPUMemUsage,GPUComputeUsage,CPUMemUsage,CPUComputeUsage +2900505501739784686173995476,7,,rr,FALSE,matlab,FALSE,acc2,user6,['\'vram23\''],normal,FAILED,ERROR,2025-07-21 18:25,2025-07-21 18:25,2025-08-03 17:43,1120680,480,umd-cscdr-cpu,umd-cscdr-cpu[003-010],[gypsum-gpu018],6,36864,,,0,0,6808277000,16.268034 +2900505501739784686173995476,6,15,predictionstuff,TRUE,,FALSE,acc4,user1,['\'vram23\''],normal,FAILED,ERROR,2025-07-21 18:25,2025-07-21 18:25,2025-08-03 17:43,1120680,480,gpu,gypsum-gpu018,[gypsum-gpu018],6,36864,1,[m40],246022140,100,6808277000,16.268034 +2900505501739784686173995476,8,,auto-generation stuff,FALSE,matlab,FALSE,acc3,user7,['\'vram23\''],normal,FAILED,ERROR,2025-07-20 18:07,2025-07-20 18:07,2025-08-03 17:20,1206780,480,umd-cscdr-cpu,umd-cscdr-cpu[003-010],[gypsum-gpu018],6,36864,,,0,0,6808277000,16.268034 +28500523017388652451738865260,20,,do_something,FALSE,,FALSE,acc1,user1,"['\'amd1900x\'', '\'amd7402\'', '\'amd7502\'', '\'amd7543\'', '\'amd7702\'', '\'amd7763\'', '\'amd9654\'', '\'intel2620v3\'', '\'intel4110\'', '\'intel4116\'', '\'intel4214r\'', '\'intel4215r\'', '\'intel5118\'', '\'intel5218\'', '\'intel6126\'', '\'intel6130\'', '\'intel6140\'', '\'intel6148\'', '\'intel6226r\'', '\'intel6238r\'', '\'intel6240\'', '\'intel6248r\'', '\'intel6326\'', '\'intel6526y\'', '\'intel8352y\'', '\'intel8358\'', '\'intel8480\'']",normal,CANCELLED,SUCCESS,2025-08-01 9:31,2025-08-03 14:55,2025-08-03 18:32,13020,24000,umd-cscdr-cpu,umd-cscdr-cpu[003-010],"[umd-cscdr-cpu003, umd-cscdr-cpu004, umd-cscdr-cpu005, umd-cscdr-cpu006, umd-cscdr-cpu007, umd-cscdr-cpu008, umd-cscdr-cpu009, umd-cscdr-cpu010]",441,661500,,,0,0,1.01E+11,14.494155 +28519169017389527551738952757,26,,MLstuff,FALSE,,FALSE,acc2,user2,"['\'avx512\'', '\'amd1900x\'', '\'amd7402\'', '\'amd7502\'', '\'amd7543\'', '\'amd7702\'', '\'amd7763\'', '\'amd9654\'', '\'intel2620v3\'', '\'intel4110\'', '\'intel4116\'', '\'intel4214r\'', '\'intel4215r\'', '\'intel5118\'', '\'intel5218\'', '\'intel6126\'', '\'intel6130\'', '\'intel6140\'', '\'intel6148\'', '\'intel6226r\'', '\'intel6238r\'', '\'intel6240\'', '\'intel6248r\'', '\'intel6326\'', '\'intel6526y\'', '\'intel8352y\'', '\'intel8358\'', '\'intel8480\'']",normal,CANCELLED,SUCCESS,2025-08-01 9:16,2025-08-03 14:48,2025-08-03 19:48,18000,30600,umd-cscdr-cpu,umd-cscdr-cpu[022-023],"[umd-cscdr-cpu022, umd-cscdr-cpu023]",128,131072,,,0,0,729686000,0.7811957 +29005047017397838091739976531,11,,collab2,FALSE,shell,FALSE,acc2,user2,,normal,COMPLETED,SUCCESS,2025-08-01 8:53,2025-08-03 14:45,2025-08-03 20:52,22020,480,gpu,gypsum-gpu005,[gypsum-gpu005],6,36864,1,[m40],250216450,100,9442972000,16.508013 +29005030017397824291739976308,10,,collab,FALSE,,FALSE,acc1,user3,['\'vram23\''],normal,COMPLETED,SUCCESS,2025-08-01 9:31,2025-08-03 14:55,2025-08-03 18:32,13020,480,gpu,gypsum-gpu002,[gypsum-gpu002],6,36864,1,[m40],250347520,100,9278689000,16.562754 +29005055017397846861739976936,15,,LLMstuff,FALSE,,FALSE,acc2,user2,['\'vram23\''],normal,COMPLETED,SUCCESS,2025-08-01 9:31,2025-08-03 14:55,2025-08-03 18:32,13020,480,gpu,gypsum-gpu018,[gypsum-gpu018],6,36864,1,[m40],246022140,100,6808277000,16.268034 +2900505501739784686173995476,3,10,something,TRUE,shell,FALSE,acc1,user2,['\'vram23\''],normal,COMPLETED,ERROR,2025-08-01 9:31,2025-08-03 14:55,2025-08-03 18:32,13020,30000,gpu,gypsum-gpu018,[gypsum-gpu018],10,36864,4,[m40],246022140,100,6808277000,16.268034 +2900505501739784686173995476,1,12,statistics_test,TRUE,shell,FALSE,acc3,user4,['\'vram23\''],normal,COMPLETED,SUCCESS,2025-08-01 9:31,2025-08-03 14:55,2025-08-03 18:32,13020,480,umd-cscdr-cpu,umd-cscdr-cpu[003-010],[gypsum-gpu018],6,36864,,,0,0,6808277000,16.268034 +2900505501739784686173995476,2,,something,FALSE,jupyter,FALSE,acc1,user5,,normal,COMPLETED,SUCCESS,2025-08-01 9:31,2025-08-03 14:55,2025-08-03 18:32,13020,480,gpu,gypsum-gpu018,[gypsum-gpu018],6,36864,2,[2080_ti],0,0,6808277000,16.268034 +2900505501739784686173990000,9,,auto-generation stuff,FALSE,matlab,FALSE,root,root,,updates,COMPLETED,SUCCESS,2025-08-01 9:31,2025-08-03 14:55,2025-08-03 18:32,13020,480,building,umd-cscdr-cpu[003-010],[gypsum-gpu018],6,36864,,,0,0,6808277000,16.268034 +2900505501739784686173995476,90,12,statistics_test,TRUE,shell,FALSE,acc3,user4,['\'vram23\''],normal,COMPLETED,SUCCESS,2025-08-01 9:31,2025-08-03 14:55,2025-08-03 15:05,600,480,umd-cscdr-cpu,umd-cscdr-cpu[003-010],"[umd-cscdr-cpu022, umd-cscdr-cpu023]",6,36864,,,0,0,6808277000,16.268034 \ No newline at end of file diff --git a/tests/mock_data/mock_new_format.csv b/tests/mock_data/mock_new_format.csv index b06d0ff..d322c5a 100644 --- a/tests/mock_data/mock_new_format.csv +++ b/tests/mock_data/mock_new_format.csv @@ -1,8 +1,8 @@ -"UUID","JobID","ArrayID","JobName","IsArray","Interactive","Preempted","Account","User","Constraints","QOS","Status","ExitCode","SubmitTime","StartTime","EndTime","Elapsed","TimeLimit","Partition","Nodes","NodeList","CPUs","Memory","GPUs","GPUType","GPUMemUsage","GPUComputeUsage","CPUMemUsage","CPUComputeUsage" -"1","101","","test_job",false,"matlab",false,"acc1","user1","['vram23']","normal","COMPLETED","SUCCESS","2025-02-17 09:31","2025-02-19 14:55","2025-02-19 18:32","12999","480","gpu","gypsum-gpu018","[gypsum-gpu018]","6","36864","2",{"a100":2},"246022140","100","6808277000","16.268034" -"2","102","","test_job2",false,"shell",false,"acc2","user2","['vram16']","normal","FAILED","ERROR","2025-02-17 09:31","2025-02-19 14:55","2025-02-19 18:32","12999","480","gpu","gypsum-gpu019","[gypsum-gpu019]","6","36864","1",{"v100":1},"250216450","100","9442972000","16.508013" -"3","103","","test_job3",false,"jupyter",false,"acc3","user3","['vram48']","normal","CANCELLED","SUCCESS","2025-02-17 09:31","2025-02-19 14:55","2025-02-19 18:32","12999","480","gpu","gypsum-gpu020","[gypsum-gpu020]","6","36864","3","{'a100':1,'v100':2}","250347520","100","9278689000","16.562754" -"4","104","","test_job4",false,"matlab",false,"acc4","user4","['vram80']","normal","COMPLETED","SUCCESS","2025-03-01 10:00","2025-03-01 12:00","2025-03-01 16:00","14400","600","gpu","hpc-node001","[hpc-node001]","8","65536","4",{"h100":4},"300000000","95","12000000000","18.000000" -"5","105","","test_job5",false,"shell",false,"acc5","user5","['vram16']","normal","COMPLETED","SUCCESS","2025-03-02 11:00","2025-03-02 13:00","2025-03-02 15:00","100000","300","cpu","cpu-node002","[cpu-node002]","4","32768","",{},"0","0","8000000000","10.000000" -"6","106","","test_job6",false,"jupyter",false,"acc6","user6","['vram48']","normal","COMPLETED","SUCCESS","2025-03-03 09:00","2025-03-03 10:00","2025-03-03 14:00","18000","900","superpod-a100","gpu[020-021]","[gpu020, gpu021]","8","49152","2",{"a100":2},"260000000","98","10000000000","17.000000" -"7","107","","test_job7",false,"shell",false,"acc7","user7","","normal","COMPLETED","SUCCESS","2025-03-04 08:00","2025-03-04 09:00","2025-03-04 12:00","10800","400","power9-gpu","power9-gpu001","[power9-gpu001]","4","32768","1",{"v100":1},"200000000","90","7000000000","15.000000" +UUID,JobID,ArrayID,JobName,IsArray,Interactive,Preempted,Account,User,Constraints,QOS,Status,ExitCode,SubmitTime,StartTime,EndTime,Elapsed,TimeLimit,Partition,Nodes,NodeList,CPUs,Memory,GPUs,GPUType,GPUMemUsage,GPUComputeUsage,CPUMemUsage,CPUComputeUsage +1,101,,test_job,FALSE,matlab,FALSE,acc1,user1,['vram23'],normal,COMPLETED,SUCCESS,2025-07-21 18:25,2025-07-21 18:25,2025-08-03 17:43,1120680,480,gpu,gypsum-gpu018,[gypsum-gpu018],6,36864,2,"{""a100"":2}",246022140,100,6808277000,16.268034 +2,102,,test_job2,FALSE,shell,FALSE,acc2,user2,['vram16'],normal,FAILED,ERROR,2025-07-21 18:25,2025-07-21 18:25,2025-08-03 17:43,1120680,480,gpu,gypsum-gpu019,[gypsum-gpu019],6,36864,1,"{""v100"":1}",250216450,100,9442972000,16.508013 +3,103,,test_job3,FALSE,jupyter,FALSE,acc3,user3,['vram48'],normal,CANCELLED,SUCCESS,2025-07-20 18:07,2025-07-20 18:07,2025-08-03 17:20,1206780,480,gpu,gypsum-gpu020,[gypsum-gpu020],6,36864,3,"{'a100':1,'v100':2}",250347520,100,9278689000,16.562754 +4,104,,test_job4,FALSE,matlab,FALSE,acc4,user4,['vram80'],normal,COMPLETED,SUCCESS,2025-08-01 9:31,2025-08-03 14:55,2025-08-03 18:32,13020,600,gpu,hpc-node001,[hpc-node001],8,65536,4,"{""h100"":4}",300000000,95,12000000000,18 +5,105,,test_job5,FALSE,shell,FALSE,acc5,user5,['vram16'],normal,COMPLETED,SUCCESS,2025-08-01 9:16,2025-08-03 14:48,2025-08-03 19:48,18000,300,cpu,cpu-node002,[cpu-node002],4,32768,,{},0,0,8000000000,10 +6,106,,test_job6,FALSE,jupyter,FALSE,acc6,user6,['vram48'],normal,COMPLETED,SUCCESS,2025-08-01 8:53,2025-08-03 14:45,2025-08-03 20:52,22020,900,superpod-a100,gpu[020-021],"[gpu020, gpu021]",8,49152,2,"{""a100"":2}",260000000,98,10000000000,17 +7,107,,test_job7,FALSE,shell,FALSE,acc7,user7,,normal,COMPLETED,SUCCESS,2025-08-01 9:31,2025-08-03 14:55,2025-08-03 18:32,13020,400,power9-gpu,power9-gpu001,[power9-gpu001],4,32768,1,"{""v100"":1}",200000000,90,7000000000,15 \ No newline at end of file diff --git a/tests/test_load_and_preprocess_jobs.py b/tests/test_load_and_preprocess_jobs.py new file mode 100644 index 0000000..8724dce --- /dev/null +++ b/tests/test_load_and_preprocess_jobs.py @@ -0,0 +1,216 @@ +import pytest +import pandas +from src.utilities import load_and_preprocess_jobs, load_and_preprocess_jobs_custom_query +from .conftest import preprocess_mock_data +from src.config.enum_constants import OptionalColumnsEnum, RequiredColumnsEnum +from datetime import datetime, timedelta + + +@pytest.mark.parametrize("mock_data_path", [False, True], ids=["false_case", "true_case"], indirect=True) +def test_return_correct_types(mock_data_path: str) -> None: + """ + Basic test on return type of function + """ + res = load_and_preprocess_jobs(db_path=mock_data_path) + assert isinstance(res, pandas.DataFrame) + + +@pytest.mark.parametrize("mock_data_path", [False, True], ids=["false_case", "true_case"], indirect=True) +def test_no_filter(mock_data_path: str) -> None: + """ + Test in case there is no filtering, function should return every valid records from database. + """ + ground_truth = preprocess_mock_data(mock_data_path, min_elapsed_seconds=0) + res = load_and_preprocess_jobs(db_path=mock_data_path) + expect_num_records = len(ground_truth) + assert expect_num_records == len(res) + + +@pytest.mark.parametrize("mock_data_path", [False, True], ids=["false_case", "true_case"], indirect=True) +@pytest.mark.parametrize("dates_back", [90, 150]) +def test_filter_date_back(mock_data_path: str, dates_back: int) -> None: + """ + Test for filtering by dates_back. + + Test with multiple different dates_back for higher test coverage. + """ + temp = preprocess_mock_data(mock_data_path, min_elapsed_seconds=0) + res = load_and_preprocess_jobs(db_path=mock_data_path, dates_back=dates_back) + cutoff = datetime.now() - timedelta(days=dates_back) + ground_truth_jobs = temp[(temp["StartTime"] >= cutoff)].copy() + expect_job_ids = ground_truth_jobs["JobID"].to_numpy() + assert len(ground_truth_jobs) == len(res) + for id in res["JobID"]: + assert id in expect_job_ids + + +@pytest.mark.parametrize("mock_data_path", [False, True], ids=["false_case", "true_case"], indirect=True) +def test_filter_min_elapsed(mock_data_path: str) -> None: + """ + Test for filtering by days back and minimum elapsed time. + """ + temp = preprocess_mock_data(mock_data_path, min_elapsed_seconds=13000) + res = load_and_preprocess_jobs(db_path=mock_data_path, min_elapsed_seconds=13000, dates_back=90) + cutoff = datetime.now() - timedelta(days=90) + ground_truth_jobs = temp[(temp["StartTime"] >= cutoff)].copy() + expect_job_ids = ground_truth_jobs["JobID"].to_numpy() + assert len(ground_truth_jobs) == len(res) + for id in res["JobID"]: + assert id in expect_job_ids + + +@pytest.mark.parametrize("mock_data_path", [False, True], ids=["false_case", "true_case"], indirect=True) +def test_filter_date_back_include_all(mock_data_path: str) -> None: + """ + Test for filtering by days_back, including CPU only jobs and FAILED/ CANCELLED jobs + """ + temp = preprocess_mock_data( + mock_data_path, + min_elapsed_seconds=0, + include_cpu_only_jobs=True, + include_custom_qos_jobs=True, + include_failed_cancelled_jobs=True, + ) + res = load_and_preprocess_jobs( + db_path=mock_data_path, + dates_back=90, + min_elapsed_seconds=0, + include_cpu_only_jobs=True, + include_failed_cancelled_jobs=True, + include_custom_qos_jobs=True, + ) + cutoff = datetime.now() - timedelta(days=90) + ground_truth_jobs = temp[temp["StartTime"] >= cutoff] + expect_job_ids = ground_truth_jobs["JobID"].to_numpy() + assert len(ground_truth_jobs) == len(res) + for id in res["JobID"]: + assert id in expect_job_ids + + +@pytest.mark.parametrize("mock_data_path", [False, True], ids=["false_case", "true_case"], indirect=True) +@pytest.mark.parametrize("missing_col", [col.value for col in RequiredColumnsEnum]) +def test_missing_required_columns_error_raised(mock_data_path: str, missing_col: str) -> None: + """ + Test enforcement of errors when the database is missing a required column. + + Expect to raise RuntimeError for any of these columns if they are missing in the dataframe. + """ + required_col = {e.value for e in RequiredColumnsEnum} + col_names = required_col.copy() + col_names.remove(missing_col) + col_str = ", ".join(col_names) + query = f"SELECT {col_str} FROM Jobs" + with pytest.raises( + RuntimeError, match=f"Failed to load jobs DataFrame: 'Column {missing_col} does not exist in dataframe.'" + ): + _res = load_and_preprocess_jobs_custom_query(db_path=mock_data_path, custom_query=query) + + +@pytest.mark.parametrize("mock_data_path", [False, True], ids=["false_case", "true_case"], indirect=True) +@pytest.mark.parametrize("missing_col", [col.value for col in OptionalColumnsEnum]) +def test_optional_column_warnings(mock_data_path: str, recwarn: pytest.WarningsRecorder, missing_col: str) -> None: + """ + Test handling the dataframe loads from database when missing one of the columns + + These columns are not in ENFORCE_COLUMNS so only warnings are expected to be raised + """ + optional_columns = {e.value for e in OptionalColumnsEnum} + required_columns = {e.value for e in RequiredColumnsEnum} + + required_column_copy = required_columns.copy() + optional_column_copy = optional_columns.copy() + optional_column_copy.remove(missing_col) + final_column_set = required_column_copy.union(optional_column_copy) + col_str = ", ".join(final_column_set) + query = f"SELECT {col_str} FROM Jobs" + + expect_warning_msg = ( + f"Column '{missing_col}' is missing from the dataframe. " + "This may impact filtering operations and downstream processing." + ) + _res = load_and_preprocess_jobs_custom_query(db_path=mock_data_path, custom_query=query) + + # Check that a warning was raised with the expected message + assert len(recwarn) > 0 + assert str(recwarn[0].message) == expect_warning_msg + + +@pytest.mark.parametrize("mock_data_path", [False, True], ids=["false_case", "true_case"], indirect=True) +def test_custom_query( + mock_data_frame: pandas.DataFrame, mock_data_path: str, recwarn: pytest.WarningsRecorder +) -> None: + """ + Test if function fetches expected records when using custom sql query. + + Warnings are ignored since test_optional_column_warnings and test_missing_required_columns_error_raised + covers warning for optional columns missing. + """ + query = ( + "SELECT JobID, GPUType, Constraints, StartTime, SubmitTime, " + "NodeList, GPUs, GPUMemUsage, CPUMemUsage, Elapsed, Partition " + "FROM Jobs WHERE Status != 'CANCELLED' AND Status !='FAILED' " + "AND ArrayID is not NULL AND Interactive is not NULL" + ) + res = load_and_preprocess_jobs_custom_query(db_path=mock_data_path, custom_query=query) + ground_truth_jobs = mock_data_frame[ + (mock_data_frame["Status"] != "CANCELLED") + & (mock_data_frame["Status"] != "FAILED") + & (mock_data_frame["ArrayID"].notna()) + & (mock_data_frame["Interactive"].notna()) + ].copy() + assert len(res) == len(ground_truth_jobs) + expect_ids = ground_truth_jobs["JobID"].to_list() + for id in res["JobID"]: + assert id in expect_ids + + +@pytest.mark.parametrize("mock_data_path", [False, True], ids=["false_case", "true_case"], indirect=True) +@pytest.mark.parametrize("days_back", [90, 150]) +def test_custom_query_days_back( + mock_data_frame: pandas.DataFrame, mock_data_path: str, recwarn: pytest.WarningsRecorder, days_back: int +) -> None: + """ + Test custom query with dates_back condition. + + Expect the result will be filtered by dates_back condition in the query. + + Warnings are ignored since test_optional_column_warnings and test_missing_required_columns_error_raised + covers test warning for optional columns missing. + """ + cutoff = datetime.now() - timedelta(days=days_back) + query = ( + "SELECT JobID, GPUType, Constraints, StartTime, SubmitTime, " + "NodeList, GPUs, GPUMemUsage, CPUMemUsage, Elapsed, Partition " + "FROM Jobs WHERE Status != 'CANCELLED' AND Status !='FAILED' AND ArrayID is not NULL " + f"AND Interactive is not NULL AND StartTime >= '{cutoff}'" + ) + res = load_and_preprocess_jobs_custom_query(db_path=mock_data_path, custom_query=query) + + ground_truth_jobs = mock_data_frame[ + (mock_data_frame["Status"] != "CANCELLED") + & (mock_data_frame["Status"] != "FAILED") + & (mock_data_frame["ArrayID"].notna()) + & (mock_data_frame["Interactive"].notna()) + & (mock_data_frame["StartTime"] >= cutoff) + ].copy() + expect_ids = ground_truth_jobs["JobID"].to_list() + + assert len(res) == len(ground_truth_jobs) + for id in res["JobID"]: + assert id in expect_ids + + +@pytest.mark.parametrize("mock_data_path", [False, True], ids=["false_case", "true_case"], indirect=True) +def test_empty_dataframe_warning(mock_data_path: str, recwarn: pytest.WarningsRecorder) -> None: + """ + Test handling the dataframe loads from database when the result is empty. + + Expect a UserWarning to be raised with the appropriate message. + """ + # Query that returns no rows + query = "SELECT * FROM Jobs WHERE 1=0" + res = load_and_preprocess_jobs_custom_query(db_path=mock_data_path, custom_query=query) + assert res.empty + # Check that the warning is about empty dataframe + assert len(recwarn) == 1 + assert str(recwarn[0].message) == "Dataframe results from database and filtering is empty." diff --git a/tests/test_preprocess.py b/tests/test_preprocess.py index e71b379..6cb51d9 100644 --- a/tests/test_preprocess.py +++ b/tests/test_preprocess.py @@ -3,7 +3,6 @@ from pandas.api.typing import NAType -from src.config import PartitionInfoFetcher from src.config.enum_constants import ( AdminsAccountEnum, ExitCodeEnum, @@ -11,63 +10,26 @@ QOSEnum, StatusEnum, AdminPartitionEnum, - PartitionTypeEnum, + ExcludedColumnsEnum, + RequiredColumnsEnum, + OptionalColumnsEnum, ) from src.preprocess import preprocess_data from src.preprocess.preprocess import _get_partition_constraint, _get_requested_vram, _get_vram_constraint +from .conftest import preprocess_mock_data -def _helper_filter_irrelevant_records( - input_df: pd.DataFrame, min_elapsed_seconds: int, include_cpu_only_jobs: bool = False -) -> pd.DataFrame: - """ - Private function to help generate expected ground truth dataframe for test. - - Given a ground truth dataframe, this will create a new dataframe without records meeting the following criteria: - - QOS is updates - - Account is root - - Partition is building - - Elasped time is less than min_elapsed - - Args: - input_df (pd.DataFrame): Input dataframe to filter. Note that the Elapsed field should be in unit seconds. - min_elapsed_seconds (int): Minimum elapsed time in seconds. - include_cpu_only_jobs (bool): Whether to include jobs that do not use GPUs (CPU-only jobs). Default is False. - - Returns: - pd.DataFrame: Filtered dataframe. - """ - - # TODO(Tan): Update implementation to use the same logic as preprocess_data - mask = pd.Series([True] * len(input_df), index=input_df.index) - - mask &= input_df["Elapsed"] >= min_elapsed_seconds - mask &= input_df["Account"] != AdminsAccountEnum.ROOT.value - mask &= input_df["Partition"] != AdminPartitionEnum.BUILDING.value - mask &= input_df["QOS"] != QOSEnum.UPDATES.value - # Filter out jobs whose partition type is not 'gpu', unless include_cpu_only_jobs is True. - partition_info = PartitionInfoFetcher().get_info() - gpu_partitions = [p["name"] for p in partition_info if p["type"] == PartitionTypeEnum.GPU.value] - mask &= input_df["Partition"].isin(gpu_partitions) | include_cpu_only_jobs - - return input_df[mask].copy() - - -@pytest.mark.parametrize("mock_data_frame", [False, True], ids=["false_case", "true_case"], indirect=True) -def test_preprocess_data_filtered_columns(mock_data_frame: pd.DataFrame) -> None: +@pytest.mark.parametrize("mock_data_path", [False, True], ids=["false_case", "true_case"], indirect=True) +@pytest.mark.parametrize("column", [member.value for member in ExcludedColumnsEnum]) +def test_preprocess_data_filtred_columns(mock_data_frame: pd.DataFrame, column: str) -> None: """ Test that the preprocessed data does not contain irrelevant columns. """ data = preprocess_data(input_df=mock_data_frame, min_elapsed_seconds=600) - assert "UUID" not in data.columns - assert "EndTime" not in data.columns - assert "Nodes" not in data.columns - assert "Preempted" not in data.columns - assert "partition_constraint" in data.columns - assert "requested_vram" in data.columns + assert column not in data.columns -@pytest.mark.parametrize("mock_data_frame", [False, True], ids=["false_case", "true_case"], indirect=True) +@pytest.mark.parametrize("mock_data_path", [False, True], ids=["false_case", "true_case"], indirect=True) def test_preprocess_data_filtered_gpu(mock_data_frame: pd.DataFrame) -> None: """ Test that the preprocessed data does not contain null GPUType and GPUs. @@ -79,7 +41,7 @@ def test_preprocess_data_filtered_gpu(mock_data_frame: pd.DataFrame) -> None: assert not any(is_gpu_null) -@pytest.mark.parametrize("mock_data_frame", [False, True], ids=["false_case", "true_case"], indirect=True) +@pytest.mark.parametrize("mock_data_path", [False, True], ids=["false_case", "true_case"], indirect=True) def test_preprocess_data_filtered_status(mock_data_frame: pd.DataFrame) -> None: """ Test that the preprocessed data does not contain FAILED or CANCELLED jobs. @@ -91,7 +53,7 @@ def test_preprocess_data_filtered_status(mock_data_frame: pd.DataFrame) -> None: assert not any(status_cancelled) -@pytest.mark.parametrize("mock_data_frame", [False, True], ids=["false_case", "true_case"], indirect=True) +@pytest.mark.parametrize("mock_data_path", [False, True], ids=["false_case", "true_case"], indirect=True) def test_preprocess_data_filtered_min_elapsed_1(mock_data_frame: pd.DataFrame) -> None: """ Test that the preprocessed data does not contain jobs with elapsed time below the threshold (300 seconds). @@ -103,10 +65,10 @@ def test_preprocess_data_filtered_min_elapsed_1(mock_data_frame: pd.DataFrame) - assert not any(elapsed_below_threshold) -@pytest.mark.parametrize("mock_data_frame", [False, True], ids=["false_case", "true_case"], indirect=True) -def test_preprocess_data_filter_min_elapsed_2(mock_data_frame: pd.DataFrame) -> None: +@pytest.mark.parametrize("mock_data_path", [False, True], ids=["false_case", "true_case"], indirect=True) +def test_preprocess_data_filter_min_elapsed_2(mock_data_path: str, mock_data_frame: pd.DataFrame) -> None: """ - Test that the preprocessed data contains only jobs with elapsed time below the threshold (700 seconds). + Test that the preprocessed data contains only jobs with elapsed time above the threshold (700 seconds). """ data = preprocess_data( input_df=mock_data_frame, @@ -114,14 +76,18 @@ def test_preprocess_data_filter_min_elapsed_2(mock_data_frame: pd.DataFrame) -> include_cpu_only_jobs=True, include_failed_cancelled_jobs=True, ) - # TODO (Tan): Update the mock data to include jobs with elapsed time below 700 seconds - ground_truth = _helper_filter_irrelevant_records(mock_data_frame, 700, include_cpu_only_jobs=True) + ground_truth = preprocess_mock_data( + mock_data_path, + min_elapsed_seconds=700, + include_cpu_only_jobs=True, + include_failed_cancelled_jobs=True, + ) assert len(data) == len(ground_truth), ( f"JobIDs in data: {data['JobID'].tolist()}, JobIDs in ground_truth: {ground_truth['JobID'].tolist()}" ) -@pytest.mark.parametrize("mock_data_frame", [False, True], ids=["false_case", "true_case"], indirect=True) +@pytest.mark.parametrize("mock_data_path", [False, True], ids=["false_case", "true_case"], indirect=True) def test_preprocess_data_filtered_root_account(mock_data_frame: pd.DataFrame) -> None: """ Test that the preprocessed data does not contain jobs with root account, partition building, or qos updates. @@ -135,60 +101,52 @@ def test_preprocess_data_filtered_root_account(mock_data_frame: pd.DataFrame) -> assert not any(partition_building) -@pytest.mark.parametrize("mock_data_frame", [False, True], ids=["false_case", "true_case"], indirect=True) -def test_preprocess_data_include_cpu_job(mock_data_frame: pd.DataFrame) -> None: +@pytest.mark.parametrize("mock_data_path", [False, True], ids=["false_case", "true_case"], indirect=True) +def test_preprocess_data_include_cpu_job(mock_data_path: str, mock_data_frame: pd.DataFrame) -> None: """ Test that the preprocessed data includes CPU-only jobs when specified. """ data = preprocess_data(input_df=mock_data_frame, min_elapsed_seconds=600, include_cpu_only_jobs=True) - ground_truth = _helper_filter_irrelevant_records(mock_data_frame, 600, include_cpu_only_jobs=True) - expected_cpu_type = len( - ground_truth[ - ground_truth["GPUType"].isna() - & (ground_truth["Status"] != StatusEnum.FAILED.value) - & (ground_truth["Status"] != StatusEnum.CANCELLED.value) - ] - ) - expected_gpus_count_0 = len( - ground_truth[ - ground_truth["GPUs"].isna() - & (ground_truth["Status"] != StatusEnum.FAILED.value) - & (ground_truth["Status"] != StatusEnum.CANCELLED.value) - ] - ) + ground_truth = preprocess_mock_data(mock_data_path, include_cpu_only_jobs=True) + expected_cpu_type = len(ground_truth[ground_truth["GPUType"].isna()]) + expected_gpus_count_0 = len(ground_truth[ground_truth["GPUs"].isna()]) assert sum(pd.isna(x) for x in data["GPUType"]) == expected_cpu_type - assert data["GPUs"].value_counts()[0] == expected_gpus_count_0 + assert sum(x == 0 for x in data["GPUs"]) == expected_gpus_count_0 + # Check that GPUType is NA for CPU-only jobs assert all(isinstance(row, list | dict) for row in data["GPUType"] if not pd.isna(row)) -@pytest.mark.parametrize("mock_data_frame", [False, True], ids=["false_case", "true_case"], indirect=True) -def test_preprocess_data_include_failed_cancelled_job(mock_data_frame: pd.DataFrame) -> None: +@pytest.mark.parametrize("mock_data_path", [False, True], ids=["false_case", "true_case"], indirect=True) +def test_preprocess_data_include_failed_cancelled_job(mock_data_path: str, mock_data_frame: pd.DataFrame) -> None: """ Test that the preprocessed data includes FAILED and CANCELLED jobs when specified. """ data = preprocess_data(input_df=mock_data_frame, min_elapsed_seconds=600, include_failed_cancelled_jobs=True) - ground_truth = _helper_filter_irrelevant_records(mock_data_frame, 600) - expect_failed_status = len( - ground_truth[ - (ground_truth["Status"] == StatusEnum.FAILED.value) - & (ground_truth["GPUType"].notna()) - & (ground_truth["GPUs"].notna()) - ] - ) - expect_cancelled_status = len( - ground_truth[ - (ground_truth["Status"] == StatusEnum.CANCELLED.value) - & (ground_truth["GPUType"].notna()) - & (ground_truth["GPUs"].notna()) - ] + ground_truth = preprocess_mock_data(mock_data_path, min_elapsed_seconds=600, include_failed_cancelled_jobs=True) + expect_failed_status = len(ground_truth[(ground_truth["Status"] == StatusEnum.FAILED.value)]) + expect_cancelled_status = len(ground_truth[(ground_truth["Status"] == StatusEnum.CANCELLED.value)]) + assert sum(x == StatusEnum.FAILED.value for x in data["Status"]) == expect_failed_status + assert sum(x == StatusEnum.CANCELLED.value for x in data["Status"]) == expect_cancelled_status + + +@pytest.mark.parametrize("mock_data_path", [False, True], ids=["false_case", "true_case"], indirect=True) +def test_preprocess_data_include_custom_qos_values(mock_data_path: str, mock_data_frame: pd.DataFrame) -> None: + data = preprocess_data(input_df=mock_data_frame, min_elapsed_seconds=600, include_custom_qos_jobs=True) + ground_truth = preprocess_mock_data(mock_data_path, min_elapsed_seconds=600, include_custom_qos_jobs=True) + filtered_ground_truth = ground_truth[ + (ground_truth["Status"] != "CANCELLED") & (ground_truth["Status"] != "FAILED") + ].copy() + assert len(data) == len(filtered_ground_truth), ( + f"JobIDs in data: {data['JobID'].tolist()}, JobIDs in ground_truth: {filtered_ground_truth['JobID'].tolist()}" ) - assert data["Status"].value_counts()[StatusEnum.FAILED.value] == expect_failed_status - assert data["Status"].value_counts()[StatusEnum.CANCELLED.value] == expect_cancelled_status + expect_ids = filtered_ground_truth["JobID"].to_list() + for id in data["JobID"]: + assert id in expect_ids -@pytest.mark.parametrize("mock_data_frame", [False, True], ids=["false_case", "true_case"], indirect=True) -def test_preprocess_data_include_all(mock_data_frame: pd.DataFrame) -> None: +@pytest.mark.parametrize("mock_data_path", [False, True], ids=["false_case", "true_case"], indirect=True) +def test_all_boolean_args_being_true(mock_data_path: str, mock_data_frame: pd.DataFrame) -> None: """ Test that the preprocessed data includes all jobs when both CPU-only and FAILED/CANCELLED jobs are specified. """ @@ -197,24 +155,33 @@ def test_preprocess_data_include_all(mock_data_frame: pd.DataFrame) -> None: min_elapsed_seconds=600, include_failed_cancelled_jobs=True, include_cpu_only_jobs=True, + include_custom_qos_jobs=True, + ) + ground_truth = preprocess_mock_data( + mock_data_path, + min_elapsed_seconds=600, + include_cpu_only_jobs=True, + include_custom_qos_jobs=True, + include_failed_cancelled_jobs=True, ) - ground_truth = _helper_filter_irrelevant_records(mock_data_frame, 600, include_cpu_only_jobs=True) - expect_failed_status = len(ground_truth[(ground_truth["Status"] == StatusEnum.FAILED.value)]) expect_cancelled_status = len(ground_truth[(ground_truth["Status"] == StatusEnum.CANCELLED.value)]) expect_completed_status = len(ground_truth[(ground_truth["Status"] == StatusEnum.COMPLETED.value)]) expect_gpu_type_null = len(ground_truth[(ground_truth["GPUType"].isna())]) expect_gpus_null = len(ground_truth[(ground_truth["GPUs"].isna())]) - assert len(data) == len(ground_truth) + + assert len(data) == len(ground_truth), ( + f"JobIDs in data: {data['JobID'].tolist()}, JobIDs in ground_truth: {ground_truth['JobID'].tolist()}" + ) assert sum(pd.isna(x) for x in data["GPUType"]) == expect_gpu_type_null - assert data["GPUs"].value_counts()[0] == expect_gpus_null - assert data["Status"].value_counts()[StatusEnum.FAILED.value] == expect_failed_status - assert data["Status"].value_counts()[StatusEnum.CANCELLED.value] == expect_cancelled_status - assert data["Status"].value_counts()[StatusEnum.COMPLETED.value] == expect_completed_status + assert sum(x == 0 for x in data["GPUs"]) == expect_gpus_null + assert sum(x == StatusEnum.FAILED.value for x in data["Status"]) == expect_failed_status + assert sum(x == StatusEnum.CANCELLED.value for x in data["Status"]) == expect_cancelled_status + assert sum(x == StatusEnum.COMPLETED.value for x in data["Status"]) == expect_completed_status -@pytest.mark.parametrize("mock_data_frame", [False, True], ids=["false_case", "true_case"], indirect=True) -def test_preprocess_data_fill_missing_interactive(mock_data_frame: pd.DataFrame) -> None: +@pytest.mark.parametrize("mock_data_path", [False, True], ids=["false_case", "true_case"], indirect=True) +def test_preprocess_data_fill_missing_interactive(mock_data_path: str, mock_data_frame: pd.DataFrame) -> None: """ Test that the preprocessed data fills missing interactive job types with 'non-interactive' correctly. """ @@ -224,16 +191,20 @@ def test_preprocess_data_fill_missing_interactive(mock_data_frame: pd.DataFrame) include_cpu_only_jobs=True, include_failed_cancelled_jobs=True, ) - ground_truth = _helper_filter_irrelevant_records(mock_data_frame, 100, include_cpu_only_jobs=True) + ground_truth = preprocess_mock_data( + mock_data_path, + min_elapsed_seconds=100, + include_cpu_only_jobs=True, + include_failed_cancelled_jobs=True, + ) expect_non_interactive = len(ground_truth[(ground_truth["Interactive"].isna())]) - interactive_stat = data["Interactive"].value_counts() - assert interactive_stat[InteractiveEnum.NON_INTERACTIVE.value] == expect_non_interactive + assert sum(x == InteractiveEnum.NON_INTERACTIVE.value for x in data["Interactive"]) == expect_non_interactive -@pytest.mark.parametrize("mock_data_frame", [False, True], ids=["false_case", "true_case"], indirect=True) -def test_preprocess_data_fill_missing_array_id(mock_data_frame: pd.DataFrame) -> None: +@pytest.mark.parametrize("mock_data_path", [False, True], ids=["false_case", "true_case"], indirect=True) +def test_preprocess_data_fill_missing_array_id(mock_data_path: str, mock_data_frame: pd.DataFrame) -> None: """ Test that the preprocessed data fills missing ArrayID with -1 correctly. """ @@ -243,14 +214,18 @@ def test_preprocess_data_fill_missing_array_id(mock_data_frame: pd.DataFrame) -> include_cpu_only_jobs=True, include_failed_cancelled_jobs=True, ) - ground_truth = _helper_filter_irrelevant_records(mock_data_frame, 100, include_cpu_only_jobs=True) + ground_truth = preprocess_mock_data( + mock_data_path, + min_elapsed_seconds=100, + include_cpu_only_jobs=True, + include_failed_cancelled_jobs=True, + ) expect_array_id_null = len(ground_truth[(ground_truth["ArrayID"].isna())]) - array_id_stat = data["ArrayID"].value_counts() - assert array_id_stat[-1] == expect_array_id_null + assert sum(x == -1 for x in data["ArrayID"]) == expect_array_id_null -@pytest.mark.parametrize("mock_data_frame", [False, True], ids=["false_case", "true_case"], indirect=True) -def test_preprocess_data_fill_missing_gpu_type(mock_data_frame: pd.DataFrame) -> None: +@pytest.mark.parametrize("mock_data_path", [False, True], ids=["false_case", "true_case"], indirect=True) +def test_preprocess_data_fill_missing_gpu_type(mock_data_path: str, mock_data_frame: pd.DataFrame) -> None: """ Test that the preprocessed data fills missing GPUType with pd.NA correctly. """ @@ -261,19 +236,23 @@ def test_preprocess_data_fill_missing_gpu_type(mock_data_frame: pd.DataFrame) -> include_failed_cancelled_jobs=True, ) - ground_truth = _helper_filter_irrelevant_records(mock_data_frame, 100, include_cpu_only_jobs=True) + ground_truth = preprocess_mock_data( + mock_data_path, + min_elapsed_seconds=100, + include_cpu_only_jobs=True, + include_failed_cancelled_jobs=True, + ) expect_gpu_type_null = len(ground_truth[(ground_truth["GPUType"].isna())]) expect_gpus_null = len(ground_truth[(ground_truth["GPUs"] == 0) | (ground_truth["GPUs"].isna())]) - gpus_stat = data["GPUs"].value_counts() - + actual_count_gpu_0 = sum(x == 0 for x in data["GPUs"]) assert sum(pd.isna(x) for x in data["GPUType"]) == expect_gpu_type_null - assert gpus_stat[0] == expect_gpus_null, ( - f"Expected {expect_gpus_null} null GPUs, but found {gpus_stat[0]} null GPUs." + assert actual_count_gpu_0 == expect_gpus_null, ( + f"Expected {expect_gpus_null} null GPUs, but found {actual_count_gpu_0} null GPUs." ) -@pytest.mark.parametrize("mock_data_frame", [False, True], ids=["false_case", "true_case"], indirect=True) -def test_preprocess_data_fill_missing_constraints(mock_data_frame: pd.DataFrame) -> None: +@pytest.mark.parametrize("mock_data_path", [False, True], ids=["false_case", "true_case"], indirect=True) +def test_preprocess_data_fill_missing_constraints(mock_data_path: str, mock_data_frame: pd.DataFrame) -> None: """ Test that the preprocessed data fills missing Constraints with empty numpy array correctly. """ @@ -283,113 +262,88 @@ def test_preprocess_data_fill_missing_constraints(mock_data_frame: pd.DataFrame) include_cpu_only_jobs=True, include_failed_cancelled_jobs=True, ) - ground_truth = _helper_filter_irrelevant_records(mock_data_frame, 100, include_cpu_only_jobs=True) + ground_truth = preprocess_mock_data( + mock_data_path, + min_elapsed_seconds=100, + include_cpu_only_jobs=True, + include_failed_cancelled_jobs=True, + ) expect_constraints_null = len(ground_truth[(ground_truth["Constraints"].isna())]) assert sum(len(x) == 0 for x in data["Constraints"]) == expect_constraints_null -@pytest.mark.parametrize("mock_data_frame", [False, True], ids=["false_case", "true_case"], indirect=True) -def test_category_interactive(mock_data_frame: pd.DataFrame) -> None: +@pytest.mark.parametrize("mock_data_path", [False, True], ids=["false_case", "true_case"], indirect=True) +def test_category_interactive(mock_data_path: str, mock_data_frame: pd.DataFrame) -> None: """ Test that the preprocessed data has 'Interactive' as a categorical variable and check values contained within it. """ data = preprocess_data(input_df=mock_data_frame, min_elapsed_seconds=600) - ground_truth = _helper_filter_irrelevant_records(mock_data_frame, 600) - ground_truth_filtered = ground_truth[ - (ground_truth["GPUType"].notna()) - & (ground_truth["GPUs"].notna()) - & (ground_truth["Status"] != StatusEnum.FAILED.value) - & (ground_truth["Status"] != StatusEnum.CANCELLED.value) - ] - expected = set(ground_truth_filtered["Interactive"].dropna().to_numpy()) | set([e.value for e in InteractiveEnum]) + ground_truth = preprocess_mock_data(mock_data_path, min_elapsed_seconds=600) + expected = set(ground_truth["Interactive"].dropna().to_numpy()) | set([e.value for e in InteractiveEnum]) assert data["Interactive"].dtype == "category" assert expected.issubset(set(data["Interactive"].cat.categories)) -@pytest.mark.parametrize("mock_data_frame", [False, True], ids=["false_case", "true_case"], indirect=True) -def test_category_qos(mock_data_frame: pd.DataFrame) -> None: +@pytest.mark.parametrize("mock_data_path", [False, True], ids=["false_case", "true_case"], indirect=True) +def test_category_qos(mock_data_path: str, mock_data_frame: pd.DataFrame) -> None: """ Test that the preprocessed data has 'QOS' as a categorical variable and check values contained within it. """ data = preprocess_data(input_df=mock_data_frame, min_elapsed_seconds=600) - ground_truth = _helper_filter_irrelevant_records(mock_data_frame, 600) - ground_truth_filtered = ground_truth[ - (ground_truth["GPUType"].notna()) - & (ground_truth["GPUs"].notna()) - & (ground_truth["Status"] != StatusEnum.FAILED.value) - & (ground_truth["Status"] != StatusEnum.CANCELLED.value) - ] - expected = set(ground_truth_filtered["QOS"].dropna().to_numpy()) | set([e.value for e in QOSEnum]) + ground_truth = preprocess_mock_data(mock_data_path, min_elapsed_seconds=600) + expected = set(ground_truth["QOS"].dropna().to_numpy()) | set([e.value for e in QOSEnum]) assert data["QOS"].dtype == "category" assert expected.issubset(set(data["QOS"].cat.categories)) -@pytest.mark.parametrize("mock_data_frame", [False, True], ids=["false_case", "true_case"], indirect=True) -def test_category_exit_code(mock_data_frame: pd.DataFrame) -> None: +@pytest.mark.parametrize("mock_data_path", [False, True], ids=["false_case", "true_case"], indirect=True) +def test_category_exit_code(mock_data_path: str, mock_data_frame: pd.DataFrame) -> None: """ Test that the preprocessed data has 'ExitCode' as a categorical variable and check values contained within it. """ data = preprocess_data(input_df=mock_data_frame, min_elapsed_seconds=600) - ground_truth = _helper_filter_irrelevant_records(mock_data_frame, 600) - ground_truth_filtered = ground_truth[ - (ground_truth["GPUType"].notna()) - & (ground_truth["GPUs"].notna()) - & (ground_truth["Status"] != StatusEnum.FAILED.value) - & (ground_truth["Status"] != StatusEnum.CANCELLED.value) - ] - expected = set(ground_truth_filtered["ExitCode"].dropna().to_numpy()) | set([e.value for e in ExitCodeEnum]) + ground_truth = preprocess_mock_data(mock_data_path, min_elapsed_seconds=600) + expected = set(ground_truth["ExitCode"].dropna().to_numpy()) | set([e.value for e in ExitCodeEnum]) assert data["ExitCode"].dtype == "category" assert expected.issubset(set(data["ExitCode"].cat.categories)) -@pytest.mark.parametrize("mock_data_frame", [False, True], ids=["false_case", "true_case"], indirect=True) -def test_category_partition(mock_data_frame: pd.DataFrame) -> None: +@pytest.mark.parametrize("mock_data_path", [False, True], ids=["false_case", "true_case"], indirect=True) +def test_category_partition(mock_data_path: str, mock_data_frame: pd.DataFrame) -> None: """ Test that the preprocessed data has 'Partition' as a categorical variable and check values contained within it. """ data = preprocess_data(input_df=mock_data_frame, min_elapsed_seconds=600) - ground_truth = _helper_filter_irrelevant_records(mock_data_frame, 600) - ground_truth_filtered = ground_truth[ - (ground_truth["GPUType"].notna()) - & (ground_truth["GPUs"].notna()) - & (ground_truth["Status"] != StatusEnum.FAILED.value) - & (ground_truth["Status"] != StatusEnum.CANCELLED.value) - ] - expected = set(ground_truth_filtered["Partition"].dropna().to_numpy()) | set([e.value for e in AdminPartitionEnum]) + ground_truth = preprocess_mock_data(mock_data_path, min_elapsed_seconds=600) + expected = set(ground_truth["Partition"].dropna().to_numpy()) | set([e.value for e in AdminPartitionEnum]) assert data["Partition"].dtype == "category" assert expected.issubset(set(data["Partition"].cat.categories)) -@pytest.mark.parametrize("mock_data_frame", [False, True], ids=["false_case", "true_case"], indirect=True) -def test_category_account(mock_data_frame: pd.DataFrame) -> None: +@pytest.mark.parametrize("mock_data_path", [False, True], ids=["false_case", "true_case"], indirect=True) +def test_category_account(mock_data_path: str, mock_data_frame: pd.DataFrame) -> None: """ Test that the preprocessed data has 'Account' as a categorical variable and check values contained within it. """ data = preprocess_data(input_df=mock_data_frame, min_elapsed_seconds=600) - ground_truth = _helper_filter_irrelevant_records(mock_data_frame, 600) - ground_truth_filtered = ground_truth[ - (ground_truth["GPUType"].notna()) - & (ground_truth["GPUs"].notna()) - & (ground_truth["Status"] != StatusEnum.FAILED.value) - & (ground_truth["Status"] != StatusEnum.CANCELLED.value) - ] - expected = set(ground_truth_filtered["Account"].dropna().to_numpy()) | set([e.value for e in AdminsAccountEnum]) + ground_truth = preprocess_mock_data(mock_data_path, min_elapsed_seconds=600) + expected = set(ground_truth["Account"].dropna().to_numpy()) | set([e.value for e in AdminsAccountEnum]) assert data["Account"].dtype == "category" assert expected.issubset(set(data["Account"].cat.categories)) -@pytest.mark.parametrize("mock_data_frame", [False, True], ids=["false_case", "true_case"], indirect=True) -def test_preprocess_timedelta_conversion(mock_data_frame: pd.DataFrame) -> None: +@pytest.mark.parametrize("mock_data_path", [False, True], ids=["false_case", "true_case"], indirect=True) +def test_preprocess_timedelta_conversion(mock_data_path: str, mock_data_frame: pd.DataFrame) -> None: """ Test that the preprocessed data converts elapsed time to timedelta. """ @@ -399,16 +353,19 @@ def test_preprocess_timedelta_conversion(mock_data_frame: pd.DataFrame) -> None: include_cpu_only_jobs=True, include_failed_cancelled_jobs=True, ) - ground_truth = _helper_filter_irrelevant_records(mock_data_frame, 600, include_cpu_only_jobs=True) - max_len = len(ground_truth) + ground_truth = preprocess_mock_data( + mock_data_path, min_elapsed_seconds=600, include_cpu_only_jobs=True, include_failed_cancelled_jobs=True + ) time_limit = data["TimeLimit"] + assert time_limit.dtype == "timedelta64[ns]" # assert correct type - assert time_limit.dtype == "timedelta64[ns]" - assert time_limit[0].total_seconds() / 60 == ground_truth["TimeLimit"][0] - assert time_limit[max_len - 1].total_seconds() / 60 == ground_truth["TimeLimit"][max_len - 1] + time_limit_list = time_limit.tolist() + ground_truth_time_limit = ground_truth["TimeLimit"].tolist() + for i, timedelta in enumerate(time_limit_list): + assert timedelta.total_seconds() / 60 == ground_truth_time_limit[i] -@pytest.mark.parametrize("mock_data_frame", [False, True], ids=["false_case", "true_case"], indirect=True) +@pytest.mark.parametrize("mock_data_path", [False, True], ids=["false_case", "true_case"], indirect=True) def test_preprocess_gpu_type(mock_data_frame: pd.DataFrame) -> None: """ Test that the GPUType column is correctly filled and transformed during preprocessing. @@ -456,7 +413,7 @@ def test_get_requested_vram_cases() -> None: assert pd.isna(_get_requested_vram(pd.NA, pd.NA)) -@pytest.mark.parametrize("mock_data_frame", [False, True], ids=["false_case", "true_case"], indirect=True) +@pytest.mark.parametrize("mock_data_path", [False, True], ids=["false_case", "true_case"], indirect=True) def test_partition_constraint_and_requested_vram_on_mock_data(mock_data_frame: pd.DataFrame) -> None: """ Test that the partition_constraint and requested_vram columns are correctly computed in the preprocessed data. @@ -487,3 +444,90 @@ def test_partition_constraint_and_requested_vram_on_mock_data(mock_data_frame: p assert pd.isna(actual) else: assert actual == expected + + +@pytest.mark.parametrize("mock_data_path", [False, True], ids=["false_case", "true_case"], indirect=True) +@pytest.mark.parametrize("missing_col", [col.value for col in RequiredColumnsEnum]) +def test_preprocess_missing_required_columns(mock_data_frame: pd.DataFrame, missing_col: str) -> None: + """ + Test handling the dataframe when missing one of the ENFORCE_COLUMNS in constants.py. + + Expect to raise KeyError for any of these columns if they are missing in the dataframe. + """ + cur_df = mock_data_frame.drop(missing_col, axis=1, inplace=False) + with pytest.raises(KeyError, match=f"Column {missing_col} does not exist in dataframe."): + _res = preprocess_data(cur_df) + + +@pytest.mark.parametrize("mock_data_path", [False, True], ids=["false_case", "true_case"], indirect=True) +@pytest.mark.parametrize("missing_col", [col.value for col in OptionalColumnsEnum]) +def test_preprocess_missing_optional_columns( + mock_data_frame: pd.DataFrame, missing_col: str, recwarn: pytest.WarningsRecorder +) -> None: + """ + Test handling the dataframe when missing one of the columns. + + These columns are not in ENFORCE_COLUMNS so only warnings are expected to be raised. + """ + cur_df = mock_data_frame.drop(missing_col, axis=1, inplace=False) + + expect_warning_msg = ( + f"Column '{missing_col}' is missing from the dataframe. " + "This may impact filtering operations and downstream processing." + ) + _res = preprocess_data(cur_df) + + # Check that a warning was raised with the expected message + assert len(recwarn) == 1 + assert str(recwarn[0].message) == expect_warning_msg + + +@pytest.mark.parametrize("mock_data_path", [False, True], ids=["false_case", "true_case"], indirect=True) +def test_preprocess_empty_dataframe_warning(mock_data_frame: pd.DataFrame, recwarn: pytest.WarningsRecorder) -> None: + """ + Test handling when preprocess_data results in an empty dataframe. + + Expect a UserWarning to be raised with the appropriate message. + Also verify that columns added and type-casted in _cast_type_and_add_columns have correct data types. + """ + # Make a copy of mock_data_frame and remove all entries to make it empty + empty_df = mock_data_frame.copy() + empty_df = empty_df.iloc[0:0] + # Should trigger the warning since the dataframe is empty + result = preprocess_data(empty_df) + + # Check that the result is still empty + assert result.empty + + # Check that a warning was raised about empty dataframe + assert len(recwarn) == 1 + assert str(recwarn[0].message) == "Dataframe results from database and filtering is empty." + + # Test that columns added in _cast_type_and_add_columns have correct types + # New columns added for empty dataframes + assert "Queued" in result.columns + assert result["Queued"].dtype == "timedelta64[ns]" + + assert "vram_constraint" in result.columns + assert result["vram_constraint"].dtype == pd.Int64Dtype() + + assert "allocated_vram" in result.columns + assert result["allocated_vram"].dtype == pd.Int64Dtype() + + # Test that time columns were converted to datetime (if they exist) + time_columns = ["StartTime", "SubmitTime"] + for col in time_columns: + if col in result.columns: + assert pd.api.types.is_datetime64_any_dtype(result[col]) + + # Test that duration columns were converted to timedelta (if they exist) + duration_columns = ["TimeLimit", "Elapsed"] + for col in duration_columns: + if col in result.columns: + assert pd.api.types.is_timedelta64_dtype(result[col]) + + # Test that categorical columns have correct dtype (if they exist) + categorical_columns = ["Interactive", "QOS", "ExitCode", "Partition", "Account", "Status"] + for col in categorical_columns: + if col in result.columns: + assert result[col].dtype == "category"