Skip to content

Commit d5eeb0d

Browse files
committed
Add anonymization option to preprocessing
1 parent 0ead8e8 commit d5eeb0d

File tree

6 files changed

+48
-11
lines changed

6 files changed

+48
-11
lines changed

notebooks/analysis/Requested and Used VRAM.ipynb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,7 @@
132132
"preprocessed_jobs_df = ea.load_preprocessed_jobs_dataframe_from_duckdb(\n",
133133
" db_path=Path(project_root) / \"data/slurm_data.db\",\n",
134134
" table_name=\"Jobs\",\n",
135+
" anonymize=True,\n",
135136
")\n",
136137
"display(preprocessed_jobs_df.head(10))\n",
137138
"print(preprocessed_jobs_df.shape)"

src/analysis/efficiency_analysis.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ def load_preprocessed_jobs_dataframe_from_duckdb(
2323
table_name: str = "Jobs",
2424
sample_size: int | None = None,
2525
random_state: pd._typing.RandomState | None = None,
26+
anonymize: bool = False,
2627
) -> pd.DataFrame:
2728
"""
2829
Load jobs DataFrame from a DuckDB database and preprocess it.
@@ -32,6 +33,7 @@ def load_preprocessed_jobs_dataframe_from_duckdb(
3233
table_name (str, optional): Table name to query. Defaults to 'Jobs'.
3334
sample_size (int, optional): Number of rows to sample from the DataFrame. Defaults to None (no sampling).
3435
random_state (pd._typing.RandomState, optional): Random state for reproducibility. Defaults to None.
36+
anonymize (bool, optional): Whether to anonymize the DataFrame. Defaults to False.
3537
3638
Returns:
3739
pd.DataFrame: DataFrame containing the table data.
@@ -46,7 +48,11 @@ def load_preprocessed_jobs_dataframe_from_duckdb(
4648

4749
jobs_df = db.fetch_all_jobs(table_name=table_name)
4850
processed_data = preprocess_data(
49-
jobs_df, min_elapsed_seconds=0, include_failed_cancelled_jobs=False, include_cpu_only_jobs=False
51+
jobs_df,
52+
min_elapsed_seconds=0,
53+
include_failed_cancelled_jobs=False,
54+
include_cpu_only_jobs=False,
55+
anonymize=anonymize,
5056
)
5157
if sample_size is not None:
5258
processed_data = processed_data.sample(n=sample_size, random_state=random_state)

src/config/snapshots/partition_info.json

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,17 @@
11
[
2+
{
3+
"name": "arm",
4+
"type": "cpu",
5+
"node_count": 9,
6+
"maxtime": "14 days",
7+
"deftime": "1 hour",
8+
"max_ram": 470,
9+
"max_cpus": 144
10+
},
211
{
312
"name": "arm-gpu",
413
"type": "gpu",
5-
"node_count": 3,
14+
"node_count": 4,
615
"maxtime": "14 days",
716
"deftime": "1 hour",
817
"max_ram": 560,
@@ -83,7 +92,7 @@
8392
{
8493
"name": "cpu",
8594
"type": "cpu",
86-
"node_count": 147,
95+
"node_count": 153,
8796
"maxtime": "2 days",
8897
"deftime": "1 hour",
8998
"max_ram": 1510,
@@ -92,7 +101,7 @@
92101
{
93102
"name": "cpu-preempt",
94103
"type": "cpu",
95-
"node_count": 138,
104+
"node_count": 144,
96105
"maxtime": "2 days",
97106
"deftime": "1 hour",
98107
"max_ram": 1510,
@@ -155,7 +164,7 @@
155164
{
156165
"name": "gpupod-l40s",
157166
"type": "gpu",
158-
"node_count": 10,
167+
"node_count": 13,
159168
"maxtime": "14 days",
160169
"deftime": "1 hour",
161170
"max_ram": 500,
@@ -164,11 +173,11 @@
164173
{
165174
"name": "ials-gpu",
166175
"type": "gpu",
167-
"node_count": 31,
176+
"node_count": 28,
168177
"maxtime": "14 days",
169178
"deftime": "1 hour",
170-
"max_ram": 500,
171-
"max_cpus": 32
179+
"max_ram": 180,
180+
"max_cpus": 24
172181
},
173182
{
174183
"name": "jdelhommelle",
@@ -272,7 +281,7 @@
272281
{
273282
"name": "uri-cpu",
274283
"type": "cpu",
275-
"node_count": 43,
284+
"node_count": 49,
276285
"maxtime": "30 days",
277286
"deftime": "1 hour",
278287
"max_ram": 1000,

src/preprocess/preprocess.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,11 +180,25 @@ def _write_preprocessing_error_logs(preprocessing_error_logs: list[dict]) -> Non
180180
f.writelines(summary_lines)
181181

182182

183+
def anonymize_str_column(column: pd.Series, prefix: str) -> pd.Series:
184+
"""Anonymize a DataFrame column by replacing its values with a unique identifier.
185+
186+
Args:
187+
column (pd.Series): The column to anonymize.
188+
prefix (str): The prefix to add to the anonymized values.
189+
190+
Returns:
191+
pd.Series: The anonymized column.
192+
"""
193+
return prefix + column.rank(method="dense").astype(int).astype(str).str.zfill(2)
194+
195+
183196
def preprocess_data(
184197
input_df: pd.DataFrame,
185198
min_elapsed_seconds: int = DEFAULT_MIN_ELAPSED_SECONDS,
186199
include_failed_cancelled_jobs: bool = False,
187200
include_cpu_only_jobs: bool = False,
201+
anonymize: bool = False,
188202
) -> pd.DataFrame:
189203
"""
190204
Preprocess dataframe, filtering out unwanted rows and columns, filling missing values and converting types.
@@ -196,6 +210,7 @@ def preprocess_data(
196210
min_elapsed_seconds (int, optional): Minimum elapsed time in seconds to keep a job record. Defaults to 600.
197211
include_failed_cancelled_jobs (bool, optional): Whether to include jobs with status FAILED or CANCELLED.
198212
include_cpu_only_jobs (bool, optional): Whether to include jobs that do not use GPUs (CPU-only jobs).
213+
anonymize (bool, optional): Whether to anonymize user and account information.
199214
200215
Returns:
201216
pd.DataFrame: The preprocessed dataframe
@@ -288,9 +303,15 @@ def preprocess_data(
288303
if error_indices:
289304
data = data.drop(index=list(error_indices)).reset_index(drop=True)
290305

306+
# TODO (Tan): remove these two columns as they are calculated during analysis
291307
data.loc[:, "user_jobs"] = data.groupby("User")["User"].transform("size")
292308
data.loc[:, "account_jobs"] = data.groupby("Account")["Account"].transform("size")
293309

310+
# Anonymize user and account information
311+
if anonymize:
312+
data.loc[:, "User"] = anonymize_str_column(data["User"], "user_")
313+
data.loc[:, "Account"] = anonymize_str_column(data["Account"], "account_")
314+
294315
# Convert columns to categorical
295316
for col, enum_obj in ATTRIBUTE_CATEGORIES.items():
296317
enum_values = [e.value for e in enum_obj]

src/visualization/efficiency_metrics.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -322,7 +322,7 @@ def visualize_metric_distribution(self, output_dir_path: Path | None = None, **k
322322
)
323323
column = validated_kwargs.column
324324
figsize = validated_kwargs.figsize
325-
output_dir_path = self.validate_output_dir(output_dir_path)
325+
output_dir_path = self.validate_output_dir(output_dir_path)
326326

327327
# Distribution of Avg Requested VRAM Efficiency Score (actual values; all are <= 0)
328328
# We keep scores as-is (negative or zero) and construct bins that respect the skew while

src/visualization/visualization.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ def validate_dataframe(self) -> pd.DataFrame:
4646
if self.df.columns.empty:
4747
raise ValueError("DataFrame has no columns.")
4848
return self.df
49-
49+
5050
@staticmethod
5151
def anonymize_str_column(column: pd.Series, prefix: str) -> pd.Series:
5252
"""Anonymize a DataFrame column by replacing its values with a unique identifier.

0 commit comments

Comments
 (0)