diff --git a/examples/ray-kft-v1/1_ray_sdg.ipynb b/examples/ray-kft-v1/1_ray_sdg.ipynb new file mode 100644 index 000000000..0d491684f --- /dev/null +++ b/examples/ray-kft-v1/1_ray_sdg.ipynb @@ -0,0 +1,462 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "576f8661", + "metadata": {}, + "source": [ + "## Phase 1 : RayCluster Setup and Ray Based Distributed Data Processing\n", + "\n", + "- **CodeFlare SDK**: Ray cluster deployment and management on Kubernetes\n", + "- **Ray Job Submission**: Distributed synthetic data generation using Ray workers" + ] + }, + { + "cell_type": "markdown", + "id": "c2f96f12", + "metadata": {}, + "source": [ + "### Setup Ray Cluster using Codeflare-SDK\n", + "\n", + "Configure and deploy the Ray cluster for distributed data processing\n" + ] + }, + { + "cell_type": "code", + "execution_count": 65, + "id": "cde5dd0b", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "'Logged into https://:6443'" + ] + }, + "execution_count": 65, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "# Setup Ray cluster using CodeFlare SDK\n", + "from codeflare_sdk import Cluster, ClusterConfiguration, TokenAuthentication\n", + "import time\n", + "\n", + "token=\"\"\n", + "api_server=\"\"\n", + "\n", + "# Authenticate with the Openshift cluster\n", + "auth = TokenAuthentication(\n", + " token=token,\n", + " server=api_server,\n", + " skip_tls=True\n", + ")\n", + "auth.login()" + ] + }, + { + "cell_type": "code", + "execution_count": 66, + "id": "3303a9a8", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Yaml resources loaded for test1-cluster\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + " Ray Cluster Configuration:\n", + " Name: test1-cluster\n", + " Workers: 2\n", + " Worker Resources: 2CPU, 20G RAM, {'nvidia.com/gpu': 2} GPU\n", + " Image: quay.io/rhoai/ray:2.35.0-py311-cu121-torch24-fa26\n" + ] + } + ], + "source": [ + "# Configure Ray cluster for distributed synthetic data generation\n", + "from kubernetes.client.models import V1Volume, V1VolumeMount, V1PersistentVolumeClaimVolumeSource\n", + "\n", + "ray_cluster = Cluster(ClusterConfiguration(\n", + " name=\"test1-cluster\",\n", + " num_workers=2,\n", + " # Head node configuration\n", + " head_cpu_requests=2,\n", + " head_cpu_limits=4,\n", + " head_memory_requests=16,\n", + " head_memory_limits=24,\n", + " # Worker node configuration\n", + " worker_cpu_requests=2,\n", + " worker_cpu_limits=4,\n", + " worker_memory_requests=20,\n", + " worker_memory_limits=24,\n", + " # UnComment in case of using accelerators for RayCluster\n", + " head_extended_resource_requests={'nvidia.com/gpu': 2},\n", + " worker_extended_resource_requests={'nvidia.com/gpu': 2},\n", + " # Ray runtime image\n", + " image=\"quay.io/rhoai/ray:2.35.0-py311-cu121-torch24-fa26\",\n", + " # Volume mount - Shared PVC storage with RWX peermissions\n", + " volume_mounts=[\n", + " V1VolumeMount(\n", + " name=\"shared\",\n", + " mount_path=\"/shared\"\n", + " )\n", + " ],\n", + " volumes=[\n", + " V1Volume(\n", + " name=\"shared\",\n", + " persistent_volume_claim=V1PersistentVolumeClaimVolumeSource(\n", + " claim_name=\"shared\"\n", + " )\n", + " )\n", + " ],\n", + "))\n", + "\n", + "print(\" Ray Cluster Configuration:\")\n", + "print(f\" Name: {ray_cluster.config.name}\")\n", + "print(f\" Workers: {ray_cluster.config.num_workers}\")\n", + "print(f\" Worker Resources: {ray_cluster.config.worker_cpu_requests}CPU, {ray_cluster.config.worker_memory_requests} RAM, {ray_cluster.config.worker_extended_resource_requests} GPU\")\n", + "print(f\" Image: {ray_cluster.config.image}\")\n" + ] + }, + { + "cell_type": "code", + "execution_count": 63, + "id": "4984abc8", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Ray Cluster: 'test1-cluster' has successfully been applied. For optimal resource management, you should delete this Ray Cluster when no longer in use.\n" + ] + } + ], + "source": [ + "# Deploy the Ray cluster\n", + "ray_cluster.apply()" + ] + }, + { + "cell_type": "code", + "execution_count": 67, + "id": "1f88a2be", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Waiting for requested resources to be set up...\n", + "Requested cluster is up and running!\n", + "Dashboard is ready!\n" + ] + } + ], + "source": [ + "# Wait for Ray cluster to be ready\n", + "ray_cluster.wait_ready()" + ] + }, + { + "cell_type": "code", + "execution_count": 44, + "id": "e8de7254", + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
                        ๐Ÿš€ CodeFlare Cluster Details ๐Ÿš€                       \n",
+       "                                                                              \n",
+       " โ•ญโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ•ฎ \n",
+       " โ”‚   Name                                                                   โ”‚ \n",
+       " โ”‚   test1-cluster                                              Active โœ…   โ”‚ \n",
+       " โ”‚                                                                          โ”‚ \n",
+       " โ”‚   URI: ray://test1-cluster-head-svc..svc:10001              โ”‚ \n",
+       " โ”‚                                                                          โ”‚ \n",
+       " โ”‚   -.\" target=\"_blank\">Dashboard๐Ÿ”—                                                            โ”‚ \n",
+       " โ”‚                                                                          โ”‚ \n",
+       " โ”‚                       Cluster Resources                                  โ”‚ \n",
+       " โ”‚   โ•ญโ”€โ”€ Workers โ”€โ”€โ•ฎ  โ•ญโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ Worker specs(each) โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ•ฎ              โ”‚ \n",
+       " โ”‚   โ”‚  # Workers  โ”‚  โ”‚  Memory      CPU         GPU         โ”‚              โ”‚ \n",
+       " โ”‚   โ”‚             โ”‚  โ”‚                                      โ”‚              โ”‚ \n",
+       " โ”‚   โ”‚  2          โ”‚  โ”‚  20G~24G     2~4         2           โ”‚              โ”‚ \n",
+       " โ”‚   โ”‚             โ”‚  โ”‚                                      โ”‚              โ”‚ \n",
+       " โ”‚   โ•ฐโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ•ฏ  โ•ฐโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ•ฏ              โ”‚ \n",
+       " โ•ฐโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ•ฏ \n",
+       "
\n" + ], + "text/plain": [ + "\u001b[3m \u001b[0m\u001b[1;3m ๐Ÿš€ CodeFlare Cluster Details ๐Ÿš€\u001b[0m\u001b[3m \u001b[0m\n", + "\u001b[1m \u001b[0m\u001b[1m \u001b[0m\u001b[1m \u001b[0m\n", + " โ•ญโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ•ฎ \n", + " โ”‚ \u001b[1;37;42mName\u001b[0m โ”‚ \n", + " โ”‚ \u001b[1;4mtest1-cluster\u001b[0m Active โœ… โ”‚ \n", + " โ”‚ โ”‚ \n", + " โ”‚ \u001b[1mURI:\u001b[0m ray://test1-cluster-head-svc..svc:10001 โ”‚ \n", + " โ”‚ โ”‚ \n", + " โ”‚ \u001b]8;id=726768;https://ray-dashboard--.\u001b\\\u001b[4;34mDashboard๐Ÿ”—\u001b[0m\u001b]8;;\u001b\\ โ”‚ \n", + " โ”‚ โ”‚ \n", + " โ”‚ \u001b[3m Cluster Resources \u001b[0m โ”‚ \n", + " โ”‚ โ•ญโ”€โ”€ Workers โ”€โ”€โ•ฎ โ•ญโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ Worker specs(each) โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ•ฎ โ”‚ \n", + " โ”‚ โ”‚ \u001b[1m \u001b[0m\u001b[1m# Workers\u001b[0m\u001b[1m \u001b[0m โ”‚ โ”‚ \u001b[1m \u001b[0m\u001b[1mMemory \u001b[0m\u001b[1m \u001b[0m\u001b[1m \u001b[0m\u001b[1mCPU \u001b[0m\u001b[1m \u001b[0m\u001b[1m \u001b[0m\u001b[1mGPU \u001b[0m\u001b[1m \u001b[0m โ”‚ โ”‚ \n", + " โ”‚ โ”‚ \u001b[35m \u001b[0m\u001b[35m \u001b[0m\u001b[35m \u001b[0m โ”‚ โ”‚ \u001b[36m \u001b[0m\u001b[36m \u001b[0m\u001b[36m \u001b[0m\u001b[35m \u001b[0m\u001b[35m \u001b[0m\u001b[35m \u001b[0m\u001b[35m \u001b[0m\u001b[35m \u001b[0m\u001b[35m \u001b[0m โ”‚ โ”‚ \n", + " โ”‚ โ”‚ \u001b[35m \u001b[0m\u001b[35m2 \u001b[0m\u001b[35m \u001b[0m โ”‚ โ”‚ \u001b[36m \u001b[0m\u001b[36m20G~24G \u001b[0m\u001b[36m \u001b[0m\u001b[35m \u001b[0m\u001b[35m2~4 \u001b[0m\u001b[35m \u001b[0m\u001b[35m \u001b[0m\u001b[35m2 \u001b[0m\u001b[35m \u001b[0m โ”‚ โ”‚ \n", + " โ”‚ โ”‚ \u001b[35m \u001b[0m\u001b[35m \u001b[0m\u001b[35m \u001b[0m โ”‚ โ”‚ \u001b[36m \u001b[0m\u001b[36m \u001b[0m\u001b[36m \u001b[0m\u001b[35m \u001b[0m\u001b[35m \u001b[0m\u001b[35m \u001b[0m\u001b[35m \u001b[0m\u001b[35m \u001b[0m\u001b[35m \u001b[0m โ”‚ โ”‚ \n", + " โ”‚ โ•ฐโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ•ฏ โ•ฐโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ•ฏ โ”‚ \n", + " โ•ฐโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ•ฏ \n" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/plain": [ + "RayCluster(name='test1-cluster', status=, head_cpu_requests=2, head_cpu_limits=4, head_mem_requests='16G', head_mem_limits='24G', num_workers=2, worker_mem_requests='20G', worker_mem_limits='24G', worker_cpu_requests=2, worker_cpu_limits=4, namespace='', dashboard='https://ray-dashboard--.', worker_extended_resources={'nvidia.com/gpu': 2}, head_extended_resources={'nvidia.com/gpu': 2})" + ] + }, + "execution_count": 44, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "ray_cluster.details()" + ] + }, + { + "cell_type": "code", + "execution_count": 68, + "id": "9cc5585a", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Ray job client initialized\n" + ] + } + ], + "source": [ + "# Initialize the Job Submission Client\n", + "client = ray_cluster.job_client\n", + "print(\"Ray job client initialized\")\n" + ] + }, + { + "cell_type": "markdown", + "id": "0ca759e7", + "metadata": {}, + "source": [ + "## Submit Ray Job for Synthetic Data Generation\n", + "\n", + "Submit the synthetic data generation function to the Ray cluster:\n" + ] + }, + { + "cell_type": "code", + "execution_count": 81, + "id": "c8eb5659", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Ray Data SDG job submitted with ID: raysubmit_SVqNSiZVvbSCtsJV\n" + ] + } + ], + "source": [ + "# Submit the Ray Data SDG job for distributed synthetic data generation\n", + "submission_id = client.submit_job(\n", + " entrypoint=(\n", + " \"python scripts/ray_sdg_job.py \"\n", + " \"--enable-multi-node \"\n", + " \"--seeds 1000 \"\n", + " \"--variations 2 \"\n", + " \"--batch-size 2 \"\n", + " \"--quality-threshold 0.75 \"\n", + " \"--output-path /shared/synthetic_data_v2 \"\n", + " \"--max-concurrent-workers 6 \"\n", + " \"--gpus-per-worker 1 \"\n", + " \"--resume \"\n", + " \"--save-every 100\"\n", + " ), \n", + " runtime_env={\n", + " \"env_vars\": {\n", + " 'HF_HOME': '/shared/cache',\n", + " 'HF_DATASETS_CACHE': '/shared/cache/datasets',\n", + " 'TOKENIZERS_PARALLELISM': 'false',\n", + " },\n", + " 'pip': [\n", + " 'ray[data]>=2.8.0',\n", + " 'transformers>=4.36.0',\n", + " 'torch>=2.0.0', \n", + " 'datasets>=2.14.0',\n", + " 'accelerate>=0.24.0',\n", + " 'numpy>=1.21.0',\n", + " 'tqdm>=4.64.0',\n", + " 'pyarrow>=12.0.0,<15.0.0',\n", + " ],\n", + " 'working_dir': './',\n", + " \"excludes\": [\"*.ipynb\", \"*.md\"]\n", + " },\n", + ")\n", + "\n", + "print(f\"Ray Data SDG job submitted with ID: {submission_id}\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "4bcfa1fb", + "metadata": {}, + "outputs": [], + "source": [ + "client.get_job_logs(submission_id)" + ] + }, + { + "cell_type": "code", + "execution_count": 82, + "id": "429051e9", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "True" + ] + }, + "execution_count": 82, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "# Stop/Delete any running jobs\n", + "# client.stop_job(submission_id)\n", + "client.delete_job(submission_id)" + ] + }, + { + "cell_type": "markdown", + "id": "189584fd", + "metadata": {}, + "source": [ + "### Cleanup Ray Cluster\n", + "\n", + "Clean up the Ray cluster resources (following ray_finetune_llm_deepspeed.ipynb pattern):\n" + ] + }, + { + "cell_type": "code", + "execution_count": 83, + "id": "bcf75853", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + " Cleaning up Ray cluster...\n", + "Ray Cluster: 'test1-cluster' has successfully been deleted\n" + ] + } + ], + "source": [ + "# Cleanup Ray cluster (following ray_finetune_llm_deepspeed.ipynb pattern)\n", + "print(\" Cleaning up Ray cluster...\")\n", + "\n", + "# Tear down the Ray cluster\n", + "ray_cluster.down()\n" + ] + }, + { + "cell_type": "code", + "execution_count": 84, + "id": "e021423d-c7c6-4aa0-9c3b-f8747bb27e85", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + " Dataset found: 295 samples\n", + " Avg quality: 0.76 \n", + " Source: ray_data_sdg_qwen\n", + " Sample Question -> A school bought 7 boxes of pencils. Each box contains 24 pencils. How many pencils did the school buy?\n", + " Sample Answer -> To find out how many pencils the school bought, we multiply the number of boxes by the number of pencils per box.\n", + "Number of boxes: 7\n", + "Number of pencils per box: 24\n", + "Total pencils = 7 * 24 = 168\n", + "Therefore, the school bought 168 pencils.\n", + "\n", + " Ready for training!\n" + ] + } + ], + "source": [ + "import os, json\n", + "# Check for dataset\n", + "paths = [\"/opt/app-root/src/shared/synthetic_data/synthetic_dataset.json\", \n", + " \"/opt/app-root/src/shared/synthetic_data/final_synthetic_dataset.json\"]\n", + "dataset_path = next((p for p in paths if os.path.exists(p)), None)\n", + "\n", + "if dataset_path:\n", + " with open(dataset_path, \"r\") as f:\n", + " data = json.load(f)\n", + " \n", + " if isinstance(data, list):\n", + " total_samples = len(data)\n", + " avg_quality = sum(item.get('overall_quality', 0) for item in data) / total_samples if total_samples > 0 else 0\n", + " sample = data[0] if data else None\n", + " \n", + " print(f\" Dataset found: {total_samples} samples\")\n", + " print(f\" Avg quality: {avg_quality:.2f} \\n Source: {sample.get('source', 'N/A') if sample else 'N/A'}\")\n", + " \n", + " # Show sample\n", + " if sample:\n", + " print(f\" Sample Question -> {sample['question']}\")\n", + " print(f\" Sample Answer -> {sample['answer']}\")\n", + " \n", + " print(\"\\n Ready for training!\") \n", + "else:\n", + " print(\" Dataset not found. Run Ray Data job first.\")" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3.12", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.9" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/examples/ray-kft-v1/2_kft_training.ipynb b/examples/ray-kft-v1/2_kft_training.ipynb new file mode 100644 index 000000000..ce4f58e97 --- /dev/null +++ b/examples/ray-kft-v1/2_kft_training.ipynb @@ -0,0 +1,323 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "d1c9049a-7daa-43aa-9e50-5f9f951a8324", + "metadata": {}, + "source": [ + "## Phase 2: Distributed Training using Kubeflow Training Operator and SDK\n", + "\n", + "- **kubeflow-training SDK**: PyTorchJob creation and management\n", + "- **TRL + PEFT**: Modern fine-tuning with LoRA adapters\n", + "- **Distributed Training**: Multi-node GPU coordination " + ] + }, + { + "cell_type": "markdown", + "id": "035727e0", + "metadata": {}, + "source": [ + "### Training Configuration using kubeflow-training SDK" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "92017175-8d63-4dbe-ac8d-f2724b57f9a8", + "metadata": { + "scrolled": true + }, + "outputs": [], + "source": [ + "%pip install kubernetes yamlmagic" + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "id": "c82140d0", + "metadata": {}, + "outputs": [], + "source": [ + "%load_ext yamlmagic" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "7be28c8f", + "metadata": {}, + "outputs": [], + "source": [ + "%%yaml training_parameters\n", + "\n", + "# Model configuration\n", + "model_name_or_path: ibm-granite/granite-3.1-2b-instruct\n", + "model_revision: main\n", + "torch_dtype: bfloat16\n", + "attn_implementation: flash_attention_2\n", + "use_liger: false\n", + "\n", + "# PEFT / LoRA configuration\n", + "use_peft: true\n", + "lora_r: 16\n", + "lora_alpha: 16 # Changed from 8 to 16 for better scaling\n", + "lora_dropout: 0.05\n", + "lora_target_modules: [\"q_proj\", \"v_proj\", \"k_proj\", \"o_proj\", \"gate_proj\", \"up_proj\", \"down_proj\"]\n", + "lora_modules_to_save: []\n", + "\n", + "# QLoRA (BitsAndBytes)\n", + "load_in_4bit: false\n", + "load_in_8bit: false\n", + "\n", + "# Dataset configuration (synthetic data from Ray preprocessing)\n", + "dataset_path: synthetic_gsm8k\n", + "dataset_config: main\n", + "dataset_train_split: train\n", + "dataset_test_split: test\n", + "dataset_text_field: text\n", + "dataset_kwargs:\n", + " add_special_tokens: false\n", + " append_concat_token: false\n", + "\n", + "# SFT configuration # Fixed typo\n", + "max_seq_length: 1024\n", + "dataset_batch_size: 1000\n", + "packing: false\n", + "\n", + "# Training hyperparameters\n", + "num_train_epochs: 3\n", + "per_device_train_batch_size: 8\n", + "per_device_eval_batch_size: 8\n", + "auto_find_batch_size: false\n", + "eval_strategy: epoch\n", + "\n", + "# Precision and optimization\n", + "bf16: true\n", + "tf32: false\n", + "learning_rate: 1.0e-4 # Reduced from 2.0e-4 for more stable LoRA training\n", + "warmup_steps: 100 # Increased from 10 for better stability\n", + "lr_scheduler_type: inverse_sqrt\n", + "optim: adamw_torch_fused\n", + "max_grad_norm: 1.0\n", + "seed: 42\n", + "\n", + "# Gradient settings\n", + "gradient_accumulation_steps: 1\n", + "gradient_checkpointing: false\n", + "gradient_checkpointing_kwargs:\n", + " use_reentrant: false\n", + "\n", + "# FSDP for distributed training\n", + "fsdp: \"full_shard auto_wrap\"\n", + "fsdp_config:\n", + " activation_checkpointing: true\n", + " cpu_ram_efficient_loading: false\n", + " sync_module_states: true\n", + " use_orig_params: true\n", + " limit_all_gathers: false\n", + "\n", + "# Checkpointing and logging\n", + "save_strategy: epoch\n", + "save_total_limit: 1\n", + "resume_from_checkpoint: false\n", + "log_level: warning\n", + "logging_strategy: steps\n", + "logging_steps: 10 # Reduced frequency from 1 to 10\n", + "report_to:\n", + "- tensorboard\n", + "\n", + "output_dir: /shared/models/granite-3.1-2b-instruct-synthetic2" + ] + }, + { + "cell_type": "markdown", + "id": "20521af6", + "metadata": {}, + "source": [ + "### Configure kubeflow-training Client\n", + "\n", + "Set up the kubeflow-training SDK client following the sft.ipynb pattern:\n" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "id": "deb20fde", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "kubeflow-training client configured\n" + ] + } + ], + "source": [ + "# Configure kubeflow-training client (following sft.ipynb pattern)\n", + "from kubernetes import client\n", + "from kubeflow.training import TrainingClient\n", + "from kubeflow.training.models import V1Volume, V1VolumeMount, V1PersistentVolumeClaimVolumeSource\n", + "\n", + "token=\"\"\n", + "api_server=\"\"\n", + "\n", + "configuration = client.Configuration()\n", + "configuration.host = api_server\n", + "configuration.api_key = {\"authorization\": f\"Bearer {token}\"}\n", + "# Un-comment if your cluster API server uses a self-signed certificate or an un-trusted CA\n", + "configuration.verify_ssl = False\n", + "\n", + "api_client = client.ApiClient(configuration)\n", + "training_client = TrainingClient(client_configuration=api_client.configuration)\n", + "\n", + "print(\"kubeflow-training client configured\")" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "id": "91d0b76b", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "PyTorchJob submitted successfully\n" + ] + } + ], + "source": [ + "from scripts.kft_granite_training import training_func\n", + "\n", + "job = training_client.create_job(\n", + " job_kind=\"PyTorchJob\",\n", + " name=\"test1-training\",\n", + " # Use script file instead of function import\n", + " train_func=training_func,\n", + " # Pass YAML parameters as config\n", + " parameters=training_parameters,\n", + " # Distributed training configuration\n", + " num_workers=2,\n", + " num_procs_per_worker=2,\n", + " resources_per_worker={\n", + " \"nvidia.com/gpu\": 2, # Uncomment for GPU training\n", + " \"memory\": \"24Gi\",\n", + " \"cpu\": 4,\n", + " },\n", + " base_image=\"quay.io/modh/training:py311-cuda124-torch251\",\n", + " # Environment variables for training\n", + " env_vars={\n", + " # HuggingFace configuration - use shared storage\n", + " \"HF_HOME\": \"/shared/huggingface_cache\",\n", + " \"HF_DATASETS_CACHE\": \"/shared/huggingface_cache/datasets\",\n", + " \"TOKENIZERS_PARALLELISM\": \"false\",\n", + " # Training configuration\n", + " \"PYTHONUNBUFFERED\": \"1\",\n", + " \"NCCL_DEBUG\": \"INFO\",\n", + " },\n", + " # Package dependencies\n", + " packages_to_install=[\n", + " \"transformers>=4.36.0\",\n", + " \"trl>=0.7.0\",\n", + " \"datasets>=2.14.0\",\n", + " \"peft>=0.6.0\",\n", + " \"accelerate>=0.24.0\",\n", + " \"torch>=2.0.0\",\n", + " ],\n", + " volumes=[\n", + " V1Volume(\n", + " name=\"shared\",\n", + " persistent_volume_claim=V1PersistentVolumeClaimVolumeSource(claim_name=\"shared\")\n", + " ),\n", + " ],\n", + " volume_mounts=[\n", + " V1VolumeMount(name=\"shared\", mount_path=\"/shared\"),\n", + " ],\n", + ")\n", + "\n", + "print(f\"PyTorchJob submitted successfully\")\n" + ] + }, + { + "cell_type": "markdown", + "id": "08beef7d", + "metadata": {}, + "source": [ + "### Create Training Job using kubeflow-training SDK\n", + "\n", + "Create and submit the distributed training job following the sft.ipynb pattern:\n" + ] + }, + { + "cell_type": "markdown", + "id": "cac9307d", + "metadata": {}, + "source": [ + "### Monitor Training Job\n", + "\n", + "Follow the training progress and logs:\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "a7f61439", + "metadata": { + "scrolled": true + }, + "outputs": [], + "source": [ + "# Monitor training job logs (following sft.ipynb pattern)\n", + "training_client.get_job_logs(\n", + " name=\"test1-training\",\n", + " job_kind=\"PyTorchJob\",\n", + " follow=True,\n", + ")\n" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "id": "8571ae47", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "PytorchJob deleted!\n" + ] + } + ], + "source": [ + "# Delete the Training Job\n", + "training_client.delete_job(\"test1-training\")\n", + "print(\"PytorchJob deleted!\")" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3.12", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.9" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/examples/ray-kft-v1/3_tensorboard_monitoring.ipynb b/examples/ray-kft-v1/3_tensorboard_monitoring.ipynb new file mode 100644 index 000000000..063c9b6a5 --- /dev/null +++ b/examples/ray-kft-v1/3_tensorboard_monitoring.ipynb @@ -0,0 +1,94 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Phase-3 : TensorBoard Training Monitoring\n", + "\n", + "Monitor your Granite model fine-tuning progress with TensorBoard visualization.\n" + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [], + "source": [ + "import os\n", + "os.environ[\"TENSORBOARD_PROXY_URL\"]= os.environ[\"NB_PREFIX\"]+\"/proxy/6006/\"" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [], + "source": [ + "# Load TensorBoard extension\n", + "%load_ext tensorboard" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## ๐ŸŽฏ Training Metrics Visualization\n", + "\n", + "View real-time training progress including:\n", + "- Training & validation loss\n", + "- Learning rate schedules \n", + "- Gradient norms\n", + "- System metrics (GPU/CPU usage)\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Start TensorBoard for current training run\n", + "%tensorboard --logdir /opt/app-root/src/shared/models --port 6006" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## ๐Ÿ† Key Training Insights and overall assessment : What's Working Excellently\n", + "\n", + "`Your Granite fine-tuning is successful!`\n", + "\n", + "- โœ… Strong Learning: 14% token accuracy improvement\n", + "- โœ… Loss Reduction: Excellent (2.75 โ†’ 2.56)\n", + "- โœ… Token Accuracy: Outstanding (56% โ†’ 70%)\n", + "- โœ… Gradient Health: Very Good (decreasing norms)\n", + "- โœ… Stable Training: Good gradient behavior\n", + "- โœ… Proper Convergence: No signs of instability\n", + "- โœ… Ready for Inference" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3.12", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.9" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} diff --git a/examples/ray-kft-v1/4_test_inference.ipynb b/examples/ray-kft-v1/4_test_inference.ipynb new file mode 100644 index 000000000..8bf6d0fdf --- /dev/null +++ b/examples/ray-kft-v1/4_test_inference.ipynb @@ -0,0 +1,589 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "534fd815-3b10-4304-bf5b-b7faa4c04dfc", + "metadata": {}, + "source": [ + "### Phase-4: Model Loading and Inference Testing\n", + "\n", + "Load the fine-tuned model from shared storage and test it to verify training worked:\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "01cd99a3", + "metadata": {}, + "outputs": [], + "source": [ + "%pip install transformers>=4.36.0\n", + "%pip install peft>=0.6.0" + ] + }, + { + "cell_type": "code", + "execution_count": 20, + "id": "050bd15e", + "metadata": {}, + "outputs": [], + "source": [ + "# Model paths in shared storage\n", + "trained_model_path = \"/opt/app-root/src/shared/models/granite-3.1-2b-instruct-synthetic2\"\n", + "base_model_cache_path = \"/opt/app-root/src/shared/huggingface_cache/hub/models--ibm-granite--granite-3.1-2b-instruct\"\n", + "base_model_name = \"ibm-granite/granite-3.1-2b-instruct\"" + ] + }, + { + "cell_type": "code", + "execution_count": 21, + "id": "b217e739", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Loading models for comparison...\n", + "Using device: cuda\n", + "Loading tokenizer...\n", + " Using cached model at: /opt/app-root/src/shared/huggingface_cache/hub/models--ibm-granite--granite-3.1-2b-instruct/snapshots/bbc2aed595bd38bd770263dc3ab831db9794441d\n", + "Tokenizer loaded from local cache\n", + "\n", + "1. Loading original untrained model...\n", + " Loading from cached path: /opt/app-root/src/shared/huggingface_cache/hub/models--ibm-granite--granite-3.1-2b-instruct/snapshots/bbc2aed595bd38bd770263dc3ab831db9794441d\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Original model loaded from local cache\n" + ] + } + ], + "source": [ + "# Load both original and fine-tuned models from shared storage\n", + "import torch\n", + "from transformers import AutoTokenizer, AutoModelForCausalLM\n", + "from peft import PeftModel\n", + "import os\n", + "import glob\n", + "\n", + "print(\"Loading models for comparison...\")\n", + "\n", + "device = \"cuda\" if torch.cuda.is_available() else \"cpu\"\n", + "print(f\"Using device: {device}\")\n", + "\n", + "# Function to find the actual model path in HuggingFace cache\n", + "def find_model_snapshot_path(cache_path):\n", + " \"\"\"Find the actual model files in HuggingFace cache structure\"\"\"\n", + " if not os.path.exists(cache_path):\n", + " return None\n", + " \n", + " # Look for snapshots directory\n", + " snapshots_dir = os.path.join(cache_path, \"snapshots\")\n", + " if not os.path.exists(snapshots_dir):\n", + " return None\n", + " \n", + " # Get the latest snapshot (usually there's only one)\n", + " snapshot_dirs = [d for d in os.listdir(snapshots_dir) if os.path.isdir(os.path.join(snapshots_dir, d))]\n", + " if not snapshot_dirs:\n", + " return None\n", + " \n", + " # Use the first (or only) snapshot\n", + " snapshot_path = os.path.join(snapshots_dir, snapshot_dirs[0])\n", + " \n", + " # Verify it contains model files\n", + " if os.path.exists(os.path.join(snapshot_path, \"config.json\")):\n", + " return snapshot_path\n", + " \n", + " return None\n", + "\n", + "# Load tokenizer from local cache\n", + "print(f\"Loading tokenizer...\")\n", + "local_model_path = find_model_snapshot_path(base_model_cache_path)\n", + "\n", + "if local_model_path:\n", + " print(f\" Using cached model at: {local_model_path}\")\n", + " try:\n", + " tokenizer = AutoTokenizer.from_pretrained(\n", + " local_model_path,\n", + " trust_remote_code=True,\n", + " local_files_only=True\n", + " )\n", + " print(\"Tokenizer loaded from local cache\")\n", + " except Exception as e:\n", + " print(f\" Cache loading failed: {e}\")\n", + " print(\" Falling back to HuggingFace download...\")\n", + " tokenizer = AutoTokenizer.from_pretrained(\n", + " base_model_name,\n", + " trust_remote_code=True\n", + " )\n", + " print(\"Tokenizer loaded from HuggingFace\")\n", + "else:\n", + " print(\" Local cache not found, downloading from HuggingFace...\")\n", + " tokenizer = AutoTokenizer.from_pretrained(\n", + " base_model_name,\n", + " trust_remote_code=True\n", + " )\n", + " print(\"Tokenizer loaded from HuggingFace\")\n", + "\n", + "if tokenizer.pad_token is None:\n", + " tokenizer.pad_token = tokenizer.eos_token\n", + "\n", + "# 1. Load original untrained model from local cache\n", + "print(\"\\n1. Loading original untrained model...\")\n", + "\n", + "if local_model_path:\n", + " print(f\" Loading from cached path: {local_model_path}\")\n", + " try:\n", + " original_model = AutoModelForCausalLM.from_pretrained(\n", + " local_model_path,\n", + " dtype=torch.bfloat16 if device == \"cuda\" else torch.float32,\n", + " device_map=\"auto\" if device == \"cuda\" else None,\n", + " trust_remote_code=True,\n", + " local_files_only=True\n", + " )\n", + " print(\"Original model loaded from local cache\")\n", + " except Exception as e:\n", + " print(f\"Cache loading failed: {e}\")\n", + " print(\"Falling back to HuggingFace download...\")\n", + " try:\n", + " original_model = AutoModelForCausalLM.from_pretrained(\n", + " base_model_name,\n", + " dtype=torch.bfloat16 if device == \"cuda\" else torch.float32,\n", + " device_map=\"auto\" if device == \"cuda\" else None,\n", + " trust_remote_code=True\n", + " )\n", + " print(\"Original model loaded from HuggingFace\")\n", + " except Exception as e2:\n", + " print(f\"Failed to load original model: {e2}\")\n", + " original_model = None\n", + "else:\n", + " print(\"Local cache not found, downloading from HuggingFace...\")\n", + " try:\n", + " original_model = AutoModelForCausalLM.from_pretrained(\n", + " base_model_name,\n", + " dtype=torch.bfloat16 if device == \"cuda\" else torch.float32,\n", + " device_map=\"auto\" if device == \"cuda\" else None,\n", + " trust_remote_code=True\n", + " )\n", + " print(\"Original model loaded from HuggingFace\")\n", + " except Exception as e:\n", + " print(f\"Failed to load original model: {e}\")\n", + " original_model = None\n" + ] + }, + { + "cell_type": "code", + "execution_count": 22, + "id": "90ffbb12", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "\n", + "2. Loading fine-tuned model...\n", + " Checking path: /opt/app-root/src/shared/models/granite-3.1-2b-instruct-synthetic2\n", + " Fine-tuned model path exists\n", + " Loading base model from cache for LoRA...\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Fine-tuned model loaded from: /opt/app-root/src/shared/models/granite-3.1-2b-instruct-synthetic2\n", + "LoRA weights merged successfully\n" + ] + } + ], + "source": [ + "\n", + "# 2. Load fine-tuned model with LoRA adapter\n", + "print(\"\\n2. Loading fine-tuned model...\")\n", + "print(f\" Checking path: {trained_model_path}\")\n", + "\n", + "if os.path.exists(trained_model_path):\n", + " print(\" Fine-tuned model path exists\")\n", + " try:\n", + " # Load base model for LoRA (use same logic as original model)\n", + " if local_model_path:\n", + " print(\" Loading base model from cache for LoRA...\")\n", + " base_for_lora = AutoModelForCausalLM.from_pretrained(\n", + " local_model_path,\n", + " torch_dtype=torch.bfloat16 if device == \"cuda\" else torch.float32,\n", + " device_map=\"auto\" if device == \"cuda\" else None,\n", + " trust_remote_code=True,\n", + " local_files_only=True\n", + " )\n", + " else:\n", + " print(\" Loading base model from HuggingFace for LoRA...\")\n", + " base_for_lora = AutoModelForCausalLM.from_pretrained(\n", + " base_model_name,\n", + " torch_dtype=torch.bfloat16 if device == \"cuda\" else torch.float32,\n", + " device_map=\"auto\" if device == \"cuda\" else None,\n", + " trust_remote_code=True\n", + " )\n", + " \n", + " # Load LoRA adapter and merge\n", + " trained_model = PeftModel.from_pretrained(base_for_lora, trained_model_path)\n", + " trained_model = trained_model.merge_and_unload() # Merge LoRA weights for inference\n", + " \n", + " print(f\"Fine-tuned model loaded from: {trained_model_path}\")\n", + " print(\"LoRA weights merged successfully\")\n", + " \n", + " # Clean up base model used for LoRA loading\n", + " del base_for_lora\n", + " if torch.cuda.is_available():\n", + " torch.cuda.empty_cache()\n", + " \n", + " except Exception as e:\n", + " print(f\"Error loading fine-tuned model: {e}\")\n", + " trained_model = None\n", + " \n", + "else:\n", + " print(f\"Fine-tuned model not found at: {trained_model_path}\")\n", + " print(\"Training may not have completed or path is incorrect\")\n", + " trained_model = None\n" + ] + }, + { + "cell_type": "code", + "execution_count": 23, + "id": "d4c2d675", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "\n", + "Models loaded successfully!\n", + " Original model: Ready\n", + " Fine-tuned model: Ready\n", + " Device: cuda\n", + " Cache path used: /opt/app-root/src/shared/huggingface_cache/hub/models--ibm-granite--granite-3.1-2b-instruct/snapshots/bbc2aed595bd38bd770263dc3ab831db9794441d\n" + ] + } + ], + "source": [ + "print(f\"\\nModels loaded successfully!\")\n", + "print(f\" Original model: {'Ready' if original_model else 'Not available'}\")\n", + "print(f\" Fine-tuned model: {'Ready' if trained_model else 'Not available'}\")\n", + "print(f\" Device: {device}\")\n", + "print(f\" Cache path used: {local_model_path if local_model_path else 'HuggingFace download'}\")" + ] + }, + { + "cell_type": "code", + "execution_count": 24, + "id": "e846ff92", + "metadata": { + "scrolled": true + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Loaded 20 test samples\n", + " Source: /opt/app-root/src/shared/synthetic_data_v2/synthetic_dataset.json\n" + ] + } + ], + "source": [ + "\n", + "import json\n", + "import re\n", + "from typing import List, Dict, Any\n", + "\n", + "def generate_response(question: str, model, tokenizer, max_length: int = 512) -> str:\n", + " \"\"\"Generate response using the model\"\"\"\n", + " # Format as chat message\n", + " messages = [{\"role\": \"user\", \"content\": question}]\n", + " \n", + " # Apply chat template\n", + " prompt = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)\n", + " \n", + " # Tokenize\n", + " inputs = tokenizer(prompt, return_tensors=\"pt\", truncation=True, max_length=max_length)\n", + " inputs = {k: v.to(model.device) for k, v in inputs.items()}\n", + " \n", + " # Generate response\n", + " with torch.no_grad():\n", + " outputs = model.generate(\n", + " **inputs,\n", + " max_new_tokens=256,\n", + " do_sample=True,\n", + " temperature=0.7,\n", + " top_p=0.9,\n", + " pad_token_id=tokenizer.eos_token_id,\n", + " eos_token_id=tokenizer.eos_token_id,\n", + " )\n", + " \n", + " # Decode response (remove input prompt)\n", + " response = tokenizer.decode(outputs[0][inputs['input_ids'].shape[1]:], skip_special_tokens=True)\n", + " return response.strip()\n", + "\n", + "def load_test_data(file_path: str, num_samples: int = 5) -> List[Dict[str, Any]]:\n", + " \"\"\"Load test samples from synthetic dataset\"\"\"\n", + " try:\n", + " with open(file_path, 'r') as f:\n", + " data = json.load(f)\n", + " \n", + " # Take a subset for testing\n", + " if len(data) > num_samples:\n", + " # Take samples from different parts of the dataset\n", + " step = len(data) // num_samples\n", + " test_samples = [data[i] for i in range(0, len(data), step)][:num_samples]\n", + " else:\n", + " test_samples = data[:num_samples]\n", + " \n", + " return test_samples\n", + " except Exception as e:\n", + " print(f\"Error loading test data: {e}\")\n", + " return []\n", + "\n", + "def extract_final_number(text: str) -> str:\n", + " \"\"\"Extract the final numerical answer from text\"\"\"\n", + " numbers = re.findall(r'\\d+', text)\n", + " return numbers[-1] if numbers else None\n", + "\n", + "def evaluate_accuracy(expected: str, response: str) -> bool:\n", + " \"\"\"Simple accuracy check based on final number\"\"\"\n", + " expected_num = extract_final_number(expected)\n", + " response_num = extract_final_number(response)\n", + " \n", + " if expected_num and response_num:\n", + " return expected_num == response_num\n", + " return False\n", + "\n", + "test_data_path = \"/opt/app-root/src/shared/synthetic_data_v2/synthetic_dataset.json\"\n", + "test_samples = load_test_data(test_data_path, num_samples=20)\n", + "\n", + "if test_samples:\n", + " print(f\"Loaded {len(test_samples)} test samples\")\n", + " print(f\" Source: {test_data_path}\")\n", + "else:\n", + " print(\"No test data available\")\n", + " # Create fallback test samples\n", + " test_samples = [\n", + " {\n", + " \"question\": \"A bakery sold 45 cupcakes in the morning and 38 cupcakes in the afternoon. How many cupcakes did they sell in total?\",\n", + " \"answer\": \"The bakery sold 45 + 38 = 83 cupcakes in total.\",\n", + " \"source\": \"fallback\"\n", + " },\n", + " {\n", + " \"question\": \"Sarah has 24 stickers. She gives 8 stickers to her friend and buys 15 more stickers. How many stickers does Sarah have now?\",\n", + " \"answer\": \"Sarah had 24 stickers, gave away 8, so she had 24 - 8 = 16 stickers. Then she bought 15 more, so she has 16 + 15 = 31 stickers now.\",\n", + " \"source\": \"fallback\"\n", + " }\n", + " ]\n", + " print(f\"Using {len(test_samples)} fallback test samples\")\n", + "print(test_samples)\n" + ] + }, + { + "cell_type": "code", + "execution_count": 25, + "id": "d17dd825", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "\n", + "PERFORMANCE COMPARISON SUMMARY\n", + "==================================================\n", + "Original Model Performance:\n", + " Correct Answers: 7/20\n", + " Accuracy: 35.0%\n", + "\n", + "Fine-tuned Model Performance:\n", + " Correct Answers: 10/20\n", + " Accuracy: 50.0%\n", + "\n", + "Training Impact:\n", + " Accuracy Change: +15.0%\n", + " Good! Training showed positive results\n", + "\n", + "DETAILED ANALYSIS\n", + "========================================\n", + "Key improvements to look for in fine-tuned model:\n", + " Step-by-step mathematical reasoning\n", + " Correct arithmetic calculations\n", + " Clear explanation of the process\n", + " Consistent answer format\n", + " Better handling of word problems\n", + "\n", + "CLEANING UP RESOURCES\n", + "==============================\n", + "Original model cleared from memory\n", + "Fine-tuned model cleared from memory\n", + "Tokenizer cleared from memory\n", + "GPU memory cache cleared\n" + ] + } + ], + "source": [ + "\n", + "print(\"\\n\" + \"=\"*80)\n", + "print(\"SIDE-BY-SIDE MODEL COMPARISON\")\n", + "print(\"=\"*80)\n", + "\n", + "if original_model and tokenizer:\n", + " original_correct = 0\n", + " trained_correct = 0\n", + " \n", + " for i, sample in enumerate(test_samples, 1):\n", + " question = sample[\"question\"]\n", + " expected_answer = sample[\"answer\"]\n", + " \n", + " print(f\"\\nTest Problem {i}/{len(test_samples)}:\")\n", + " print(f\"Question: {question}\")\n", + " print(f\"Expected: {expected_answer}\")\n", + " print(\"-\" * 60)\n", + " \n", + " # Test original model\n", + " print(\"ORIGINAL MODEL (Untrained):\")\n", + " try:\n", + " original_response = generate_response(question, original_model, tokenizer)\n", + " print(f\" Response: {original_response}\")\n", + " \n", + " # Check accuracy\n", + " is_correct = evaluate_accuracy(expected_answer, original_response)\n", + " if is_correct:\n", + " original_correct += 1\n", + " print(\" Correct\")\n", + " else:\n", + " print(\" Incorrect\")\n", + " \n", + " except Exception as e:\n", + " print(f\" Error: {e}\")\n", + " \n", + " print()\n", + " \n", + " # Test fine-tuned model\n", + " if trained_model:\n", + " print(\"FINE-TUNED MODEL (After Training):\")\n", + " try:\n", + " trained_response = generate_response(question, trained_model, tokenizer)\n", + " print(f\" Response: {trained_response}\")\n", + " \n", + " # Check accuracy\n", + " is_correct = evaluate_accuracy(expected_answer, trained_response)\n", + " if is_correct:\n", + " trained_correct += 1\n", + " print(\" Correct\")\n", + " else:\n", + " print(\" Incorrect\")\n", + " \n", + " except Exception as e:\n", + " print(f\" Error: {e}\")\n", + " else:\n", + " print(\"FINE-TUNED MODEL: Not available\")\n", + " \n", + " print(\"=\" * 80)\n", + " \n", + " print(f\"\\nPERFORMANCE COMPARISON SUMMARY\")\n", + " print(\"=\" * 50)\n", + " \n", + " total_samples = len(test_samples)\n", + " original_accuracy = (original_correct / total_samples) * 100\n", + " \n", + " print(f\"Original Model Performance:\")\n", + " print(f\" Correct Answers: {original_correct}/{total_samples}\")\n", + " print(f\" Accuracy: {original_accuracy:.1f}%\")\n", + " \n", + " if trained_model:\n", + " trained_accuracy = (trained_correct / total_samples) * 100\n", + " improvement = trained_accuracy - original_accuracy\n", + " \n", + " print(f\"\\nFine-tuned Model Performance:\")\n", + " print(f\" Correct Answers: {trained_correct}/{total_samples}\")\n", + " print(f\" Accuracy: {trained_accuracy:.1f}%\")\n", + " \n", + " print(f\"\\nTraining Impact:\")\n", + " print(f\" Accuracy Change: {improvement:+.1f}%\")\n", + " \n", + " if improvement > 20:\n", + " print(\" Excellent! Training significantly improved performance\")\n", + " elif improvement > 0:\n", + " print(\" Good! Training showed positive results\")\n", + " elif improvement == 0:\n", + " print(\" No change. Consider adjusting training parameters\")\n", + " else:\n", + " print(\" Performance decreased. Check training setup\")\n", + " else:\n", + " print(f\"\\nFine-tuned Model: Not available for comparison\")\n", + " \n", + " # ============================================================================\n", + " # DETAILED ANALYSIS\n", + " # ============================================================================\n", + " \n", + " print(f\"\\nDETAILED ANALYSIS\")\n", + " print(\"=\" * 40)\n", + " print(\"Key improvements to look for in fine-tuned model:\")\n", + " print(\" Step-by-step mathematical reasoning\")\n", + " print(\" Correct arithmetic calculations\")\n", + " print(\" Clear explanation of the process\")\n", + " print(\" Consistent answer format\")\n", + " print(\" Better handling of word problems\")\n", + " \n", + "else:\n", + " print(\"Cannot run comparison - models not loaded\")\n", + "\n", + "# ============================================================================\n", + "# CLEANUP\n", + "# ============================================================================\n", + "\n", + "print(f\"\\nCLEANING UP RESOURCES\")\n", + "print(\"=\" * 30)\n", + "\n", + "# Clear models from memory\n", + "if 'original_model' in locals() and original_model:\n", + " del original_model\n", + " print(\"Original model cleared from memory\")\n", + "\n", + "if 'trained_model' in locals() and trained_model:\n", + " del trained_model\n", + " print(\"Fine-tuned model cleared from memory\")\n", + "\n", + "if 'tokenizer' in locals():\n", + " del tokenizer\n", + " print(\"Tokenizer cleared from memory\")\n", + "\n", + "# Clear CUDA cache if using GPU\n", + "if torch.cuda.is_available():\n", + " torch.cuda.empty_cache()\n", + " print(\"GPU memory cache cleared\")\n" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3.12", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.9" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/examples/ray-kft-v1/README.md b/examples/ray-kft-v1/README.md new file mode 100644 index 000000000..594903018 --- /dev/null +++ b/examples/ray-kft-v1/README.md @@ -0,0 +1,192 @@ +# Distributed ML Pipeline: Ray Data + Kubeflow Training + +End-to-end distributed ML pipeline combining **Ray Data processing** with **Kubeflow Training** for scalable synthetic data generation and model fine-tuning. + +## Pipeline Overview + +**Phase 1: Ray Data Processing** + +**Phase 2: Distributed Training** + +**Phase 3: Monitoring & Evaluation** + +**Phase 4: Inference Testing** + +``` +GSM8K Dataset --> Ray Cluster --> Synthetic Data Generation --> PyTorchJob --> Model Finetuning --> Model monitoring and evaluation --> Inference Test +(7.5K samples) (Qwen1.5B) (300+ problems) (Granite-3.1-2B) (LoRA adapters) (TensorBoard) (Comparison) +``` + +``` +โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” +โ”‚ Distributed ML Pipeline: Ray Data + Kubeflow Training โ”‚ +โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ +โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” +โ”‚ GSM8K Dataset โ”‚ +โ”‚ 7.5K Problems โ”‚ +โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ + โ–ผ +โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” +โ”‚ PHASE 1: Ray Data Processing โ”‚ +โ”‚ (CodeFlare SDK) โ”‚ +โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค +โ”‚ โ”‚ +โ”‚ โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”‚ +โ”‚ โ”‚ CodeFlare SDK โ”‚โ”€โ”€โ”€โ–ถโ”‚ Ray Cluster โ”‚โ”€โ”€โ”€โ–ถโ”‚ Ray Data Pipeline โ”‚ โ”‚ +โ”‚ โ”‚ Cluster โ”‚ โ”‚ Head + Workers โ”‚ โ”‚ Qwen2.5-1.5B-Instruct โ”‚ โ”‚ +โ”‚ โ”‚ Management โ”‚ โ”‚ โ”‚ โ”‚ โ€ข map_batches inference โ”‚ โ”‚ +โ”‚ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ”‚ โ€ข quality filtering โ”‚ โ”‚ +โ”‚ โ”‚ โ€ข streaming processing โ”‚ โ”‚ +โ”‚ โ”‚ โ€ข fault tolerance โ”‚ โ”‚ +โ”‚ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ”‚ +โ”‚ โ”‚ โ”‚ +โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€-โ”€โ”€โ”˜ + โ–ผ + โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” + โ”‚ Shared PVC Storage โ”‚ + โ”‚ /shared โ”‚ + โ”‚ RWX Access โ”‚ + โ”‚ โ”‚ + โ”‚ โ€ข Models & Datasets Cache โ”‚ + โ”‚ โ€ข Synthetic Data (150+ samples) โ”‚ + โ”‚ โ€ข Quality Metrics & Metadata โ”‚ + โ”‚ โ€ข Training Checkpoints โ”‚ + โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ + โ–ผ +โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” +โ”‚ PHASE 2: Distributed Training โ”‚ +โ”‚ (kubeflow-training SDK) โ”‚ +โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค +โ”‚ โ”‚ +โ”‚ โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”‚ +โ”‚ โ”‚ kubeflow- โ”‚โ”€โ”€โ”€โ–ถโ”‚ PyTorchJob โ”‚โ”€โ”€โ”€โ–ถโ”‚ IBM Granite 3.1-2B โ”‚ โ”‚ +โ”‚ โ”‚ training SDK โ”‚ โ”‚ Multi-node โ”‚ โ”‚ + LoRA Adapters โ”‚ โ”‚ +โ”‚ โ”‚ Job Management โ”‚ โ”‚ Training โ”‚ โ”‚ (Fine-tuning Process) โ”‚ โ”‚ +โ”‚ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€-โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ”‚ +โ”‚ โ”‚ โ”‚ +โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ + โ–ผ + โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” + โ”‚ Fine-tuned Model โ”‚ + โ”‚ Math Reasoning โ”‚ + โ”‚ Capabilities โ”‚ + โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ + +Key Components: +โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ” +โ€ข Data Flow: GSM8K โ†’ Ray Processing โ†’ Shared Storage โ†’ Training โ†’ Model +โ€ข Storage: Persistent Volume Claim with ReadWriteMany (RWX) access +โ€ข Phase 1: Distributed synthetic data generation using Ray workers +โ€ข Phase 2: Multi-node GPU training with PyTorchJob and LoRA fine-tuning +โ€ข Models: Qwen2.5-1.5B for generation, IBM Granite 3.1-2B for training +``` +### Prerequisites +- Red Hat OpenShift AI 2.17+ installed +- NVIDIA and Node Feature Discovery Operator installed +- In Openshift AI's DataScienceCluster resource - CodeFlare and Kubeflow Training operator (KFTO-V1) enabled/managed +- A persistent shared storage with RWX(ReadWriteMany) access. + - Workbench Notebook pod: `/opt/app-root/src/shared` + - RayCluster pods : `/shared` + - PyTorchJob pods : `/shared` +- Compute resources: 4+ CPU cores, 8GB+ RAM per worker node +- GPU support (optional): NVIDIA GPUs for accelerated training + +## Pipeline Details + +**Phase 1: Ray Data Processing** +- Uses CodeFlare SDK to deploy Ray cluster +- Loads GSM8K dataset as seed problems +- Generates synthetic math problems using Qwen2.5-1.5B-Instruct +- Quality filtering and deduplication +- Saves to `/shared/synthetic_data/` + +**Phase 2: Distributed Training** +- Uses kubeflow-training SDK to create PyTorchJob +- Fine-tunes IBM Granite-3.1-2B-Instruct with LoRA adapters +- Multi-node distributed training with FSDP +- Saves model to `/shared/models/` + +**Phase 3: TensorBoard Monitoring** +- Real-time training metrics visualization +- Loss curves and learning rate tracking + +**Phase 4: Model Testing** +- Loads both original and fine-tuned models +- Side-by-side comparison on test problems +- Performance metrics and accuracy analysis + + +## Files + +| File | Description | +|------|-------------| +| `1_ray_sdg.ipynb` | **Phase 1**: Ray cluster setup and synthetic data generation | +| `2_kft_training.ipynb` | **Phase 2**: Distributed training with PyTorchJob | +| `3_tensorboard_monitoring.ipynb` | **Phase 3**: TensorBoard monitoring and training metrics | +| `4_test_inference.ipynb` | **Phase 4**: Model comparison and inference testing | +| `scripts/ray_sdg_job.py` | Ray Data pipeline for distributed synthetic data generation | +| `scripts/kft_granite_training.py` | Training function for IBM Granite model fine-tuning | + +## Quick Start + +1. **Run notebooks in sequence:** + - `1_ray_sdg.ipynb` โ†’ Generate synthetic math problems using Ray Data + - `2_kft_training.ipynb` โ†’ Fine-tune Granite model with PyTorchJob + - `3_tensorboard_monitoring.ipynb` โ†’ Monitor training progress with TensorBoard + - `4_test_inference.ipynb` โ†’ Compare original vs fine-tuned model performance + +2. **Standalone Ray Data generation:** + ```bash + # Test mode (2 problems, ~15s) + python scripts/ray_sdg_job.py --test-mode + + # Production mode (150+ problems, ~30min) + python scripts/ray_sdg_job.py --seeds 50 --variations 3 + ``` + +## Why Ray Data? + +Ray Data is specifically designed for distributed ML inference workloads: +- **GPU Optimization**: Concurrent CPU preprocessing + GPU inference, maximizing resource utilization +- **Scalable Processing**: Handles thousands of samples across heterogeneous clusters with fault tolerance +- **Streaming Execution**: Memory-efficient processing of large datasets without loading everything into memory +- **ML-Native**: Built-in `map_batches()` API for distributed model inference with stateful operations +- **Production Ready**: Automatic retries, checkpointing, and monitoring capabilities + +### Comparison with Alternatives + +| Feature | Ray Data | Apache Spark | Dask | +|---------|----------|--------------|------| +| **GPU Support** | Native, optimized | Limited | Basic | +| **ML Integration** | Built-in | External libraries | External | +| **Streaming** | Native streaming | Micro-batching | Task-based | +| **Fault Tolerance** | Automatic retry | RDD lineage | Task retry | +| **Memory Efficiency** | Streaming execution | Memory-intensive | Memory-aware | + +*References: [Ray Data Overview](https://docs.ray.io/en/latest/data/overview.html), [Performance Guide](https://docs.ray.io/en/latest/data/performance-tips.html)* + +## Screenshots + +### Ray Job Execution +![Ray Job Running](docs/rayjob_running_1.png) +![Ray Job Running Logs](docs/rayjob_running_2.png) +*Ray cluster processing synthetic data generation* + +![Ray Job GPU Utilization](docs/rayjob_gpu_util.png) +*GPU utilization during Ray Data processing* + +![Ray Job Success](docs/rayjob_succeeded_1.png) +![Ray Job Success Logs](docs/rayjob_succeeded_2.png) +*Successful completion of Ray job* + +### Training Monitoring +![TensorBoard Metrics](docs/tensorboard_1.png) +![TensorBoard Usage](docs/tensorboard_2.png) +*Training loss and learning rate curves* + + +### References +- [CodeFlare SDK](https://github.com/project-codeflare/codeflare-sdk): Kubernetes-native distributed computing +- [Ray Documentation](https://docs.ray.io/): Distributed computing framework +- [kubeflow-training SDK](https://github.com/kubeflow/training-operator): Kubernetes-native ML training +- [IBM Granite Models](https://huggingface.co/ibm-granite/granite-3.1-2b-instruct): Enterprise LLM family \ No newline at end of file diff --git a/examples/ray-kft-v1/dataset/sample_synthetic_dataset.json b/examples/ray-kft-v1/dataset/sample_synthetic_dataset.json new file mode 100644 index 000000000..5f92f00f8 --- /dev/null +++ b/examples/ray-kft-v1/dataset/sample_synthetic_dataset.json @@ -0,0 +1,94 @@ +{ + "train": [ + { + "question": "During a school fundraiser, Sarah sold 36 cupcakes on Monday. On Tuesday, she sold twice as many cupcakes as she did on Monday. How many cupcakes did Sarah sell in total over these two days?", + "answer": "Step-by-step solution: Step 1: Calculate the number of cupcakes sold on Tuesday. Since she sold twice as many cupcakes on Tuesday compared to Monday, we multiply the number of cupcakes sold on Monday by 2. \\( \\text{Cupcakes sold on Tuesday} = 36 \\times 2 \\). Step 2: Add the number of cupcakes sold on both days to find the total. Sum up the cupcakes sold on Monday (\\(36\\)) and Tuesday (result from Step 1) to get the total.", + "context": "Natalia sold clips to 48 of her friends in April, and then she sold half as many clips in May. How many clips did Natalia sell altogether in April and May?", + "source": "ray_sdg_qwen", + "seed_id": 0, + "variation_id": 0, + "difficulty": "Easy", + "concepts": [ + "Multiplication", + "Addition", + "Problem-solving skills" + ], + "model_confidence": 0.9, + "quality_scores": { + "mathematical_content": 1.0, + "answer_quality": 0.7, + "question_clarity": 1.0, + "model_confidence": 0.9, + "overall_quality": 0.89 + }, + "overall_quality": 0.89 + }, + { + "question": "A baker uses 3 cups of flour to make one loaf of bread. If he has 18 cups of flour, how many loaves can he bake?", + "answer": "To find out how many loaves the baker can bake, you need to divide the total amount of flour (18 cups) by the amount needed for one loaf (3 cups). So, 18 \u00f7 3 = 6 loaves.", + "context": "Weng earns $12 an hour for babysitting. Yesterday, she just did 50 minutes of babysitting. How much did she earn?", + "source": "ray_sdg_qwen", + "seed_id": 1, + "variation_id": 0, + "difficulty": "easy", + "concepts": [ + "division" + ], + "model_confidence": 0.9, + "quality_scores": { + "mathematical_content": 1.0, + "answer_quality": 0.3, + "question_clarity": 1.0, + "model_confidence": 0.9, + "overall_quality": 0.77 + }, + "overall_quality": 0.77 + } + ], + "test": [ + { + "question": "A gardener plants flowers in rows such that each row contains 7 flowers. If there were 49 flowers planted in total, how many full rows can the gardener create?", + "answer": "To find out how many full rows the gardener can create, divide the total number of flowers (49) by the number of flowers per row (7). The quotient gives you the number of full rows.", + "context": "Weng earns $12 an hour for babysitting. Yesterday, she just did 50 minutes of babysitting. How much did she earn?", + "source": "ray_sdg_qwen", + "seed_id": 1, + "variation_id": 1, + "difficulty": "medium", + "concepts": [ + "division", + "multiplication" + ], + "model_confidence": 0.9, + "quality_scores": { + "mathematical_content": 0.7, + "answer_quality": 0.3, + "question_clarity": 1.0, + "model_confidence": 0.9, + "overall_quality": 0.68 + }, + "overall_quality": 0.68 + } + ], + "metadata": { + "total_generated": 3, + "high_quality_count": 3, + "quality_threshold": 0.4, + "min_mathematical_content": 0.4, + "min_answer_quality": 0.3, + "avg_overall_quality": 0.78, + "difficulty_distribution": { + "Easy": 1, + "easy": 1, + "medium": 1 + }, + "model_used": "Qwen/Qwen2.5-1.5B-Instruct", + "generation_method": "ray_distributed_qwen", + "features": [ + "structured_json_output", + "multi_dimensional_quality_assessment", + "difficulty_variation", + "deduplication", + "robust_parsing" + ] + } +} \ No newline at end of file diff --git a/examples/ray-kft-v1/docs/raycluster_dashboard.png b/examples/ray-kft-v1/docs/raycluster_dashboard.png new file mode 100644 index 000000000..e9c60a211 Binary files /dev/null and b/examples/ray-kft-v1/docs/raycluster_dashboard.png differ diff --git a/examples/ray-kft-v1/docs/rayjob_gpu_util.png b/examples/ray-kft-v1/docs/rayjob_gpu_util.png new file mode 100644 index 000000000..fc0f1c622 Binary files /dev/null and b/examples/ray-kft-v1/docs/rayjob_gpu_util.png differ diff --git a/examples/ray-kft-v1/docs/rayjob_running_1.png b/examples/ray-kft-v1/docs/rayjob_running_1.png new file mode 100644 index 000000000..b6df870b8 Binary files /dev/null and b/examples/ray-kft-v1/docs/rayjob_running_1.png differ diff --git a/examples/ray-kft-v1/docs/rayjob_running_2.png b/examples/ray-kft-v1/docs/rayjob_running_2.png new file mode 100644 index 000000000..640cb69f4 Binary files /dev/null and b/examples/ray-kft-v1/docs/rayjob_running_2.png differ diff --git a/examples/ray-kft-v1/docs/rayjob_succeeded_1.png b/examples/ray-kft-v1/docs/rayjob_succeeded_1.png new file mode 100644 index 000000000..4a1da36fd Binary files /dev/null and b/examples/ray-kft-v1/docs/rayjob_succeeded_1.png differ diff --git a/examples/ray-kft-v1/docs/rayjob_succeeded_2.png b/examples/ray-kft-v1/docs/rayjob_succeeded_2.png new file mode 100644 index 000000000..529e28052 Binary files /dev/null and b/examples/ray-kft-v1/docs/rayjob_succeeded_2.png differ diff --git a/examples/ray-kft-v1/docs/tensorboard_1.png b/examples/ray-kft-v1/docs/tensorboard_1.png new file mode 100644 index 000000000..820dfe3db Binary files /dev/null and b/examples/ray-kft-v1/docs/tensorboard_1.png differ diff --git a/examples/ray-kft-v1/docs/tensorboard_2.png b/examples/ray-kft-v1/docs/tensorboard_2.png new file mode 100644 index 000000000..5f8b2df49 Binary files /dev/null and b/examples/ray-kft-v1/docs/tensorboard_2.png differ diff --git a/examples/ray-kft-v1/scripts/kft_granite_training.py b/examples/ray-kft-v1/scripts/kft_granite_training.py new file mode 100644 index 000000000..cbf11841d --- /dev/null +++ b/examples/ray-kft-v1/scripts/kft_granite_training.py @@ -0,0 +1,253 @@ +def training_func(parameters=None): + """Fine-tune IBM Granite model on synthetic data using TRL SFTTrainer""" + import random + import json + import os + from datasets import Dataset + from transformers import AutoTokenizer, set_seed + from trl import ModelConfig, ScriptArguments, SFTConfig, SFTTrainer, TrlParser, get_peft_config, get_quantization_config, get_kbit_device_map + + print("Starting Granite fine-tuning on synthetic data...") + + # Ensure cache directories exist + print(f"HuggingFace cache directory: {os.environ['HF_HOME']}") + os.makedirs(os.environ['HF_HOME'], exist_ok=True) + os.makedirs(os.environ['HF_DATASETS_CACHE'], exist_ok=True) + os.makedirs('/shared/models', exist_ok=True) + + if parameters is None: + # Parse arguments using TRL parser + parser = TrlParser((ScriptArguments, SFTConfig, ModelConfig)) + script_args, training_args, model_args = parser.parse_args_and_config() + else: + # Create ModelConfig from parameters + model_args = ModelConfig( + model_name_or_path=parameters['model_name_or_path'], + model_revision=parameters.get('model_revision', 'main'), + torch_dtype=parameters.get('torch_dtype', 'bfloat16'), + attn_implementation=parameters.get('attn_implementation', 'flash_attention_2'), + use_peft=parameters.get('use_peft', True), + lora_r=parameters.get('lora_r', 16), + lora_alpha=parameters.get('lora_alpha', 8), + lora_dropout=parameters.get('lora_dropout', 0.05), + lora_target_modules=parameters.get('lora_target_modules', []), + lora_modules_to_save=parameters.get('lora_modules_to_save', []), + load_in_4bit=parameters.get('load_in_4bit', False), + load_in_8bit=parameters.get('load_in_8bit', False), + trust_remote_code=True, + ) + + # Store parameters that don't belong to standard TRL configs + dataset_batch_size = parameters.get('dataset_batch_size', 1000) + + # Create SFTConfig from parameters + training_args = SFTConfig( + output_dir=parameters.get('output_dir', '/shared/models/granite-3.1-2b-instruct-synthetic'), + max_seq_length=parameters.get('max_seq_length', 1024), + packing=parameters.get('packing', False), + num_train_epochs=parameters.get('num_train_epochs', 3), + per_device_train_batch_size=parameters.get('per_device_train_batch_size', 8), + per_device_eval_batch_size=parameters.get('per_device_eval_batch_size', 8), + auto_find_batch_size=parameters.get('auto_find_batch_size', False), + eval_strategy=parameters.get('eval_strategy', 'epoch'), + bf16=parameters.get('bf16', True), + tf32=parameters.get('tf32', False), + learning_rate=parameters.get('learning_rate', 2.0e-4), + warmup_steps=parameters.get('warmup_steps', 10), + lr_scheduler_type=parameters.get('lr_scheduler_type', 'inverse_sqrt'), + optim=parameters.get('optim', 'adamw_torch_fused'), + max_grad_norm=parameters.get('max_grad_norm', 1.0), + seed=parameters.get('seed', 42), + gradient_accumulation_steps=parameters.get('gradient_accumulation_steps', 1), + gradient_checkpointing=parameters.get('gradient_checkpointing', False), + gradient_checkpointing_kwargs=parameters.get('gradient_checkpointing_kwargs', {'use_reentrant': False}), + fsdp=parameters.get('fsdp', 'full_shard auto_wrap'), + fsdp_config=parameters.get('fsdp_config', { + 'activation_checkpointing': True, + 'cpu_ram_efficient_loading': False, + 'sync_module_states': True, + 'use_orig_params': True, + 'limit_all_gathers': False + }), + save_strategy=parameters.get('save_strategy', 'epoch'), + save_total_limit=parameters.get('save_total_limit', 1), + resume_from_checkpoint=parameters.get('resume_from_checkpoint', False), + log_level=parameters.get('log_level', 'warning'), + logging_strategy=parameters.get('logging_strategy', 'steps'), + logging_steps=parameters.get('logging_steps', 1), + report_to=parameters.get('report_to', ['tensorboard']), + ) + + # Store dataset-related parameters separately (not part of standard ScriptArguments) + dataset_name = parameters.get('dataset_name', 'synthetic_gsm8k') + dataset_config = parameters.get('dataset_config', 'main') + dataset_train_split = parameters.get('dataset_train_split', 'train') + dataset_test_split = parameters.get('dataset_test_split', 'test') + dataset_text_field = parameters.get('dataset_text_field', 'text') + dataset_kwargs = parameters.get('dataset_kwargs', { + 'add_special_tokens': False, + 'append_concat_token': False + }) + + # Create ScriptArguments with required dataset_name parameter + script_args = ScriptArguments(dataset_name=dataset_name) + + # Set seed for reproducibility + set_seed(training_args.seed) + + # Ensure output directory exists + os.makedirs(training_args.output_dir, exist_ok=True) + print(f"Output directory: {training_args.output_dir}") + + # Model and tokenizer configuration with CPU/GPU compatibility + import torch + device = "cuda" if torch.cuda.is_available() else "cpu" + print(f"Using device: {device}") + + # Adjust configuration based on available hardware + quantization_config = get_quantization_config(model_args) if device == "cuda" else None + torch_dtype = getattr(torch, model_args.torch_dtype) if device == "cuda" and hasattr(model_args, 'torch_dtype') else torch.float32 + + model_kwargs = dict( + revision=model_args.model_revision, + trust_remote_code=model_args.trust_remote_code, + attn_implementation=model_args.attn_implementation if device == "cuda" else "eager", + torch_dtype=torch_dtype, + use_cache=False if training_args.gradient_checkpointing or training_args.fsdp_config.get("activation_checkpointing", False) else True, + device_map=get_kbit_device_map() if quantization_config is not None else None, + quantization_config=quantization_config, + ) + training_args.model_init_kwargs = model_kwargs + + # Check if model exists in shared PVC first, otherwise download from HuggingFace Hub + shared_model_path = f"/shared/models/{model_args.model_name_or_path.replace('/', '_')}" + + if os.path.exists(shared_model_path): + model_path = shared_model_path + print(f"Loading model from shared PVC: {model_path}") + else: + model_path = model_args.model_name_or_path + print(f"Model not found in shared PVC, will download from HuggingFace Hub: {model_path}") + print(f"Downloaded model will be cached in: {os.environ['HF_HOME']}") + + # Load tokenizer + tokenizer = AutoTokenizer.from_pretrained( + model_path, + trust_remote_code=model_args.trust_remote_code, + use_fast=True + ) + if tokenizer.pad_token is None: + tokenizer.pad_token = tokenizer.eos_token + + # Load synthetic dataset generated by Ray workers + print("Loading synthetic dataset...") + dataset_paths = [ + "/shared/synthetic_data/synthetic_dataset.json", + "/shared/synthetic_data/final_synthetic_dataset.json" + ] + + synthetic_data = None + for path in dataset_paths: + try: + with open(path, "r") as f: + synthetic_data = json.load(f) + print(f"Loaded synthetic dataset from: {path}") + break + except FileNotFoundError: + continue + + if synthetic_data is None: + print("Synthetic dataset not found in any expected location:") + for path in dataset_paths: + print(f" - {path}") + print("Please run Ray preprocessing first.") + raise FileNotFoundError("No synthetic dataset found") + + # Handle both list and dict formats + if isinstance(synthetic_data, list): + # Convert flat list to train/test split + total_samples = len(synthetic_data) + split_idx = int(total_samples * 0.8) # 80/20 split + train_data = synthetic_data[:split_idx] + test_data = synthetic_data[split_idx:] + print(f"Converted list format: {len(train_data)} train, {len(test_data)} test samples") + else: + # Dict format with train/test keys + train_data = synthetic_data.get("train", []) + test_data = synthetic_data.get("test", []) + print(f"Dict format: {len(train_data)} train, {len(test_data)} test samples") + + # Convert to HuggingFace datasets format with chat template + train_samples = [] + for item in train_data: + # Format as conversation for instruction tuning + messages = [ + {"role": "user", "content": item["question"]}, + {"role": "assistant", "content": item["answer"]}, + ] + train_samples.append({"messages": messages}) + + test_samples = [] + for item in test_data[:100]: # Limit eval set size + messages = [ + {"role": "user", "content": item["question"]}, + {"role": "assistant", "content": item["answer"]}, + ] + test_samples.append({"messages": messages}) + + # Create datasets + train_dataset = Dataset.from_list(train_samples) + test_dataset = Dataset.from_list(test_samples) if training_args.eval_strategy != "no" else None + + # Apply chat template + def template_dataset(sample): + return {"text": tokenizer.apply_chat_template(sample["messages"], tokenize=False, add_generation_prompt=False)} + + train_dataset = train_dataset.map(template_dataset, remove_columns=["messages"]) + if test_dataset is not None: + test_dataset = test_dataset.map(template_dataset, remove_columns=["messages"]) + + print(f"Dataset prepared: {len(train_dataset)} train samples") + + # Log few random samples from training set + with training_args.main_process_first(desc="Log few samples from the training set"): + for index in random.sample(range(len(train_dataset)), 2): + print(f"Sample {index}:") + print(train_dataset[index]["text"]) + print("-" * 50) + + # Initialize SFT trainer + trainer = SFTTrainer( + model=model_path, # Use the resolved model path (shared PVC or HuggingFace Hub) + args=training_args, + train_dataset=train_dataset, + eval_dataset=test_dataset, + peft_config=get_peft_config(model_args), + processing_class=tokenizer, + ) + + # Print trainable parameters info + if trainer.accelerator.is_main_process and hasattr(trainer.model, "print_trainable_parameters"): + trainer.model.print_trainable_parameters() + + # Start training + print("Starting training...") + checkpoint = None + if training_args.resume_from_checkpoint is not None: + checkpoint = training_args.resume_from_checkpoint + + trainer.train(resume_from_checkpoint=checkpoint) + trainer.save_model(training_args.output_dir) + + with training_args.main_process_first(desc="Training completed"): + print(f"Training completed successfully!") + print(f"Model checkpoint saved to: {training_args.output_dir}") + print(f"Training info:") + print(f" - Model: {model_args.model_name_or_path}") + print(f" - Epochs: {training_args.num_train_epochs}") + print(f" - LoRA rank: {model_args.lora_r if model_args.use_peft else 'N/A'}") + print(f" - Synthetic samples: {len(train_dataset)}") + + +if __name__ == "__main__": + training_func() diff --git a/examples/ray-kft-v1/scripts/ray_sdg_job.py b/examples/ray-kft-v1/scripts/ray_sdg_job.py new file mode 100644 index 000000000..80fe62a1f --- /dev/null +++ b/examples/ray-kft-v1/scripts/ray_sdg_job.py @@ -0,0 +1,1186 @@ +#!/usr/bin/env python3 +""" +Ray based Synthetic Data Generation Script +""" + +import os +import json +import ray +import ray.data +import torch +import warnings +import time +import argparse +import numpy as np +import threading +import signal +from typing import List, Dict, Optional, Tuple +from datasets import load_dataset +from transformers import AutoTokenizer, AutoModelForCausalLM +import random +import re +from dataclasses import dataclass +from pathlib import Path + +warnings.filterwarnings("ignore", category=FutureWarning, module="transformers") + +# Configuration constants +MODEL_NAME = "Qwen/Qwen2.5-1.5B-Instruct" +# Alternative models that can be used: "microsoft/DialoGPT-medium", "microsoft/DialoGPT-small" + +@dataclass +class QualityMetrics: + """Quality assessment metrics for generated content.""" + mathematical_correctness: float = 0.0 + reasoning_quality: float = 0.0 + problem_complexity: float = 0.0 + answer_completeness: float = 0.0 + overall_quality: float = 0.0 + +class NumpyEncoder(json.JSONEncoder): + """Custom JSON encoder for numpy arrays.""" + def default(self, obj): + if isinstance(obj, np.ndarray): + return obj.tolist() + if isinstance(obj, (np.integer, np.floating)): + return obj.item() + return super().default(obj) + +class CheckpointManager: + """Manages checkpointing and resume functionality.""" + + def __init__(self, output_dir: str, save_every: int = 5): + self.output_dir = Path(output_dir) + self.save_every = save_every + self.checkpoint_file = self.output_dir / "checkpoint.json" + self.dataset_file = self.output_dir / "synthetic_dataset.json" + self.metadata_file = self.output_dir / "dataset_metadata.json" + self.processed_seeds = set() + self.current_data = [] + self.checkpoint_count = 0 + self.lock = threading.Lock() + + self.output_dir.mkdir(parents=True, exist_ok=True) + + def load_checkpoint(self) -> Dict: + """Load existing checkpoint if available.""" + if not self.checkpoint_file.exists(): + return {} + + try: + with open(self.checkpoint_file, 'r') as f: + checkpoint = json.load(f) + + self.processed_seeds = set(checkpoint.get('processed_seeds', [])) + self.checkpoint_count = checkpoint.get('checkpoint_count', 0) + + if self.dataset_file.exists(): + with open(self.dataset_file, 'r') as f: + self.current_data = json.load(f) + + print(f"Loaded checkpoint: {len(self.processed_seeds)} seeds processed, {len(self.current_data)} samples saved") + return checkpoint + except Exception as e: + print(f"Failed to load checkpoint: {e}") + return {} + + def save_checkpoint(self, processed_seeds: set, total_expected: int, force: bool = False): + """Save current progress to checkpoint.""" + with self.lock: + self.processed_seeds.update(processed_seeds) + + if not (force or len(self.processed_seeds) % self.save_every == 0): + return + + checkpoint = { + 'processed_seeds': list(self.processed_seeds), + 'checkpoint_count': self.checkpoint_count + 1, + 'total_expected': total_expected, + 'timestamp': time.time(), + 'progress_percentage': (len(self.processed_seeds) / total_expected) * 100 if total_expected > 0 else 0 + } + + try: + with open(self.checkpoint_file, 'w') as f: + json.dump(checkpoint, f, indent=2) + + self.checkpoint_count += 1 + print(f"Checkpoint saved: {len(self.processed_seeds)}/{total_expected} seeds processed ({checkpoint['progress_percentage']:.1f}%)") + except Exception as e: + print(f"Failed to save checkpoint: {e}") + + def save_batch_data(self, batch_data: List[Dict]): + """Save batch data incrementally.""" + with self.lock: + self.current_data.extend(batch_data) + + try: + with open(self.dataset_file, 'w') as f: + json.dump(self.current_data, f, indent=2, cls=NumpyEncoder) + + metadata = self._create_metadata() + with open(self.metadata_file, 'w') as f: + json.dump(metadata, f, indent=2, cls=NumpyEncoder) + + except Exception as e: + print(f"Failed to save batch data: {e}") + + def _create_metadata(self) -> Dict: + """Create metadata for current dataset.""" + total_samples = len(self.current_data) + high_quality_count = sum(1 for item in self.current_data if item.get('overall_quality', 0) >= 0.3) + + metadata = { + 'total_samples': total_samples, + 'high_quality_count': high_quality_count, + 'last_update': time.time(), + 'checkpoint_count': self.checkpoint_count + } + + if total_samples > 0: + metadata['quality_pass_rate'] = (high_quality_count / total_samples) * 100 + metadata['avg_quality_score'] = np.mean([item.get('overall_quality', 0) for item in self.current_data]) + + return metadata + + def get_remaining_seeds(self, all_seed_ids: List[int]) -> List[int]: + """Get list of seeds that haven't been processed yet.""" + return [seed_id for seed_id in all_seed_ids if seed_id not in self.processed_seeds] + + def is_seed_processed(self, seed_id: int) -> bool: + """Check if a seed has already been processed.""" + return seed_id in self.processed_seeds + +class ModelInferenceCallable: + """Ray Data callable class for distributed model inference with checkpointing.""" + + def __init__(self, model_name: str = MODEL_NAME, variations_per_seed: int = 1, + output_dir: str = "/tmp/synthetic_data", processed_seeds: set = None): + self.model_name = model_name + self.variations_per_seed = variations_per_seed + self.output_dir = output_dir + self.processed_seeds = processed_seeds or set() + self.model = None + self.tokenizer = None + self.device = None + + def __call__(self, batch: Dict[str, List]) -> Dict[str, List]: + """Process a batch of seed samples to generate synthetic data.""" + if self.model is None: + self._initialize_model() + + results = { + "questions": [], "answers": [], "sources": [], "difficulties": [], + "concepts": [], "quality_scores": [], "model_confidences": [], + "seed_ids": [], "variation_ids": [] + } + + processed_seed_ids = set() + batch_data = [] + + for i, seed_sample in enumerate(batch["seed_samples"]): + seed_id = batch.get("seed_ids", [i])[i] + + if seed_id in self.processed_seeds: + print(f"[Batch] Skipping already processed seed {seed_id}") + continue + + print(f"[Batch] Processing seed {seed_id}: {seed_sample.get('question', 'No question')[:50]}...") + + for var_id in range(self.variations_per_seed): + try: + generated = self._generate_variation(seed_sample, var_id) + if not generated or not self._validate_quality(generated): + continue + + quality_metrics = self._assess_quality(generated) + + # Add to results + self._add_to_results(results, generated, quality_metrics, seed_id, var_id) + self._add_to_batch_data(batch_data, generated, quality_metrics, seed_id, var_id) + + except Exception as e: + print(f"[Batch] Error processing seed {seed_id}, variation {var_id}: {e}") + continue + + processed_seed_ids.add(seed_id) + + if batch_data: + self._save_batch_data(batch_data) + + self._cleanup_gpu_memory() + return results + + def _initialize_model(self): + """Initialize model and tokenizer on worker.""" + print(f"[Worker] Loading model: {self.model_name}") + + cache_dir = self._get_cache_directory() + max_retries = 3 + + for attempt in range(max_retries): + try: + # Load tokenizer + self.tokenizer = AutoTokenizer.from_pretrained( + self.model_name, + trust_remote_code=True, + cache_dir=cache_dir, + resume_download=True, + force_download=False, + local_files_only=False + ) + + if self.tokenizer.pad_token is None: + self.tokenizer.pad_token = self.tokenizer.eos_token + + # Setup device + self.device, device_map = self._setup_device() + + # Load model + model_dtype = torch.bfloat16 if self.device.startswith("cuda") else torch.float32 + + self.model = AutoModelForCausalLM.from_pretrained( + self.model_name, + trust_remote_code=True, + torch_dtype=model_dtype, + cache_dir=cache_dir, + device_map=device_map, + low_cpu_mem_usage=True, + use_safetensors=True, + resume_download=True, + force_download=False, + local_files_only=False + ) + + if device_map is None and self.device != "cpu": + self.model = self.model.to(self.device) + + self.model.eval() + + if self.device.startswith("cuda"): + torch.cuda.empty_cache() + + print(f"[Worker] Model loaded successfully on {self.device}") + return + + except Exception as e: + print(f"[Worker] Attempt {attempt + 1} failed: {e}") + if torch.cuda.is_available(): + torch.cuda.empty_cache() + + if attempt < max_retries - 1: + wait_time = (attempt + 1) * 10 + print(f"[Worker] Waiting {wait_time}s before retry...") + time.sleep(wait_time) + else: + raise Exception(f"Failed to load model after {max_retries} attempts. Last error: {e}") + + def _setup_device(self) -> Tuple[str, Optional[str]]: + """Setup device configuration.""" + os.environ['CUDA_LAUNCH_BLOCKING'] = '1' + + if not torch.cuda.is_available(): + return "cpu", None + + available_gpus = torch.cuda.device_count() + ray_gpu_ids = ray.get_gpu_ids() + + print(f"[Worker] CUDA available: {torch.cuda.is_available()}, GPUs: {available_gpus}, Ray GPUs: {ray_gpu_ids}") + + if not ray_gpu_ids: + return ("cuda:0" if available_gpus > 0 else "cpu"), None + + valid_gpus = [gpu_id for gpu_id in ray_gpu_ids if gpu_id < available_gpus] + if not valid_gpus: + return "cuda:0", None + + primary_gpu = valid_gpus[0] + device_map = "auto" if len(valid_gpus) > 1 else None + + return f"cuda:{primary_gpu}", device_map + + def _get_cache_directory(self) -> str: + """Get cache directory for model storage.""" + for path in ["/shared/cache", os.path.expanduser("~/.cache"), "/tmp/.cache"]: + if os.path.exists(os.path.dirname(path)): + os.makedirs(path, exist_ok=True) + return path + return "/tmp/.cache" + + def _generate_variation(self, seed_sample: Dict, var_id: int) -> Optional[Dict]: + """Generate a single variation from a seed sample.""" + difficulties = ["easy", "medium", "hard"] + difficulty = difficulties[var_id % len(difficulties)] + + prompt = self._create_variation_prompt(seed_sample, difficulty) + + try: + inputs = self.tokenizer( + prompt, + return_tensors="pt", + truncation=True, + max_length=512 + ).to(self.device) + + with torch.no_grad(): + outputs = self.model.generate( + **inputs, + max_new_tokens=512, + temperature=0.7, + do_sample=True, + top_p=0.9, + top_k=50, + repetition_penalty=1.1, + eos_token_id=self.tokenizer.eos_token_id, + pad_token_id=self.tokenizer.pad_token_id + ) + + response = self.tokenizer.decode( + outputs[0][inputs['input_ids'].shape[1]:], + skip_special_tokens=True + ).strip() + + parsed = self._parse_response(response) + if parsed: + parsed["difficulty"] = difficulty + return parsed + + except Exception as e: + print(f"[Worker] Generation error: {e}") + + return None + + def _create_variation_prompt(self, seed_sample: Dict, difficulty: str = "medium") -> str: + """Create prompt for generating variations.""" + seed_question = seed_sample["question"] + seed_answer = seed_sample["answer"] + + difficulty_constraints = { + "easy": "single-step calculation with whole numbers, clear and simple scenario", + "medium": "2-3 step calculation, may include decimals, requires logical reasoning", + "hard": "multi-step problem requiring multiple operations and complex logical reasoning" + } + + quality_examples = { + "easy": { + "question": "A bakery sold 45 cupcakes in the morning and 38 cupcakes in the afternoon. How many cupcakes did they sell in total?", + "answer": "To find the total cupcakes sold, I need to add the morning and afternoon sales.\nMorning sales: 45 cupcakes\nAfternoon sales: 38 cupcakes\nTotal = 45 + 38 = 83 cupcakes\nTherefore, the bakery sold 83 cupcakes in total." + }, + "medium": { + "question": "A school bought 15 notebooks for $45. If each notebook costs the same amount, how much would it cost to buy 27 notebooks?", + "answer": "First, I need to find the cost per notebook.\nCost per notebook = Total cost รท Number of notebooks = $45 รท 15 = $3 per notebook\nNext, I'll calculate the cost for 27 notebooks.\nCost for 27 notebooks = 27 ร— $3 = $81\nTherefore, it would cost $81 to buy 27 notebooks." + }, + "hard": { + "question": "A company produces widgets at a rate of 12 per hour. If they work 8 hours per day and need to fulfill an order of 2,400 widgets, how many full days will it take to complete the order?", + "answer": "First, I'll calculate how many widgets are produced per day.\nWidgets per hour = 12\nHours per day = 8\nWidgets per day = 12 ร— 8 = 96 widgets per day\nNext, I'll find how many days are needed for 2,400 widgets.\nDays needed = Total widgets รท Widgets per day = 2,400 รท 96 = 25 days\nTherefore, it will take 25 full days to complete the order." + } + } + + example = quality_examples.get(difficulty, quality_examples["medium"]) + constraint = difficulty_constraints.get(difficulty, difficulty_constraints["medium"]) + + return f"""<|im_start|>system +You are an expert mathematics educator creating high-quality word problems for student practice. + +DIFFICULTY LEVEL: {difficulty.upper()} +REQUIREMENTS: {constraint} + +QUALITY STANDARDS: +1. Create a realistic, engaging scenario (shopping, cooking, travel, business, etc.) +2. Question must be clear, specific, and unambiguous +3. Provide a complete step-by-step solution showing ALL calculations +4. Each calculation must be mathematically correct (double-check your arithmetic!) +5. Include the final numerical answer clearly stated with appropriate units +6. Use proper mathematical language and logical flow + +EXAMPLE OF HIGH QUALITY {difficulty.upper()} PROBLEM: +Question: {example["question"]} +Answer: {example["answer"]} + +SEED INSPIRATION (create something similar but different): +Question: {seed_question} +Answer: {seed_answer} + +Return ONLY a JSON object with "question" and "answer" fields. Ensure your answer shows step-by-step work and arrives at a correct final numerical answer. +<|im_end|> +<|im_start|>user +Generate a high-quality {difficulty} math word problem with complete step-by-step solution: +<|im_end|> +<|im_start|>assistant +""" + + def _parse_response(self, response: str) -> Optional[Dict]: + """Parse model response into structured format.""" + try: + response = response.strip() + + # Try JSON parsing first + if response.startswith('{') and response.endswith('}'): + return json.loads(response) + + # Handle Qwen format + if response.startswith('"') and '"answer":' in response: + if not response.startswith('{'): + response = "{" + response + if not response.endswith('}'): + response = response + "}" + return json.loads(response) + + # Try to find JSON-like content + json_match = re.search(r'\{[^{}]*"question"[^{}]*"answer"[^{}]*\}', response, re.DOTALL) + if json_match: + try: + return json.loads(json_match.group(0)) + except: + pass + + return self._fallback_parse(response) + + except Exception: + return self._fallback_parse(response) + + def _fallback_parse(self, response: str) -> Optional[Dict]: + """Fallback parsing for non-JSON responses.""" + # Extract question and answer with regex + question_match = re.search(r'"question":\s*"([^"]*(?:\\.[^"]*)*?)"', response, re.DOTALL) + answer_match = re.search(r'"answer":\s*"([^"]*(?:\\.[^"]*)*?)"', response, re.DOTALL) + + if question_match and answer_match: + question = question_match.group(1).replace('\\"', '"').replace('\\n', '\n') + answer = answer_match.group(1).replace('\\"', '"').replace('\\n', '\n') + return {"question": question, "answer": answer, "confidence": 0.7} + + # Try alternative patterns + alt_question_match = re.search(r'Question:\s*(.+?)(?=Answer:|$)', response, re.DOTALL | re.IGNORECASE) + alt_answer_match = re.search(r'Answer:\s*(.+?)(?=Question:|$)', response, re.DOTALL | re.IGNORECASE) + + if alt_question_match and alt_answer_match: + return { + "question": alt_question_match.group(1).strip(), + "answer": alt_answer_match.group(1).strip(), + "confidence": 0.6 + } + + # If response looks like a math problem + if any(keyword in response.lower() for keyword in ["how many", "what is", "calculate", "find"]): + return { + "question": response.strip(), + "answer": "This is a mathematical word problem that requires calculation.", + "confidence": 0.5 + } + + return None + + def _validate_quality(self, generated: Dict) -> bool: + """Enhanced quality validation.""" + if not generated or not generated.get("question") or not generated.get("answer"): + return False + + question = str(generated["question"]).lower() + answer = str(generated["answer"]).lower() + + # Check for mathematical content + math_indicators = ["calculate", "solve", "find", "how many", "total", "cost", "price", "sum", "difference"] + has_math = any(indicator in question for indicator in math_indicators) + + # Check minimum length and completeness + min_length = len(question) >= 25 and len(answer) >= 50 + + # Check if answer seems complete + has_calculation = any(word in answer for word in ["=", "total", "result", "answer is", "solution", "therefore"]) + has_numbers = len(re.findall(r'\d+', answer)) >= 2 + + # Avoid incomplete answers + incomplete_indicators = ["let's", "step by step:", "to determine", "we need to", "first,", "i need to", "let me"] + is_incomplete = any(indicator in answer and len(answer) < 150 for indicator in incomplete_indicators) + + # Check for realistic scenarios + scenario_words = ["bakery", "school", "store", "farmer", "company", "trip", "recipe", "budget", "restaurant", "library"] + has_scenario = any(word in question for word in scenario_words) + + # Additional quality checks + has_proper_question = any(phrase in question for phrase in ["how many", "how much", "what is", "calculate"]) + has_final_answer = any(phrase in answer for phrase in ["therefore", "the answer is", "total", "result"]) + + return (has_math and min_length and (has_calculation or has_numbers) and + not is_incomplete and (has_scenario or has_proper_question) and has_final_answer) + + def _assess_quality(self, generated: Dict) -> QualityMetrics: + """Assess quality of generated content.""" + question = str(generated.get("question", "")) + answer = str(generated.get("answer", "")) + + math_score = self._validate_mathematical_correctness(question, answer) + reasoning_score = self._assess_reasoning_quality(answer) + complexity_score = self._assess_problem_complexity(question) + completeness_score = self._assess_answer_completeness(answer) + + overall_quality = (math_score * 0.4 + reasoning_score * 0.3 + + complexity_score * 0.2 + completeness_score * 0.1) + + return QualityMetrics( + mathematical_correctness=math_score, + reasoning_quality=reasoning_score, + problem_complexity=complexity_score, + answer_completeness=completeness_score, + overall_quality=overall_quality + ) + + def _validate_mathematical_correctness(self, question: str, answer: str) -> float: + """Validate mathematical correctness by checking calculations.""" + calculations = re.findall(r'(\d+(?:\.\d+)?)\s*([+\-*/รทร—])\s*(\d+(?:\.\d+)?)\s*=\s*(\d+(?:\.\d+)?)', answer) + + if not calculations: + math_indicators = ["total", "sum", "difference", "product", "quotient", "result"] + has_math = any(indicator in answer.lower() for indicator in math_indicators) + has_numbers = len(re.findall(r'\d+', answer)) >= 2 + return 0.6 if has_math and has_numbers else 0.3 + + correct_count = 0 + for calc in calculations: + try: + num1, op, num2, result = float(calc[0]), calc[1], float(calc[2]), float(calc[3]) + + if op == '+': + expected = num1 + num2 + elif op == '-': + expected = num1 - num2 + elif op in ['*', 'ร—']: + expected = num1 * num2 + elif op in ['/', 'รท'] and num2 != 0: + expected = num1 / num2 + else: + continue + + if abs(expected - result) < 0.01: + correct_count += 1 + except: + continue + + return correct_count / len(calculations) if calculations else 0.3 + + def _assess_reasoning_quality(self, answer: str) -> float: + """Assess quality of step-by-step reasoning.""" + reasoning_score = 0.0 + + # Check for logical flow indicators + flow_indicators = ["first", "then", "next", "therefore", "so", "thus", "finally"] + flow_count = sum(1 for indicator in flow_indicators if indicator in answer.lower()) + reasoning_score += min(flow_count * 0.15, 0.4) + + # Check for explanation words + explanation_words = ["because", "since", "as", "given that", "we know", "to find"] + explanation_count = sum(1 for word in explanation_words if word in answer.lower()) + reasoning_score += min(explanation_count * 0.1, 0.3) + + # Check for step-by-step structure + step_patterns = ["step 1", "step 2", "1.", "2.", "3."] + if any(pattern in answer.lower() for pattern in step_patterns): + reasoning_score += 0.2 + + # Check for calculation explanation + calc_explanations = ["calculate", "multiply", "add", "subtract", "divide"] + if any(word in answer.lower() for word in calc_explanations): + reasoning_score += 0.1 + + return min(reasoning_score, 1.0) + + def _assess_problem_complexity(self, question: str) -> float: + """Assess problem complexity and clarity.""" + complexity_score = 0.0 + + # Check question length + word_count = len(question.split()) + if 15 <= word_count <= 50: + complexity_score += 0.3 + elif 10 <= word_count <= 60: + complexity_score += 0.2 + else: + complexity_score += 0.1 + + # Check for realistic scenario words + scenario_words = ["bakery", "school", "store", "farmer", "company", "trip", "recipe", "budget"] + if any(word in question.lower() for word in scenario_words): + complexity_score += 0.2 + + # Check for clear question structure + question_words = ["how many", "how much", "what is", "calculate", "find"] + if any(phrase in question.lower() for phrase in question_words): + complexity_score += 0.3 + + # Check for multiple numbers + numbers = re.findall(r'\d+', question) + if len(numbers) >= 2: + complexity_score += 0.2 + + return min(complexity_score, 1.0) + + def _assess_answer_completeness(self, answer: str) -> float: + """Check if answer is complete with final numerical result.""" + completeness_score = 0.0 + + # Check for final answer indicators + final_indicators = ["therefore", "the answer is", "total", "result", "final answer"] + if any(indicator in answer.lower() for indicator in final_indicators): + completeness_score += 0.4 + + # Check for numerical answer + if re.findall(r'\d+(?:\.\d+)?', answer): + completeness_score += 0.3 + + # Check minimum answer length + if len(answer.split()) >= 20: + completeness_score += 0.2 + + # Check for units or context + unit_words = ["dollars", "cents", "items", "people", "days", "hours", "pounds", "kilograms"] + if any(unit in answer.lower() for unit in unit_words): + completeness_score += 0.1 + + return min(completeness_score, 1.0) + + def _add_to_results(self, results: Dict, generated: Dict, quality_metrics: QualityMetrics, + seed_id: int, var_id: int): + """Add generated content to results.""" + results["questions"].append(str(generated["question"])) + results["answers"].append(str(generated["answer"])) + results["sources"].append("ray_data_sdg_qwen") + results["difficulties"].append(str(generated.get("difficulty", "medium"))) + results["concepts"].append("arithmetic,word_problems") + results["quality_scores"].append(quality_metrics.overall_quality) + results["model_confidences"].append(float(generated.get("confidence", 0.5))) + results["seed_ids"].append(seed_id) + results["variation_ids"].append(var_id) + + def _add_to_batch_data(self, batch_data: List[Dict], generated: Dict, + quality_metrics: QualityMetrics, seed_id: int, var_id: int): + """Add generated content to batch data.""" + batch_item = { + "question": str(generated["question"]), + "answer": str(generated["answer"]), + "source": "ray_data_sdg_qwen", + "difficulty": str(generated.get("difficulty", "medium")), + "concepts": ["arithmetic", "word_problems"], + "overall_quality": quality_metrics.overall_quality, + "model_confidence": float(generated.get("confidence", 0.5)), + "seed_id": seed_id, + "variation_id": var_id + } + batch_data.append(batch_item) + + def _save_batch_data(self, batch_data: List[Dict]): + """Save batch data incrementally.""" + try: + os.makedirs(self.output_dir, exist_ok=True) + + dataset_path = os.path.join(self.output_dir, "synthetic_dataset.json") + existing_data = [] + + if os.path.exists(dataset_path): + try: + with open(dataset_path, "r") as f: + existing_data = json.load(f) + except: + pass + + existing_data.extend(batch_data) + + with open(dataset_path, "w") as f: + json.dump(existing_data, f, indent=2, cls=NumpyEncoder) + + # Update metadata + metadata = { + "total_samples": len(existing_data), + "high_quality_count": sum(1 for item in existing_data if item.get("overall_quality", 0) >= 0.3), + "last_update": time.time(), + "batch_count": len(batch_data) + } + + if metadata["total_samples"] > 0: + metadata["quality_pass_rate"] = (metadata["high_quality_count"] / metadata["total_samples"]) * 100 + metadata["avg_quality_score"] = np.mean([item.get("overall_quality", 0) for item in existing_data]) + + metadata_path = os.path.join(self.output_dir, "dataset_metadata.json") + with open(metadata_path, "w") as f: + json.dump(metadata, f, indent=2, cls=NumpyEncoder) + + print(f"Saved batch: {len(batch_data)} new samples (Total: {len(existing_data)})") + + except Exception as e: + print(f"Warning: Failed to save batch data: {e}") + + def _cleanup_gpu_memory(self): + """Clean up GPU memory after batch processing.""" + if torch.cuda.is_available(): + torch.cuda.empty_cache() + torch.cuda.synchronize() + +def get_cluster_info() -> Dict: + """Get detailed cluster information for multi-node setup.""" + cluster_resources = ray.cluster_resources() + node_resources = ray.nodes() + + gpu_nodes = [] + total_gpus = total_cpus = 0 + + for node in node_resources: + if node['Alive']: + node_gpus = node['Resources'].get('GPU', 0) + node_cpus = node['Resources'].get('CPU', 0) + + if node_gpus > 0: + gpu_nodes.append({ + 'node_id': node['NodeID'], + 'gpus': int(node_gpus), + 'cpus': int(node_cpus), + 'node_ip': node.get('NodeManagerAddress', 'unknown') + }) + + total_gpus += node_gpus + total_cpus += node_cpus + + return { + 'total_nodes': len([n for n in node_resources if n['Alive']]), + 'gpu_nodes': len(gpu_nodes), + 'total_gpus': int(total_gpus), + 'total_cpus': int(total_cpus), + 'gpu_nodes_info': gpu_nodes, + 'cluster_resources': cluster_resources + } + +def calculate_optimal_resources(cluster_info: Dict, args) -> Dict: + """Calculate optimal resource allocation for multi-node multi-GPU setup.""" + total_gpus = cluster_info['total_gpus'] + gpu_nodes = cluster_info['gpu_nodes'] + + if args.cpu_only or total_gpus == 0: + return { + 'compute_resources': {'num_cpus': min(args.num_cpus or 4, cluster_info['total_cpus'])}, + 'concurrency': min(4, cluster_info['total_cpus'] // 2), + 'gpus_per_worker': 0, + 'total_workers': min(4, cluster_info['total_cpus'] // 2) + } + + gpus_per_worker = min(args.gpus_per_worker, total_gpus) + max_workers = total_gpus // gpus_per_worker + + if args.max_concurrent_workers: + concurrency = min(args.max_concurrent_workers, max_workers) + else: + if args.enable_multi_node and gpu_nodes > 1: + concurrency = min(max_workers, gpu_nodes * 2) + else: + concurrency = max_workers + + return { + 'compute_resources': {'num_gpus': gpus_per_worker}, + 'concurrency': concurrency, + 'gpus_per_worker': gpus_per_worker, + 'total_workers': concurrency + } + +def parse_args(): + """Parse command line arguments.""" + parser = argparse.ArgumentParser( + description="Ray Data Synthetic Data Generation for Mathematical Word Problems", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples: + # Test mode - generate few samples quickly + python ray_data_sdg_job.py --test-mode + + # Production mode - generate full dataset + python ray_data_sdg_job.py + + # Custom configuration + python ray_data_sdg_job.py --seeds 100 --variations 2 --batch-size 16 + """ + ) + + parser.add_argument("--test-mode", action="store_true", help="Enable test mode with minimal samples") + parser.add_argument("--seeds", type=int, help="Number of seed samples to use") + parser.add_argument("--variations", type=int, help="Number of variations per seed") + parser.add_argument("--batch-size", type=int, help="Batch size for Ray Data processing") + parser.add_argument("--quality-threshold", type=float, help="Quality threshold for filtering (0.0-1.0)") + parser.add_argument("--output-path", type=str, default="/tmp/synthetic_data", help="Output path for generated dataset") + parser.add_argument("--num-cpus", type=int, help="Number of CPUs to use") + parser.add_argument("--save-every", type=int, default=5, help="Save checkpoint every N processed seeds") + parser.add_argument("--resume", action="store_true", help="Resume from last checkpoint if available") + parser.add_argument("--checkpoint-dir", type=str, help="Directory for checkpoint files") + parser.add_argument("--cpu-only", action="store_true", help="Force CPU-only execution") + parser.add_argument("--gpus-per-worker", type=int, default=1, help="Number of GPUs per worker") + parser.add_argument("--max-concurrent-workers", type=int, help="Maximum number of concurrent workers") + parser.add_argument("--enable-multi-node", action="store_true", help="Enable multi-node distributed processing") + + return parser.parse_args() + +def create_seed_dataset(num_seeds: int, cache_dir: str) -> ray.data.Dataset: + """Create Ray Dataset from GSM8K seed samples.""" + print("Loading GSM8K dataset...") + gsm8k_dataset = load_dataset("gsm8k", "main", cache_dir=f"{cache_dir}/datasets") + print(f"Dataset loaded: {len(gsm8k_dataset['train'])} train samples") + + train_data = gsm8k_dataset["train"] + seed_samples = [] + + for i in range(min(num_seeds, len(train_data))): + sample = train_data[i] + seed_samples.append({ + "seed_samples": {"question": sample["question"], "answer": sample["answer"]}, + "seed_ids": i + }) + + print(f"Created {len(seed_samples)} seed samples") + return ray.data.from_items(seed_samples) + +def create_seed_dataset_filtered(seed_ids: List[int], cache_dir: str) -> ray.data.Dataset: + """Create Ray Dataset from GSM8K seed samples for specific seed IDs.""" + print(f"Loading GSM8K dataset for {len(seed_ids)} specific seeds...") + gsm8k_dataset = load_dataset("gsm8k", "main", cache_dir=f"{cache_dir}/datasets") + + train_data = gsm8k_dataset["train"] + seed_samples = [] + + for seed_id in seed_ids: + if seed_id < len(train_data): + sample = train_data[seed_id] + seed_samples.append({ + "seed_samples": {"question": sample["question"], "answer": sample["answer"]}, + "seed_ids": seed_id + }) + + print(f"Created {len(seed_samples)} filtered seed samples") + return ray.data.from_items(seed_samples) + +def quality_filter(batch: Dict[str, List]) -> Dict[str, List]: + """Enhanced quality filter with detailed logging.""" + print(f"[Filter] Input batch has {len(batch.get('quality_scores', []))} items") + + filtered_indices = [] + quality_stats = {"total": 0, "high_quality": 0, "medium_quality": 0, "low_quality": 0} + + for i, quality_score in enumerate(batch["quality_scores"]): + quality_stats["total"] += 1 + + if quality_score >= 0.7: + filtered_indices.append(i) + quality_stats["high_quality"] += 1 + print(f"[Filter] Item {i}: ACCEPTED (quality_score = {quality_score:.3f}) - High Quality") + elif quality_score >= 0.5: + filtered_indices.append(i) + quality_stats["medium_quality"] += 1 + print(f"[Filter] Item {i}: ACCEPTED (quality_score = {quality_score:.3f}) - Medium Quality") + else: + quality_stats["low_quality"] += 1 + print(f"[Filter] Item {i}: REJECTED (quality_score = {quality_score:.3f}) - Low Quality") + + print(f"[Filter] Quality Statistics:") + print(f" - Total items: {quality_stats['total']}") + print(f" - High quality (โ‰ฅ0.7): {quality_stats['high_quality']}") + print(f" - Medium quality (โ‰ฅ0.5): {quality_stats['medium_quality']}") + print(f" - Low quality (<0.5): {quality_stats['low_quality']}") + print(f" - Acceptance rate: {len(filtered_indices)/quality_stats['total']*100:.1f}%") + + # Filter all fields based on quality + return {key: [values[i] for i in filtered_indices] for key, values in batch.items()} + +def format_for_output(batch: Dict[str, List]) -> Dict[str, List]: + """Format batch for final output.""" + formatted_items = [] + + for i in range(len(batch["questions"])): + item = { + "question": str(batch["questions"][i]), + "answer": str(batch["answers"][i]), + "source": str(batch["sources"][i]), + "difficulty": str(batch["difficulties"][i]), + "concepts": batch["concepts"][i].split(",") if isinstance(batch["concepts"][i], str) else ["arithmetic", "word_problems"], + "overall_quality": float(batch["quality_scores"][i]), + "model_confidence": float(batch["model_confidences"][i]), + "seed_id": int(batch["seed_ids"][i]), + "variation_id": int(batch["variation_ids"][i]) + } + formatted_items.append(item) + + return {"items": formatted_items} + +def setup_signal_handlers(checkpoint_manager: CheckpointManager, total_expected: int): + """Setup signal handlers for graceful shutdown.""" + def signal_handler(signum, frame): + print(f"\nReceived signal {signum}. Saving checkpoint and shutting down gracefully...") + checkpoint_manager.save_checkpoint(checkpoint_manager.processed_seeds, total_expected, force=True) + print("Checkpoint saved. Exiting...") + if ray.is_initialized(): + ray.shutdown() + exit(0) + + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + +def save_final_dataset(all_items: List[Dict], output_path: str, metadata: Dict): + """Save final formatted dataset.""" + os.makedirs(output_path, exist_ok=True) + + # Split into train/test (80/20) + random.shuffle(all_items) + split_idx = int(len(all_items) * 0.8) + + final_dataset = { + "train": all_items[:split_idx], + "test": all_items[split_idx:], + "metadata": metadata + } + + final_dataset_path = os.path.join(output_path, "final_synthetic_dataset.json") + with open(final_dataset_path, "w") as f: + json.dump(final_dataset, f, indent=2, cls=NumpyEncoder) + + print(f"Final dataset saved: {len(final_dataset['train'])} train / {len(final_dataset['test'])} test") + print(f"Saved to: {final_dataset_path}") + +def main(): + """Main function using Ray Data pipeline with checkpointing.""" + args = parse_args() + + print("Starting Ray Data distributed synthetic data generation with checkpointing...") + print(f"Mode: {'TEST' if args.test_mode else 'PRODUCTION'}") + + # Setup checkpoint directory + checkpoint_dir = args.checkpoint_dir if args.checkpoint_dir else args.output_path + checkpoint_manager = CheckpointManager(checkpoint_dir, args.save_every) + + # Load existing checkpoint if resuming + checkpoint_data = {} + if args.resume: + print("Attempting to resume from checkpoint...") + checkpoint_data = checkpoint_manager.load_checkpoint() + if checkpoint_data: + print(f"Resuming from checkpoint with {len(checkpoint_manager.processed_seeds)} seeds already processed") + else: + print("No checkpoint found, starting fresh") + else: + print("Starting fresh (not resuming from checkpoint)") + + # Initialize Ray + if not ray.is_initialized(): + print("Initializing Ray...") + is_ray_job = os.environ.get('RAY_JOB_ID') is not None or os.environ.get('RAY_ADDRESS') is not None + + if is_ray_job: + ray.init() + print("Ray initialized in cluster mode (running as Ray job)") + else: + ray.init(num_cpus=min(8, os.cpu_count())) + print("Ray initialized in standalone mode") + else: + print("Ray already initialized") + + # Get cluster information + cluster_info = get_cluster_info() + print(f"Connected to Ray cluster:") + print(f" - Total nodes: {cluster_info['total_nodes']}") + print(f" - GPU nodes: {cluster_info['gpu_nodes']}") + print(f" - Total GPUs: {cluster_info['total_gpus']}") + print(f" - Total CPUs: {cluster_info['total_cpus']}") + + if args.enable_multi_node and cluster_info['gpu_nodes'] > 1: + print(f" - Multi-node mode enabled with {cluster_info['gpu_nodes']} GPU nodes") + for i, node_info in enumerate(cluster_info['gpu_nodes_info']): + print(f" Node {i+1}: {node_info['gpus']} GPUs, {node_info['cpus']} CPUs @ {node_info['node_ip']}") + + # Configure based on mode + if args.test_mode: + default_seeds, default_variations, default_batch_size = 2, 1, 1 + default_quality_threshold, default_num_cpus = 0.5, 2 + print("TEST MODE: Using minimal samples for quick testing") + else: + default_seeds, default_variations = 50, 3 + default_batch_size = max(4, cluster_info['total_gpus']) + default_quality_threshold, default_num_cpus = 0.7, 4 + print("PRODUCTION MODE: Generating full-scale dataset") + + # Apply user overrides + num_seeds = args.seeds if args.seeds is not None else default_seeds + variations_per_seed = args.variations if args.variations is not None else default_variations + batch_size = args.batch_size if args.batch_size is not None else default_batch_size + quality_threshold = args.quality_threshold if args.quality_threshold is not None else default_quality_threshold + num_cpus = args.num_cpus if args.num_cpus is not None else default_num_cpus + + # Calculate optimal resource allocation + resource_config = calculate_optimal_resources(cluster_info, args) + total_expected_seeds = num_seeds + + print(f"Configuration:") + print(f" - Seeds: {num_seeds}") + print(f" - Variations per seed: {variations_per_seed}") + print(f" - Batch size: {batch_size}") + print(f" - Quality threshold: {quality_threshold}") + print(f" - Save every: {args.save_every} seeds") + print(f" - Checkpoint dir: {checkpoint_dir}") + print(f" - Expected total: {num_seeds} x {variations_per_seed} = {num_seeds * variations_per_seed} problems") + print(f"Resource Allocation:") + print(f" - GPUs per worker: {resource_config['gpus_per_worker']}") + print(f" - Concurrent workers: {resource_config['concurrency']}") + print(f" - Total workers: {resource_config['total_workers']}") + print(f" - Compute resources: {resource_config['compute_resources']}") + + # Setup signal handlers + setup_signal_handlers(checkpoint_manager, total_expected_seeds) + + # Get cache directory + cache_dir = "/shared/cache" if os.path.exists("/shared/cache") else os.path.expanduser("~/.cache") + + try: + # Create seed dataset + all_seed_ids = list(range(num_seeds)) + + if args.resume and checkpoint_manager.processed_seeds: + remaining_seed_ids = checkpoint_manager.get_remaining_seeds(all_seed_ids) + print(f"Resuming: {len(remaining_seed_ids)} seeds remaining out of {num_seeds} total") + if not remaining_seed_ids: + print("All seeds already processed! Nothing to do.") + return + else: + remaining_seed_ids = all_seed_ids + + seed_ds = create_seed_dataset_filtered(remaining_seed_ids, cache_dir) + + print(f"\nStarting Ray Data pipeline with checkpointing...") + + compute_resources = resource_config['compute_resources'] + concurrency = resource_config['concurrency'] + + print(f"Pipeline Configuration:") + print(f" - Compute resources per worker: {compute_resources}") + print(f" - Concurrency (parallel workers): {concurrency}") + print(f" - Batch size: {batch_size}") + + # Enhanced Ray Data Pipeline + results_ds = (seed_ds + .map_batches( + ModelInferenceCallable(MODEL_NAME, variations_per_seed, checkpoint_dir, checkpoint_manager.processed_seeds), + batch_size=batch_size, + concurrency=concurrency, + **compute_resources, + max_retries=3, + retry_exceptions=True, + ) + .filter(lambda batch: len(batch["questions"]) > 0) + .map_batches(quality_filter, batch_size=batch_size * 2) + .map_batches(format_for_output, batch_size=batch_size * 2) + ) + + # Execute pipeline + print("Executing Ray Data pipeline with incremental saving...") + + total_generated = high_quality_count = processed_batches = 0 + quality_scores = [] + batches_without_progress = last_total_generated = 0 + max_batches_without_progress = 10 + + pipeline_start_time = start_time = time.time() + max_pipeline_time = 3600 # 1 hour timeout + + try: + for batch in results_ds.iter_batches(batch_size=None): + if time.time() - pipeline_start_time > max_pipeline_time: + print(f"Pipeline timeout after {max_pipeline_time} seconds, stopping...") + break + + items = batch["items"] + total_generated += len(items) + processed_batches += 1 + + batch_seed_ids = set() + for item in items: + if item["overall_quality"] >= quality_threshold: + high_quality_count += 1 + quality_scores.append(item["overall_quality"]) + batch_seed_ids.add(item["seed_id"]) + + if batch_seed_ids: + checkpoint_manager.save_checkpoint(batch_seed_ids, total_expected_seeds) + + print(f"Processed batch {processed_batches}: {len(items)} items, {len(batch_seed_ids)} seeds") + + # Check for progress stall + if total_generated == last_total_generated: + batches_without_progress += 1 + if batches_without_progress >= max_batches_without_progress: + print(f"No progress for {max_batches_without_progress} batches, stopping pipeline...") + break + else: + batches_without_progress = 0 + last_total_generated = total_generated + + except KeyboardInterrupt: + print("Pipeline interrupted by user") + except Exception as e: + print(f"Pipeline error: {e}") + print("Saving progress and continuing...") + + end_time = time.time() + processing_time = end_time - start_time + + # Create metadata + metadata = { + "total_generated": total_generated, + "high_quality_count": high_quality_count, + "quality_pass_rate": (high_quality_count / total_generated * 100) if total_generated > 0 else 0, + "quality_threshold": quality_threshold, + "avg_quality_score": sum(quality_scores) / len(quality_scores) if quality_scores else 0, + "processing_time_seconds": processing_time, + "model_used": MODEL_NAME, + "generation_method": "ray_data_distributed", + "ray_data_features": [ + "map_batches_inference", + "automatic_scaling", + "fault_tolerance", + "streaming_processing", + "quality_filtering" + ] + } + + print("\n" + "="*60) + print("RAY DATA SDG PIPELINE SUMMARY") + print("="*60) + print(f"Total problems generated: {total_generated}") + print(f"High quality problems: {high_quality_count}") + print(f"Quality pass rate: {metadata['quality_pass_rate']:.1f}%") + print(f"Average quality score: {metadata['avg_quality_score']:.3f}") + print(f"Processing time: {processing_time:.1f} seconds") + print(f"Throughput: {total_generated/processing_time:.2f} problems/second") + + if cluster_info['total_gpus'] > 0: + gpu_efficiency = total_generated / (cluster_info['total_gpus'] * processing_time) + print(f"GPU efficiency: {gpu_efficiency:.2f} problems/GPU/second") + print(f"GPU utilization: {resource_config['total_workers']}/{cluster_info['total_gpus']} GPUs used") + + print("="*60) + + # Final checkpoint save + checkpoint_manager.save_checkpoint(checkpoint_manager.processed_seeds, total_expected_seeds, force=True) + + # Save final results + if checkpoint_manager.current_data: + save_final_dataset(checkpoint_manager.current_data, args.output_path, metadata) + + print(f"\nPipeline completed successfully!") + print(f"Results saved to: {args.output_path}") + print(f"Checkpoints saved to: {checkpoint_dir}") + + except KeyboardInterrupt: + print(f"\nPipeline interrupted by user") + checkpoint_manager.save_checkpoint(checkpoint_manager.processed_seeds, total_expected_seeds, force=True) + print("Progress saved to checkpoint") + except Exception as e: + print(f"Error in Ray Data pipeline: {e}") + checkpoint_manager.save_checkpoint(checkpoint_manager.processed_seeds, total_expected_seeds, force=True) + print("Progress saved to checkpoint before exit") + raise + finally: + if ray.is_initialized(): + ray.shutdown() + +if __name__ == "__main__": + main() \ No newline at end of file