Skip to content

Commit 0cde74f

Browse files
beatsmonsterclaude
andcommitted
feat: implement SDG Hub KFP component
Add a Kubeflow Pipelines component that wraps the SDG Hub SDK for synthetic data generation within KFP pipelines. Component features: - Input from KFP artifact (Input[Dataset]) or PVC path - Built-in flow selection via flow_id or custom YAML - LLM model configuration with K8s secret injection - Checkpointing for resumable long-running jobs - Optional PVC export with timestamped directory structure - KFP metrics output (input_rows, output_rows, execution_time) Testing: - 26 unit tests with mocked SDG Hub SDK - Integration test with real transform flow (no LLM) - E2E test with OpenAI gpt-4o-mini (skipped when LLM_API_KEY unset) Also includes example pipelines, local runner script, and test data with transform-only and LLM flow definitions. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Signed-off-by: Yi Zheng <237498169+beatsmonster@users.noreply.github.com>
1 parent 190114f commit 0cde74f

File tree

16 files changed

+1672
-0
lines changed

16 files changed

+1672
-0
lines changed

components/sdg/__init__.py

Whitespace-only changes.

components/sdg/sdg_hub/OWNERS

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
approvers:
2+
- beatsmonster
3+
- shivchander
4+
- eshwarprasadS
5+
- ashtarkb
6+
reviewers:
7+
- beatsmonster
8+
- shivchander
9+
- eshwarprasadS
10+
- ashtarkb

components/sdg/sdg_hub/README.md

Lines changed: 230 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,230 @@
1+
# SDG Hub Component
2+
3+
Runs [SDG Hub](https://github.com/Red-Hat-AI-Innovation-Team/sdg_hub) synthetic data generation flows as a Kubeflow Pipelines component.
4+
5+
## Overview
6+
7+
This component wraps the SDG Hub SDK to execute composable data generation flows within KFP pipelines. It supports:
8+
9+
- Built-in flows via `flow_id` (from the SDG Hub registry)
10+
- Custom flows via `flow_yaml_path` (mounted from ConfigMap or PVC)
11+
- Automatic LLM model configuration for flows with LLM blocks
12+
- Checkpointing for resumable execution
13+
14+
## Parameters
15+
16+
| Parameter | Type | Default | Description |
17+
|-----------|------|---------|-------------|
18+
| `input_artifact` | `dsl.Input[dsl.Dataset]` | `None` | KFP Dataset artifact from upstream component |
19+
| `input_pvc_path` | `str` | `""` | Path to JSONL input file on a mounted PVC |
20+
| `flow_id` | `str` | `""` | Built-in flow ID from the SDG Hub registry |
21+
| `flow_yaml_path` | `str` | `""` | Path to a custom flow YAML file |
22+
| `model` | `str` | `""` | LiteLLM model identifier (e.g. `openai/gpt-4o-mini`) |
23+
| `max_concurrency` | `int` | `10` | Maximum concurrent LLM requests |
24+
| `checkpoint_pvc_path` | `str` | `""` | PVC path for checkpoints |
25+
| `save_freq` | `int` | `100` | Checkpoint save frequency |
26+
| `log_level` | `str` | `"INFO"` | Logging level |
27+
| `temperature` | `float` | `0.7` | LLM sampling temperature |
28+
| `max_tokens` | `int` | `2048` | Maximum response tokens |
29+
| `export_to_pvc` | `bool` | `False` | Export output to PVC (in addition to KFP artifact) |
30+
| `export_path` | `str` | `""` | Base PVC path for exports (required if `export_to_pvc` is `True`) |
31+
32+
## Outputs
33+
34+
- `output_artifact` (`Dataset`): JSONL file with generated data
35+
- `output_metrics` (`Metrics`): JSON with `input_rows`, `output_rows`, `execution_time_seconds`
36+
37+
## Usage
38+
39+
### Basic PVC Input
40+
41+
```python
42+
from components.sdg.sdg_hub import sdg
43+
from kfp import dsl
44+
from kfp_kubernetes import mount_pvc, use_secret_as_env
45+
46+
@dsl.pipeline(name="sdg-pipeline")
47+
def my_pipeline():
48+
sdg_task = sdg(
49+
input_pvc_path="/mnt/data/input.jsonl",
50+
flow_id="green-clay-812",
51+
model="openai/gpt-4o-mini",
52+
)
53+
54+
# Mount PVC containing input data
55+
mount_pvc(
56+
task=sdg_task,
57+
pvc_name="data-pvc",
58+
mount_path="/mnt/data",
59+
)
60+
61+
# Inject LLM API credentials
62+
use_secret_as_env(
63+
task=sdg_task,
64+
secret_name="llm-credentials",
65+
secret_key_to_env={"OPENAI_APIKEY": "LLM_API_KEY"},
66+
)
67+
```
68+
69+
### Artifact Chaining
70+
71+
Chain SDG with upstream components by consuming their output artifacts:
72+
73+
```python
74+
from components.sdg.sdg_hub import sdg
75+
from kfp import dsl
76+
from kfp_kubernetes import use_secret_as_env
77+
78+
@dsl.component(packages_to_install=["pandas"])
79+
def preprocess_data(output_data: dsl.Output[dsl.Dataset]) -> None:
80+
import pandas as pd
81+
# ... preprocessing logic ...
82+
df.to_json(output_data.path, orient="records", lines=True)
83+
84+
@dsl.pipeline(name="sdg-chained-pipeline")
85+
def chained_pipeline():
86+
# Step 1: Preprocess data
87+
preprocess_task = preprocess_data()
88+
89+
# Step 2: Run SDG using preprocessed data
90+
sdg_task = sdg(
91+
input_artifact=preprocess_task.outputs["output_data"],
92+
flow_id="green-clay-812",
93+
model="openai/gpt-4o-mini",
94+
)
95+
96+
use_secret_as_env(
97+
task=sdg_task,
98+
secret_name="llm-credentials",
99+
secret_key_to_env={"OPENAI_APIKEY": "LLM_API_KEY"},
100+
)
101+
```
102+
103+
### PVC Export
104+
105+
Export generated data to a PVC for archival or external access:
106+
107+
```python
108+
from components.sdg.sdg_hub import sdg
109+
from kfp import dsl
110+
from kfp_kubernetes import mount_pvc, use_secret_as_env
111+
112+
@dsl.pipeline(name="sdg-export-pipeline")
113+
def export_pipeline():
114+
sdg_task = sdg(
115+
input_pvc_path="/mnt/data/input.jsonl",
116+
flow_id="green-clay-812",
117+
model="openai/gpt-4o-mini",
118+
export_to_pvc=True,
119+
export_path="/mnt/data/exports",
120+
)
121+
122+
# Mount PVC for both input and export
123+
mount_pvc(
124+
task=sdg_task,
125+
pvc_name="data-pvc",
126+
mount_path="/mnt/data",
127+
)
128+
129+
use_secret_as_env(
130+
task=sdg_task,
131+
secret_name="llm-credentials",
132+
secret_key_to_env={"OPENAI_APIKEY": "LLM_API_KEY"},
133+
)
134+
```
135+
136+
Exports are saved to: `{export_path}/{flow_id}/{timestamp}/generated.jsonl`
137+
138+
## Local Development
139+
140+
### Prerequisites
141+
142+
```bash
143+
# From the repo root
144+
uv venv && source .venv/bin/activate
145+
uv sync --extra test
146+
```
147+
148+
### Running the Component Locally
149+
150+
KFP's `LocalRunner` does not support `Input[Dataset]` artifacts, so local execution
151+
calls `sdg.python_func()` directly with mock artifact objects.
152+
153+
A ready-to-run script is provided at `run_local.py`:
154+
155+
```bash
156+
cd components/sdg/sdg_hub
157+
LLM_API_KEY="<your-api-key>" python run_local.py
158+
```
159+
160+
This runs the LLM test flow against `test_data/sdg_hub/sample_input.jsonl` using
161+
`gpt-4o-mini`, prints the generated output, and cleans up the temp directory.
162+
163+
To run with your own data or flow:
164+
165+
```python
166+
import json
167+
import os
168+
import tempfile
169+
170+
import pandas as pd
171+
172+
from components.sdg.sdg_hub.component import sdg
173+
174+
175+
class Artifact:
176+
def __init__(self, path):
177+
self.path = path
178+
179+
180+
with tempfile.TemporaryDirectory() as tmp_dir:
181+
output_artifact = Artifact(os.path.join(tmp_dir, "output.jsonl"))
182+
output_metrics = Artifact(os.path.join(tmp_dir, "metrics.json"))
183+
184+
sdg.python_func(
185+
output_artifact=output_artifact,
186+
output_metrics=output_metrics,
187+
input_pvc_path="/path/to/your/input.jsonl",
188+
flow_yaml_path="/path/to/your/flow.yaml", # or use flow_id="green-clay-812"
189+
model="openai/gpt-4o-mini",
190+
max_concurrency=1,
191+
temperature=0.7,
192+
max_tokens=2048,
193+
checkpoint_pvc_path="",
194+
save_freq=100,
195+
log_level="INFO",
196+
export_to_pvc=False,
197+
export_path="",
198+
)
199+
200+
# Read results
201+
df = pd.read_json(output_artifact.path, lines=True)
202+
print(df)
203+
204+
with open(output_metrics.path) as f:
205+
print(json.dumps(json.load(f), indent=2))
206+
```
207+
208+
To persist output to a local directory instead of a temp folder, set
209+
`export_to_pvc=True` and `export_path` to a local directory. Output is written to
210+
`{export_path}/{flow_id}/{timestamp}/generated.jsonl`.
211+
212+
### Running Tests
213+
214+
```bash
215+
# Unit tests (no API key needed)
216+
pytest components/sdg/sdg_hub/tests/test_component_unit.py -v
217+
218+
# Integration test with transform-only flow (no API key needed)
219+
pytest components/sdg/sdg_hub/tests/test_component_local.py::TestSdgHubLocalRunner -v
220+
221+
# LLM E2E tests (requires API key)
222+
LLM_API_KEY="<your-api-key>" pytest components/sdg/sdg_hub/tests/test_component_local.py::TestSdgHubLLMFlow -v
223+
```
224+
225+
## Environment Variables
226+
227+
For flows with LLM blocks, set these via Kubernetes Secrets:
228+
229+
- `LLM_API_KEY`: API key for the LLM provider
230+
- `LLM_API_BASE`: API base URL (optional, for self-hosted models)

components/sdg/sdg_hub/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
from .component import sdg
2+
3+
__all__ = ["sdg"]

0 commit comments

Comments
 (0)