-
Notifications
You must be signed in to change notification settings - Fork 0
Feature/a 100 analysis #8
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 43 commits
703379b
ae94ad4
4baadaa
a487bb6
f6b6a33
46b2514
9fefe76
0da5e62
ace5dd5
a9892ac
b3690b7
97eca33
1b7d9f6
4979449
dd291b9
bd0a418
56efc92
0fbf2ce
afa331d
d7bbd9d
fe42dbd
3eb7559
271bbc7
6d4eaf6
be387c5
bcc49ed
bff782b
36c30dc
53d4e91
0ee8bf9
662785a
4c433a2
25bf316
891c120
4c2b94a
0d8279c
e9453de
6c22061
0440cbb
62a5be3
fafd927
15a6f8c
c7d52b4
54a9c41
dffc6dd
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Large diffs are not rendered by default.
Large diffs are not rendered by default.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,4 +1,4 @@ | ||
| from .efficiency_analysis import EfficiencyAnalysis as EfficiencyAnalysis | ||
| from .efficiency_analysis import ( | ||
| load_preprocessed_jobs_dataframe_from_duckdb as load_preprocessed_jobs_dataframe_from_duckdb, | ||
| ) | ||
| ) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,6 +19,7 @@ def load_preprocessed_jobs_dataframe_from_duckdb( | |
| table_name: str = "Jobs", | ||
| sample_size: int | None = None, | ||
| random_state: pd._typing.RandomState | None = None, | ||
| query: str | None = None, | ||
| ) -> pd.DataFrame: | ||
| """ | ||
| Load jobs DataFrame from a DuckDB database and preprocess it. | ||
|
|
@@ -28,6 +29,7 @@ def load_preprocessed_jobs_dataframe_from_duckdb( | |
| table_name (str, optional): Table name to query. Defaults to 'Jobs'. | ||
| sample_size (int, optional): Number of rows to sample from the DataFrame. Defaults to None (no sampling). | ||
| random_state (pd._typing.RandomState, optional): Random state for reproducibility. Defaults to None. | ||
| query (str, optional): Custom SQL query to fetch data. If provided, overrides the table_name. | ||
|
|
||
| Returns: | ||
| pd.DataFrame: DataFrame containing the table data. | ||
|
|
@@ -40,7 +42,7 @@ def load_preprocessed_jobs_dataframe_from_duckdb( | |
| try: | ||
| db = DatabaseConnection(str(db_path)) | ||
|
|
||
| jobs_df = db.fetch_all_jobs(table_name=table_name) | ||
| jobs_df = db.fetch_all_jobs(table_name=table_name) if query is None else db.fetch_query(query=query) | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This looks good, but ideally, we wait for Tan's PR to be merged and use those functions. If we don't have time to do that though, then we can stick to using this. |
||
| processed_data = preprocess_data( | ||
| jobs_df, min_elapsed_seconds=0, include_failed_cancelled_jobs=False, include_cpu_only_jobs=False | ||
| ) | ||
|
|
@@ -165,6 +167,23 @@ def apply_numeric_filter( | |
| raise ValueError(f"{filter_name} must be a numeric type.") | ||
| return mask | ||
|
|
||
| def get_unique_gpu_types(self) -> np.ndarray: | ||
| """ | ||
| Get unique GPU types from the jobs DataFrame. | ||
|
|
||
| Returns: | ||
| pd.Series: Unique GPU types as a pandas Series. | ||
| """ | ||
| return ( | ||
| self.jobs_df["GPUType"] | ||
| .dropna() | ||
| .explode() | ||
| .astype(str) | ||
| .str.strip() | ||
| .str.lower() | ||
| .unique() | ||
| ) | ||
|
|
||
| def filter_jobs_for_analysis( | ||
| self, | ||
| vram_constraint_filter: int | float | list | set | tuple | dict | pd.api.typing.NAType | None = None, | ||
|
|
@@ -680,7 +699,7 @@ def find_inefficient_pis_by_vram_hours( | |
| # Sort by the metric descending (higher is worse) | ||
| inefficient_pi_accounts = inefficient_pi_accounts.sort_values("pi_acc_vram_hours", ascending=False) | ||
| return inefficient_pi_accounts | ||
|
|
||
| def sort_and_filter_records_with_metrics( | ||
| self, | ||
| metrics_df_name_enum: MetricsDataFrameNameEnum, | ||
|
|
@@ -750,3 +769,135 @@ def sort_and_filter_records_with_metrics( | |
| filtered_records = filtered_records.sort_values(sorting_key, ascending=ascending) | ||
|
|
||
| return filtered_records | ||
|
|
||
| def compare_job_metrics_by_gpu_type(self) -> pd.DataFrame: | ||
| """ | ||
| Aggregate and display metrics for each GPU type for jobs matching a SQL query. | ||
|
|
||
| Args: | ||
| query (str): SQL query to select jobs. | ||
|
|
||
| Returns: | ||
| pd.DataFrame: Aggregated metrics by GPU type | ||
| """ | ||
|
|
||
| # Get unique GPU types | ||
| unique_gpu_types = self.get_unique_gpu_types() | ||
|
|
||
| metrics = [ | ||
| "Mean Used GPU Memory (GiB)", | ||
| "Median Used GPU Memory (GiB)", | ||
| "Mean Requested VRAM Efficiency", | ||
| "Median Requested VRAM Efficiency", | ||
| "Mean Allocated VRAM Efficiency", | ||
| "Median Allocated VRAM Efficiency", | ||
| "Total GPU Hours", | ||
| "Mean Weighted VRAM Efficiency", | ||
| "Median Weighted VRAM Efficiency" | ||
| ] | ||
|
|
||
| job_efficiency_metrics = self.calculate_job_efficiency_metrics(self.jobs_df) | ||
|
|
||
| results: dict[str, list] = {gpu_type.upper(): [] for gpu_type in unique_gpu_types} | ||
| for gpu_type in unique_gpu_types: | ||
| gpu_jobs = job_efficiency_metrics[ | ||
| job_efficiency_metrics['GPUType'].apply( | ||
| lambda x, gpu_type=gpu_type: isinstance(x, dict) and gpu_type in x | ||
| ) | ||
| ] | ||
|
|
||
| if gpu_jobs.empty: | ||
| results[gpu_type.upper()] = [None] * len(metrics) | ||
| continue | ||
| results[gpu_type.upper()] = [ | ||
| gpu_jobs["GPUMemUsage"].mean() / (2**30), # Mean Used GPU Memory in GiB | ||
| gpu_jobs["GPUMemUsage"].median() / (2**30), # Median Used GPU Memory in GiB | ||
| gpu_jobs["vram_constraint_efficiency"].mean(), # Mean VRAM Efficiency | ||
| gpu_jobs["vram_constraint_efficiency"].median(), # Median VRAM Efficiency | ||
| gpu_jobs["alloc_vram_efficiency"].mean(), # Mean VRAM Efficiency | ||
| gpu_jobs["alloc_vram_efficiency"].median(), # Median VRAM Efficiency | ||
|
|
||
| gpu_jobs["job_hours"].sum(), # Total GPU Hours | ||
| # Mean Weighted VRAM Efficiency | ||
| (gpu_jobs["alloc_vram_efficiency"] * gpu_jobs["job_hours"]).sum() / gpu_jobs["job_hours"].sum(), | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we should probably use VRAM hours here since the other weighted job metrics use that. Unless there's a reason why job_hours would work better here |
||
| # Median Weighted VRAM Efficiency | ||
| (gpu_jobs["alloc_vram_efficiency"] * gpu_jobs["job_hours"]).median() / gpu_jobs["job_hours"].median() | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same here, should we use vram_hours instead? |
||
|
|
||
|
|
||
| ] | ||
|
|
||
| # Create summary DataFrame | ||
| summary_df = pd.DataFrame(results, index=metrics) | ||
| return summary_df | ||
|
|
||
| def compare_gpu_utilization_patterns(self) -> pd.DataFrame: | ||
| """ | ||
| Compare GPU utilization patterns across different GPU types. | ||
|
|
||
| Returns: | ||
| pd.DataFrame: DataFrame with GPU utilization patterns by GPU type. | ||
| """ | ||
| job_metrics_by_gpu_type = self.compare_job_metrics_by_gpu_type() | ||
|
|
||
| # Create a DataFrame to hold the GPU utilization patterns | ||
| gpu_utilization_patterns = pd.DataFrame({ | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this just transposing the other df? It looks like we're creating rows out of the column values? Correct me if I'm wrong but could transposing the df do the same thing and be simplified? Try it and see |
||
| "GPU Type": job_metrics_by_gpu_type.columns, | ||
| "Mean Used GPU Memory (GiB)": job_metrics_by_gpu_type.loc["Mean Used GPU Memory (GiB)"], | ||
| "Median Used GPU Memory (GiB)": job_metrics_by_gpu_type.loc["Median Used GPU Memory (GiB)"], | ||
| "Mean Requested VRAM Efficiency": job_metrics_by_gpu_type.loc["Mean Requested VRAM Efficiency"], | ||
| "Median Requested VRAM Efficiency": job_metrics_by_gpu_type.loc["Median Requested VRAM Efficiency"], | ||
| "Mean Allocated VRAM Efficiency": job_metrics_by_gpu_type.loc["Mean Allocated VRAM Efficiency"], | ||
| "Median Allocated VRAM Efficiency": job_metrics_by_gpu_type.loc["Median Allocated VRAM Efficiency"], | ||
| "Total GPU Hours": job_metrics_by_gpu_type.loc["Total GPU Hours"], | ||
| "Mean Weighted VRAM Efficiency": job_metrics_by_gpu_type.loc["Mean Weighted VRAM Efficiency"], | ||
| "Median Weighted VRAM Efficiency": job_metrics_by_gpu_type.loc["Median Weighted VRAM Efficiency"] | ||
| }) | ||
|
|
||
| # Sort by Total GPU Hours in descending order | ||
| gpu_utilization_patterns = gpu_utilization_patterns.sort_values(by="Total GPU Hours", ascending=False) | ||
|
|
||
| return gpu_utilization_patterns | ||
|
|
||
| def categorize_jobs_by_vram_constraint_efficiency(self) -> pd.DataFrame: | ||
| """ | ||
| Bucketize jobs based on their VRAM constraint efficiency. | ||
|
|
||
| This is what your original function was actually doing. | ||
|
|
||
| Returns: | ||
| pd.DataFrame: DataFrame with jobs categorized into efficiency buckets. | ||
| """ | ||
| if self.jobs_with_efficiency_metrics is None: | ||
| self.calculate_job_efficiency_metrics(self.jobs_df) | ||
|
|
||
| df = self.jobs_with_efficiency_metrics.copy() | ||
|
|
||
| # Create efficiency bucket | ||
| def categorize_efficiency(val: float | pd.api.typing.NAType) -> str: | ||
| if pd.isna(val): | ||
| return "NA" | ||
| if val <= 0.3: | ||
| return "0–30%" | ||
| elif val <= 0.6: | ||
| return "30–60%" | ||
| elif val <= 1.0: | ||
| return "60–100%" | ||
| else: | ||
| return ">100%" | ||
|
|
||
| df["vram_constraint_efficiency_bucket"] = df["vram_constraint_efficiency"].apply(categorize_efficiency) | ||
|
|
||
| # Count jobs in each bucket | ||
| bucket_counts = df["vram_constraint_efficiency_bucket"].value_counts(dropna=True).sort_index() | ||
|
|
||
| # Add proportion of jobs per bucket | ||
| total_jobs = len(df) | ||
| bucket_distribution = bucket_counts.to_frame(name="job_count") | ||
| bucket_distribution["percentage"] = (bucket_distribution["job_count"] / total_jobs * 100).round(2) | ||
|
|
||
| # Update the jobs DataFrame with bucket information | ||
| self.jobs_with_efficiency_metrics = df | ||
|
|
||
| return bucket_distribution | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this file, could we remove the older Efficiency Analysis cells (at least the plots that are irrelevant to A100s). If we aren't doing anything with users, we don't need to call the user metric functions and show the output here. I had to scroll quite a bit to find the actual A100 plots. It's better if we can simplify this notebook since we will also have another notebook later that contains the Efficiency analysis + time plots + ROC + A100 analysis for certain groups, so this notebook should only focus on A100s for easy reference to those functions