Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
82a899b
feat: sdg component design
beatsmonster Jan 29, 2026
b42aa16
feat: implement SDG Hub KFP component
beatsmonster Feb 25, 2026
e3ff3f8
feat: enable optional Input[Dataset]
beatsmonster Feb 26, 2026
4360916
fix: version-pin sdg-hub dependency to >=0.7.0,<1.0
beatsmonster Mar 2, 2026
019d5d9
fix: use sentinel defaults for temperature/max_tokens to respect flow…
beatsmonster Mar 2, 2026
f9d6303
fix: switch metrics output to KFP log_metric() API
beatsmonster Mar 2, 2026
678315f
feat: add runtime_params for per-block parameter overrides
beatsmonster Mar 2, 2026
c4d400a
fix: use absolute import in example_pipelines for validate_examples c…
beatsmonster Mar 2, 2026
720735b
fix: sanitize flow_name in PVC export path to prevent path traversal
beatsmonster Mar 3, 2026
dfb0081
fix: update README example Artifact class with log_metric() support
beatsmonster Mar 3, 2026
328b45c
refactor: move example_pipelines from components/ to pipelines/sdg/
beatsmonster Mar 3, 2026
58dbcf3
docs: inline architecture doc into README and align with current impl…
beatsmonster Mar 3, 2026
43cd08a
refactor: use per-component requirements-test.txt for test dependencies
beatsmonster Mar 3, 2026
a40338b
docs: add OpenShift AI pipeline submission guide to README
beatsmonster Mar 9, 2026
fbcb8ae
fix: use UBI9 Python 3.11 base image to avoid Docker Hub rate limits
beatsmonster Mar 9, 2026
f343bb2
fix: remove per-component requirements-test.txt auto-install from tes…
beatsmonster Mar 9, 2026
7f139d8
refactor: move sdg component to components/data_processing/sdg/sdg_hub/
beatsmonster Mar 9, 2026
6f1f1cf
refactor: move example_pipelines to pipelines/data_processing/sdg/sdg…
beatsmonster Mar 9, 2026
ef23193
refactor: move test data into component and pipeline directories
beatsmonster Mar 9, 2026
711fd3d
refactor: move run_local.py to component shared/ directory
beatsmonster Mar 9, 2026
4d88333
refactor: use conftest.py subprocess runner fixture in local tests
beatsmonster Mar 9, 2026
bc67df4
fix: misc updates on (stale paths, OWNERS, metadata, LLM pipeline)
beatsmonster Mar 9, 2026
673b221
docs: move component-specific install instructions from CONTRIBUTING.…
beatsmonster Mar 9, 2026
8148dd5
fix: gracefully handle invalid log_level with fallback to INFO
beatsmonster Mar 11, 2026
b330bc9
fix: standardize secret key name to api_key across all examples
beatsmonster Mar 11, 2026
213345a
docs: update import paths in README and add note about PYTHONPATH
beatsmonster Mar 11, 2026
100dab5
docs: fix stale test data path reference in README
beatsmonster Mar 11, 2026
a57df6a
fix: wrap run_local.py execution in __main__ guard
beatsmonster Mar 11, 2026
f926b26
fix: remove custom base_image to use KFP default python:3.11
beatsmonster Mar 16, 2026
073b296
fix: remove __init__.py from shared/ to exclude it from package valid…
beatsmonster Mar 16, 2026
9f6d44d
fix: adding sdg dependency to project test group
beatsmonster Mar 18, 2026
c55294b
fix: update kfp_kubernetes import to kfp.kubernetes
beatsmonster Mar 18, 2026
4b17351
fix: update KFP issue reference to #13061
beatsmonster Mar 18, 2026
883266a
fix: add OWNERS file to sdg subcategory directory
beatsmonster Mar 19, 2026
f2d5e4b
fix: fix YAML indentation and add document-start markers for yamllint
beatsmonster Mar 19, 2026
069810d
docs: regenerate READMEs and add outputs section as custom content
beatsmonster Mar 19, 2026
1dd5453
docs: add auto-generated subcategory README for sdg
beatsmonster Mar 19, 2026
1c0286e
fix: add kfp-kubernetes dependency and revert to kfp_kubernetes impor…
beatsmonster Mar 19, 2026
68239f1
fix: remove example_pipelines from validation and drop kfp-kubernetes…
beatsmonster Mar 21, 2026
f7dafc5
docs: fix markdown lint errors in READMEs and architecture doc
beatsmonster Mar 21, 2026
d91081b
fix: add import exceptions for sdg component test and pipeline files
beatsmonster Mar 21, 2026
0e90d18
fix: rename sdg_llm_pipeline.py to pipeline.py to follow convention
beatsmonster Mar 21, 2026
5911e5c
fix: add pipeline subcategory OWNERS and fix metadata.yaml validation…
beatsmonster Mar 21, 2026
43b5de7
fix: fix YAML indentation in pipeline test_data files
beatsmonster Mar 21, 2026
85fe1ec
docs: regenerate pipeline READMEs to sync with CI validation
beatsmonster Mar 21, 2026
6f05ee8
fix: add kfp-kubernetes to test deps for pipeline validation
beatsmonster Mar 21, 2026
7646491
refactor: flatten sdg component and pipeline directory structure
beatsmonster Mar 21, 2026
f2c198f
fix: rename test_data to tests/data and use kfp.kubernetes import path
beatsmonster Mar 21, 2026
40eb11f
fix: use default python:3.11 base image for pipeline component
beatsmonster Mar 21, 2026
453b207
fix: align stability to beta and lastVerified dates across component …
beatsmonster Mar 21, 2026
a50ba09
fix: use kfp_components namespace import and remove unnecessary impor…
beatsmonster Mar 27, 2026
237348e
fix: fix import sorting in pipeline.py
beatsmonster Mar 28, 2026
00401ea
fix: use None default for runtime_params to avoid mutable default arg…
beatsmonster Mar 30, 2026
a041769
fix: derive PVC export flow_name from the actually-selected flow source
beatsmonster Mar 30, 2026
c613f19
fix: use specific exception types in invalid model test
beatsmonster Mar 30, 2026
ab0cbd6
docs: fix pipeline overview to avoid collapsed numbered list in README
beatsmonster Mar 30, 2026
6c3cdf9
docs: regenerate component README to sync with CI
beatsmonster Mar 30, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .github/scripts/check_imports/import_exceptions.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ files:
# Glob patterns: * matches any directory, ** matches any path
"components/**/tests/*":
- tests
- pandas
"components/**/shared/*":
- pandas
- component
"pipelines/**/tests/*":
- tests
# Allow pipelines to import the repo package at module scope
Expand Down
1 change: 1 addition & 0 deletions components/data_processing/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@

This directory contains components in the **Data Processing** category:

- [Sdg Hub](./sdg/README.md): Run an SDG Hub flow to generate synthetic data.
- [Yoda Data Processor](./yoda_data_processor/README.md): Prepare the training and evaluation datasets by downloading and preprocessing.
10 changes: 10 additions & 0 deletions components/data_processing/sdg/OWNERS
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
approvers:
- beatsmonster
- shivchander
- eshwarprasadS
- abhi1092
reviewers:
- beatsmonster
- shivchander
- eshwarprasadS
- abhi1092
63 changes: 63 additions & 0 deletions components/data_processing/sdg/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# Sdg Hub ✨

> ⚠️ **Stability: beta** — This asset is not yet stable and may change.

## Overview 🧾

Run an SDG Hub flow to generate synthetic data.

Loads input data, selects and configures a flow, executes it, and writes the output as a JSONL artifact with execution
metrics.

## Inputs 📥

| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `output_artifact` | `dsl.Output[dsl.Dataset]` | `None` | KFP Dataset artifact for downstream components. |
| `output_metrics` | `dsl.Output[dsl.Metrics]` | `None` | KFP Metrics artifact with execution stats. |
| `input_artifact` | `dsl.Input[dsl.Dataset]` | `None` | KFP Dataset artifact from upstream component (optional). |
| `input_pvc_path` | `str` | `""` | Path to JSONL input file on a mounted PVC (optional). |
| `flow_id` | `str` | `""` | Built-in flow ID from the SDG Hub registry. |
| `flow_yaml_path` | `str` | `""` | Path to a custom flow YAML file. |
| `model` | `str` | `""` | LiteLLM model identifier (e.g. 'openai/gpt-4o-mini'). |
| `max_concurrency` | `int` | `10` | Maximum concurrent LLM requests. |
| `checkpoint_pvc_path` | `str` | `""` | PVC path for checkpoints (enables resume). |
| `save_freq` | `int` | `100` | Checkpoint save frequency (number of samples). |
| `log_level` | `str` | `INFO` | Logging level (DEBUG, INFO, WARNING, ERROR). |
| `temperature` | `float` | `-1.0` | LLM sampling temperature (0.0-2.0). Use -1 for flow default. |
| `max_tokens` | `int` | `-1` | Maximum response tokens. Use -1 for flow default. |
| `export_to_pvc` | `bool` | `False` | Whether to export output to PVC (in addition to KFP artifact). |
| `export_path` | `str` | `""` | Base PVC path for exports (required if export_to_pvc is True). |
| `runtime_params` | `dict` | `None` | Per-block parameter overrides as a dict of {block_name: {param: value}}. |

## Metadata 🗂️

- **Name**: sdg_hub
- **Stability**: beta
- **Dependencies**:
- Kubeflow:
- Name: Pipelines, Version: >=2.15.2
- External Services:
- Name: SDG Hub, Version: >=0.7.0
- Name: LiteLLM, Version: >=1.0.0
- **Tags**:
- sdg
- synthetic_data_generation
- llm
- data_processing
- **Last Verified**: 2026-03-21 00:00:00+00:00
- **Owners**:
- Approvers:
- beatsmonster
- shivchander
- eshwarprasadS
- abhi1092
- Reviewers:
- beatsmonster
- shivchander
- eshwarprasadS
- abhi1092

## Additional Resources 📚

- **Documentation**: [https://github.com/Red-Hat-AI-Innovation-Team/sdg_hub](https://github.com/Red-Hat-AI-Innovation-Team/sdg_hub)
Empty file.
268 changes: 268 additions & 0 deletions components/data_processing/sdg/component.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,268 @@
"""SDG Hub KFP Component.

Wraps the SDG Hub SDK to enable synthetic data generation
within Kubeflow Pipelines.
"""

import kfp.compiler
from kfp import dsl


@dsl.component(
packages_to_install=["sdg-hub>=0.7.0,<1.0"],
)
def sdg(
output_artifact: dsl.Output[dsl.Dataset],
output_metrics: dsl.Output[dsl.Metrics],
input_artifact: dsl.Input[dsl.Dataset] = None,
input_pvc_path: str = "",
flow_id: str = "",
flow_yaml_path: str = "",
model: str = "",
max_concurrency: int = 10,
checkpoint_pvc_path: str = "",
save_freq: int = 100,
log_level: str = "INFO",
temperature: float = -1.0,
max_tokens: int = -1,
export_to_pvc: bool = False,
export_path: str = "",
runtime_params: dict = None,
):
"""Run an SDG Hub flow to generate synthetic data.

Loads input data, selects and configures a flow, executes it,
and writes the output as a JSONL artifact with execution metrics.

Args:
output_artifact: KFP Dataset artifact for downstream components.
output_metrics: KFP Metrics artifact with execution stats.
input_artifact: KFP Dataset artifact from upstream component (optional).
input_pvc_path: Path to JSONL input file on a mounted PVC (optional).
flow_id: Built-in flow ID from the SDG Hub registry.
flow_yaml_path: Path to a custom flow YAML file.
model: LiteLLM model identifier (e.g. 'openai/gpt-4o-mini').
max_concurrency: Maximum concurrent LLM requests.
checkpoint_pvc_path: PVC path for checkpoints (enables resume).
save_freq: Checkpoint save frequency (number of samples).
log_level: Logging level (DEBUG, INFO, WARNING, ERROR).
temperature: LLM sampling temperature (0.0-2.0). Use -1 for flow default.
max_tokens: Maximum response tokens. Use -1 for flow default.
export_to_pvc: Whether to export output to PVC (in addition to KFP artifact).
export_path: Base PVC path for exports (required if export_to_pvc is True).
runtime_params: Per-block parameter overrides as a dict of {block_name: {param: value}}.
"""
import logging
import os
import time

import pandas as pd
from sdg_hub.core.flow.base import Flow
from sdg_hub.core.flow.registry import FlowRegistry
from sdg_hub.core.utils.error_handling import FlowValidationError

# Configure logging
log_level_value = getattr(logging, log_level.upper(), None)
if log_level_value is None:
log_level_value = logging.INFO
logging.basicConfig(
level=log_level_value,
format="%(asctime)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger(__name__)

start_time = time.time()

logger.info("=" * 60)
logger.info("SDG Hub KFP Component")
logger.info("=" * 60)

# Log configuration
logger.info(f"Input Artifact: {'Provided' if input_artifact else 'Not provided'}")
logger.info(f"Input PVC Path: {input_pvc_path or 'Not provided'}")
logger.info(f"Flow ID: {flow_id or 'Not provided'}")
logger.info(f"Flow YAML Path: {flow_yaml_path or 'Not provided'}")
logger.info(f"Model: {model or 'Not provided'}")
logger.info(f"Max Concurrency: {max_concurrency}")
logger.info(f"Temperature: {'flow default' if temperature < 0 else temperature}")
logger.info(f"Max Tokens: {'flow default' if max_tokens < 0 else max_tokens}")
logger.info(f"Export to PVC: {export_to_pvc}")
if export_to_pvc:
logger.info(f"Export Path: {export_path or 'Not provided'}")
runtime_params = runtime_params or {}
if runtime_params:
logger.info(f"Runtime params: {runtime_params}")

# =========================================================================
# INPUT HANDLING
# =========================================================================
if input_artifact:
logger.info(f"Loading input from KFP artifact: {input_artifact.path}")
if not os.path.exists(input_artifact.path):
raise FileNotFoundError(f"Input artifact file not found: {input_artifact.path}")
df = pd.read_json(input_artifact.path, lines=True)
logger.info("Using input_artifact as data source")
elif input_pvc_path:
logger.info(f"Loading input from PVC: {input_pvc_path}")
if not os.path.exists(input_pvc_path):
raise FileNotFoundError(f"Input file not found: {input_pvc_path}")
df = pd.read_json(input_pvc_path, lines=True)
logger.info("Using input_pvc_path as data source")
else:
raise ValueError("No input provided. Supply 'input_artifact' or 'input_pvc_path'.")

input_rows = len(df)
logger.info(f"Loaded {input_rows} rows with columns: {list(df.columns)}")

# =========================================================================
# FLOW SELECTION
# =========================================================================
if not flow_id and not flow_yaml_path:
raise ValueError(
"Either 'flow_id' or 'flow_yaml_path' must be provided. "
"Use 'flow_id' for built-in flows or 'flow_yaml_path' for custom YAML."
)

if flow_id and flow_yaml_path:
logger.warning("Both 'flow_id' and 'flow_yaml_path' provided. Using 'flow_yaml_path' (takes precedence).")

if flow_yaml_path:
yaml_path = flow_yaml_path
logger.info(f"Using custom flow YAML: {yaml_path}")
if not os.path.exists(yaml_path):
raise FileNotFoundError(
f"Custom flow YAML not found: {yaml_path}. Ensure the file is mounted (e.g., via ConfigMap or PVC)."
)
else:
logger.info(f"Looking up built-in flow: {flow_id}")
try:
yaml_path = FlowRegistry.get_flow_path_safe(flow_id)
except ValueError as exc:
raise ValueError(f"Flow lookup failed for '{flow_id}': {exc}") from exc
logger.info(f"Found flow at: {yaml_path}")

# =========================================================================
# FLOW LOADING
# =========================================================================
logger.info(f"Loading flow from: {yaml_path}")
try:
flow = Flow.from_yaml(yaml_path)
except FlowValidationError as exc:
raise FlowValidationError(f"Failed to load flow from '{yaml_path}': {exc}") from exc

logger.info(f"Flow loaded: '{flow.metadata.name}' v{flow.metadata.version} with {len(flow.blocks)} blocks")

# =========================================================================
# MODEL CONFIGURATION
# =========================================================================
if flow.is_model_config_required():
if not model:
raise ValueError(
f"Flow '{flow.metadata.name}' contains LLM blocks and requires "
"a 'model' parameter. Provide a LiteLLM model identifier "
"(e.g., 'openai/gpt-4o-mini')."
)

api_key = os.environ.get("LLM_API_KEY", "")
api_base = os.environ.get("LLM_API_BASE", "")

model_kwargs = {}
if temperature >= 0:
model_kwargs["temperature"] = temperature
if max_tokens > 0:
model_kwargs["max_tokens"] = max_tokens

logger.info(f"Configuring model: {model}")
if api_base:
logger.info(f"Using API base: {api_base}")

flow.set_model_config(
model=model,
api_key=api_key if api_key else None,
api_base=api_base if api_base else None,
**model_kwargs,
)
logger.info("Model configuration applied to LLM blocks")
else:
logger.info("Flow has no LLM blocks - skipping model configuration")

# =========================================================================
# DATASET VALIDATION
# =========================================================================
validation_errors = flow.validate_dataset(df)
if validation_errors:
raise FlowValidationError(
f"Dataset validation failed for flow '{flow.metadata.name}':\n"
+ "\n".join(f" - {err}" for err in validation_errors)
)
logger.info("Dataset validation passed")

# =========================================================================
# FLOW EXECUTION
# =========================================================================
logger.info(f"Starting flow execution: {len(df)} samples, max_concurrency={max_concurrency}")

generate_kwargs = {
"max_concurrency": max_concurrency,
}

if checkpoint_pvc_path:
generate_kwargs["checkpoint_dir"] = checkpoint_pvc_path
generate_kwargs["save_freq"] = save_freq
logger.info(f"Checkpointing enabled: dir={checkpoint_pvc_path}, save_freq={save_freq}")

if runtime_params:
generate_kwargs["runtime_params"] = runtime_params

output_df = flow.generate(df, **generate_kwargs)
output_rows = len(output_df)

# =========================================================================
# OUTPUT HANDLING
# =========================================================================
output_df.to_json(output_artifact.path, orient="records", lines=True)
logger.info(f"Output written to: {output_artifact.path}")
logger.info(f"Output: {output_rows} rows with columns: {list(output_df.columns)}")

# =========================================================================
# PVC EXPORT (OPTIONAL)
# =========================================================================
if export_to_pvc:
if not export_path:
raise ValueError(
"export_to_pvc is True but export_path is not provided. "
"Supply export_path (base PVC directory for exports)."
)

if flow_yaml_path:
flow_name = os.path.splitext(os.path.basename(flow_yaml_path))[0] or "custom"
elif flow_id:
flow_name = flow_id.replace("/", "_").replace("\\", "_")
else:
flow_name = "custom"
timestamp = time.strftime("%Y%m%d_%H%M%S")
export_dir = os.path.join(export_path, flow_name, timestamp)

os.makedirs(export_dir, exist_ok=True)
export_file_path = os.path.join(export_dir, "generated.jsonl")

output_df.to_json(export_file_path, orient="records", lines=True)
logger.info(f"Output exported to PVC: {export_file_path}")

# Write metrics
execution_time = time.time() - start_time
output_metrics.log_metric("input_rows", input_rows)
output_metrics.log_metric("output_rows", output_rows)
output_metrics.log_metric("execution_time_seconds", round(execution_time, 2))
logger.info("Metrics logged")

logger.info("=" * 60)
logger.info(f"SDG Hub KFP Component completed in {execution_time:.2f}s")
logger.info("=" * 60)


if __name__ == "__main__":
kfp.compiler.Compiler().compile(
sdg,
package_path=__file__.replace(".py", "_component.yaml"),
)
20 changes: 20 additions & 0 deletions components/data_processing/sdg/metadata.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
---
name: sdg_hub
stability: beta
dependencies:
kubeflow:
- name: Pipelines
version: ">=2.15.2"
external_services:
- name: SDG Hub
version: ">=0.7.0"
- name: LiteLLM
version: ">=1.0.0"
tags:
- sdg
- synthetic_data_generation
- llm
- data_processing
lastVerified: 2026-03-21T00:00:00Z
links:
documentation: https://github.com/Red-Hat-AI-Innovation-Team/sdg_hub
Loading
Loading