Skip to content

Commit 6634717

Browse files
Adding s3 support for osft e2e test
1 parent cf25842 commit 6634717

File tree

2 files changed

+265
-39
lines changed

2 files changed

+265
-39
lines changed

tests/trainer/resources/osft.ipynb

Lines changed: 246 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -43,61 +43,255 @@
4343
"metadata": {},
4444
"outputs": [],
4545
"source": [
46-
"import json\n",
47-
"import random\n",
46+
"import os\n",
47+
"import gzip\n",
48+
"import shutil\n",
49+
"import socket\n",
50+
"import time\n",
51+
"\n",
52+
"import boto3\n",
53+
"from botocore.config import Config as BotoConfig\n",
54+
"from botocore.exceptions import ClientError\n",
55+
"\n",
56+
"# --- Global networking safety net: cap all socket operations ---\n",
57+
"socket.setdefaulttimeout(10) # seconds\n",
58+
"\n",
59+
"# Notebook's PVC mount path (per Notebook CR). Training pods will mount the same PVC at /opt/app-root/src\n",
60+
"PVC_NOTEBOOK_PATH = \"/opt/app-root/src\"\n",
61+
"DATASET_ROOT_NOTEBOOK = PVC_NOTEBOOK_PATH\n",
62+
"TABLE_GPT_DIR = os.path.join(DATASET_ROOT_NOTEBOOK, \"table-gpt-data\", \"train\")\n",
63+
"MODEL_DIR = os.path.join(DATASET_ROOT_NOTEBOOK, \"Qwen\", \"Qwen2.5-1.5B-Instruct\")\n",
64+
"os.makedirs(TABLE_GPT_DIR, exist_ok=True)\n",
65+
"os.makedirs(MODEL_DIR, exist_ok=True)\n",
66+
"\n",
67+
"# Env config for S3/MinIO\n",
68+
"s3_endpoint = os.getenv(\"AWS_DEFAULT_ENDPOINT\", \"\")\n",
69+
"s3_access_key = os.getenv(\"AWS_ACCESS_KEY_ID\", \"\")\n",
70+
"s3_secret_key = os.getenv(\"AWS_SECRET_ACCESS_KEY\", \"\")\n",
71+
"s3_bucket = os.getenv(\"AWS_STORAGE_BUCKET\", \"\")\n",
72+
"s3_prefix = os.getenv(\"AWS_STORAGE_BUCKET_DATA_DIR\", \"\") # e.g. \"osft-data\"\n",
73+
"\n",
74+
"def stream_download(s3, bucket, key, dst):\n",
75+
" \"\"\"\n",
76+
" Download an object from S3/MinIO using get_object and streaming reads.\n",
77+
" Returns True on success, False on any error.\n",
78+
" \"\"\"\n",
79+
" print(f\"[notebook] STREAM download s3://{bucket}/{key} -> {dst}\")\n",
80+
" t0 = time.time()\n",
81+
"\n",
82+
" try:\n",
83+
" resp = s3.get_object(Bucket=bucket, Key=key)\n",
84+
" except ClientError as e:\n",
85+
" err = e.response.get(\"Error\", {})\n",
86+
" print(f\"[notebook] CLIENT ERROR (get_object) for {key}: {err}\")\n",
87+
" return False\n",
88+
" except Exception as e:\n",
89+
" print(f\"[notebook] OTHER ERROR (get_object) for {key}: {e}\")\n",
90+
" return False\n",
91+
"\n",
92+
" body = resp[\"Body\"]\n",
93+
" try:\n",
94+
" with open(dst, \"wb\") as f:\n",
95+
" while True:\n",
96+
" try:\n",
97+
" chunk = body.read(1024 * 1024) # 1MB per chunk\n",
98+
" except socket.timeout as e:\n",
99+
" print(f\"[notebook] socket.timeout while reading {key}: {e}\")\n",
100+
" return False\n",
101+
" if not chunk:\n",
102+
" break\n",
103+
" f.write(chunk)\n",
104+
" except Exception as e:\n",
105+
" print(f\"[notebook] ERROR writing to {dst} for {key}: {e}\")\n",
106+
" return False\n",
107+
"\n",
108+
" t1 = time.time()\n",
109+
" print(f\"[notebook] DONE stream {key} in {t1 - t0:.2f}s\")\n",
110+
" return True\n",
111+
"\n",
112+
"\n",
113+
"if s3_endpoint and s3_bucket:\n",
114+
" try:\n",
115+
" # Normalize endpoint URL\n",
116+
" endpoint_url = (\n",
117+
" s3_endpoint\n",
118+
" if s3_endpoint.startswith(\"http\")\n",
119+
" else f\"https://{s3_endpoint}\"\n",
120+
" )\n",
121+
" prefix = (s3_prefix or \"\").strip(\"/\")\n",
122+
"\n",
123+
" print(\n",
124+
" f\"S3 configured (boto3, notebook): \"\n",
125+
" f\"endpoint={endpoint_url}, bucket={s3_bucket}, prefix={prefix or '<root>'}\"\n",
126+
" )\n",
127+
"\n",
128+
" # Boto config: single attempt, reasonable connect/read timeouts\n",
129+
" boto_cfg = BotoConfig(\n",
130+
" signature_version=\"s3v4\",\n",
131+
" s3={\"addressing_style\": \"path\"},\n",
132+
" retries={\"max_attempts\": 1, \"mode\": \"standard\"},\n",
133+
" connect_timeout=5,\n",
134+
" read_timeout=10,\n",
135+
" )\n",
48136
"\n",
49-
"from datasets import load_dataset\n",
137+
" # Create S3/MinIO client\n",
138+
" s3 = boto3.client(\n",
139+
" \"s3\",\n",
140+
" endpoint_url=endpoint_url,\n",
141+
" aws_access_key_id=s3_access_key,\n",
142+
" aws_secret_access_key=s3_secret_key,\n",
143+
" config=boto_cfg,\n",
144+
" verify=False,\n",
145+
" )\n",
146+
"\n",
147+
" # List and download all objects under the prefix\n",
148+
" paginator = s3.get_paginator(\"list_objects_v2\")\n",
149+
" pulled_any = False\n",
150+
" file_count = 0\n",
151+
"\n",
152+
" print(f\"[notebook] Starting S3 download from prefix: {prefix}\")\n",
153+
" for page in paginator.paginate(Bucket=s3_bucket, Prefix=prefix or \"\"):\n",
154+
" contents = page.get(\"Contents\", [])\n",
155+
" if not contents:\n",
156+
" print(f\"[notebook] No contents found in this page\")\n",
157+
" continue\n",
158+
" \n",
159+
" print(f\"[notebook] Found {len(contents)} objects in this page\")\n",
160+
"\n",
161+
" for obj in contents:\n",
162+
" key = obj[\"Key\"]\n",
163+
" file_count += 1\n",
164+
"\n",
165+
" # Skip \"directory markers\"\n",
166+
" if key.endswith(\"/\"):\n",
167+
" print(f\"[notebook] Skipping directory marker: {key}\")\n",
168+
" continue\n",
169+
"\n",
170+
" # Determine relative path under prefix for local storage\n",
171+
" rel = key[len(prefix):].lstrip(\"/\") if prefix else key\n",
172+
" print(f\"[notebook] Processing key={key}, rel={rel}\")\n",
173+
" \n",
174+
" # Route to appropriate directory based on content type\n",
175+
" if \"table-gpt\" in rel.lower() or rel.endswith(\".jsonl\"):\n",
176+
" dst = os.path.join(TABLE_GPT_DIR, os.path.basename(rel))\n",
177+
" print(f\"[notebook] Routing to dataset dir: {dst}\")\n",
178+
" elif \"qwen\" in rel.lower() or any(rel.endswith(ext) for ext in [\".bin\", \".json\", \".model\", \".safetensors\", \".txt\"]):\n",
179+
" # Preserve directory structure for model files\n",
180+
" dst = os.path.join(MODEL_DIR, rel.split(\"Qwen2.5-1.5B-Instruct/\")[-1] if \"Qwen2.5-1.5B-Instruct\" in rel else os.path.basename(rel))\n",
181+
" print(f\"[notebook] Routing to model dir: {dst}\")\n",
182+
" else:\n",
183+
" # Default: use the relative path as-is\n",
184+
" dst = os.path.join(DATASET_ROOT_NOTEBOOK, rel)\n",
185+
" print(f\"[notebook] Routing to default dir: {dst}\")\n",
186+
" \n",
187+
" os.makedirs(os.path.dirname(dst), exist_ok=True)\n",
188+
"\n",
189+
" # Download only if missing\n",
190+
" if not os.path.exists(dst):\n",
191+
" ok = stream_download(s3, s3_bucket, key, dst)\n",
192+
" if not ok:\n",
193+
" print(f\"[notebook] Download failed for {key}\")\n",
194+
" continue\n",
195+
" pulled_any = True\n",
196+
" else:\n",
197+
" print(f\"[notebook] Skipping existing file {dst}\")\n",
198+
" pulled_any = True\n",
199+
"\n",
200+
" # If the file is .gz, decompress and remove the .gz\n",
201+
" if dst.endswith(\".gz\") and os.path.exists(dst):\n",
202+
" out_path = os.path.splitext(dst)[0]\n",
203+
" if not os.path.exists(out_path):\n",
204+
" print(f\"[notebook] Decompressing {dst} -> {out_path}\")\n",
205+
" try:\n",
206+
" with gzip.open(dst, \"rb\") as f_in, open(out_path, \"wb\") as f_out:\n",
207+
" shutil.copyfileobj(f_in, f_out)\n",
208+
" except Exception as e:\n",
209+
" print(f\"[notebook] Failed to decompress {dst}: {e}\")\n",
210+
" else:\n",
211+
" try:\n",
212+
" os.remove(dst)\n",
213+
" except Exception:\n",
214+
" pass\n",
215+
"\n",
216+
" print(f\"[notebook] S3 download complete. Processed {file_count} files, pulled_any={pulled_any}\")\n",
50217
"\n",
51-
"# Load the Table-GPT dataset\n",
52-
"print(\"Loading Table-GPT dataset...\")\n",
53-
"dataset = load_dataset(\"LipengCS/Table-GPT\", \"All\")\n",
218+
" except Exception as e:\n",
219+
" print(f\"[notebook] S3 fetch failed: {e}\")\n",
220+
" import traceback\n",
221+
" traceback.print_exc()\n",
222+
"else:\n",
223+
" print(\"[notebook] S3 not configured: missing endpoint or bucket env vars\")\n",
224+
" # Fallback to HuggingFace if S3 is not configured\n",
225+
" print(\"[notebook] Falling back to HuggingFace dataset download...\")\n",
226+
" import json\n",
227+
" import random\n",
228+
" from datasets import load_dataset\n",
54229
"\n",
55-
"# Get the training split and create a random subset of 100 samples\n",
56-
"train_data = dataset[\"train\"]\n",
57-
"print(f\"Original training set size: {len(train_data)}\")\n",
230+
" # Load the Table-GPT dataset\n",
231+
" print(\"Loading Table-GPT dataset...\")\n",
232+
" dataset = load_dataset(\"LipengCS/Table-GPT\", \"All\")\n",
58233
"\n",
59-
"# Create a random subset of 100 samples\n",
60-
"random.seed(42) # For reproducibility\n",
61-
"subset_indices = random.sample(range(len(train_data)), min(100, len(train_data)))\n",
62-
"subset_data = train_data.select(subset_indices)\n",
234+
" # Get the training split and create a random subset of 100 samples\n",
235+
" train_data = dataset[\"train\"]\n",
236+
" print(f\"Original training set size: {len(train_data)}\")\n",
63237
"\n",
64-
"print(f\"Subset size: {len(subset_data)}\")\n",
238+
" # Create a random subset of 100 samples\n",
239+
" random.seed(42) # For reproducibility\n",
240+
" subset_indices = random.sample(range(len(train_data)), min(100, len(train_data)))\n",
241+
" subset_data = train_data.select(subset_indices)\n",
65242
"\n",
66-
"# Save the subset to a JSONL file\n",
67-
"# Save the subset to a JSONL file - USE ABSOLUTE PATH\n",
68-
"output_dir = \"table-gpt-data/train\"\n",
69-
"output_file = f\"{output_dir}/train_All_100.jsonl\"\n",
243+
" print(f\"Subset size: {len(subset_data)}\")\n",
70244
"\n",
71-
"print(f\"Creating directory: {output_dir}\")\n",
72-
"os.makedirs(output_dir, exist_ok=True)\n",
245+
" # Save the subset to a JSONL file\n",
246+
" output_file = os.path.join(TABLE_GPT_DIR, \"train_All_100.jsonl\")\n",
247+
" with open(output_file, \"w\") as f:\n",
248+
" for example in subset_data:\n",
249+
" f.write(json.dumps(example) + \"\\n\")\n",
73250
"\n",
74-
"with open(output_file, \"w\") as f:\n",
75-
" for example in subset_data:\n",
76-
" f.write(json.dumps(example) + \"\\n\")\n",
251+
" print(f\"Subset saved to {output_file}\")\n",
77252
"\n",
78-
"print(f\"Subset saved to {output_file}\")"
253+
"# Verify dataset file exists\n",
254+
"dataset_file = os.path.join(TABLE_GPT_DIR, \"train_All_100.jsonl\")\n",
255+
"if os.path.exists(dataset_file):\n",
256+
" print(f\"[notebook] Dataset ready: {dataset_file}\")\n",
257+
"else:\n",
258+
" raise RuntimeError(f\"Dataset file not found: {dataset_file}\")\n",
259+
"\n",
260+
"# Verify model directory has files\n",
261+
"if os.path.exists(MODEL_DIR) and os.listdir(MODEL_DIR):\n",
262+
" print(f\"[notebook] Model files ready in: {MODEL_DIR}\")\n",
263+
" print(f\"[notebook] Model files: {os.listdir(MODEL_DIR)[:5]}...\") # Show first 5 files\n",
264+
"else:\n",
265+
" print(f\"[notebook] Warning: Model directory is empty or missing: {MODEL_DIR}\")\n",
266+
" print(\"[notebook] Training will attempt to download from HuggingFace during execution\")"
79267
]
80268
},
81269
{
82270
"cell_type": "code",
83271
"execution_count": null,
84272
"metadata": {},
85-
"outputs": [
86-
{
87-
"name": "stdout",
88-
"output_type": "stream",
89-
"text": [
90-
"⚙️ Training Hyperparameters\n",
91-
"==================================================\n"
92-
]
93-
}
94-
],
273+
"outputs": [],
95274
"source": [
275+
"# Determine model path based on whether S3 download succeeded\n",
276+
"import os\n",
277+
"LOCAL_MODEL_PATH = \"/opt/app-root/src/Qwen/Qwen2.5-1.5B-Instruct\"\n",
278+
"HUGGINGFACE_MODEL_ID = \"Qwen/Qwen2.5-1.5B-Instruct\"\n",
279+
"\n",
280+
"# Check if model was downloaded from S3\n",
281+
"model_downloaded = os.path.exists(LOCAL_MODEL_PATH) and len(os.listdir(LOCAL_MODEL_PATH)) > 0\n",
282+
"\n",
283+
"if model_downloaded:\n",
284+
" model_path_to_use = LOCAL_MODEL_PATH\n",
285+
" print(f\"✓ Using local model from S3: {model_path_to_use}\")\n",
286+
"else:\n",
287+
" model_path_to_use = HUGGINGFACE_MODEL_ID \n",
288+
" print(f\"✓ Using HuggingFace model ID: {model_path_to_use}\")\n",
289+
"\n",
96290
"params = {\n",
97291
" ###########################################################################\n",
98292
" # 🤖 Model + Data Paths #\n",
99293
" ###########################################################################\n",
100-
" \"model_path\": \"Qwen/Qwen2.5-1.5B-Instruct\",\n",
294+
" \"model_path\": model_path_to_use,\n",
101295
" \"data_path\": \"/opt/app-root/src/table-gpt-data/train/train_All_100.jsonl\",\n",
102296
" \"ckpt_output_dir\": \"/opt/app-root/src/checkpoints-logs-dir\",\n",
103297
" \"data_output_path\": \"/opt/app-root/src/osft-json/_data\",\n",
@@ -227,9 +421,24 @@
227421
"metadata": {},
228422
"outputs": [],
229423
"source": [
230-
"# Wait for the running status, then completion.\n",
424+
"# Wait for the running status, then wait for completion or failure\n",
425+
"# Using reasonable timeout for OSFT training\n",
231426
"client.wait_for_job_status(name=job_name, status={\"Running\"}, timeout=300)\n",
232-
"client.wait_for_job_status(name=job_name, status={\"Complete\"}, timeout=600)"
427+
"client.wait_for_job_status(name=job_name, status={\"Complete\", \"Failed\"}, timeout=1800) # 30 minutes for training\n",
428+
"\n",
429+
"# Check if the job succeeded\n",
430+
"job = client.get_job(name=job_name)\n",
431+
"\n",
432+
"# Check for success: status should be \"Complete\" and not \"Failed\"\n",
433+
"if job.status == \"Failed\":\n",
434+
" print(f\"ERROR: Training job failed\")\n",
435+
" raise RuntimeError(f\"Training job failed with status: {job.status}\")\n",
436+
"elif job.status == \"Complete\":\n",
437+
" print(\"✓ Training job completed successfully\")\n",
438+
"else:\n",
439+
" # Unexpected status\n",
440+
" print(f\"ERROR: Unexpected job status: {job.status}\")\n",
441+
" raise RuntimeError(f\"Training job ended with unexpected status: {job.status}\")"
233442
]
234443
},
235444
{

tests/trainer/sdk_tests/osft_traininghub_tests.go

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,19 @@ func RunOsftTrainingHubMultiGpuDistributedTraining(t *testing.T) {
6161
test.Expect(err).NotTo(HaveOccurred(), fmt.Sprintf("failed to read notebook: %s", localPath))
6262
cm := support.CreateConfigMap(test, namespace.Name, map[string][]byte{osftNotebookName: nb})
6363

64+
// Build command with parameters and pinned deps, and print definitive status line to logs
65+
endpoint, endpointOK := support.GetStorageBucketDefaultEndpoint()
66+
accessKey, _ := support.GetStorageBucketAccessKeyId()
67+
secretKey, _ := support.GetStorageBucketSecretKey()
68+
bucket, bucketOK := support.GetStorageBucketName()
69+
prefix, _ := support.GetStorageBucketMnistDir()
70+
if !endpointOK {
71+
endpoint = ""
72+
}
73+
if !bucketOK {
74+
bucket = ""
75+
}
76+
6477
// Create RWX PVC for shared dataset and pass the claim name to the notebook
6578
storageClass, err := support.GetRWXStorageClass(test)
6679
test.Expect(err).NotTo(HaveOccurred(), "Failed to find an RWX supporting StorageClass")
@@ -77,10 +90,15 @@ func RunOsftTrainingHubMultiGpuDistributedTraining(t *testing.T) {
7790
"export OPENSHIFT_API_URL='%s'; export NOTEBOOK_USER_TOKEN='%s'; "+
7891
"export NOTEBOOK_NAMESPACE='%s'; "+
7992
"export SHARED_PVC_NAME='%s'; "+
93+
"export AWS_DEFAULT_ENDPOINT='%s'; export AWS_ACCESS_KEY_ID='%s'; "+
94+
"export AWS_SECRET_ACCESS_KEY='%s'; export AWS_STORAGE_BUCKET='%s'; "+
95+
"export AWS_STORAGE_BUCKET_DATA_DIR='%s'; "+
8096
"python -m pip install --quiet --no-cache-dir --break-system-packages papermill boto3==1.34.162 git+https://github.com/opendatahub-io/kubeflow-sdk.git@main && "+
8197
"if python -m papermill -k python3 /opt/app-root/notebooks/%s /opt/app-root/src/out.ipynb --log-output; "+
8298
"then echo 'NOTEBOOK_STATUS: SUCCESS'; else echo 'NOTEBOOK_STATUS: FAILURE'; fi; sleep infinity",
83-
support.GetOpenShiftApiUrl(test), userToken, namespace.Name, rwxPvc.Name, osftNotebookName,
99+
support.GetOpenShiftApiUrl(test), userToken, namespace.Name, rwxPvc.Name,
100+
endpoint, accessKey, secretKey, bucket, prefix,
101+
osftNotebookName,
84102
)
85103
command := []string{"/bin/sh", "-c", shellCmd}
86104

@@ -98,7 +116,6 @@ func RunOsftTrainingHubMultiGpuDistributedTraining(t *testing.T) {
98116
podName, containerName := trainerutils.WaitForNotebookPodRunning(test, namespace.Name)
99117

100118
// Poll logs to check if the notebook execution completed successfully
101-
// Use extra long timeout for multi-GPU distributed training
102119
err = trainerutils.PollNotebookLogsForStatus(test, namespace.Name, podName, containerName, support.TestTimeoutDouble)
103120
test.Expect(err).ShouldNot(HaveOccurred(), "Notebook execution reported FAILURE")
104121
}

0 commit comments

Comments
 (0)