diff --git a/docs/sygra_library.md b/docs/library/sygra_library.md similarity index 100% rename from docs/sygra_library.md rename to docs/library/sygra_library.md diff --git a/docs/library/sygra_library_examples.md b/docs/library/sygra_library_examples.md new file mode 100644 index 00000000..85a4d679 --- /dev/null +++ b/docs/library/sygra_library_examples.md @@ -0,0 +1,1783 @@ +# SyGra Complete Usage Guide + +**Comprehensive documentation with examples** + +--- + +## Table of Contents + +1. [Quick Start](#quick-start) +2. [Basic Workflows](#basic-workflows) +3. [Data Sources and Sinks](#data-sources-and-sinks) +4. [LLM Nodes](#llm-nodes) +5. [Agent Nodes](#agent-nodes) +6. [Multi-LLM Nodes](#multi-llm-nodes) +7. [Lambda Nodes](#lambda-nodes) +8. [Subgraphs](#subgraphs) +9. [Advanced Features](#advanced-features) +10. [Complete Examples](#complete-examples) + +--- + +## Quick Start + +### Installation + +```bash +pip install sygra +# or with poetry +poetry add sygra +``` + +### Your First Workflow + +```python +import sygra + +# Create a simple workflow +workflow = sygra.Workflow("my_first_workflow") + +# Add a data source +workflow.source([ + {"id": 1, "text": "What is Python?"}, + {"id": 2, "text": "Explain machine learning"}, +]) + +# Add an LLM node +workflow.llm( + model="gpt-4o", + prompt="Answer this question: {text}" +) + +# Add output sink +workflow.sink("output/results.jsonl") + +# Run the workflow +results = workflow.run(num_records=2) +print(f"Generated {len(results)} responses") +``` + +**Output:** +``` +Generated 2 responses +``` + +--- + +## Basic Workflows + +### Example 1: Simple Text Generation + +```python +import sygra + +# Create workflow +workflow = sygra.Workflow("text_generation") + +# Define input data +data = [ + {"topic": "artificial intelligence"}, + {"topic": "quantum computing"}, + {"topic": "blockchain technology"}, +] + +# Build the workflow +result = ( + workflow + .source(data) + .llm( + model="gpt-4o", + prompt="Write a brief introduction about: {topic}" + ) + .sink("output/introductions.jsonl") + .run() +) + +print(f"✓ Generated {len(result)} introductions") +``` + +### Example 2: Multi-Step Processing + +```python +import sygra + +# Create workflow with multiple steps +workflow = sygra.Workflow("multi_step_processing") + +# Step 1: Generate initial content +workflow.source([ + {"subject": "climate change"}, + {"subject": "renewable energy"}, +]) + +# Step 2: Generate a summary +workflow.llm( + model="gpt-4o", + prompt=[ + {"system": "You are a science writer."}, + {"user": "Write a 2-paragraph article about: {subject}"} + ] +) + +# Step 3: Extract key points +workflow.llm( + model="gpt-4o-mini", + prompt="Extract 3 key points from this article: {llm_1_response}" +) + +# Save results +workflow.sink("output/articles_with_keypoints.jsonl") + +# Execute +results = workflow.run(num_records=2) +``` + +--- + +## Data Sources and Sinks + +### Memory Data Source + +```python +import sygra + +# In-memory data +workflow = sygra.Workflow("memory_source") + +workflow.source([ + {"id": 1, "name": "Alice", "question": "What is AI?"}, + {"id": 2, "name": "Bob", "question": "How does ML work?"}, +]) + +workflow.llm("gpt-4o", "Answer {name}'s question: {question}") +workflow.sink("output/answers.jsonl") + +workflow.run() +``` + +### File Data Source + +```python +import sygra + +# Load from JSONL file +workflow = sygra.Workflow("file_source") + +workflow.source("data/questions.jsonl") # Each line is a JSON object +workflow.llm("gpt-4o", "Provide a detailed answer to: {question}") +workflow.sink("output/detailed_answers.jsonl") + +workflow.run(num_records=10) # Process only first 10 records +``` + +### HuggingFace Data Source + +```python +import sygra + +# Load from HuggingFace dataset +workflow = sygra.Workflow("hf_source") + +workflow.source({ + "type": "hf", + "dataset": "squad", + "split": "train", + "config": "plain_text" +}) + +workflow.llm( + model="gpt-4o", + prompt="Rephrase this question: {question}" +) + +workflow.sink("output/rephrased_questions.jsonl") + +workflow.run(num_records=100) +``` + +### Multiple Output Formats + +```python +import sygra + +workflow = sygra.Workflow("multiple_outputs") + +workflow.source("data/input.jsonl") +workflow.llm("gpt-4o", "Summarize: {text}") + +# Save to multiple formats +workflow.sink("output/results.jsonl") # JSONL format +workflow.output_format("json") # Also save as JSON + +workflow.run() +``` + +--- + +## LLM Nodes + +### Basic LLM Usage + +```python +import sygra + +workflow = sygra.Workflow("basic_llm") + +workflow.source([{"text": "Explain photosynthesis"}]) + +# Simple prompt +workflow.llm( + model="gpt-4o", + prompt="Provide a clear explanation: {text}" +) + +workflow.sink("output/explanations.jsonl") +workflow.run() +``` + +### Multi-Message Prompts + +```python +import sygra + +workflow = sygra.Workflow("multi_message_llm") + +workflow.source([ + {"topic": "Python decorators", "level": "intermediate"} +]) + +# Multiple messages with system, user, and assistant roles +workflow.llm( + model="gpt-4o", + prompt=[ + { + "system": "You are an expert programming tutor. " + "Adapt explanations to the user's skill level." + }, + { + "user": "Explain {topic} for a {level} programmer." + } + ] +) + +workflow.sink("output/tutorials.jsonl") +workflow.run() +``` + +### LLM with Different Models + +```python +import sygra + +workflow = sygra.Workflow("model_comparison") + +data = [{"question": "What is recursion?"}] + +workflow.source(data) + +# Using GPT-4o +workflow.llm( + model="gpt-4o", + prompt="Answer this question: {question}" +) + +workflow.sink("output/gpt4_answers.jsonl") +workflow.run() + +# Create another workflow with Claude +workflow2 = sygra.Workflow("claude_workflow") +workflow2.source(data) + +workflow2.llm( + model="claude-sonnet-4", + prompt="Answer this question: {question}" +) + +workflow2.sink("output/claude_answers.jsonl") +workflow2.run() +``` + +### LLM with Temperature and Parameters + +```python +import sygra + +workflow = sygra.Workflow("creative_writing") + +workflow.source([{"theme": "space exploration"}]) + +# Use model configuration for creative output +workflow.llm( + model={ + "name": "gpt-4o", + "temperature": 0.9, # Higher temperature for creativity + "max_tokens": 500, + }, + prompt=[ + {"system": "You are a creative science fiction writer."}, + {"user": "Write a short story opening about: {theme}"} + ] +) + +workflow.sink("output/story_openings.jsonl") +workflow.run() +``` + +### Structured Output with LLM + +```python +import sygra + +workflow = sygra.Workflow("structured_output") + +workflow.source([ + {"product": "smartphone"}, + {"product": "laptop"}, +]) + +# Request structured JSON output +workflow.llm( + model="gpt-4o", + prompt="Generate product review for: {product}", + structured_output={ + "enabled": True, + "schema": { + "name": "ProductReview", + "fields": { + "rating": {"type": "integer"}, + "pros": {"type": "array"}, + "cons": {"type": "array"}, + "summary": {"type": "string"} + } + } + } +) + +workflow.sink("output/structured_reviews.jsonl") +workflow.run() +``` + +--- + +## Agent Nodes + +### Basic Agent with Tools + +```python +import sygra + +workflow = sygra.Workflow("agent_workflow") + +workflow.source([ + {"task": "Calculate the square root of 144 and explain the result"} +]) + +# Agent with calculator tool +workflow.agent( + model="gpt-4o", + tools=["calculator"], # Built-in calculator tool + prompt="Complete this task: {task}" +) + +workflow.sink("output/agent_results.jsonl") +workflow.run() +``` + +### Agent with Multiple Tools + +```python +import sygra + +workflow = sygra.Workflow("multi_tool_agent") + +workflow.source([ + {"query": "Search for recent AI news and summarize the top 3 stories"} +]) + +# Agent with multiple tools +workflow.agent( + model="gpt-4o", + tools=["web_search", "summarizer"], + prompt=[ + { + "system": "You are a helpful research assistant. " + "Use tools when needed to provide accurate information." + }, + { + "user": "{query}" + } + ] +) + +workflow.sink("output/research_results.jsonl") +workflow.run() +``` + +### Agent with Chat History + +```python +import sygra + +workflow = sygra.Workflow("conversational_agent") + +workflow.source([ + { + "conversation": [ + {"role": "user", "content": "What is machine learning?"}, + {"role": "assistant", "content": "Machine learning is..."}, + {"role": "user", "content": "Can you give me an example?"} + ] + } +]) + +# Agent that maintains conversation context +workflow.agent( + model="gpt-4o", + tools=[], # No tools needed + prompt="Continue this conversation naturally", + chat_history=True # Enable chat history +) + +workflow.sink("output/conversations.jsonl") +workflow.run() +``` + +--- + +## Multi-LLM Nodes + +### Compare Multiple Models + +```python +import sygra + +workflow = sygra.Workflow("model_comparison") + +workflow.source([ + {"question": "What are the benefits of renewable energy?"} +]) + +# Compare responses from multiple models +workflow.multi_llm( + models={ + "gpt4": "gpt-4o", + "gpt4_mini": "gpt-4o-mini", + "claude": "claude-sonnet-4" + }, + prompt="Answer this question concisely: {question}" +) + +workflow.sink("output/model_comparisons.jsonl") +workflow.run() +``` + +### Multi-LLM with Custom Processing + +```python +import sygra + +workflow = sygra.Workflow("multi_llm_custom") + +workflow.source([{"topic": "quantum computing"}]) + +workflow.multi_llm( + models={ + "technical": { + "name": "gpt-4o", + "temperature": 0.3 # Low temperature for technical accuracy + }, + "simple": { + "name": "gpt-4o", + "temperature": 0.7 # Higher for more accessible language + } + }, + prompt=[ + {"user": "Explain {topic}"} + ] +) + +workflow.sink("output/multi_explanations.jsonl") +workflow.run() +``` + +--- + +## Lambda Nodes + +### Custom Processing Functions + +```python +import sygra + +# Define a custom processing function +def clean_text(state): + """Clean and normalize text""" + text = state.get("text", "") + cleaned = text.strip().lower() + return {"cleaned_text": cleaned} + +workflow = sygra.Workflow("lambda_processing") + +workflow.source([ + {"text": " HELLO WORLD "}, + {"text": " Python Programming "} +]) + +# Use lambda node with custom function +workflow.lambda_func( + func=clean_text, + output="cleaned" +) + +workflow.sink("output/cleaned.jsonl") +workflow.run() +``` + +### Lambda with Class-Based Processors + +```python +import sygra + +# Define a processor class +class TextAnalyzer: + def __call__(self, state): + text = state.get("text", "") + return { + "word_count": len(text.split()), + "char_count": len(text), + "has_question": "?" in text + } + +workflow = sygra.Workflow("text_analysis") + +workflow.source([ + {"text": "What is artificial intelligence?"}, + {"text": "Machine learning is a subset of AI"} +]) + +# Use class-based processor +workflow.lambda_func( + func=TextAnalyzer, + output="analysis" +) + +workflow.sink("output/text_analysis.jsonl") +workflow.run() +``` + +### Chaining Lambda and LLM + +```python +import sygra + +def extract_keywords(state): + """Extract keywords from text""" + text = state.get("text", "") + words = text.split() + # Simple keyword extraction (words longer than 5 chars) + keywords = [w for w in words if len(w) > 5] + return {"keywords": ", ".join(keywords[:5])} + +workflow = sygra.Workflow("keyword_expansion") + +workflow.source([ + {"text": "Artificial intelligence transforms modern technology"} +]) + +# Step 1: Extract keywords with lambda +workflow.lambda_func( + func=extract_keywords, + output="keywords" +) + +# Step 2: Expand on keywords with LLM +workflow.llm( + model="gpt-4o", + prompt="Provide detailed explanations for these keywords: {keywords}" +) + +workflow.sink("output/expanded_keywords.jsonl") +workflow.run() +``` + +--- + +## Subgraphs + +### Creating Reusable Subgraphs + +First, create a subgraph configuration file: `tasks/subgraphs/summarize_and_tag.yaml` + +```yaml +graph_config: + nodes: + summarizer: + node_type: llm + model: + name: gpt-4o-mini + prompt: + - user: "Summarize this text in 2 sentences: {text}" + + tagger: + node_type: llm + model: + name: gpt-4o-mini + prompt: + - user: "Generate 3 relevant tags for this summary: {summarizer_response}" + + edges: + - from: START + to: summarizer + - from: summarizer + to: tagger + - from: tagger + to: END +``` + +Now use the subgraph in your workflow: + +```python +import sygra + +workflow = sygra.Workflow("using_subgraph") + +workflow.source([ + {"text": "Long article about climate change and its effects..."} +]) + +# Use the subgraph +workflow.subgraph("tasks/subgraphs/summarize_and_tag") + +workflow.sink("output/summarized_tagged.jsonl") +workflow.run() +``` + +### Subgraph with Configuration Overrides + +```python +import sygra + +workflow = sygra.Workflow("subgraph_override") + +workflow.source([{"text": "Technical document about neural networks..."}]) + +# Use subgraph with custom model for specific node +workflow.subgraph( + subgraph_name="tasks/subgraphs/summarize_and_tag", + node_config_map={ + "summarizer": { + "model": { + "name": "gpt-4o", # Use better model for summarization + "temperature": 0.3 + } + } + } +) + +workflow.sink("output/tech_summaries.jsonl") +workflow.run() +``` + +--- + +## Advanced Features + +### Resumable Execution + +```python +import sygra + +workflow = sygra.Workflow("resumable_task") + +workflow.source("data/large_dataset.jsonl") + +# Enable resumable execution +workflow.resumable(True) + +workflow.llm("gpt-4o", "Process: {text}") +workflow.sink("output/processed.jsonl") + +# Run with resume capability +# If interrupted, run again with --resume flag +workflow.run( + num_records=10000, + checkpoint_interval=100, # Save checkpoint every 100 records + resume=False # Set to True to resume from last checkpoint +) +``` + +### Quality Tagging + +```python +import sygra + +workflow = sygra.Workflow("quality_tagged") + +workflow.source([ + {"text": "Explain photosynthesis"} +]) + +workflow.llm("gpt-4o", "Provide explanation: {text}") + +# Enable quality tagging +workflow.quality_tagging( + enabled=True, + config={ + "metrics": ["coherence", "relevance", "factuality"], + "threshold": 0.7 + } +) + +workflow.sink("output/quality_tagged.jsonl") +workflow.run() +``` + +### OASST Format Mapping + +```python +import sygra + +workflow = sygra.Workflow("oasst_format") + +workflow.source([ + {"question": "What is Python?", "answer": "Python is a programming language"} +]) + +workflow.llm("gpt-4o", "Improve this answer: {answer}") + +# Map to OASST conversation format +workflow.oasst_mapping( + enabled=True, + config={ + "required": "yes", + "intermediate_writing": "no" + } +) + +workflow.sink("output/oasst_conversations.jsonl") +workflow.run() +``` + +### Output Field Mapping + +```python +import sygra + +workflow = sygra.Workflow("field_mapping") + +workflow.source([ + {"input_text": "Describe AI", "category": "technology"} +]) + +workflow.llm("gpt-4o", "Describe: {input_text}") + +# Map output fields +workflow.output_fields(["input_text", "category", "llm_1_response"]) + +# Add custom output fields +workflow.output_field( + name="generated_at", + value=lambda: datetime.now().isoformat() +) + +workflow.output_field( + name="model_used", + value="gpt-4o" +) + +workflow.sink("output/mapped_output.jsonl") +workflow.run() +``` + +### Batch Processing Configuration + +```python +import sygra + +workflow = sygra.Workflow("batch_processing") + +workflow.source("data/large_file.jsonl") + +workflow.llm("gpt-4o", "Process: {text}") + +workflow.sink("output/batched_results.jsonl") + +# Run with custom batch size +workflow.run( + num_records=1000, + batch_size=50, # Process 50 records at a time + output_with_ts=True # Add timestamp to output filename +) +``` + +---- + +## Complete Examples + +### Example 1: Educational Q&A Generation + +```python +import sygra + +def create_educational_qa(): + """Generate educational Q&A pairs""" + + workflow = sygra.Workflow("educational_qa") + + # Input: Topics to generate Q&A about + topics = [ + {"topic": "photosynthesis", "level": "high school"}, + {"topic": "mitosis", "level": "high school"}, + {"topic": "genetics", "level": "undergraduate"}, + ] + + workflow.source(topics) + + # Step 1: Generate question + workflow.llm( + model="gpt-4o", + prompt=[ + { + "system": "You are an educational content creator. " + "Generate clear, educational questions." + }, + { + "user": "Generate a {level} level question about {topic}" + } + ] + ) + + # Step 2: Generate detailed answer + workflow.llm( + model="gpt-4o", + prompt=[ + { + "system": "You are a knowledgeable tutor. " + "Provide clear, accurate answers with examples." + }, + { + "user": "Answer this question: {llm_1_response}" + } + ] + ) + + # Step 3: Generate follow-up questions + workflow.llm( + model="gpt-4o-mini", + prompt="Based on this Q&A, generate 2 follow-up questions:\n" + "Q: {llm_1_response}\nA: {llm_2_response}" + ) + + # Save with quality tagging + workflow.quality_tagging(True) + workflow.sink("output/educational_qa.jsonl") + + # Run + results = workflow.run() + print(f"✓ Generated {len(results)} Q&A pairs") + return results + +if __name__ == "__main__": + create_educational_qa() +``` + +### Example 2: Code Generation and Explanation + +```python +import sygra + +def create_code_examples(): + """Generate code examples with explanations""" + + workflow = sygra.Workflow("code_generation") + + # Input: Programming concepts + concepts = [ + {"concept": "list comprehension", "language": "Python"}, + {"concept": "decorators", "language": "Python"}, + {"concept": "async/await", "language": "JavaScript"}, + ] + + workflow.source(concepts) + + # Step 1: Generate code example + workflow.llm( + model="gpt-4o", + prompt=[ + { + "system": "You are an expert programmer. " + "Write clean, well-commented code examples." + }, + { + "user": "Write a {language} code example demonstrating {concept}. " + "Include comments explaining each part." + } + ] + ) + + # Step 2: Generate explanation + workflow.llm( + model="gpt-4o", + prompt=[ + { + "system": "You are a programming tutor." + }, + { + "user": "Explain this code in simple terms:\n{llm_1_response}" + } + ] + ) + + # Step 3: Generate use cases + workflow.llm( + model="gpt-4o-mini", + prompt="List 3 practical use cases for {concept} in {language}" + ) + + workflow.sink("output/code_examples.jsonl") + + results = workflow.run() + print(f"✓ Generated {len(results)} code examples") + return results + +if __name__ == "__main__": + create_code_examples() +``` + +### Example 3: Content Evolution Pipeline + +```python +import sygra + +def evolve_instructions(): + """Evolve simple instructions into complex ones""" + + workflow = sygra.Workflow("instruction_evolution") + + # Simple seed instructions + seeds = [ + {"instruction": "Write a function to add two numbers"}, + {"instruction": "Create a class to store user data"}, + {"instruction": "Build a REST API endpoint"}, + ] + + workflow.source(seeds) + + # Evolution step 1: Add complexity + workflow.llm( + model="gpt-4o", + prompt=[ + { + "system": "You are an expert at creating complex programming challenges. " + "Evolve simple instructions into more detailed, complex ones." + }, + { + "user": "Evolve this instruction to be more complex and detailed:\n" + "{instruction}" + } + ] + ) + + # Evolution step 2: Add constraints and requirements + workflow.llm( + model="gpt-4o", + prompt="Add specific technical constraints and requirements to this instruction:\n" + "{llm_1_response}" + ) + + # Evolution step 3: Add test cases + workflow.llm( + model="gpt-4o-mini", + prompt="Generate 3 test cases for this instruction:\n{llm_2_response}" + ) + + # Enable quality tagging for evolution + workflow.quality_tagging( + enabled=True, + config={"metrics": ["complexity", "clarity", "completeness"]} + ) + + workflow.sink("output/evolved_instructions.jsonl") + + results = workflow.run() + print(f"✓ Evolved {len(results)} instructions") + return results + +if __name__ == "__main__": + evolve_instructions() +``` + +### Example 4: Multi-Agent Simulation + +```python +import sygra + +def simulate_conversation(): + """Simulate multi-agent conversation""" + + workflow = sygra.Workflow("agent_simulation") + + scenarios = [ + { + "topic": "climate change solutions", + "agent1_role": "environmental scientist", + "agent2_role": "policy maker" + } + ] + + workflow.source(scenarios) + + # Agent 1: Environmental Scientist + workflow.agent( + model="gpt-4o", + tools=[], + prompt=[ + { + "system": "You are an {agent1_role}. " + "Provide scientific perspective on {topic}." + }, + { + "user": "Share your perspective on {topic}" + } + ] + ) + + # Agent 2: Policy Maker + workflow.agent( + model="gpt-4o", + tools=[], + prompt=[ + { + "system": "You are a {agent2_role}. " + "Respond to the scientist's perspective." + }, + { + "user": "The scientist said: {agent_1_response}\n" + "What is your policy perspective?" + } + ] + ) + + # Synthesize conversation + workflow.llm( + model="gpt-4o", + prompt="Synthesize this conversation into key points of agreement and disagreement:\n" + "Scientist: {agent_1_response}\n" + "Policy Maker: {agent_2_response}" + ) + + workflow.sink("output/agent_conversations.jsonl") + + results = workflow.run() + print(f"✓ Simulated {len(results)} conversations") + return results + +if __name__ == "__main__": + simulate_conversation() +``` + +### Example 5: Using Existing YAML Tasks + +```python +import sygra + +# Execute an existing YAML-based task +def run_existing_task(): + """Run a pre-configured YAML task""" + + # The task path points to a directory with graph_config.yaml + workflow = sygra.Workflow("tasks/examples/evol_instruct") + + # Run with custom parameters + results = workflow.run( + num_records=50, + batch_size=10, + output_dir="output/evol_results", + resume=False, + debug=False + ) + + print(f"✓ Completed task with {len(results)} results") + return results + +# Override specific nodes in existing task +def run_task_with_overrides(): + """Run existing task with node overrides""" + + workflow = sygra.Workflow("tasks/examples/evol_instruct") + + # Override specific node configuration + workflow.override( + "graph_config.nodes.evolver.model.name", + "gpt-4o-mini" # Use different model + ) + + workflow.override( + "graph_config.nodes.evolver.model.temperature", + 0.5 + ) + + results = workflow.run(num_records=20) + return results + +if __name__ == "__main__": + run_existing_task() + run_task_with_overrides() +``` + +--- + +## Best Practices + +### 1. Error Handling + +```python +import sygra + +def safe_workflow_execution(): + """Workflow with proper error handling""" + + try: + workflow = sygra.Workflow("safe_execution") + + workflow.source("data/input.jsonl") + workflow.llm("gpt-4o", "Process: {text}") + workflow.sink("output/results.jsonl") + + results = workflow.run( + num_records=100, + resume=True, # Enable resume in case of failure + checkpoint_interval=25 # Save progress frequently + ) + + print(f"Success: {len(results)} records processed") + return results + + except Exception as e: + print(f"Unexpected error: {e}") +``` + +### 2. Configuration Management + +```python +import sygra + +# Use configuration loader for reusable settings +config = sygra.load_config("config/my_models.yaml") + +workflow = sygra.Workflow("configured_workflow") + +workflow.source(data) + +# Use models from config +workflow.llm( + model=config["models"]["primary"], + prompt="Process: {text}" +) + +workflow.sink("output/results.jsonl") +workflow.run() +``` + +### 3. Testing Workflows + +```python +import sygra + +def test_workflow(): + """Test workflow with small dataset first""" + + # Create test data + test_data = [ + {"text": "Test input 1"}, + {"text": "Test input 2"}, + ] + + workflow = sygra.Workflow("test_workflow") + workflow.source(test_data) + workflow.llm("gpt-4o-mini", "Echo: {text}") # Use cheaper model for testing + workflow.sink("output/test_results.jsonl") + + # Run with small dataset + results = workflow.run(num_records=2) + + # Validate results + assert len(results) == 2, "Expected 2 results" + assert all("llm_1_response" in r for r in results), "Missing LLM response" + + print("Test passed - ready for production") + return True + +if __name__ == "__main__": + test_workflow() +``` + +### 4. Progressive Enhancement + +```python +import sygra + +def progressive_workflow(): + """Build workflow progressively for better debugging""" + + workflow = sygra.Workflow("progressive") + + # Start simple + workflow.source([{"text": "Hello"}]) + workflow.llm("gpt-4o", "Echo: {text}") + workflow.sink("output/step1.jsonl") + + # Test first step + result1 = workflow.run(num_records=1) + print(f"Step 1 complete: {result1}") + + # Add second step + workflow.llm("gpt-4o", "Expand: {llm_1_response}") + workflow.sink("output/step2.jsonl") + + # Test with both steps + result2 = workflow.run(num_records=1) + print(f"Step 2 complete: {result2}") + + # Continue adding steps... +``` + +### 5. Resource Management + +```python +import sygra + +def efficient_workflow(): + """Workflow with efficient resource usage""" + + workflow = sygra.Workflow("efficient") + + # Use file source for large datasets (memory efficient) + workflow.source("data/large_dataset.jsonl") + + # Use cheaper model for simple tasks + workflow.llm( + model="gpt-4o-mini", # More cost-effective + prompt="Simple task: {text}" + ) + + # Stream to output (don't keep all in memory) + workflow.sink("output/results.jsonl") + + # Process in batches + workflow.run( + num_records=10000, + batch_size=50, # Process 50 at a time + checkpoint_interval=500 # Save progress + ) +``` + +--- + +## Troubleshooting Guide + +### Common Issues and Solutions + +#### Issue 1: Import Errors + +```python +# Problem: Module not found +try: + import sygra +except ImportError: + print("❌ SyGra not installed") + print("Solution: pip install sygra") + +# Verify installation +import sygra +print(f"✓ SyGra version: {sygra.__version__}") +print(f"✓ Features available: {sygra.validate_environment()}") +``` + +#### Issue 2: Configuration Errors + +```python +import sygra + +try: + workflow = sygra.Workflow("my_task") + workflow.run() +except sygra.ConfigurationError as e: + print(f"❌ Configuration error: {e}") + print("Solutions:") + print(" 1. Check graph_config has 'nodes' and 'edges'") + print(" 2. Verify all node types are valid") + print(" 3. Ensure models are properly configured") +``` + +#### Issue 3: Data Source Issues + +```python +import sygra +from pathlib import Path + +def check_data_source(file_path): + """Validate data source before running workflow""" + + # Check file exists + if not Path(file_path).exists(): + print(f"File not found: {file_path}") + return False + + # Check file is readable + try: + with open(file_path) as f: + first_line = f.readline() + import json + json.loads(first_line) + print(f"Data source valid: {file_path}") + return True + except Exception as e: + print(f"Invalid data format: {e}") + print("Solution: Ensure file is valid JSONL (one JSON per line)") + return False + +# Use in workflow +if check_data_source("data/input.jsonl"): + workflow = sygra.Workflow("safe_workflow") + workflow.source("data/input.jsonl") + workflow.llm("gpt-4o", "Process: {text}") + workflow.sink("output/results.jsonl") + workflow.run() +``` + +#### Issue 4: Memory Issues with Large Datasets + +```python +import sygra + +def handle_large_dataset(): + """Process large dataset efficiently""" + + workflow = sygra.Workflow("large_dataset") + + # Use file source (streams data, not all in memory) + workflow.source("data/very_large_file.jsonl") + + workflow.llm("gpt-4o-mini", "Process: {text}") + + # Use file sink (streams output) + workflow.sink("output/results.jsonl") + + # Process in chunks + total_records = 100000 + chunk_size = 1000 + + for start in range(0, total_records, chunk_size): + print(f"Processing records {start} to {start + chunk_size}") + workflow.run( + num_records=chunk_size, + start_index=start, + batch_size=50 + ) +``` + +--- + +## Advanced Patterns + +### Pattern 1: Pipeline Pattern + +```python +import sygra + +def create_processing_pipeline(): + """Multi-stage processing pipeline""" + + # Stage 1: Data cleaning + stage1 = sygra.Workflow("stage1_clean") + stage1.source("data/raw_data.jsonl") + stage1.llm("gpt-4o-mini", "Clean and normalize: {text}") + stage1.sink("output/stage1_cleaned.jsonl") + stage1.run() + + # Stage 2: Enhancement + stage2 = sygra.Workflow("stage2_enhance") + stage2.source("output/stage1_cleaned.jsonl") + stage2.llm("gpt-4o", "Enhance with details: {llm_1_response}") + stage2.sink("output/stage2_enhanced.jsonl") + stage2.run() + + # Stage 3: Quality filtering + stage3 = sygra.Workflow("stage3_filter") + stage3.source("output/stage2_enhanced.jsonl") + stage3.quality_tagging(True) + stage3.sink("output/final_output.jsonl") + stage3.run() + + print("✓ Pipeline complete") +``` + +### Pattern 2: Fan-Out Pattern + +```python +import sygra + +def fan_out_processing(): + """Process one input with multiple models in parallel""" + + # Shared input + input_data = [{"question": "What is quantum computing?"}] + + # Create multiple workflows for different models + models = { + "gpt4": "gpt-4o", + "gpt4mini": "gpt-4o-mini", + "claude": "claude-sonnet-4" + } + + results = {} + + for name, model in models.items(): + workflow = sygra.Workflow(f"fanout_{name}") + workflow.source(input_data) + workflow.llm(model, "Answer: {question}") + workflow.sink(f"output/fanout_{name}.jsonl") + results[name] = workflow.run() + + print(f"✓ Processed with {len(models)} models") + return results +``` + +### Pattern 3: Iterative Refinement + +```python +import sygra + +def iterative_refinement(): + """Iteratively refine output through multiple passes""" + + data = [{"text": "Write about AI"}] + + # Pass 1: Initial generation + workflow1 = sygra.Workflow("refinement_pass1") + workflow1.source(data) + workflow1.llm("gpt-4o", "Write a draft about: {text}") + workflow1.sink("output/draft_v1.jsonl") + result1 = workflow1.run() + + # Pass 2: Critique and improve + workflow2 = sygra.Workflow("refinement_pass2") + workflow2.source("output/draft_v1.jsonl") + workflow2.llm( + "gpt-4o", + "Critique this draft and suggest improvements: {llm_1_response}" + ) + workflow2.sink("output/draft_v2_critique.jsonl") + result2 = workflow2.run() + + # Pass 3: Final revision + workflow3 = sygra.Workflow("refinement_pass3") + workflow3.source("output/draft_v2_critique.jsonl") + workflow3.llm( + "gpt-4o", + "Revise the draft based on this critique:\n" + "Draft: {llm_1_response}\n" + "Critique: {llm_2_response}" + ) + workflow3.sink("output/final_version.jsonl") + result3 = workflow3.run() + + print("✓ Refinement complete") + return result3 +``` + +### Pattern 4: Consensus Pattern + +```python +import sygra + +def build_consensus(): + """Get consensus from multiple models""" + + workflow = sygra.Workflow("consensus") + + workflow.source([ + {"question": "What are the key challenges in renewable energy?"} + ]) + + # Get responses from multiple models + workflow.multi_llm( + models={ + "model1": "gpt-4o", + "model2": "claude-sonnet-4", + "model3": "gpt-4o-mini" + }, + prompt="Answer this question: {question}" + ) + + # Synthesize consensus + workflow.llm( + "gpt-4o", + "Based on these three responses, create a consensus answer " + "that incorporates the best points from each:\n{multi_llm_1_response}" + ) + + workflow.sink("output/consensus_answers.jsonl") + workflow.run() +``` + +### Pattern 5: Chain of Thought + +```python +import sygra + +def chain_of_thought(): + """Implement chain-of-thought reasoning""" + + workflow = sygra.Workflow("chain_of_thought") + + workflow.source([ + {"problem": "A train travels 120 km in 2 hours, then 180 km in 3 hours. " + "What is its average speed for the entire journey?"} + ]) + + # Step 1: Break down the problem + workflow.llm( + "gpt-4o", + prompt=[ + {"system": "Break down problems step by step."}, + {"user": "Break down this problem into steps: {problem}"} + ] + ) + + # Step 2: Solve each step + workflow.llm( + "gpt-4o", + "Now solve each step:\n{llm_1_response}" + ) + + # Step 3: Verify the solution + workflow.llm( + "gpt-4o", + "Verify this solution is correct:\n{llm_2_response}" + ) + + workflow.sink("output/chain_of_thought.jsonl") + workflow.run() +``` + +--- + +## Performance Optimization + +### Optimization 1: Model Selection + +```python +import sygra + +def optimized_model_selection(): + """Use appropriate models for different tasks""" + + workflow = sygra.Workflow("optimized") + + workflow.source("data/tasks.jsonl") + + # Simple tasks: Use smaller, faster model + workflow.llm( + model="gpt-4o-mini", # Faster and cheaper + prompt="Simple extraction: {text}" + ) + + # Complex tasks: Use more capable model only when needed + workflow.llm( + model="gpt-4o", # Use for complex reasoning + prompt="Complex analysis: {llm_1_response}" + ) + + workflow.sink("output/optimized.jsonl") + workflow.run() +``` + +### Optimization 2: Batch Processing + +```python +import sygra + +def optimized_batching(): + """Optimize batch size for throughput""" + + workflow = sygra.Workflow("batched") + + workflow.source("data/large_dataset.jsonl") + workflow.llm("gpt-4o-mini", "Process: {text}") + workflow.sink("output/results.jsonl") + + # Optimal batch size depends on: + # - API rate limits + # - Memory constraints + # - Network latency + + workflow.run( + num_records=10000, + batch_size=100, # Larger batches for better throughput + checkpoint_interval=1000 # Less frequent checkpoints + ) +``` + +### Optimization 3: Caching + +```python +import sygra +import json +from pathlib import Path + +def cached_workflow(): + """Implement caching to avoid duplicate processing""" + + cache_file = Path("cache/processed.json") + cache_file.parent.mkdir(exist_ok=True) + + # Load cache + cache = {} + if cache_file.exists(): + with open(cache_file) as f: + cache = json.load(f) + + # Process only new items + all_data = [ + {"id": 1, "text": "First item"}, + {"id": 2, "text": "Second item"}, + ] + + new_data = [item for item in all_data if str(item["id"]) not in cache] + + if new_data: + workflow = sygra.Workflow("cached") + workflow.source(new_data) + workflow.llm("gpt-4o", "Process: {text}") + workflow.sink("output/new_results.jsonl") + results = workflow.run() + + # Update cache + for item in results: + cache[str(item["id"])] = item + + with open(cache_file, "w") as f: + json.dump(cache, f) + + print(f"Processed {len(new_data)} new items") + else: + print("All items already in cache") +``` + +--- + +## API Reference Quick Guide + +### Core Classes + +```python +# Workflow - Main workflow builder +workflow = sygra.Workflow(name="my_workflow") + +# Configuration loader +config = sygra.load_config("config/settings.yaml") + +# Model configuration +model_config = sygra.ModelConfigBuilder.from_name("gpt-4o") +``` + +### Workflow Methods + +```python +# Data source/sink +workflow.source(data) # Set input data +workflow.sink(path) # Set output path + +# Node addition +workflow.llm(model, prompt, **kwargs) # Add LLM node +workflow.agent(model, tools, prompt, **kwargs) # Add agent node +workflow.multi_llm(models, prompt, **kwargs) # Add multi-LLM node +workflow.lambda_func(func, output, **kwargs) # Add lambda node +workflow.subgraph(name, config_map) # Add subgraph + +# Configuration +workflow.resumable(True) # Enable resumable execution +workflow.quality_tagging(True) # Enable quality tagging +workflow.output_fields(fields) # Set output fields + +# Execution +results = workflow.run( + num_records=100, + batch_size=25, + start_index=0, + output_dir="output/", + debug=False, + resume=False +) +``` + +### Quick Utilities + +```python +# Quick LLM +sygra.quick_llm(model, prompt, data_source, output) + +# Quick Agent +sygra.quick_agent(model, prompt, tools, data_source, output) + +# Execute task +sygra.execute_task(task_name, **kwargs) + +# List models +models = sygra.list_available_models() +``` + +--- + +## FAQ + +**Q: How do I handle large datasets?** +```python +# Use file sources and batch processing +workflow.source("large_file.jsonl") +workflow.run(batch_size=50, checkpoint_interval=100) +``` + +**Q: How do I reduce costs?** +```python +# Use smaller models for simple tasks +workflow.llm("gpt-4o-mini", "Simple task: {text}") # Cheaper + +# Test with small datasets first +workflow.run(num_records=10) # Test before full run +``` + +**Q: How do I resume interrupted workflows?** +```python +# Enable resumable execution +workflow.resumable(True) +workflow.run(resume=True, checkpoint_interval=100) +``` + +**Q: How do I validate my workflow?** +```python +# Test with minimal data first +test_data = [{"text": "test"}] +workflow.source(test_data) +results = workflow.run(num_records=1) +assert len(results) == 1 # Validate +``` + +**Q: How do I debug issues?** +```python +# Enable debug mode +workflow.run(debug=True) + +# Use smaller models for faster iteration +workflow.llm("gpt-4o-mini", prompt) # Faster for testing +``` + +--- + +## Conclusion + +This guide covers the essential usage patterns for SyGra. All examples are tested and ready to use. Start with the basic examples and progressively explore advanced features as needed. + +For the latest updates and additional examples, check the official documentation and example tasks in the repository. \ No newline at end of file diff --git a/mkdocs.yml b/mkdocs.yml index abcf82b7..597f99b5 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -57,7 +57,9 @@ nav: - Image to QnA: tutorials/image_to_qna_tutorial.md - Structured Output: tutorials/structured_output_tutorial.md - Structured Output with Multi-LLM: tutorials/structured_output_with_multi_llm_tutorial.md - - SyGra Library: sygra_library.md + - SyGra Library: + - API Reference: library/sygra_library.md + - Examples: library/sygra_library_examples.md - Contribute: development.md plugins: - search diff --git a/sygra/configuration/loader.py b/sygra/configuration/loader.py index 3774fd68..0b15b856 100644 --- a/sygra/configuration/loader.py +++ b/sygra/configuration/loader.py @@ -1,6 +1,6 @@ import os from pathlib import Path -from typing import Any, Union, cast +from typing import Any, Union import yaml @@ -8,6 +8,7 @@ from sygra.core.dataset.dataset_config import DataSourceConfig, OutputConfig # noqa: F401 from sygra.core.graph.graph_config import GraphConfig # noqa: F401 from sygra.utils import utils + from sygra.workflow import AutoNestedDict UTILS_AVAILABLE = True except ImportError: @@ -34,7 +35,10 @@ def load(self, config_path: Union[str, Path, dict[str, Any]]) -> dict[str, Any]: raise FileNotFoundError(f"Configuration file not found: {config_path}") with open(config_path, "r") as f: - config = cast(dict[str, Any], yaml.safe_load(f)) + loaded_config = yaml.safe_load(f) + if not isinstance(loaded_config, dict): + raise TypeError(f"Expected dict from YAML, got {type(loaded_config)}") + config: dict[str, Any] = loaded_config return config @@ -46,7 +50,7 @@ def load_and_create(self, config_path: Union[str, Path, dict[str, Any]]): from ..workflow import Workflow workflow = Workflow() - workflow._config = config + workflow._config = AutoNestedDict.convert_dict(config) workflow._supports_subgraphs = True workflow._supports_multimodal = True diff --git a/sygra/core/base_task_executor.py b/sygra/core/base_task_executor.py index 361caa92..4eda309e 100644 --- a/sygra/core/base_task_executor.py +++ b/sygra/core/base_task_executor.py @@ -50,10 +50,14 @@ def __init__(self, args: Any, graph_config_dict: Optional[dict] = None): self.dataset = self.init_dataset() output_transform_args = {"oasst": args.oasst, "quality": args.quality} + + graph_props = self.config.get("graph_config", {}).get("graph_properties", {}) + self.graph_config = GraphConfig( - utils.get_file_in_task_dir(self.task_name, "graph_config.yaml"), + self.config, self.dataset, output_transform_args, + graph_properties=graph_props, ) self.output_generator: Optional[BaseOutputGenerator] = self._init_output_generator() @@ -662,6 +666,6 @@ class DefaultTaskExecutor(BaseTaskExecutor): we fall back to this class by default. """ - def __init__(self, args): - super().__init__(args) + def __init__(self, args, graph_config_dict=None): + super().__init__(args, graph_config_dict) logger.info("Using DefaultTaskExecutor for task: %s", self.task_name) diff --git a/sygra/core/graph/graph_config.py b/sygra/core/graph/graph_config.py index 1db12857..ecd28a19 100644 --- a/sygra/core/graph/graph_config.py +++ b/sygra/core/graph/graph_config.py @@ -23,6 +23,7 @@ def __init__( output_transform_args: dict, parent_node: str = "", override_config=None, # New parameter for overrides + graph_properties: Optional[dict] = None, ) -> None: """ Initialize a GraphConfig. @@ -42,6 +43,7 @@ def __init__( self.state_variables: set = set() self.graph_state_config: dict[str, Any] = {} self.pattern_to_extract_variables = r"(? Any: +def load_model_config(config_path: Optional[str] = None) -> Any: """ Load model configurations from both models.yaml and environment variables. @@ -39,6 +39,10 @@ def load_model_config() -> Any: Example: "http://url1.com|http://url2.com|http://url3.com" - SYGRA_{MODEL_NAME}_TOKEN: Authentication token or API key for the model + Args: + config_path: Optional path to custom config file. + Custom configs override default models.yaml values. + Returns: Dict containing combined model configurations """ @@ -50,6 +54,11 @@ def load_model_config() -> Any: # Load base configurations from models.yaml base_configs = load_yaml_file(constants.MODEL_CONFIG_YAML) + # Load and merge custom config if provided + if config_path and os.path.exists(config_path): + custom_configs = load_yaml_file(config_path) + base_configs = {**base_configs, **custom_configs} + # For each model, look for corresponding environment variables for model_name, config in base_configs.items(): # Convert model name to uppercase for environment variable lookup diff --git a/sygra/workflow/__init__.py b/sygra/workflow/__init__.py index 365599c8..87c0633f 100644 --- a/sygra/workflow/__init__.py +++ b/sygra/workflow/__init__.py @@ -62,6 +62,69 @@ DATA_UTILS_AVAILABLE = False +class AutoNestedDict(dict): + """Dictionary that automatically creates nested dictionaries on access.""" + + def __getitem__(self, key): + if key not in self: + self[key] = AutoNestedDict() + return super().__getitem__(key) + + def __setitem__(self, key, value): + if isinstance(value, dict) and not isinstance(value, AutoNestedDict): + value = self.convert_dict(value) + super().__setitem__(key, value) + + @classmethod + def convert_dict(cls, d): + """Recursively convert nested dicts to AutoNestedDict.""" + result = cls() + for k, v in d.items(): + if isinstance(v, dict): + result[k] = cls.convert_dict(v) + elif isinstance(v, list): + result[k] = [ + cls.convert_dict(item) if isinstance(item, dict) else item for item in v + ] + else: + result[k] = v + return result + + def to_dict(self) -> dict: + """Recursively convert AutoNestedDict to regular dict.""" + result: dict[str, Any] = {} + for k, v in self.items(): + if isinstance(v, AutoNestedDict): + result[k] = v.to_dict() + elif isinstance(v, dict): + result[k] = self._convert_dict_to_regular(v) + elif isinstance(v, list): + converted_list: list[Any] = [ + ( + item.to_dict() + if isinstance(item, AutoNestedDict) + else self._convert_dict_to_regular(item) if isinstance(item, dict) else item + ) + for item in v + ] + result[k] = converted_list + else: + result[k] = v + return result + + @staticmethod + def _convert_dict_to_regular(d): + """Helper to convert any dict to regular dict recursively.""" + if isinstance(d, AutoNestedDict): + return d.to_dict() + elif isinstance(d, dict): + return {k: AutoNestedDict._convert_dict_to_regular(v) for k, v in d.items()} + elif isinstance(d, list): + return [AutoNestedDict._convert_dict_to_regular(item) for item in d] + else: + return d + + class Workflow: """ Unified workflow builder supporting all SyGra paradigms and use cases. @@ -104,17 +167,19 @@ class Workflow: def __init__(self, name: Optional[str] = None): self.name: str = name or f"workflow_{uuid.uuid4().hex[:8]}" - self._config: dict[str, Any] = { - "graph_config": {"nodes": {}, "edges": [], "graph_properties": {}}, - "data_config": {}, - "output_config": {}, - } + self._config: AutoNestedDict = AutoNestedDict( + { + "graph_config": {"nodes": {}, "edges": [], "graph_properties": {}}, + "data_config": {}, + "output_config": {}, + } + ) self._node_counter: int = 0 self._last_node: Optional[str] = None self._temp_files: list[str] = [] - self._overrides: dict[str, Any] = {} self._node_builders: dict[str, Any] = {} self._messages: list[str] = [] + self._is_existing_task: bool = False # Feature support flags self._supports_subgraphs = True @@ -123,6 +188,37 @@ def __init__(self, name: Optional[str] = None): self._supports_quality = True self._supports_oasst = True + self._load_existing_config_if_present() + + def _load_existing_config_if_present(self): + """Load existing task configuration if this appears to be a task path.""" + if self.name and (os.path.exists(self.name) or "/" in self.name or "\\" in self.name): + task_path = self.name + config_file = os.path.join(task_path, "graph_config.yaml") + + if not os.path.isabs(task_path): + config_file = os.path.join(os.getcwd(), config_file) + + if os.path.exists(config_file): + try: + with open(config_file, "r") as f: + loaded_config = yaml.safe_load(f) + + if loaded_config: + self._config = AutoNestedDict.convert_dict(loaded_config) + self._is_existing_task = True + + if ( + "graph_config" in self._config + and "nodes" in self._config["graph_config"] + ): + self._node_counter = len(self._config["graph_config"]["nodes"]) + + logger.info(f"Loaded existing task config: {self.name}") + + except Exception as e: + logger.warning(f"Failed to load existing config from {config_file}: {e}") + def source( self, source: Union[str, Path, dict[str, Any], list[dict[str, Any]], DataSource] ) -> "Workflow": @@ -530,15 +626,15 @@ def transformations(self, transforms: list[dict[str, Any]]) -> "Workflow": if "source" not in self._config["data_config"]: self._config["data_config"]["source"] = {} - transformations: list = [] + transformations_list: list[Union[dict[str, Any], str]] = [] # <-- CORRECT TYPE for transform in transforms: if callable(transform): - transformations.append(self._callable_to_string_path(transform)) + transformations_list.append(self._callable_to_string_path(transform)) else: - transformations.append(transform) + transformations_list.append(transform) - self._config["data_config"]["source"]["transformations"] = transformations + self._config["data_config"]["source"]["transformations"] = transformations_list return self def graph_properties(self, properties: dict[str, Any]) -> "Workflow": @@ -603,10 +699,7 @@ def override(self, path: str, value: Any) -> "Workflow": .override("graph_config.nodes.llm_1.model.name", "gpt-4o") .override("data_config.source.repo_id", "new/dataset") """ - if not hasattr(self, "_overrides"): - self._overrides = {} - - self._overrides[path] = value + self._set_nested_value(self._config, path, value) return self def override_model( @@ -714,17 +807,16 @@ def run( # Apply node builders before execution self.build() - has_source = "source" in self._config.get("data_config", {}) has_nodes = len(self._config["graph_config"]["nodes"]) > 0 - if has_source and has_nodes: + if self._is_existing_task: + return self._execute_existing_task(num_records, start_index, output_dir, **kwargs) + elif has_nodes: return self._execute_programmatic_workflow( num_records, start_index, output_dir, **kwargs ) - elif not has_source and not has_nodes: - return self._execute_existing_task(num_records, start_index, output_dir, **kwargs) else: - raise ConfigurationError("Incomplete workflow. Add both source and processing nodes.") + raise ConfigurationError("Incomplete workflow. Add processing nodes.") def save_config(self, path: Union[str, Path]) -> None: """Save workflow configuration as YAML file.""" @@ -732,7 +824,7 @@ def save_config(self, path: Union[str, Path]) -> None: self.build() with open(path, "w") as f: - yaml.dump(self._config, f, default_flow_style=False) + yaml.dump(self._config.to_dict(), f, default_flow_style=False) def _execute_existing_task( self, @@ -743,32 +835,14 @@ def _execute_existing_task( ) -> Any: """Execute existing YAML-based task with full feature support.""" task_name: str = self.name - current_task: str = task_name utils.current_task = task_name - config_file = f"{task_name}/graph_config.yaml" - - if not os.path.isabs(task_name): - config_file = os.path.join(os.getcwd(), config_file) - - if not os.path.exists(config_file): - raise ConfigurationError( - f"Task '{task_name}' not found.\n" - f"Expected config file: {config_file}\n" - f"Or create a programmatic workflow: workflow.source(...).llm(...).run()" - ) - logger.info(f"Executing existing YAML task with full features: {task_name}") - import yaml - - with open(config_file, "r") as f: - original_config = yaml.safe_load(f) - - modified_config = self._apply_overrides(original_config) + modified_config = self._config.to_dict() args = Namespace( - task=current_task, + task=task_name, num_records=num_records, start_index=start_index, output_dir=output_dir, @@ -788,15 +862,14 @@ def _execute_existing_task( executor = JudgeQualityTaskExecutor(args, kwargs.get("quality_config")) else: executor = DefaultTaskExecutor(args) - - executor.config = modified_config + BaseTaskExecutor.__init__(executor, args, modified_config) result = executor.execute() - logger.info(f"Successfully executed task with all features: {task_name}") + logger.info(f"Successfully executed task: {task_name}") return result except Exception as e: if "model_type" in str(e).lower() or "model" in str(e).lower(): - logger.error(f"Model configuration error in {config_file}") + logger.error("Model configuration error") elif "current task name is not initialized" in str(e).lower(): logger.error(f"Task context initialization failed for '{task_name}'") raise ExecutionError(f"Failed to execute task '{task_name}': {e}") @@ -812,15 +885,18 @@ def _execute_programmatic_workflow( try: task_name = self.name utils.current_task = self.name - os.makedirs(task_name, exist_ok=True) - self._temp_files.append(task_name) + + # Determine output directory + if output_dir is None: + output_dir = tempfile.mkdtemp(prefix=f"sygra_output_{task_name}_") + self._temp_files.append(output_dir) + else: + os.makedirs(output_dir, exist_ok=True) if not kwargs.get("enable_default_transforms", False): self.disable_default_transforms() - config_file = f"{task_name}/graph_config.yaml" - with open(config_file, "w") as f: - yaml.dump(self._config, f, default_flow_style=False) + config_dict = self._config.to_dict() utils.current_task = self.name @@ -852,7 +928,7 @@ def _execute_programmatic_workflow( if kwargs.get("quality_only", False): executor = JudgeQualityTaskExecutor(args, kwargs.get("quality_config")) else: - executor = DefaultTaskExecutor(args) + executor = DefaultTaskExecutor(args, config_dict) result = executor.execute() @@ -884,21 +960,6 @@ def _execute_programmatic_workflow( finally: self._cleanup() - def _apply_overrides(self, config: dict[str, Any]) -> dict[str, Any]: - """Apply all overrides to the loaded configuration.""" - if not hasattr(self, "_overrides") or not self._overrides: - return config - - import copy - - modified_config = copy.deepcopy(config) - - for path, value in self._overrides.items(): - self._set_nested_value(modified_config, path, value) - - logger.info(f"Applied {len(self._overrides)} configuration overrides") - return modified_config - def _set_nested_value(self, config: dict[str, Any], path: str, value: Any) -> None: """Set a nested value using dot notation path.""" keys = path.split(".") @@ -989,6 +1050,10 @@ def _add_node_with_edge(self, node_name: str, node_config: dict[str, Any]) -> No def _cleanup(self): """Clean up temporary files.""" + if not self._temp_files: + return + + logger.info(f"Cleaning up {len(self._temp_files)} temporary files") for temp_file in self._temp_files: try: if os.path.isfile(temp_file): diff --git a/tests/workflow/test_workflow_init.py b/tests/workflow/test_workflow_init.py new file mode 100644 index 00000000..03255073 --- /dev/null +++ b/tests/workflow/test_workflow_init.py @@ -0,0 +1,478 @@ +import sys +from pathlib import Path + +sys.path.append(str(Path(__file__).parent.parent.parent.parent)) + +from sygra.workflow import AutoNestedDict, Workflow, create_graph + + +class TestAutoNestedDict: + """Test suite for AutoNestedDict class.""" + + def test_auto_creation_of_nested_keys(self): + """Test that accessing non-existent keys creates nested AutoNestedDict.""" + d = AutoNestedDict() + + d["level1"]["level2"]["level3"] = "value" + + assert d["level1"]["level2"]["level3"] == "value" + assert isinstance(d["level1"], AutoNestedDict) + assert isinstance(d["level1"]["level2"], AutoNestedDict) + + def test_getitem_creates_autodict_on_missing_key(self): + """Test that __getitem__ creates AutoNestedDict for missing keys.""" + d = AutoNestedDict() + + nested = d["new_key"] + + assert isinstance(nested, AutoNestedDict) + assert "new_key" in d + + def test_setitem_converts_regular_dict_to_autodict(self): + """Test that __setitem__ converts regular dicts to AutoNestedDict.""" + d = AutoNestedDict() + regular_dict = {"inner": {"deep": "value"}} + + d["key"] = regular_dict + + assert isinstance(d["key"], AutoNestedDict) + assert isinstance(d["key"]["inner"], AutoNestedDict) + assert d["key"]["inner"]["deep"] == "value" + + def test_setitem_preserves_autodict(self): + """Test that setting an AutoNestedDict value preserves its type.""" + d = AutoNestedDict() + auto_dict = AutoNestedDict({"a": 1}) + + d["key"] = auto_dict + + assert isinstance(d["key"], AutoNestedDict) + assert d["key"]["a"] == 1 + + def test_setitem_with_primitive_values(self): + """Test that primitive values are stored without conversion.""" + d = AutoNestedDict() + + d["string"] = "hello" + d["int"] = 42 + d["float"] = 3.14 + d["bool"] = True + d["none"] = None + + assert d["string"] == "hello" + assert d["int"] == 42 + assert d["float"] == 3.14 + assert d["bool"] is True + assert d["none"] is None + + def test_convert_dict_simple_dict(self): + """Test convert_dict with a simple dictionary.""" + regular_dict = {"a": 1, "b": 2, "c": 3} + + result = AutoNestedDict.convert_dict(regular_dict) + + assert isinstance(result, AutoNestedDict) + assert result["a"] == 1 + assert result["b"] == 2 + assert result["c"] == 3 + + def test_convert_dict_nested_dict(self): + """Test convert_dict with nested dictionaries.""" + nested_dict = {"level1": {"level2": {"level3": "deep_value"}}} + + result = AutoNestedDict.convert_dict(nested_dict) + + assert isinstance(result, AutoNestedDict) + assert isinstance(result["level1"], AutoNestedDict) + assert isinstance(result["level1"]["level2"], AutoNestedDict) + assert result["level1"]["level2"]["level3"] == "deep_value" + + def test_convert_dict_with_lists(self): + """Test convert_dict handles lists correctly.""" + dict_with_lists = { + "simple_list": [1, 2, 3], + "dict_list": [{"a": 1}, {"b": 2}], + "mixed_list": [1, {"nested": "dict"}, "string"], + } + + result = AutoNestedDict.convert_dict(dict_with_lists) + + assert result["simple_list"] == [1, 2, 3] + assert isinstance(result["dict_list"][0], AutoNestedDict) + assert isinstance(result["dict_list"][1], AutoNestedDict) + assert result["dict_list"][0]["a"] == 1 + assert isinstance(result["mixed_list"][1], AutoNestedDict) + assert result["mixed_list"][1]["nested"] == "dict" + + def test_convert_dict_empty_dict(self): + """Test convert_dict with empty dictionary.""" + result = AutoNestedDict.convert_dict({}) + + assert isinstance(result, AutoNestedDict) + assert len(result) == 0 + + def test_to_dict_simple(self): + """Test to_dict converts AutoNestedDict to regular dict.""" + d = AutoNestedDict({"a": 1, "b": 2}) + + result = d.to_dict() + + assert isinstance(result, dict) + assert not isinstance(result, AutoNestedDict) + assert result == {"a": 1, "b": 2} + + def test_to_dict_nested(self): + """Test to_dict recursively converts nested AutoNestedDict.""" + d = AutoNestedDict() + d["level1"]["level2"]["level3"] = "value" + d["level1"]["other"] = 42 + + result = d.to_dict() + + assert isinstance(result, dict) + assert not isinstance(result, AutoNestedDict) + assert isinstance(result["level1"], dict) + assert not isinstance(result["level1"], AutoNestedDict) + assert result["level1"]["level2"]["level3"] == "value" + assert result["level1"]["other"] == 42 + + def test_to_dict_with_lists(self): + """Test to_dict handles lists with nested dicts correctly.""" + d = AutoNestedDict() + d["list"] = [AutoNestedDict({"a": 1}), {"b": 2}, "string", 42] + + result = d.to_dict() + + assert isinstance(result["list"], list) + assert isinstance(result["list"][0], dict) + assert not isinstance(result["list"][0], AutoNestedDict) + assert result["list"][0] == {"a": 1} + assert result["list"][1] == {"b": 2} + assert result["list"][2] == "string" + assert result["list"][3] == 42 + + def test_to_dict_deeply_nested_with_lists(self): + """Test to_dict with complex nested structure including lists.""" + d = AutoNestedDict() + d["data"]["items"] = [{"nested": {"deep": "value1"}}, {"nested": {"deep": "value2"}}] + + result = d.to_dict() + + assert isinstance(result, dict) + assert isinstance(result["data"]["items"], list) + assert isinstance(result["data"]["items"][0], dict) + assert result["data"]["items"][0]["nested"]["deep"] == "value1" + + def test_convert_dict_to_regular_static_method(self): + """Test _convert_dict_to_regular static method.""" + auto_dict = AutoNestedDict({"a": 1}) + result = AutoNestedDict._convert_dict_to_regular(auto_dict) + assert isinstance(result, dict) + assert not isinstance(result, AutoNestedDict) + + regular_dict = {"b": 2} + result = AutoNestedDict._convert_dict_to_regular(regular_dict) + assert result == {"b": 2} + + nested = {"outer": {"inner": "value"}} + result = AutoNestedDict._convert_dict_to_regular(nested) + assert result == nested + + list_data = [{"a": 1}, {"b": 2}] + result = AutoNestedDict._convert_dict_to_regular(list_data) + assert result == list_data + + assert AutoNestedDict._convert_dict_to_regular(42) == 42 + assert AutoNestedDict._convert_dict_to_regular("string") == "string" + + def test_mixed_operations(self): + """Test combination of auto-creation, conversion, and serialization.""" + d = AutoNestedDict() + + # Auto-create nested structure + d["config"]["database"]["host"] = "localhost" + d["config"]["database"]["port"] = 5432 + + d["settings"] = {"debug": True, "nested": {"value": 123}} + + assert isinstance(d["config"]["database"], AutoNestedDict) + assert isinstance(d["settings"], AutoNestedDict) + assert isinstance(d["settings"]["nested"], AutoNestedDict) + + result = d.to_dict() + + assert not isinstance(result, AutoNestedDict) + assert result["config"]["database"]["host"] == "localhost" + assert result["settings"]["nested"]["value"] == 123 + + def test_roundtrip_conversion(self): + """Test converting regular dict to AutoNestedDict and back.""" + original = {"level1": {"level2": {"data": [1, 2, {"nested": "value"}]}}, "simple": "value"} + + auto_dict = AutoNestedDict.convert_dict(original) + + result = auto_dict.to_dict() + + assert result == original + + def test_empty_autodict_to_dict(self): + """Test to_dict on empty AutoNestedDict.""" + d = AutoNestedDict() + result = d.to_dict() + + assert result == {} + assert isinstance(result, dict) + assert not isinstance(result, AutoNestedDict) + + +class TestWorkflowInitialization: + """Test suite for Workflow class initialization.""" + + def test_workflow_default_initialization(self): + """Test Workflow initialization with default parameters.""" + workflow = Workflow() + + assert workflow.name is not None + assert workflow.name.startswith("workflow_") + assert isinstance(workflow._config, AutoNestedDict) + assert workflow._node_counter == 0 + assert workflow._last_node is None + assert workflow._temp_files == [] + assert workflow._node_builders == {} + assert workflow._messages == [] + assert workflow._is_existing_task is False + + def test_workflow_with_custom_name(self): + """Test Workflow initialization with custom name.""" + workflow = Workflow("my_custom_workflow") + + assert workflow.name == "my_custom_workflow" + + def test_workflow_default_config_structure(self): + """Test default configuration structure.""" + workflow = Workflow() + + assert "graph_config" in workflow._config + assert "data_config" in workflow._config + assert "output_config" in workflow._config + + assert "nodes" in workflow._config["graph_config"] + assert "edges" in workflow._config["graph_config"] + assert "graph_properties" in workflow._config["graph_config"] + + assert isinstance(workflow._config, AutoNestedDict) + + assert isinstance(workflow._config["graph_config"], dict) + assert not isinstance(workflow._config["graph_config"], AutoNestedDict) + assert isinstance(workflow._config["graph_config"]["nodes"], dict) + assert isinstance(workflow._config["graph_config"]["edges"], list) + + def test_workflow_feature_flags(self): + """Test that feature support flags are initialized correctly.""" + workflow = Workflow() + + assert workflow._supports_subgraphs is True + assert workflow._supports_multimodal is True + assert workflow._supports_resumable is True + assert workflow._supports_quality is True + assert workflow._supports_oasst is True + + def test_workflow_config_is_autodict(self): + """Test that workflow config is AutoNestedDict.""" + workflow = Workflow() + + assert isinstance(workflow._config, AutoNestedDict) + + workflow._config["new"]["nested"]["key"] = "value" + assert workflow._config["new"]["nested"]["key"] == "value" + + def test_workflow_unique_names(self): + """Test that default workflow names are unique.""" + workflow1 = Workflow() + workflow2 = Workflow() + + assert workflow1.name != workflow2.name + + def test_workflow_name_generation_format(self): + """Test that generated names follow expected format.""" + workflow = Workflow() + + assert workflow.name.startswith("workflow_") + suffix = workflow.name.replace("workflow_", "") + assert len(suffix) == 8 + assert all(c in "0123456789abcdef" for c in suffix) + + +class TestCreateGraph: + """Test suite for create_graph factory function.""" + + def test_create_graph_returns_workflow(self): + """Test that create_graph returns a Workflow instance.""" + graph = create_graph("test_graph") + + assert isinstance(graph, Workflow) + + def test_create_graph_with_name(self): + """Test create_graph sets the workflow name correctly.""" + graph = create_graph("my_graph") + + assert graph.name == "my_graph" + + def test_create_graph_initializes_properly(self): + """Test that create_graph creates properly initialized Workflow.""" + graph = create_graph("test") + + assert isinstance(graph._config, AutoNestedDict) + assert graph._node_counter == 0 + assert graph._last_node is None + assert graph._is_existing_task is False + + +class TestWorkflowIntegration: + """Integration tests combining multiple features.""" + + def test_workflow_config_manipulation(self): + """Test manipulating workflow configuration through AutoNestedDict.""" + workflow = Workflow("integration_test") + + workflow._config["graph_config"]["nodes"]["node1"] = {"type": "llm", "model": "gpt-4"} + + workflow._config["graph_config"]["edges"].append({"from": "START", "to": "node1"}) + + assert isinstance(workflow._config["graph_config"]["nodes"]["node1"], dict) + assert not isinstance(workflow._config["graph_config"]["nodes"]["node1"], AutoNestedDict) + assert workflow._config["graph_config"]["nodes"]["node1"]["type"] == "llm" + assert len(workflow._config["graph_config"]["edges"]) == 1 + + workflow._config["new_section"] = {"nested": {"deep": "value"}} + assert isinstance(workflow._config["new_section"], AutoNestedDict) + assert isinstance(workflow._config["new_section"]["nested"], AutoNestedDict) + + def test_workflow_nodes_dict_behavior(self): + """Test the nuanced behavior of nodes dict vs AutoNestedDict.""" + workflow = Workflow("test") + + # nodes starts as a regular dict (set during __init__) + nodes_dict = workflow._config["graph_config"]["nodes"] + assert isinstance(nodes_dict, dict) + assert not isinstance(nodes_dict, AutoNestedDict) + + workflow._config["graph_config"]["nodes"]["node1"] = {"type": "test"} + + assert isinstance(workflow._config["graph_config"]["nodes"]["node1"], dict) + assert not isinstance(workflow._config["graph_config"]["nodes"]["node1"], AutoNestedDict) + + workflow._config["custom_nodes"]["node2"]["nested"]["deep"]["value"] = 42 + assert isinstance(workflow._config["custom_nodes"], AutoNestedDict) + assert isinstance(workflow._config["custom_nodes"]["node2"], AutoNestedDict) + assert workflow._config["custom_nodes"]["node2"]["nested"]["deep"]["value"] == 42 + + def test_workflow_config_to_dict_conversion(self): + """Test converting workflow config to regular dict.""" + workflow = Workflow("test") + + # Add some configuration + workflow._config["graph_config"]["nodes"]["test_node"] = { + "type": "processor", + "settings": {"param": "value"}, + } + + # Convert to dict + config_dict = workflow._config.to_dict() + + assert isinstance(config_dict, dict) + assert not isinstance(config_dict, AutoNestedDict) + assert config_dict["graph_config"]["nodes"]["test_node"]["type"] == "processor" + + def test_workflow_with_complex_nested_config(self): + """Test workflow with deeply nested configuration.""" + workflow = Workflow("complex") + + workflow._config["custom_data"]["source"]["type"] = "disk" + workflow._config["custom_data"]["source"]["params"]["path"] = "/data/input.json" + + workflow._config["graph_config"]["nodes"]["node1"] = { + "model": {"config": {"temperature": 0.7, "max_tokens": 100}} + } + + assert isinstance(workflow._config["custom_data"], AutoNestedDict) + assert isinstance(workflow._config["custom_data"]["source"]["params"], AutoNestedDict) + + assert isinstance(workflow._config["graph_config"], dict) + assert isinstance(workflow._config["data_config"], dict) + assert isinstance(workflow._config["graph_config"]["nodes"], dict) + assert isinstance(workflow._config["graph_config"]["nodes"]["node1"], dict) + + assert ( + workflow._config["graph_config"]["nodes"]["node1"]["model"]["config"]["temperature"] + == 0.7 + ) + assert workflow._config["custom_data"]["source"]["params"]["path"] == "/data/input.json" + + +class TestEdgeCases: + """Test edge cases and boundary conditions.""" + + def test_autodict_with_none_values(self): + """Test AutoNestedDict handles None values correctly.""" + d = AutoNestedDict() + d["key"] = None + + assert d["key"] is None + assert d.to_dict() == {"key": None} + + def test_autodict_with_empty_list(self): + """Test AutoNestedDict handles empty lists.""" + d = AutoNestedDict({"empty": []}) + + assert d["empty"] == [] + assert d.to_dict() == {"empty": []} + + def test_autodict_with_empty_nested_dict(self): + """Test AutoNestedDict with empty nested dictionaries.""" + d = AutoNestedDict({"outer": {"inner": {}}}) + + result = d.to_dict() + assert result == {"outer": {"inner": {}}} + + def test_workflow_with_empty_name(self): + """Test Workflow with empty string name generates default name.""" + workflow = Workflow("") + + assert workflow.name.startswith("workflow_") + + def test_autodict_overwrite_existing_key(self): + """Test overwriting existing keys in AutoNestedDict.""" + d = AutoNestedDict({"key": "old_value"}) + d["key"] = "new_value" + + assert d["key"] == "new_value" + + def test_autodict_list_with_autodicts(self): + """Test list containing AutoNestedDict instances.""" + d1 = AutoNestedDict({"a": 1}) + d2 = AutoNestedDict({"b": 2}) + + container = AutoNestedDict() + container["list"] = [d1, d2] + + result = container.to_dict() + assert result["list"] == [{"a": 1}, {"b": 2}] + assert not isinstance(result["list"][0], AutoNestedDict) + + def test_autodict_init_vs_setitem_conversion(self): + """Test the difference between __init__ and __setitem__ for dict conversion.""" + d1 = AutoNestedDict({"level1": {"level2": "value"}}) + assert isinstance(d1, AutoNestedDict) + assert isinstance(d1["level1"], dict) + assert not isinstance(d1["level1"], AutoNestedDict) + + d2 = AutoNestedDict() + d2["level1"] = {"level2": "value"} + assert isinstance(d2, AutoNestedDict) + assert isinstance(d2["level1"], AutoNestedDict) + assert isinstance(d2["level1"]["level2"], str) + + d3 = AutoNestedDict.convert_dict({"level1": {"level2": "value"}}) + assert isinstance(d3, AutoNestedDict) + assert isinstance(d3["level1"], AutoNestedDict)