Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
207 changes: 207 additions & 0 deletions examples/prefect_example.py
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is better suited for documentation rather than as a module within the codebase.

Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
"""
Example demonstrating how to use jobflow with Prefect.

This example shows how to:
1. Create jobflow Jobs and Flows
2. Convert them to Prefect workflows
3. Run them locally using Prefect
4. Create deployments for a Prefect cluster
"""

import jobflow
from jobflow import job, Job, Flow
from jobflow.managers.prefect import PrefectManager, run_on_prefect, flow_to_prefect_flow


# Define some simple jobs
@job
def add(a: int, b: int) -> int:
"""Add two numbers."""
print(f"Adding {a} + {b}")
return a + b


@job
def multiply(a: int, b: int) -> int:
"""Multiply two numbers."""
print(f"Multiplying {a} * {b}")
return a * b


@job
def subtract(a: int, b: int) -> int:
"""Subtract two numbers."""
print(f"Subtracting {a} - {b}")
return a - b


def example_single_job():
"""Example of running a single job with Prefect."""
print("\n=== Single Job Example ===")

# Create a simple job
job1 = add(5, 3)

try:
# Run using Prefect
result = run_on_prefect(job1, flow_name="single_add_job")
print(f"Result: {result}")
except ImportError:
print("Prefect is not installed. Please install with: pip install prefect")
except Exception as e:
print(f"Error running job: {e}")


def example_simple_flow():
"""Example of running a simple flow with Prefect."""
print("\n=== Simple Flow Example ===")

# Create jobs
job1 = add(5, 3)
job2 = multiply(job1.output, 2)
job3 = subtract(job2.output, 1)

# Create flow
flow = Flow([job1, job2, job3], output=job3.output, name="arithmetic_flow")

try:
# Run using Prefect
result = run_on_prefect(flow, flow_name="arithmetic_workflow")
print(f"Flow result: {result}")
except ImportError:
print("Prefect is not installed. Please install with: pip install prefect")
except Exception as e:
print(f"Error running flow: {e}")


def example_parallel_flow():
"""Example of running a parallel flow with Prefect."""
print("\n=== Parallel Flow Example ===")

# Create independent jobs that can run in parallel
job1 = add(1, 2)
job2 = add(3, 4)
job3 = add(5, 6)

# These jobs depend on the parallel jobs
job4 = multiply(job1.output, job2.output)
job5 = multiply(job3.output, 2)

# Final job combines everything
job6 = add(job4.output, job5.output)

# Create flow
flow = Flow(
[job1, job2, job3, job4, job5, job6],
output=job6.output,
name="parallel_arithmetic_flow"
)

try:
# Run using Prefect with concurrent task runner
result = run_on_prefect(
flow,
flow_name="parallel_workflow",
task_runner="concurrent"
)
print(f"Parallel flow result: {result}")
except ImportError:
print("Prefect is not installed. Please install with: pip install prefect")
except Exception as e:
print(f"Error running parallel flow: {e}")


def example_prefect_manager():
"""Example using the PrefectManager class."""
print("\n=== PrefectManager Example ===")

try:
# Create manager
manager = PrefectManager(task_runner="sequential")

# Create a flow
job1 = add(10, 5)
job2 = multiply(job1.output, 3)
flow = Flow([job1, job2], output=job2.output, name="manager_test_flow")

# Submit flow (this would be async in real usage)
print("Creating Prefect flow...")
prefect_flow = flow_to_prefect_flow(flow, flow_name="manager_workflow")
print(f"Created Prefect flow: {prefect_flow}")

# You could also create a deployment like this:
# deployment = manager.create_deployment(
# flow,
# deployment_name="my_deployment",
# work_pool_name="my_work_pool"
# )
# print(f"Created deployment: {deployment}")

except ImportError:
print("Prefect is not installed. Please install with: pip install prefect")
except Exception as e:
print(f"Error with PrefectManager: {e}")


def example_error_handling():
"""Example showing error handling in Prefect workflows."""
print("\n=== Error Handling Example ===")

@job
def divide(a: int, b: int) -> float:
"""Divide two numbers."""
if b == 0:
raise ValueError("Cannot divide by zero!")
return a / b

# First, show a successful division
working_job = divide(10, 2)
try:
result = run_on_prefect(working_job, flow_name="working_division")
print(f"Successful division result: {list(result.values())[0]}")
except Exception as e:
print(f"Error in working job: {e}")
return

# Now show error handling (this will generate Prefect error logs, but that's expected)
print("Testing error handling (expect to see error logs below)...")
failing_job = divide(10, 0)

try:
result = run_on_prefect(failing_job, flow_name="failing_job")
print(f"❌ Unexpected success: {result}")
except ImportError:
print("Prefect is not installed. Please install with: pip install prefect")
except Exception as e:
print(f"✅ Expected error caught and handled: {type(e).__name__}: {e}")
print(" (The error traceback above is expected - Prefect logs all failures)")


if __name__ == "__main__":
print("Jobflow + Prefect Integration Examples")
print("=====================================")
print("NOTE: Error tracebacks in logs are expected for the error handling example.")
print(" The examples demonstrate both successful execution and error handling.")
print()

# Check if Prefect is available
try:
import prefect
print(f"Prefect version: {prefect.__version__}")
except ImportError:
print("Prefect is not installed. Some examples will not work.")
print("Install with: pip install prefect")

# Run examples
example_single_job()
example_simple_flow()
example_parallel_flow()
example_prefect_manager()
example_error_handling()

print("\n=== Examples Complete ===")
print("To deploy to a Prefect cluster:")
print("1. Start Prefect server: prefect server start")
print("2. Create work pool: prefect work-pool create --type process my_pool")
print("3. Create deployment using PrefectManager.create_deployment()")
print("4. Run worker: prefect worker start --pool my_pool")
5 changes: 4 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ classifiers = [
"Topic :: Other/Nonlisted Topic",
"Topic :: Scientific/Engineering",
]
requires-python = ">=3.10"
requires-python = ">=3.11"
dependencies = [
"PyYAML",
"maggma>=0.57.0",
Expand All @@ -34,6 +34,8 @@ dependencies = [
"pydantic-settings>=2.0.3",
"pydantic>=2.0.1",
"pydash",
"prefect",
"pytest",
]

[project.optional-dependencies]
Expand All @@ -51,6 +53,7 @@ dev = ["pre-commit>=2.12.1", "typing_extensions; python_version < '3.11'"]
tests = ["moto==5.1.9", "pytest-cov==6.2.1", "pytest==8.4.1"]
vis = ["matplotlib", "pydot"]
fireworks = ["FireWorks"]
prefect = ["prefect>=3.0.0"]
strict = [
"FireWorks==2.0.4",
"PyYAML==6.0.2",
Expand Down
6 changes: 6 additions & 0 deletions src/jobflow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@
from jobflow.core.state import CURRENT_JOB
from jobflow.core.store import JobStore
from jobflow.managers.local import run_locally

try:
from jobflow.managers.prefect import PrefectManager, run_on_prefect
except ImportError:
PrefectManager = None
run_on_prefect = None
from jobflow.settings import JobflowSettings
from jobflow.utils.log import initialize_logger

Expand Down
Loading
Loading