Ask Backfill Pattern: Should I use subflows (.submit()) or run_deployment() for daily Spark jobs? #19500
-
|
Hi Prefect friends & community! I'm building a backfill workflow for Spark jobs and would love some guidance on the best architectural pattern. My setup I have a @flow called daily_spark_flow(day: date) that submits a Spark job for a single day. Two approaches I'm considering
@flow
def backfill_flow(start, end):
for day in date_range(start, end):
daily_spark_task.submit(day)Runs as nested tasks under one flow run.
@flow
def trigger_backfill(start, end):
for day in date_range(start, end):
run_deployment(
"daily-spark-flow/prod",
parameters={"day": day.isoformat()},
)Each day becomes an independent level flow run. My questions For a production backfill scenario (e.g., reprocessing last 90 days), which pattern is more aligned with Prefect best practices?
In the future, will Flow has .submit() as subflow? |
Beta Was this translation helpful? Give feedback.
Replies: 2 comments 1 reply
-
but you don't need this should cover your ask about |
Beta Was this translation helpful? Give feedback.
-
|
Hi @zzstoatzz , thanks so much for the thoughtful reply and the pointer to the docs — really appreciate it! I’m actually already using a pattern very close to what you suggested: I have a parent flow that concurrently submits multiple instances of a child @flow (not wrapped as a task) using .submit() inside an asyncio.Semaphore-controlled loop, like this: @flow(log_prints=True)
async def my_subflow(name: str = "world"):
ctx = get_run_context()
print(f"Subflow {name} parent_task_run_id: {ctx.flow_run.parent_task_run_id}")
await asyncio.sleep(5)
print(f"I'm an awake subflow!")
if name == "test":
raise MyCustomError("this is a test error")
print(f"Hello {name}! I'm a subflow (not a task, not a deployment)!")
@flow(log_prints=True)
async def my_parent_flow(n: int = BIGN):
semaphore = asyncio.Semaphore(2)
async def _run_with_limit(name: str):
async with semaphore:
return await my_subflow(name=name)
tasks = [
_run_with_limit(str(i)) for i in range(n)
]
tasks.append(_run_with_limit("test"))
tasks.append(_run_with_limit("final"))
results = await asyncio.gather(*tasks, return_exceptions=True)This gives me true subflows — each shows up as an independent Flow Run in the UI with proper parent-child hierarchy (via parent_task_run_id), just like run_deployment, but without needing a separate deployment. I’ve also set concurrency limits on the work pool for resource control. However, I’ve run into a subtle but important issue around failure propagation: when a subflow fails (e.g., raises a ValueError), the parent flow does not show fail, even though the subflow is logically part of its execution graph. This makes backfill monitoring tricky — the parent appears "Completed" while some children are "Failed", and you have to manually inspect each run.
I’ve documented this behavior and proposed an enhancement (raise_on_failure=True for run_deployment-like semantics) in this issue: |
Beta Was this translation helpful? Give feedback.

but you don't need
run_deploymentor to configure another deployment up front if you don't want to, check out: https://docs.prefect.io/v3/advanced/submit-flows-directl…