-
Notifications
You must be signed in to change notification settings - Fork 50
Open
Description
I want to implement a fanout/gather pattern similar to Inngest's fan-out jobs(https://www.inngest.com/docs/guides/fan-out-jobs) where I can:
- Trigger multiple async operations in parallel from a single workflow
- Collect all results when they complete
something like this the 2 examples beow
Example 1: Same operation for N items
@DBOS.workflow()
async def process_batch_workflow(items):
# Fanout - process each item in parallel
handles = []
for item in items:
handle = process_item.start_async(item)
handles.append(handle)
# Gather - collect all results
results = await asyncio.gather(*[
handle.get_result_async() for handle in handles
])
return results
@DBOS.step()
async def process_item(item):
# Process single item asynchronously
async with aiohttp.ClientSession() as session:
response = await session.post("https://api.example.com/process", json=item)
return await response.json()however each step can only run once right for example 1?
Example 2
@DBOS.workflow()
async def user_signup_workflow(user_data):
# Fanout - trigger multiple async operations
email_handle = send_welcome_email.start_async(user_data)
stripe_handle = create_stripe_trial.start_async(user_data)
crm_handle = add_to_crm.start_async(user_data)
# Gather - wait for all to complete
results = await asyncio.gather(
email_handle.get_result_async(),
stripe_handle.get_result_async(),
crm_handle.get_result_async()
)
// do something with results in a step
return ....
@DBOS.step()
async def send_welcome_email(user_data):
# async email logic
pass
@DBOS.step()
async def create_stripe_trial(user_data):
# async stripe logic
pass
@DBOS.step()
async def add_to_crm(user_data):
# async CRM logic
passIs there a cleaner async pattern for this?
Metadata
Metadata
Assignees
Labels
No labels