diff --git a/.github/workflows/code_checks.yml b/.github/workflows/code_checks.yml index a10d07f..648712e 100644 --- a/.github/workflows/code_checks.yml +++ b/.github/workflows/code_checks.yml @@ -55,5 +55,7 @@ jobs: uses: pypa/gh-action-pip-audit@v1.1.0 with: virtual-environment: .venv - additional-args: "--ignore PYSEC-2024-161" - strict: false + ignore-vulns: | + PYSEC-2024-161 + GHSA-gm62-xv2j-4w53 + GHSA-2xpw-w6gg-jr37 diff --git a/src/aixpert/data_construction/Readme.md b/src/aixpert/data_construction/Readme.md new file mode 100644 index 0000000..2b3fee7 --- /dev/null +++ b/src/aixpert/data_construction/Readme.md @@ -0,0 +1,246 @@ +# Skywork → Factual-DPO Data Construction Pipeline + +This repository contains a complete, modular, and type-safe data-construction pipeline for generating **factual-aware DPO datasets** from the **Skywork Reward-Preference-80K** dataset. + +The pipeline supports: +- Direct Preference Optimization (DPO) +- Factual-DPO +- Synthetic hallucination inversion pairs +- Balanced and flipped datasets + +## Configuration + +All configuration is centralized in: + +```bash +src/aixpert/config/config.yaml +``` +Loaded dynamically using: +```python +utils/config_loader.load_config() +``` +## Configuration Summary (`config.yaml`) + +### Model Settings +- **model.name:** `gpt-4o-mini` +- **model.temperature:** `0.8` + +--- + +### Paths (All datasets + intermediate outputs) + +The configuration tracks every stage of the data pipeline, including: + +- Cleaned **train / eval / test** splits +- **Preference pairs** (DPO-style) +- **Factual-scored** outputs +- **Synthetic inversion** samples (train + eval) +- **Merged** intermediate datasets +- **Balanced** final datasets +- **Flipped** datasets for ablation + +**Examples:** +```yaml +skywork_train_cleaned: "src/.../skywork_extracted_77k.jsonl" +skywork_train_pairs: "src/.../skywork_preference_pairs_77k.jsonl" +skywork_train_factual: "src/.../skywork_binary_factual_train.jsonl" +final_train_out: "src/.../train_balanced.jsonl" +``` + +## Pipeline Stages — Summary + +Below is a concise overview of all eight stages in the Skywork → Factual-DPO data pipeline. + +--- + +### ** Stage 1 — Skywork Extraction** +**Scripts:** +- `dataextraction_train.py` +- `dataextraction_eval.py` +- `dataextraction_test.py`(These samples are directly used in evaluation) + +**Tasks:** +- Load slices from Skywork Preference dataset +- Extract: + - **prompt** (first user message) + - **chosen** (assistant reply) + - **rejected** (assistant reply) +- Remove exact duplicates +- Save cleaned JSONL files + +--- + +### ** Stage 2 — Preference Pair Conversion** +**Scripts:** +- `dataconversion_train.py` +- `dataconversion_eval.py` + +**Tasks:** +- Convert `(prompt, chosen, rejected)` → **DPO-style preference pairs** +- Produce: + - `response_0`, `response_1` + - `better_response_id` +- Random symmetric assignment for unbiased supervision + +--- + +### ** Stage 3 — Binary Factuality Evaluation** +**Scripts:** +- `dataset_train.py` +- `dataset_val.py` + +**Components:** +Uses `utils.factual_utils` to evaluate factual correctness using **GPT-4o-mini**. + +**Outputs:** +- Binary hallucination flags: + - `h0`, `h1` (aliases for `factual_flag_0`, `factual_flag_1`) + +**Features:** +- Resume-safe incremental scoring +- Async concurrency +- Retry logic + +--- + +### ** Stage 4 — DPO Transformation** +**Scripts:** +- `data_transform_train.py` +- `data_transform_val.py` + +**Tasks:** +Transform factual-scored items into canonical DPO format: + +- `prompt`, `chosen`, `rejected` +- `h_w`, `h_l` +- `response_0`, `response_1` +- `flipped=False` + +--- + +### ** Stage 5 — Synthetic Hallucination Generation** +**Scripts:** +- `data_synthetic_train.py` +- `data_synthetic_val.py` + +**Tasks:** +- Select samples where winner is factual (`h_w=0`) and loser is incorrect (`h_l=1`) +- Use **GPT-4o-mini** to generate hallucinated corruptions +- Build synthetic inversion pairs + +**Outputs:** +- **10,000** synthetic train samples +- **400** synthetic eval samples + +--- + +### ** Stage 6 — Merging** +**Scripts:** +- `merge_train.py` +- `merge_eval.py` + +**Tasks:** +- Merge Skywork transformed data with synthetic inversion pairs +- Bucket by `(h_w, h_l)` +- Sample subsets +- Shuffle and save merged datasets + +--- + +### ** Stage 7 — Balanced Dataset Construction** +**Scripts:** +- `balance_train.py` +- `build_final_eval.py` + +**Train Balancing:** +Use `balance_targets` to create balanced buckets: + +- `(0,1)` — 10,000 +- `(1,0)` — 10,000 +- `(0,0)` — 15,000 +- `(1,1)` — 10,000 + +**Eval Construction:** +Combine: +- Skywork eval transformed +- 400 synthetic eval inversion samples +- 1500 clean `(1,1)` samples (unused in train) +- 1500 clean `(0,0)` samples (unused in train) + +--- + +### ** Stage 8 — Flipping (Optional)** +**Scripts:** +- `data_flipped_train.py` +- `data_flipped_val.py` + +**Tasks:** +- Flip all `(1,0)` samples → `(0,1)` +- Swap `chosen` ↔ `rejected` +- Produce alternate dataset for inversion or ablation studies + +--- + +This structured overview provides a clear high-level map of the complete Factual-DPO data construction workflow. + +## Utilities Summary + +### `utils/config_loader.py` +- Centralized configuration loader +- All stages call `load_config()` to read `config.yaml` + +--- + +### `utils/data_utils.py` +Core data-processing helpers: +- `extract_prompt()` — first user message +- `extract_answer()` — first assistant reply +- `filter_duplicates()` — removes exact matches +- `create_preference_pairs()` — builds DPO response pairs +- `bucket_by_flags()` — groups by (h_w, h_l) +- `flip_sample()` — converts (1,0) → (0,1) +- JSONL read/write utilities + +--- + +### `utils/factual_utils.py` +- Async binary factuality scoring using GPT-4o-mini +- Concurrency + retry logic +- Resume-safe checkpointing +- Produces `h0`, `h1` hallucination flags + +--- + +### `utils/dpo_transform_utils.py` +- Converts factual-scored items into final DPO format: + - `prompt`, `chosen`, `rejected`, `h_w`, `h_l`, `response_0`, `response_1`, `flipped=False` + +--- + +### `utils/synthetic_utils.py` +- GPT-based corruption generator +- Creates synthetic inversion pairs (hallucinated → correct) + +--- + +### `utils/prompt_templates.py` +Provides all system/user prompts: +- Strict factuality judge prompt +- Hallucination corruption prompts + +--- + +## Running the Pipeline + +Example sequence for **training pipeline**: + +```bash +python src/aixpert/data_construction/stage_1_extraction/dataextraction_train.py +python src/aixpert/data_construction/stage_2_conversion/dataconversion_train.py +python src/aixpert/data_construction/stage_3_factuality/dataset_train.py +python src/aixpert/data_construction/stage_4_transformation/data_transform_train.py +python src/aixpert/data_construction/stage_5_syntheticdata/data_synthetic_train.py +python src/aixpert/data_construction/stage_6_merging/merge_train.py +python src/aixpert/data_construction/stage_7_balancing/balance_train.py +python src/aixpert/data_construction/stage_8_flipping/data_flipped_train.py +``` diff --git a/src/aixpert/data_construction/config/config.yaml b/src/aixpert/data_construction/config/config.yaml new file mode 100644 index 0000000..261c6b4 --- /dev/null +++ b/src/aixpert/data_construction/config/config.yaml @@ -0,0 +1,61 @@ +repository: /projects/aixpert/users/sindhu/Loss_Test + +model: + name: gpt-4o-mini # or gpt-4o + temperature: 0.8 + +paths: + skywork_train_cleaned: "src/aixpert/data_construction/data/skywork_extracted_77k.jsonl" + skywork_train_removed: "src/aixpert/data_construction/data/skywork_removed_77k.jsonl" + + skywork_eval_cleaned: "src/aixpert/data_construction/data/skywork_extracted_eval.jsonl" + skywork_eval_removed: "src/aixpert/data_construction/data/skywork_eval_removed.jsonl" + + skywork_test_cleaned: "src/aixpert/data_construction/data/skywork_extracted_test.jsonl" + skywork_test_removed: "src/aixpert/data_construction/data/skywork_test_removed.jsonl" + + skywork_train_pairs: "src/aixpert/data_construction/data/skywork_preference_pairs_77k.jsonl" + skywork_eval_pairs: "src/aixpert/data_construction/data/skywork_preference_pairs_eval.jsonl" + + skywork_train_factual: "src/aixpert/data_construction/data/skywork_binary_factual_train.jsonl" + skywork_eval_factual: "src/aixpert/data_construction/data/skywork_binary_factual_eval.jsonl" + + skywork_train_transformed: "src/aixpert/data_construction/data/skywork_first_transformed_train.jsonl" + skywork_eval_transformed: "src/aixpert/data_construction/data/skywork_first_transformed_eval.jsonl" + + synthetic_train_out: "src/aixpert/data_construction/data/synthetic_llm_inversion_train_10k.jsonl" + synthetic_eval_out: "src/aixpert/data_construction/data/synthetic_llm_inversion_eval_400.jsonl" + + + final_train_merged: "src/aixpert/data_construction/data/skywork_final_train.jsonl" + final_eval_merged: "src/aixpert/data_construction/data/skywork_final_eval.jsonl" + + final_train_out: "src/aixpert/data_construction/data/train_balanced.jsonl" + final_eval_out: "src/aixpert/data_construction/data/eval_final.jsonl" + + train_flipped_out: "src/aixpert/data_construction/data/train_balanced_flipped.jsonl" + eval_flipped_out: "src/aixpert/data_construction/data/eval_final_flipped.jsonl" + + + + skywork_file: "Skywork/Skywork-Reward-Preference-80K-v0.1" + +hyperparams: + subset_size: 80000 + eval_start: 80001 + eval_end: 81000 + test_start: 81001 + test_end: 81500 + concurrency_limit: 25 + max_retries: 5 + corruption_concurrency: 20 + synthetic_train_samples: 10000 + synthetic_eval_samples: 400 + + balance_targets: + "(0,1)": 10000 + "(1,0)": 10000 + "(0,0)": 15000 + "(1,1)": 10000 + + eval_additional_clean_samples: 1500 diff --git a/src/aixpert/data_construction/stage_1_extraction/dataextraction_eval.py b/src/aixpert/data_construction/stage_1_extraction/dataextraction_eval.py new file mode 100644 index 0000000..0240f34 --- /dev/null +++ b/src/aixpert/data_construction/stage_1_extraction/dataextraction_eval.py @@ -0,0 +1,55 @@ +""" +Extract the test slice of the Skywork preference dataset. + +This script extracts rows 81001–81500, removes exact duplicates, +and saves the cleaned dataset into JSONL files under the local data folder. +Only the prompts from this test set will be used in evaluation. +""" + +from __future__ import annotations + +from pathlib import Path + +from datasets import load_dataset +from utils.config_loader import load_config +from utils.data_utils import ( + extract_answer, + extract_prompt, + filter_duplicates, + save_jsonl, +) + + +def main() -> None: + """Run test-split extraction and save cleaned JSONL outputs.""" + cfg = load_config() + hp = cfg["hyperparams"] + paths = cfg["paths"] + + start, end = hp["test_start"], hp["test_end"] + + print(f"Extracting test slice {start} → {end}") + + ds = load_dataset( + paths["skywork_file"], + split=f"train[{start}:{end + 1}]", + ) + df = ds.to_pandas() + + df["prompt"] = df["chosen"].apply(extract_prompt) + df["chosen"] = df["chosen"].apply(extract_answer) + df["rejected"] = df["rejected"].apply(extract_answer) + + rows = df[["prompt", "chosen", "rejected"]].to_dict(orient="records") + cleaned, removed = filter_duplicates(rows) + + save_jsonl(Path(paths["skywork_test_cleaned"]), cleaned) + save_jsonl(Path(paths["skywork_test_removed"]), removed) + + print(f"Removed duplicates: {len(removed)}") + print(f"Clean samples: {len(cleaned)}") + print("Test extraction completed.") + + +if __name__ == "__main__": + main() diff --git a/src/aixpert/data_construction/stage_1_extraction/dataextraction_train.py b/src/aixpert/data_construction/stage_1_extraction/dataextraction_train.py new file mode 100644 index 0000000..9ce178a --- /dev/null +++ b/src/aixpert/data_construction/stage_1_extraction/dataextraction_train.py @@ -0,0 +1,57 @@ +""" +Skywork extraction utilities. + +This module extracts prompt/chosen/rejected fields from the Skywork Preference +dataset, removes exact duplicates, and writes the cleaned dataset to JSONL +files. Fully compatible with ruff, mypy, and the AI Engineering template. +""" + +from __future__ import annotations + +from pathlib import Path + +from datasets import load_dataset +from utils.config_loader import load_config +from utils.data_utils import ( + extract_answer, + extract_prompt, + filter_duplicates, + save_jsonl, +) + + +def main() -> None: + """Run train-split extraction and save cleaned JSONL outputs.""" + cfg = load_config() + hp = cfg["hyperparams"] + paths = cfg["paths"] + + subset_size = hp["subset_size"] + + print(f"Loading first {subset_size} samples from Skywork...") + + ds = load_dataset( + paths["skywork_file"], + split=f"train[:{subset_size}]", + ) + + df = ds.to_pandas() + + df["prompt"] = df["chosen"].apply(extract_prompt) + df["chosen"] = df["chosen"].apply(extract_answer) + df["rejected"] = df["rejected"].apply(extract_answer) + + rows = df[["prompt", "chosen", "rejected"]].to_dict(orient="records") + + cleaned, removed = filter_duplicates(rows) + + save_jsonl(Path(paths["skywork_train_cleaned"]), cleaned) + save_jsonl(Path(paths["skywork_train_removed"]), removed) + + print(f"Removed exact duplicates: {len(removed)}") + print(f"Clean samples: {len(cleaned)}") + print("Training extraction completed.") + + +if __name__ == "__main__": + main() diff --git a/src/aixpert/data_construction/stage_1_extraction/dataextraction_val.py b/src/aixpert/data_construction/stage_1_extraction/dataextraction_val.py new file mode 100644 index 0000000..0918298 --- /dev/null +++ b/src/aixpert/data_construction/stage_1_extraction/dataextraction_val.py @@ -0,0 +1,54 @@ +""" +Extract the evaluation slice of the Skywork preference dataset. + +This script extracts rows 80001–81000, removes exact duplicates, +and saves the cleaned dataset into JSONL files under the local data folder. +""" + +from __future__ import annotations + +from pathlib import Path + +from datasets import load_dataset +from utils.config_loader import load_config +from utils.data_utils import ( + extract_answer, + extract_prompt, + filter_duplicates, + save_jsonl, +) + + +def main() -> None: + """Run validation-split extraction and save cleaned JSONL outputs.""" + cfg = load_config() + hp = cfg["hyperparams"] + paths = cfg["paths"] + + start, end = hp["eval_start"], hp["eval_end"] + + print(f"Extracting eval slice {start} → {end}") + + ds = load_dataset( + paths["skywork_file"], + split=f"train[{start}:{end + 1}]", + ) + df = ds.to_pandas() + + df["prompt"] = df["chosen"].apply(extract_prompt) + df["chosen"] = df["chosen"].apply(extract_answer) + df["rejected"] = df["rejected"].apply(extract_answer) + + rows = df[["prompt", "chosen", "rejected"]].to_dict(orient="records") + cleaned, removed = filter_duplicates(rows) + + save_jsonl(Path(paths["skywork_eval_cleaned"]), cleaned) + save_jsonl(Path(paths["skywork_eval_removed"]), removed) + + print(f"Removed duplicates: {len(removed)}") + print(f"Clean samples: {len(cleaned)}") + print("Eval extraction completed.") + + +if __name__ == "__main__": + main() diff --git a/src/aixpert/data_construction/stage_2_conversion/dataconversion_train.py b/src/aixpert/data_construction/stage_2_conversion/dataconversion_train.py new file mode 100644 index 0000000..65c89e4 --- /dev/null +++ b/src/aixpert/data_construction/stage_2_conversion/dataconversion_train.py @@ -0,0 +1,47 @@ +""" +Generate training preference pairs from cleaned Skywork samples. + +Loads prompt/chosen/rejected rows from the cleaned 77k dataset, +creates random preference pairs (response_0/response_1), +assigns correct better_response_id, and writes JSONL output. + +This script uses the shared data utilities and config loader. +""" + +from __future__ import annotations + +from pathlib import Path + +from utils.config_loader import load_config +from utils.data_utils import ( + create_preference_pairs, + load_jsonl, + write_jsonl, +) + + +def main() -> None: + """Generate preference pairs for the training set.""" + cfg = load_config() + paths = cfg["paths"] + + input_path = Path(paths["skywork_train_cleaned"]) + output_path = Path(paths["skywork_train_pairs"]) + + print(f"Loading training dataset → {input_path}") + + data = load_jsonl(input_path) + print(f"Loaded {len(data)} rows") + + preference_pairs = create_preference_pairs(data) + + write_jsonl(output_path, preference_pairs) + + print("======================================") + print(f"Training preference pairs saved → {output_path}") + print(f"Total pairs: {len(preference_pairs)}") + print("======================================") + + +if __name__ == "__main__": + main() diff --git a/src/aixpert/data_construction/stage_2_conversion/dataconversion_val.py b/src/aixpert/data_construction/stage_2_conversion/dataconversion_val.py new file mode 100644 index 0000000..e3d7672 --- /dev/null +++ b/src/aixpert/data_construction/stage_2_conversion/dataconversion_val.py @@ -0,0 +1,45 @@ +""" +Generate evaluation preference pairs from cleaned Skywork samples. + +Loads prompt/chosen/rejected rows for the eval slice, +creates random preference pairs (response_0/response_1), +assigns correct better_response_id, and writes JSONL output. +""" + +from __future__ import annotations + +from pathlib import Path + +from aixpert.utils.config_loader import load_config +from aixpert.utils.data_utils import ( + create_preference_pairs, + load_jsonl, + write_jsonl, +) + + +def main() -> None: + """Generate preference pairs for the evaluation set.""" + cfg = load_config() + paths = cfg["paths"] + + input_path = Path(paths["skywork_eval_cleaned"]) + output_path = Path(paths["skywork_eval_pairs"]) + + print(f"Loading evaluation dataset → {input_path}") + + data = load_jsonl(input_path) + print(f"Loaded {len(data)} rows") + + preference_pairs = create_preference_pairs(data) + + write_jsonl(output_path, preference_pairs) + + print("======================================") + print(f"Eval preference pairs saved → {output_path}") + print(f"Total eval pairs: {len(preference_pairs)}") + print("======================================") + + +if __name__ == "__main__": + main() diff --git a/src/aixpert/data_construction/stage_3_factuality/dataset_train.py b/src/aixpert/data_construction/stage_3_factuality/dataset_train.py new file mode 100644 index 0000000..1557586 --- /dev/null +++ b/src/aixpert/data_construction/stage_3_factuality/dataset_train.py @@ -0,0 +1,50 @@ +"""Run binary factuality evaluation on training preference pairs.""" + +from __future__ import annotations + +import asyncio +from pathlib import Path + +from decouple import Config, RepositoryEnv +from utils.config_loader import load_config +from utils.data_utils import load_jsonl +from utils.factual_utils import ( + factual_evaluation_pipeline, + get_client, +) + + +async def main() -> None: + """Execute factuality evaluation for the training set.""" + cfg = load_config() + + repo_path = cfg["repository"] + paths = cfg["paths"] + hp = cfg["hyperparams"] + + env = Config(RepositoryEnv(f"{repo_path}/.env")) + api_key = env("OPENAI_API_KEY", default=None) + if not api_key: + raise RuntimeError("Missing OPENAI_API_KEY in .env") + + client = get_client(api_key) + + input_path = Path(paths["skywork_train_pairs"]) + output_path = Path(paths["skywork_train_factual"]) + + items = load_jsonl(input_path) + + await factual_evaluation_pipeline( + client=client, + items=items, + output_file=output_path, + model=cfg["model"]["name"], + concurrency=hp["concurrency_limit"], + max_retries=hp["max_retries"], + ) + + print("Completed factual evaluation for training set.") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/src/aixpert/data_construction/stage_3_factuality/dataset_val.py b/src/aixpert/data_construction/stage_3_factuality/dataset_val.py new file mode 100644 index 0000000..0aea440 --- /dev/null +++ b/src/aixpert/data_construction/stage_3_factuality/dataset_val.py @@ -0,0 +1,50 @@ +"""Run binary factuality evaluation on evaluation preference pairs.""" + +from __future__ import annotations + +import asyncio +from pathlib import Path + +from decouple import Config, RepositoryEnv +from utils.config_loader import load_config +from utils.data_utils import load_jsonl +from utils.factual_utils import ( + factual_evaluation_pipeline, + get_client, +) + + +async def main() -> None: + """Execute factuality evaluation for the validation set.""" + cfg = load_config() + + repo_path = cfg["repository"] + paths = cfg["paths"] + hp = cfg["hyperparams"] + + env = Config(RepositoryEnv(f"{repo_path}/.env")) + api_key = env("OPENAI_API_KEY", default=None) + if not api_key: + raise RuntimeError("Missing OPENAI_API_KEY in .env") + + client = get_client(api_key) + + input_path = Path(paths["skywork_eval_pairs"]) + output_path = Path(paths["skywork_eval_factual"]) + + items = load_jsonl(input_path) + + await factual_evaluation_pipeline( + client=client, + items=items, + output_file=output_path, + model=cfg["model"]["name"], + concurrency=hp["concurrency_limit"], + max_retries=hp["max_retries"], + ) + + print("Completed factual evaluation for evaluation set.") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/src/aixpert/data_construction/stage_4_transformation/data_transform_train.py b/src/aixpert/data_construction/stage_4_transformation/data_transform_train.py new file mode 100644 index 0000000..d0f692e --- /dev/null +++ b/src/aixpert/data_construction/stage_4_transformation/data_transform_train.py @@ -0,0 +1,22 @@ +"""Transform factual-scored training pairs into DPO-ready format.""" + +from __future__ import annotations + +from pathlib import Path + +from utils.config_loader import load_config +from utils.dpo_transform_utils import transform_dataset + + +def main() -> None: + """Run dataset transformation for factual-scored training pairs.""" + paths = load_config()["paths"] + + input_path = Path(paths["skywork_train_factual"]) + output_path = Path(paths["skywork_train_transformed"]) + + transform_dataset(input_path, output_path) + + +if __name__ == "__main__": + main() diff --git a/src/aixpert/data_construction/stage_4_transformation/data_transform_val.py b/src/aixpert/data_construction/stage_4_transformation/data_transform_val.py new file mode 100644 index 0000000..a0036c6 --- /dev/null +++ b/src/aixpert/data_construction/stage_4_transformation/data_transform_val.py @@ -0,0 +1,22 @@ +"""Transform factual-scored evaluation pairs into DPO-ready format.""" + +from __future__ import annotations + +from pathlib import Path + +from utils.config_loader import load_config +from utils.dpo_transform_utils import transform_dataset + + +def main() -> None: + """Run dataset transformation for factual-scored validation pairs.""" + paths = load_config()["paths"] + + input_path = Path(paths["skywork_eval_factual"]) + output_path = Path(paths["skywork_eval_transformed"]) + + transform_dataset(input_path, output_path) + + +if __name__ == "__main__": + main() diff --git a/src/aixpert/data_construction/stage_5_syntheticdata/data_synthetic_train.py b/src/aixpert/data_construction/stage_5_syntheticdata/data_synthetic_train.py new file mode 100644 index 0000000..defd1cc --- /dev/null +++ b/src/aixpert/data_construction/stage_5_syntheticdata/data_synthetic_train.py @@ -0,0 +1,88 @@ +""" +Generate synthetic corruption (hallucinated) responses for TRAIN split. + +This script: +- Loads clean DPO-ready Skywork transformation for training. +- Selects items where h_w=0 (winner factual) and h_l=1 (loser incorrect). +- Asks GPT-4o-mini to rewrite the factual answer into a subtle hallucination. +- Produces “inversion pairs” where corrupted is chosen and original is rejected. +- Saves up to 10,000 synthetic hallucination samples. + +Fully compatible with ruff, ruff-format, pydocstyle, and mypy. +""" + +from __future__ import annotations + +import asyncio +import random +from pathlib import Path +from typing import Any, Dict, Optional + +from openai import AsyncOpenAI +from utils.config_loader import load_config +from utils.data_utils import load_jsonl, write_jsonl +from utils.synthetic_utils import build_inversion_item, generate_corruption + + +async def process_item( + item: Dict[str, Any], + client: AsyncOpenAI, + sem: asyncio.Semaphore, + model: str, + max_retries: int, +) -> Optional[Dict[str, Any]]: + """Generate one synthetic inversion sample for training.""" + corrupted = await generate_corruption( + client=client, + model=model, + question=item["prompt"], + answer=item["chosen"], + semaphore=sem, + max_retries=max_retries, + ) + + if corrupted is None: + return None + + return await build_inversion_item(item, corrupted) + + +async def main() -> None: + """Generate synthetic hallucination samples for training.""" + config = load_config() + + model = config["model"]["name"] + api_key = config["openai_api_key"] + + target = config["hyperparams"]["synthetic_train_samples"] + concurrency = config["hyperparams"]["corruption_concurrency"] + max_retries = config["hyperparams"]["max_retries"] + + input_path = Path(config["paths"]["skywork_train_transformed"]) + output_path = Path(config["paths"]["synthetic_train_out"]) + + print(f"Loading transformed training data → {input_path}") + items = load_jsonl(input_path) + + print("🔍 Selecting (h_w=0, h_l=1) candidates…") + valid = [x for x in items if x["h_w"] == 0 and x["h_l"] == 1] + + selected = random.sample(valid, min(target, len(valid))) + print(f"Selected {len(selected)} items for corruption.") + + client = AsyncOpenAI(api_key=api_key) + sem = asyncio.Semaphore(concurrency) + + tasks = [process_item(item, client, sem, model, max_retries) for item in selected] + results = await asyncio.gather(*tasks) + + final_rows = [r for r in results if r is not None] + + print(f"Saving {len(final_rows)} synthetic training samples → {output_path}") + write_jsonl(output_path, final_rows) + + print("Synthetic training corruption generation complete.") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/src/aixpert/data_construction/stage_5_syntheticdata/data_synthetic_val.py b/src/aixpert/data_construction/stage_5_syntheticdata/data_synthetic_val.py new file mode 100644 index 0000000..6dfa633 --- /dev/null +++ b/src/aixpert/data_construction/stage_5_syntheticdata/data_synthetic_val.py @@ -0,0 +1,90 @@ +""" +Generate synthetic corruption (hallucinated) responses for EVAL split. + +This script: +- Loads clean DPO-ready Skywork eval transformation. +- Selects pairs where h_w=0 and h_l=1. +- Uses GPT-4o-mini to introduce subtle factual errors. +- Produces inverted (hallucinated, correct) preference pairs. +- Saves 400 synthetic eval corruption examples. + +Compatible with ruff, ruff-format, pydocstyle, and mypy. +""" + +from __future__ import annotations + +import asyncio +import random +from pathlib import Path +from typing import Any, Dict, Optional + +from openai import AsyncOpenAI +from utils.config_loader import load_config +from utils.data_utils import load_jsonl, write_jsonl +from utils.synthetic_utils import build_inversion_item, generate_corruption + + +async def process_item( + item: Dict[str, Any], + client: AsyncOpenAI, + sem: asyncio.Semaphore, + model: str, + max_retries: int, +) -> Optional[Dict[str, Any]]: + """Generate one synthetic inversion example for evaluation.""" + corrupted = await generate_corruption( + client=client, + model=model, + question=item["prompt"], + answer=item["chosen"], + semaphore=sem, + max_retries=max_retries, + ) + + if corrupted is None: + return None + + entry = await build_inversion_item(item, corrupted) + entry["source"] = "synthetic_inversion_eval" + return entry + + +async def main() -> None: + """Generate synthetic corruption samples for evaluation.""" + config = load_config() + + model = config["model"]["name"] + api_key = config["openai_api_key"] + + target = config["hyperparams"]["synthetic_eval_samples"] + concurrency = config["hyperparams"]["corruption_concurrency"] + max_retries = config["hyperparams"]["max_retries"] + + input_path = Path(config["paths"]["skywork_eval_transformed"]) + output_path = Path(config["paths"]["synthetic_eval_out"]) + + print(f"Loading transformed eval data → {input_path}") + items = load_jsonl(input_path) + + print("Selecting (h_w=0, h_l=1) eval candidates…") + valid = [x for x in items if x["h_w"] == 0 and x["h_l"] == 1] + + selected = random.sample(valid, min(target, len(valid))) + print(f"Selected {len(selected)} items for corruption.") + + client = AsyncOpenAI(api_key=api_key) + sem = asyncio.Semaphore(concurrency) + + tasks = [process_item(item, client, sem, model, max_retries) for item in selected] + results = await asyncio.gather(*tasks) + + final_rows = [r for r in results if r is not None] + + print(f"Saving {len(final_rows)} synthetic eval samples → {output_path}") + write_jsonl(output_path, final_rows) + + print("Eval synthetic corruption generation complete.") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/src/aixpert/data_construction/stage_6_merging/data_merge_train.py b/src/aixpert/data_construction/stage_6_merging/data_merge_train.py new file mode 100644 index 0000000..b70f5f2 --- /dev/null +++ b/src/aixpert/data_construction/stage_6_merging/data_merge_train.py @@ -0,0 +1,66 @@ +""" +Merge Skywork training data with 10k synthetic inversion pairs. + +This script: +- Loads synthetic corruption samples. +- Loads transformed Skywork training data. +- Splits real samples into buckets by (h_w, h_l). +- Samples 10k from (0,1). +- Merges: synthetic + (0,0) + (1,1) + sampled (0,1). +- Shuffles and writes final JSONL file. + +Fully compatible with ruff, mypy, and pydocstyle. +""" + +from __future__ import annotations + +import random +from pathlib import Path + +from utils.config_loader import load_config +from utils.data_utils import bucket_by_flags, load_jsonl, write_jsonl + + +def main() -> None: + """Merge Skywork train data with synthetic inversion pairs.""" + cfg = load_config() + paths = cfg["paths"] + hp = cfg["hyperparams"] + + synthetic_path = Path(paths["synthetic_train_out"]) + skywork_transformed_path = Path(paths["skywork_train_transformed"]) + output_path = Path(paths["final_train_merged"]) + + sample_size = hp.get("merge_sample_01_train", 10000) + + print(f"Loading synthetic → {synthetic_path}") + synthetic = load_jsonl(synthetic_path) + print(f"Synthetic count: {len(synthetic)}") + + print(f"Loading transformed Skywork train → {skywork_transformed_path}") + sky = load_jsonl(skywork_transformed_path) + print(f"Skywork transformed count: {len(sky)}") + + b00, b11, b01 = bucket_by_flags(sky) + + print(f"(0,0): {len(b00)}") + print(f"(1,1): {len(b11)}") + print(f"(0,1): {len(b01)}") + + random.seed(42) + sample_01 = random.sample(b01, min(sample_size, len(b01))) + print(f"Sampled (0,1): {len(sample_01)}") + + merged = synthetic + b00 + b11 + sample_01 + + print(f"Total merged before shuffle: {len(merged)}") + random.shuffle(merged) + + print(f"Saving final merged train → {output_path}") + write_jsonl(output_path, merged) + + print("TRAIN MERGE COMPLETE.\n") + + +if __name__ == "__main__": + main() diff --git a/src/aixpert/data_construction/stage_6_merging/data_merge_val.py b/src/aixpert/data_construction/stage_6_merging/data_merge_val.py new file mode 100644 index 0000000..2e114eb --- /dev/null +++ b/src/aixpert/data_construction/stage_6_merging/data_merge_val.py @@ -0,0 +1,59 @@ +""" +Merge Skywork evaluation data with 400 synthetic inversion pairs. + +This script: +- Loads synthetic corruption samples for eval. +- Loads Skywork eval transformed dataset. +- Splits samples into buckets by (h_w, h_l). +- Keeps ALL real eval samples. +- Merges synthetic + all real eval buckets. +- Shuffles and writes final eval JSONL file. + +Fully compatible with ruff, mypy, and pydocstyle. +""" + +from __future__ import annotations + +import random +from pathlib import Path + +from utils.config_loader import load_config +from utils.data_utils import bucket_by_flags, load_jsonl, write_jsonl + + +def main() -> None: + """Merge Skywork eval data with synthetic eval inversion pairs.""" + cfg = load_config() + paths = cfg["paths"] + + synthetic_path = Path(paths["synthetic_eval_out"]) + skywork_transformed_path = Path(paths["skywork_eval_transformed"]) + output_path = Path(paths["final_eval_merged"]) + + print(f"Loading synthetic eval → {synthetic_path}") + synthetic = load_jsonl(synthetic_path) + print(f"Synthetic eval count: {len(synthetic)}") + + print(f"Loading transformed Skywork eval → {skywork_transformed_path}") + sky = load_jsonl(skywork_transformed_path) + print(f"Skywork eval count: {len(sky)}") + + b00, b11, b01 = bucket_by_flags(sky) + + print(f"(0,0): {len(b00)}") + print(f"(1,1): {len(b11)}") + print(f"(0,1): {len(b01)}") + + merged = synthetic + b00 + b11 + b01 + print(f"Total merged before shuffle: {len(merged)}") + + random.shuffle(merged) + + print(f"Saving final merged eval → {output_path}") + write_jsonl(output_path, merged) + + print("EVAL MERGE COMPLETE.\n") + + +if __name__ == "__main__": + main() diff --git a/src/aixpert/data_construction/stage_7_final/data_final_train.py b/src/aixpert/data_construction/stage_7_final/data_final_train.py new file mode 100644 index 0000000..f7b6cd2 --- /dev/null +++ b/src/aixpert/data_construction/stage_7_final/data_final_train.py @@ -0,0 +1,86 @@ +""" +Balanced sampling for TRAIN dataset. + +This script: +- Loads the merged training dataset. +- Buckets by (h_w, h_l). +- Samples required amounts per bucket (with replacement if needed). +- Shuffles and saves the final balanced training dataset. + +Buckets required: + (0,1) → 10,000 + (1,0) → 10,000 + (0,0) → 15,000 + (1,1) → 10,000 +""" + +from __future__ import annotations + +import random +from pathlib import Path +from typing import Any, Dict, List, Tuple + +from utils.config_loader import load_config +from utils.data_utils import load_jsonl, write_jsonl + + +def main() -> None: + """Balanced sampling for TRAIN dataset.""" + cfg = load_config() + paths = cfg["paths"] + hp = cfg["hyperparams"] + + input_path = Path(paths["skywork_final_train"]) + output_path = Path(paths["final_train_out"]) + + target_counts: Dict[Tuple[int, int], int] = hp["balance_targets"] + + print(f"Loading → {input_path}") + data = load_jsonl(input_path) + + buckets: Dict[Tuple[int, int], List[Dict[str, Any]]] = { + (0, 1): [], + (1, 0): [], + (0, 0): [], + (1, 1): [], + } + + print("Bucketing samples…") + for ex in data: + key = (int(ex["h_w"]), int(ex["h_l"])) + if key in buckets: + buckets[key].append(ex) + + print("\n=== AVAILABLE PER BUCKET ===") + for key, rows in buckets.items(): + print(f"{key}: {len(rows)}") + + final_rows: List[Dict[str, Any]] = [] + + for key, req_count in target_counts.items(): + pool = buckets[key] + available = len(pool) + + print(f"\nBucket {key}: available={available}, required={req_count}") + + if available < req_count: + print("Sampling WITH replacement.") + sampled = random.choices(pool, k=req_count) + else: + sampled = random.sample(pool, req_count) + + final_rows.extend(sampled) + + print(f"\nShuffling {len(final_rows)} rows…") + random.shuffle(final_rows) + + print(f"Saving → {output_path}") + write_jsonl(output_path, final_rows) + + print("\nTRAIN balanced dataset ready.") + print(f"Final count: {len(final_rows)}") + + +if __name__ == "__main__": + random.seed(42) + main() diff --git a/src/aixpert/data_construction/stage_7_final/data_final_val.py b/src/aixpert/data_construction/stage_7_final/data_final_val.py new file mode 100644 index 0000000..af74631 --- /dev/null +++ b/src/aixpert/data_construction/stage_7_final/data_final_val.py @@ -0,0 +1,90 @@ +""" +Build the FINAL evaluation dataset (skywork_final_eval.jsonl). + +Composition: + • 400 synthetic inversion samples (1,0) + • all Skywork eval samples from skywork_first_transformed_eval.jsonl + • +1500 samples of (1,1) from skywork_final_train.jsonl + • +1500 samples of (0,0) from skywork_final_train.jsonl + → excluding any sample already used in train_finallast.jsonl + +Final eval ≈ (#sky_eval + 400 synthetic + 3000 added clean samples) +""" + +from __future__ import annotations + +import random +from pathlib import Path +from typing import Any, Dict, List + +from utils.config_loader import load_config +from utils.data_utils import load_jsonl, write_jsonl + + +def main() -> None: + """Build the FINAL evaluation dataset.""" + cfg = load_config() + paths = cfg["paths"] + hp = cfg["hyperparams"] + + synthetic_path = Path(paths["synthetic_eval_out"]) + sky_eval_path = Path(paths["skywork_eval_transformed"]) + train_full_path = Path(paths["final_train_merged"]) + train_used_path = Path(paths["final_train_out"]) + output_path = Path(paths["final_eval_out"]) + + add_n = hp["eval_additional_clean_samples"] + + print(f"Loading synthetic eval → {synthetic_path}") + synthetic = load_jsonl(synthetic_path) + + print(f"Loading Skywork eval transformed → {sky_eval_path}") + sky_eval = load_jsonl(sky_eval_path) + + print(f"Loading full training dataset → {train_full_path}") + sky_train = load_jsonl(train_full_path) + + print(f"Loading train-balanced dataset (to exclude) → {train_used_path}") + train_used = load_jsonl(train_used_path) + + exclude = {(ex["prompt"], ex["chosen"], ex["rejected"]) for ex in train_used} + + pool_11: List[Dict[str, Any]] = [] + pool_00: List[Dict[str, Any]] = [] + + for ex in sky_train: + key = (ex["prompt"], ex["chosen"], ex["rejected"]) + if key in exclude: + continue + + if ex["h_w"] == 1 and ex["h_l"] == 1: + pool_11.append(ex) + elif ex["h_w"] == 0 and ex["h_l"] == 0: + pool_00.append(ex) + + print(f"(1,1) pool after exclusion: {len(pool_11)}") + print(f"(0,0) pool after exclusion: {len(pool_00)}") + + sample_11 = random.sample(pool_11, add_n) + sample_00 = random.sample(pool_00, add_n) + + merged: List[Dict[str, Any]] = [] + merged.extend(synthetic) + merged.extend(sky_eval) + merged.extend(sample_11) + merged.extend(sample_00) + + print(f"\nTotal before shuffle: {len(merged)}") + + random.shuffle(merged) + + print(f"Saving final eval → {output_path}") + write_jsonl(output_path, merged) + + print("\nFINAL EVAL DATASET READY.") + print(f"Final count: {len(merged)}") + + +if __name__ == "__main__": + random.seed(42) + main() diff --git a/src/aixpert/data_construction/stage_8_flipping/data_flipped_train.py b/src/aixpert/data_construction/stage_8_flipping/data_flipped_train.py new file mode 100644 index 0000000..f1187d9 --- /dev/null +++ b/src/aixpert/data_construction/stage_8_flipping/data_flipped_train.py @@ -0,0 +1,41 @@ +""" +Flip preference labels for training data. + +This script: +- Converts h_w=1,h_l=0 → h_w=0,h_l=1 +- Swaps chosen/rejected +- Writes a flipped version of the dataset +""" + +from __future__ import annotations + +from pathlib import Path +from typing import Any, Dict, List + +from utils.config_loader import load_config +from utils.data_utils import flip_sample, load_jsonl, write_jsonl + + +def main() -> None: + """Flip (1,0) preference labels in the final training dataset.""" + paths = load_config()["paths"] + + input_path = Path(paths["final_train_out"]) + output_path = Path(paths["train_flipped_out"]) + print(f"Loading → {input_path}") + items: List[Dict[str, Any]] = load_jsonl(input_path) + + print("Flipping (h_w=1, h_l=0) samples...") + flipped = [flip_sample(item) for item in items] + + print(f"Saving flipped dataset → {output_path}") + write_jsonl(output_path, flipped) + + print("\n==========================================") + print("TRAIN FLIP COMPLETE") + print(f"Total samples processed: {len(flipped)}") + print("==========================================\n") + + +if __name__ == "__main__": + main() diff --git a/src/aixpert/data_construction/stage_8_flipping/data_flipped_val.py b/src/aixpert/data_construction/stage_8_flipping/data_flipped_val.py new file mode 100644 index 0000000..f34b1ad --- /dev/null +++ b/src/aixpert/data_construction/stage_8_flipping/data_flipped_val.py @@ -0,0 +1,42 @@ +""" +Flip preference labels for evaluation data. + +This script: +- Converts h_w=1,h_l=0 → h_w=0,h_l=1 +- Swaps chosen/rejected +- Writes a flipped version of the dataset +""" + +from __future__ import annotations + +from pathlib import Path +from typing import Any, Dict, List + +from utils.config_loader import load_config +from utils.data_utils import flip_sample, load_jsonl, write_jsonl + + +def main() -> None: + """Flip (1,0) preference labels in the final evaluation dataset.""" + paths = load_config()["paths"] + + input_path = Path(paths["final_eval_out"]) + output_path = Path(paths["eval_flipped_out"]) + + print(f"Loading → {input_path}") + items: List[Dict[str, Any]] = load_jsonl(input_path) + + print("Flipping (h_w=1, h_l=0) samples...") + flipped = [flip_sample(item) for item in items] + + print(f"Saving flipped dataset → {output_path}") + write_jsonl(output_path, flipped) + + print("\n==========================================") + print("EVAL FLIP COMPLETE") + print(f"Total samples processed: {len(flipped)}") + print("==========================================\n") + + +if __name__ == "__main__": + main() diff --git a/src/aixpert/data_construction/utils/config_loader.py b/src/aixpert/data_construction/utils/config_loader.py new file mode 100644 index 0000000..88b3060 --- /dev/null +++ b/src/aixpert/data_construction/utils/config_loader.py @@ -0,0 +1,17 @@ +"""Utility module for loading the global YAML configuration file.""" + +from __future__ import annotations + +from pathlib import Path +from typing import Any, Dict + +import yaml + + +CONFIG_PATH = Path(__file__).resolve().parents[1] / "config" / "config.yaml" + + +def load_config() -> Dict[str, Any]: + """Load YAML config into a dictionary.""" + with open(CONFIG_PATH, "r", encoding="utf-8") as f: + return yaml.safe_load(f) diff --git a/src/aixpert/data_construction/utils/data_utils.py b/src/aixpert/data_construction/utils/data_utils.py new file mode 100644 index 0000000..d4268b2 --- /dev/null +++ b/src/aixpert/data_construction/utils/data_utils.py @@ -0,0 +1,123 @@ +"""Utility functions for dataset extraction, cleaning, formatting, and flipping. + +These helpers are used across the data-construction pipeline for DPO, SafeDPO, +Factual-DPO, and evaluation preprocessing. +""" + +from __future__ import annotations + +import json +import random +from pathlib import Path +from typing import Any, Dict, List, Tuple + + +def extract_prompt(dialog: List[Dict[str, Any]]) -> str: + """Extract the first user message.""" + for msg in dialog: + if msg.get("role") == "user": + return str(msg.get("content", "")).strip() + return "" + + +def extract_answer(dialog: List[Dict[str, Any]]) -> str: + """Extract the first assistant reply.""" + for msg in dialog: + if msg.get("role") == "assistant": + return str(msg.get("content", "")).strip() + return "" + + +def save_jsonl(path: Path, rows: List[Dict[str, Any]]) -> None: + """Write list of dictionaries to JSONL.""" + path.parent.mkdir(parents=True, exist_ok=True) + with path.open("w", encoding="utf-8") as f: + for row in rows: + f.write(json.dumps(row, ensure_ascii=False) + "\n") + + +def filter_duplicates( + rows: List[Dict[str, str]], +) -> Tuple[List[Dict[str, str]], List[Dict[str, str]]]: + """Split rows into cleaned (chosen != rejected) and removed (exact duplicates).""" + cleaned: List[Dict[str, str]] = [] + removed: List[Dict[str, str]] = [] + + for row in rows: + if row["chosen"] == row["rejected"]: + removed.append(row) + else: + cleaned.append(row) + + return cleaned, removed + + +def load_jsonl(path: Path) -> List[Dict[str, Any]]: + """Load JSONL file into a list of dictionaries.""" + with path.open("r", encoding="utf-8") as f: + return [json.loads(line) for line in f] + + +def write_jsonl(path: Path, rows: List[Dict[str, Any]]) -> None: + """Write list of dictionaries to a JSONL file.""" + path.parent.mkdir(parents=True, exist_ok=True) + with path.open("w", encoding="utf-8") as f: + for row in rows: + f.write(json.dumps(row, ensure_ascii=False) + "\n") + + +def create_preference_pairs(data: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + """Convert rows into DPO preference-pair format.""" + output: List[Dict[str, Any]] = [] + + for item in data: + prompt = item.get("prompt", "") + chosen = item.get("chosen", "") + rejected = item.get("rejected", "") + + if random.random() < 0.5: + response_0 = chosen + response_1 = rejected + better_response_id = 0 + else: + response_0 = rejected + response_1 = chosen + better_response_id = 1 + + output.append( + { + "prompt": prompt, + "response_0": response_0, + "response_1": response_1, + "better_response_id": better_response_id, + } + ) + + return output + + +def bucket_by_flags( + items: List[Dict[str, Any]], +) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]], List[Dict[str, Any]]]: + """Split items into (0,0), (1,1), and (0,1) buckets.""" + b00, b11, b01 = [], [], [] + + for ex in items: + h_w, h_l = ex["h_w"], ex["h_l"] + + if h_w == 0 and h_l == 0: + b00.append(ex) + elif h_w == 1 and h_l == 1: + b11.append(ex) + elif h_w == 0 and h_l == 1: + b01.append(ex) + + return b00, b11, b01 + + +def flip_sample(item: Dict[str, Any]) -> Dict[str, Any]: + """Flip a sample if (h_w, h_l) = (1, 0).""" + if item.get("h_w") == 1 and item.get("h_l") == 0: + item["h_w"], item["h_l"] = 0, 1 + item["chosen"], item["rejected"] = item["rejected"], item["chosen"] + return item diff --git a/src/aixpert/data_construction/utils/dpo_transform_utils.py b/src/aixpert/data_construction/utils/dpo_transform_utils.py new file mode 100644 index 0000000..98684f7 --- /dev/null +++ b/src/aixpert/data_construction/utils/dpo_transform_utils.py @@ -0,0 +1,56 @@ +"""Utilities for transforming factual-scored pairs into DPO-ready format.""" + +from __future__ import annotations + +from pathlib import Path +from typing import Any, Dict + +from tqdm import tqdm +from utils.data_utils import load_jsonl, write_jsonl + + +def process_item(item: Dict[str, Any]) -> Dict[str, Any]: + """Convert one factual-scored item into DPO-ready structure.""" + prompt = item["prompt"] + r0 = item["response_0"] + r1 = item["response_1"] + pref = int(item["better_response_id"]) + + h0 = int(item["h0"]) + h1 = int(item["h1"]) + + if pref == 0: + chosen, rejected = r0, r1 + h_w, h_l = h0, h1 + else: + chosen, rejected = r1, r0 + h_w, h_l = h1, h0 + + return { + "prompt": prompt, + "chosen": chosen, + "rejected": rejected, + "h_w": h_w, + "h_l": h_l, + "better_response_id": pref, + "response_0": r0, + "response_1": r1, + "flipped": False, + } + + +def transform_dataset(input_path: Path, output_path: Path) -> None: + """Load dataset, apply transformation, and save output JSONL.""" + print(f"Loading → {input_path}") + items = load_jsonl(input_path) + + print(f"Transforming {len(items)} items…") + transformed = [process_item(it) for it in tqdm(items)] + + print(f"Saving → {output_path}") + write_jsonl(output_path, transformed) + + print("\n=======================================") + print("TRANSFORMATION COMPLETE") + print(f"Total items: {len(items)}") + print("=======================================\n") diff --git a/src/aixpert/data_construction/utils/factual_utils.py b/src/aixpert/data_construction/utils/factual_utils.py new file mode 100644 index 0000000..f9996b1 --- /dev/null +++ b/src/aixpert/data_construction/utils/factual_utils.py @@ -0,0 +1,120 @@ +""" +Async factuality evaluation utilities. + +This module runs factual-flag scoring for preference pairs using an +LLM judge, supports concurrency, retries, and resume-safe checkpointing. +""" + +from __future__ import annotations + +import asyncio +import json +import os +import re +from pathlib import Path +from typing import Any, Dict, List + +from openai import AsyncOpenAI +from tqdm.asyncio import tqdm_asyncio +from utils.prompt_templates import BINARY_FACTUAL_JUDGE_PROMPT + + +def get_client(api_key: str) -> AsyncOpenAI: + """Return AsyncOpenAI client.""" + return AsyncOpenAI(api_key=api_key) + + +async def get_factual_flag( + client: AsyncOpenAI, + model: str, + question: str, + answer: str, + semaphore: asyncio.Semaphore, + max_retries: int, +) -> int: + """Evaluate factual correctness (0 factual, 1 hallucinated).""" + prompt = BINARY_FACTUAL_JUDGE_PROMPT.format(question=question, answer=answer) + + async with semaphore: + for retry in range(max_retries): + try: + resp = await client.chat.completions.create( + model=model, + messages=[{"role": "user", "content": prompt}], + temperature=0, + ) + text = resp.choices[0].message.content.strip() + match = re.search(r"\[\[(0|1)\]\]", text) + return int(match.group(1)) if match else 1 + except Exception: + await asyncio.sleep(1 + retry * 0.5) + + return 1 + + +async def evaluate_pair( + client: AsyncOpenAI, + item: Dict[str, Any], + model: str, + sem: asyncio.Semaphore, + retries: int, +) -> Dict[str, Any]: + """Compute factual flags for response_0 and response_1.""" + prompt = item["prompt"] + + t0 = asyncio.create_task( + get_factual_flag(client, model, prompt, item["response_0"], sem, retries) + ) + t1 = asyncio.create_task( + get_factual_flag(client, model, prompt, item["response_1"], sem, retries) + ) + + f0, f1 = await asyncio.gather(t0, t1) + + return { + **item, + "factual_flag_0": f0, + "factual_flag_1": f1, + "h0": f0, + "h1": f1, + } + + +async def factual_evaluation_pipeline( + client: AsyncOpenAI, + items: List[Dict[str, Any]], + output_file: Path, + model: str, + concurrency: int, + max_retries: int, +) -> None: + """Run factuality evaluation with resume and checkpoint support.""" + processed = 0 + if output_file.exists(): + with output_file.open("r", encoding="utf-8") as f: + processed = sum(1 for _ in f) + + remaining = items[processed:] + sem = asyncio.Semaphore(concurrency) + + tasks = [evaluate_pair(client, item, model, sem, max_retries) for item in remaining] + + buffer: List[str] = [] + counter = processed + + with output_file.open("a", encoding="utf-8") as f: + for coro in tqdm_asyncio.as_completed(tasks, total=len(tasks)): + out = await coro + buffer.append(json.dumps(out, ensure_ascii=False) + "\n") + counter += 1 + + if len(buffer) >= 25: + f.writelines(buffer) + f.flush() + os.fsync(f.fileno()) + buffer.clear() + + if buffer: + f.writelines(buffer) + f.flush() + os.fsync(f.fileno()) diff --git a/src/aixpert/data_construction/utils/prompt_templates.py b/src/aixpert/data_construction/utils/prompt_templates.py new file mode 100644 index 0000000..86cbe86 --- /dev/null +++ b/src/aixpert/data_construction/utils/prompt_templates.py @@ -0,0 +1,57 @@ +"""Central location for prompt templates (evaluation, DPO scoring, etc.).""" + +from __future__ import annotations + + +BINARY_FACTUAL_JUDGE_PROMPT = """ +System: +You are an extremely strict factuality judge. Your task is to determine whether the assistant’s +response contains ANY of the following: + +• factual errors +• hallucinations +• unsupported claims +• unverifiable statements +• fabricated details +• incorrect technical descriptions +• invented studies, names, or entities +• logical contradictions +• vague or misleading explanations +• partial answers that omit critical facts + +IMPORTANT: +If the response is vague, incomplete, unsupported, or unverifiable, +classify it as [[1]]. + +Definitions: +[[0]] → Fully factual, precise, verifiable. +[[1]] → ANY issue. + +Return ONLY: +[[0]] or [[1]] + +User Question: +{question} + +Assistant Response: +{answer} +""" + +CORRUPTION_SYSTEM_PROMPT = """ +You are a factual corruption generator. + +Rewrite a correct answer into a subtly incorrect version with 2–3 factual errors. +It must remain fluent, confident, and plausible. +""" + +CORRUPTION_USER_PROMPT = """ +PROMPT: +{question} + +CORRECT ANSWER: +{answer} + +TASK: +Rewrite the answer so that it becomes factually wrong, introducing subtle hallucinations +while sounding confident and coherent. +""" diff --git a/src/aixpert/data_construction/utils/synthetic_utils.py b/src/aixpert/data_construction/utils/synthetic_utils.py new file mode 100644 index 0000000..000cab6 --- /dev/null +++ b/src/aixpert/data_construction/utils/synthetic_utils.py @@ -0,0 +1,59 @@ +"""Async utilities for generating corrupted answers and synthetic inversions.""" + +from __future__ import annotations + +import asyncio +from typing import Any, Dict, Optional + +from openai import AsyncOpenAI +from utils.prompt_templates import ( + CORRUPTION_SYSTEM_PROMPT, + CORRUPTION_USER_PROMPT, +) + + +async def generate_corruption( + client: AsyncOpenAI, + model: str, + question: str, + answer: str, + semaphore: asyncio.Semaphore, + max_retries: int = 5, + temperature: float = 0.8, +) -> Optional[str]: + """Generate a hallucinated / corrupted answer using GPT.""" + user_prompt = CORRUPTION_USER_PROMPT.format(question=question, answer=answer) + + async with semaphore: + for retry in range(max_retries): + try: + resp = await client.chat.completions.create( + model=model, + messages=[ + {"role": "system", "content": CORRUPTION_SYSTEM_PROMPT}, + {"role": "user", "content": user_prompt}, + ], + temperature=temperature, + ) + return resp.choices[0].message.content.strip() + + except Exception as exc: + print(f"[Retry {retry}] corruption generation failed: {exc}") + await asyncio.sleep(1 + retry * 0.5) + + return None + + +async def build_inversion_item( + item: Dict[str, Any], + corrupted: str, +) -> Dict[str, Any]: + """Return a synthetic inversion DPO sample.""" + return { + "prompt": item["prompt"], + "chosen": corrupted, + "rejected": item["chosen"], + "h_w": 1, + "h_l": 0, + "source": "synthetic_inversion", + }