Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions .github/workflows/code_checks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,5 +55,7 @@ jobs:
uses: pypa/[email protected]
with:
virtual-environment: .venv
additional-args: "--ignore PYSEC-2024-161"
strict: false
ignore-vulns: |
PYSEC-2024-161
GHSA-gm62-xv2j-4w53
GHSA-2xpw-w6gg-jr37
246 changes: 246 additions & 0 deletions src/aixpert/data_construction/Readme.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,246 @@
# Skywork → Factual-DPO Data Construction Pipeline

This repository contains a complete, modular, and type-safe data-construction pipeline for generating **factual-aware DPO datasets** from the **Skywork Reward-Preference-80K** dataset.

The pipeline supports:
- Direct Preference Optimization (DPO)
- Factual-DPO
- Synthetic hallucination inversion pairs
- Balanced and flipped datasets

## Configuration

All configuration is centralized in:

```bash
src/aixpert/config/config.yaml
```
Loaded dynamically using:
```python
utils/config_loader.load_config()
```
## Configuration Summary (`config.yaml`)

### Model Settings
- **model.name:** `gpt-4o-mini`
- **model.temperature:** `0.8`

---

### Paths (All datasets + intermediate outputs)

The configuration tracks every stage of the data pipeline, including:

- Cleaned **train / eval / test** splits
- **Preference pairs** (DPO-style)
- **Factual-scored** outputs
- **Synthetic inversion** samples (train + eval)
- **Merged** intermediate datasets
- **Balanced** final datasets
- **Flipped** datasets for ablation

**Examples:**
```yaml
skywork_train_cleaned: "src/.../skywork_extracted_77k.jsonl"
skywork_train_pairs: "src/.../skywork_preference_pairs_77k.jsonl"
skywork_train_factual: "src/.../skywork_binary_factual_train.jsonl"
final_train_out: "src/.../train_balanced.jsonl"
```

## Pipeline Stages — Summary

Below is a concise overview of all eight stages in the Skywork → Factual-DPO data pipeline.

---

### ** Stage 1 — Skywork Extraction**
**Scripts:**
- `dataextraction_train.py`
- `dataextraction_eval.py`
- `dataextraction_test.py`(These samples are directly used in evaluation)

**Tasks:**
- Load slices from Skywork Preference dataset
- Extract:
- **prompt** (first user message)
- **chosen** (assistant reply)
- **rejected** (assistant reply)
- Remove exact duplicates
- Save cleaned JSONL files

---

### ** Stage 2 — Preference Pair Conversion**
**Scripts:**
- `dataconversion_train.py`
- `dataconversion_eval.py`

**Tasks:**
- Convert `(prompt, chosen, rejected)` → **DPO-style preference pairs**
- Produce:
- `response_0`, `response_1`
- `better_response_id`
- Random symmetric assignment for unbiased supervision

---

### ** Stage 3 — Binary Factuality Evaluation**
**Scripts:**
- `dataset_train.py`
- `dataset_val.py`

**Components:**
Uses `utils.factual_utils` to evaluate factual correctness using **GPT-4o-mini**.

**Outputs:**
- Binary hallucination flags:
- `h0`, `h1` (aliases for `factual_flag_0`, `factual_flag_1`)

**Features:**
- Resume-safe incremental scoring
- Async concurrency
- Retry logic

---

### ** Stage 4 — DPO Transformation**
**Scripts:**
- `data_transform_train.py`
- `data_transform_val.py`

**Tasks:**
Transform factual-scored items into canonical DPO format:

- `prompt`, `chosen`, `rejected`
- `h_w`, `h_l`
- `response_0`, `response_1`
- `flipped=False`

---

### ** Stage 5 — Synthetic Hallucination Generation**
**Scripts:**
- `data_synthetic_train.py`
- `data_synthetic_val.py`

**Tasks:**
- Select samples where winner is factual (`h_w=0`) and loser is incorrect (`h_l=1`)
- Use **GPT-4o-mini** to generate hallucinated corruptions
- Build synthetic inversion pairs

**Outputs:**
- **10,000** synthetic train samples
- **400** synthetic eval samples

---

### ** Stage 6 — Merging**
**Scripts:**
- `merge_train.py`
- `merge_eval.py`

**Tasks:**
- Merge Skywork transformed data with synthetic inversion pairs
- Bucket by `(h_w, h_l)`
- Sample subsets
- Shuffle and save merged datasets

---

### ** Stage 7 — Balanced Dataset Construction**
**Scripts:**
- `balance_train.py`
- `build_final_eval.py`

**Train Balancing:**
Use `balance_targets` to create balanced buckets:

- `(0,1)` — 10,000
- `(1,0)` — 10,000
- `(0,0)` — 15,000
- `(1,1)` — 10,000

**Eval Construction:**
Combine:
- Skywork eval transformed
- 400 synthetic eval inversion samples
- 1500 clean `(1,1)` samples (unused in train)
- 1500 clean `(0,0)` samples (unused in train)

---

### ** Stage 8 — Flipping (Optional)**
**Scripts:**
- `data_flipped_train.py`
- `data_flipped_val.py`

**Tasks:**
- Flip all `(1,0)` samples → `(0,1)`
- Swap `chosen` ↔ `rejected`
- Produce alternate dataset for inversion or ablation studies

---

This structured overview provides a clear high-level map of the complete Factual-DPO data construction workflow.

## Utilities Summary

### `utils/config_loader.py`
- Centralized configuration loader
- All stages call `load_config()` to read `config.yaml`

---

### `utils/data_utils.py`
Core data-processing helpers:
- `extract_prompt()` — first user message
- `extract_answer()` — first assistant reply
- `filter_duplicates()` — removes exact matches
- `create_preference_pairs()` — builds DPO response pairs
- `bucket_by_flags()` — groups by (h_w, h_l)
- `flip_sample()` — converts (1,0) → (0,1)
- JSONL read/write utilities

---

### `utils/factual_utils.py`
- Async binary factuality scoring using GPT-4o-mini
- Concurrency + retry logic
- Resume-safe checkpointing
- Produces `h0`, `h1` hallucination flags

---

### `utils/dpo_transform_utils.py`
- Converts factual-scored items into final DPO format:
- `prompt`, `chosen`, `rejected`, `h_w`, `h_l`, `response_0`, `response_1`, `flipped=False`

---

### `utils/synthetic_utils.py`
- GPT-based corruption generator
- Creates synthetic inversion pairs (hallucinated → correct)

---

### `utils/prompt_templates.py`
Provides all system/user prompts:
- Strict factuality judge prompt
- Hallucination corruption prompts

---

## Running the Pipeline

Example sequence for **training pipeline**:

```bash
python src/aixpert/data_construction/stage_1_extraction/dataextraction_train.py
python src/aixpert/data_construction/stage_2_conversion/dataconversion_train.py
python src/aixpert/data_construction/stage_3_factuality/dataset_train.py
python src/aixpert/data_construction/stage_4_transformation/data_transform_train.py
python src/aixpert/data_construction/stage_5_syntheticdata/data_synthetic_train.py
python src/aixpert/data_construction/stage_6_merging/merge_train.py
python src/aixpert/data_construction/stage_7_balancing/balance_train.py
python src/aixpert/data_construction/stage_8_flipping/data_flipped_train.py
```
61 changes: 61 additions & 0 deletions src/aixpert/data_construction/config/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
repository: /projects/aixpert/users/sindhu/Loss_Test

model:
name: gpt-4o-mini # or gpt-4o
temperature: 0.8

paths:
skywork_train_cleaned: "src/aixpert/data_construction/data/skywork_extracted_77k.jsonl"
skywork_train_removed: "src/aixpert/data_construction/data/skywork_removed_77k.jsonl"

skywork_eval_cleaned: "src/aixpert/data_construction/data/skywork_extracted_eval.jsonl"
skywork_eval_removed: "src/aixpert/data_construction/data/skywork_eval_removed.jsonl"

skywork_test_cleaned: "src/aixpert/data_construction/data/skywork_extracted_test.jsonl"
skywork_test_removed: "src/aixpert/data_construction/data/skywork_test_removed.jsonl"

skywork_train_pairs: "src/aixpert/data_construction/data/skywork_preference_pairs_77k.jsonl"
skywork_eval_pairs: "src/aixpert/data_construction/data/skywork_preference_pairs_eval.jsonl"

skywork_train_factual: "src/aixpert/data_construction/data/skywork_binary_factual_train.jsonl"
skywork_eval_factual: "src/aixpert/data_construction/data/skywork_binary_factual_eval.jsonl"

skywork_train_transformed: "src/aixpert/data_construction/data/skywork_first_transformed_train.jsonl"
skywork_eval_transformed: "src/aixpert/data_construction/data/skywork_first_transformed_eval.jsonl"

synthetic_train_out: "src/aixpert/data_construction/data/synthetic_llm_inversion_train_10k.jsonl"
synthetic_eval_out: "src/aixpert/data_construction/data/synthetic_llm_inversion_eval_400.jsonl"


final_train_merged: "src/aixpert/data_construction/data/skywork_final_train.jsonl"
final_eval_merged: "src/aixpert/data_construction/data/skywork_final_eval.jsonl"

final_train_out: "src/aixpert/data_construction/data/train_balanced.jsonl"
final_eval_out: "src/aixpert/data_construction/data/eval_final.jsonl"

train_flipped_out: "src/aixpert/data_construction/data/train_balanced_flipped.jsonl"
eval_flipped_out: "src/aixpert/data_construction/data/eval_final_flipped.jsonl"



skywork_file: "Skywork/Skywork-Reward-Preference-80K-v0.1"

hyperparams:
subset_size: 80000
eval_start: 80001
eval_end: 81000
test_start: 81001
test_end: 81500
concurrency_limit: 25
max_retries: 5
corruption_concurrency: 20
synthetic_train_samples: 10000
synthetic_eval_samples: 400

balance_targets:
"(0,1)": 10000
"(1,0)": 10000
"(0,0)": 15000
"(1,1)": 10000

eval_additional_clean_samples: 1500
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
"""
Extract the test slice of the Skywork preference dataset.

This script extracts rows 81001–81500, removes exact duplicates,
and saves the cleaned dataset into JSONL files under the local data folder.
Only the prompts from this test set will be used in evaluation.
"""

from __future__ import annotations

from pathlib import Path

from datasets import load_dataset
from utils.config_loader import load_config
from utils.data_utils import (
extract_answer,
extract_prompt,
filter_duplicates,
save_jsonl,
)


def main() -> None:
"""Run test-split extraction and save cleaned JSONL outputs."""
cfg = load_config()
hp = cfg["hyperparams"]
paths = cfg["paths"]

start, end = hp["test_start"], hp["test_end"]

print(f"Extracting test slice {start} → {end}")

ds = load_dataset(
paths["skywork_file"],
split=f"train[{start}:{end + 1}]",
)
df = ds.to_pandas()

df["prompt"] = df["chosen"].apply(extract_prompt)
df["chosen"] = df["chosen"].apply(extract_answer)
df["rejected"] = df["rejected"].apply(extract_answer)

rows = df[["prompt", "chosen", "rejected"]].to_dict(orient="records")
cleaned, removed = filter_duplicates(rows)

save_jsonl(Path(paths["skywork_test_cleaned"]), cleaned)
save_jsonl(Path(paths["skywork_test_removed"]), removed)

print(f"Removed duplicates: {len(removed)}")
print(f"Clean samples: {len(cleaned)}")
print("Test extraction completed.")


if __name__ == "__main__":
main()
Loading