-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Description
Bug Description
Prefect incorrectly creates task dependencies when Python reuses memory addresses (object id()) for new objects after previous objects are garbage collected.
Reproduction
When using submit_with_backpressure pattern (submitting new tasks as previous ones complete), newly submitted tasks incorrectly show dependencies on completed tasks in the Prefect UI, even though there's no actual data dependency between them.
Code Pattern
from prefect import flow, task
from prefect.futures import wait
import queue
@task
def my_task(payload: dict) -> dict:
# do work
return {"status": "success", "data": payload}
@flow
def my_flow(items: list[dict]):
# Submit with backpressure pattern
max_inflight = 10
done_queue = queue.Queue()
futures = []
active = 0
def submit(item):
nonlocal active
future = my_task.submit(item)
futures.append(future)
active += 1
future.add_done_callback(lambda f: done_queue.put(f))
pending = iter(items)
# Submit initial batch
for _ in range(max_inflight):
try:
submit(next(pending))
except StopIteration:
break
# Submit remaining as slots free up
for item in pending:
while active >= max_inflight:
done_queue.get()
active -= 1
submit(item)
wait(futures)Database Evidence
Querying the Prefect database shows the false dependencies:
SELECT name, json_extract(task_inputs, '$.payload') as payload_dep
FROM task_run
WHERE flow_run_id = '6f7c9553-5ec5-4bb0-90d5-25dd31b8e6c7'
ORDER BY created;Results:
- First batch (19 tasks):
payload_dep = [](no dependency) ✓ - Subsequent tasks:
payload_dep = [{"input_type":"task_run","id":"..."}](false dependency) ✗
The dependent task IDs point to completed tasks from the first batch, even though there's no actual data flow between them.
Root Cause Analysis
The issue is in prefect/tasks.py around line 254:
# parent dependency tracking
if flow_run_context:
for v in parameters.values():
# ...
res = flow_run_context.run_results.get(id(v)) # Uses memory address
if res:
upstream_state, _ = resAnd in prefect/utilities/engine.py line 584:
flow_run_context.run_results[id(obj)] = (linked_state, run_type)The problem: Prefect uses id(obj) (Python memory address) to track relationships between task results and states. When:
- Task A completes and returns
result_a, Prefect recordsrun_results[id(result_a)] = state_a result_agets garbage collected, freeing memory address0x1234- A new
payloaddict is created for Task B, Python allocates it at the same address0x1234 - When Task B is submitted, Prefect checks
run_results.get(id(payload))and incorrectly findsstate_a - Task B is marked as depending on Task A
This is a fundamental issue with using id() for tracking, as Python explicitly states that object IDs can be reused after garbage collection.
Expected Behavior
Tasks submitted independently should not have dependency relationships, regardless of memory address reuse.
Actual Behavior
Later tasks incorrectly show as depending on earlier completed tasks in:
- Prefect UI (visible connection lines in the task graph)
- Database
task_inputsfield
Impact
- UI Confusion: Task graph shows misleading dependency lines
- Potential scheduling issues: If Prefect uses these dependencies for scheduling decisions
Environment
- Prefect Version: 3.5.0
- Python Version: 3.11
Possible Solutions
- Use a different tracking mechanism that doesn't rely on
id()(e.g., explicit task run IDs) - Clear entries from
run_resultsmore aggressively after tasks complete - Add a flag to disable automatic dependency inference
- Use weak references to detect when objects have been garbage collected