Skip to content

store_inputs limitations in dynamic flows #842

@gpetretto

Description

@gpetretto

I want to open a discussion about the potential limitations of the store_inputs Job that is added at the end of replace Flows. I think there are two main problems at the moment. I have a partial solution for a subclass of the cases, but I though it would still be worth to open a discussion to keep track of the remaining issues (if the PR is acceptable) or to find alternative solutions.

Problem 1: Multiple replace

If the replace of a dynamic Flow is a single Job, the replacing Job inherits the uuid of the original Job with index increased by 1. This works nicely even in cases where the replace happens multiple times.
Instead, if the Job is replaced by a Flow, the store_inputs is added if the Flow has an output. But if many replacements take place the Flow ends up with a concantenation of many store_inputs. For example, consider the code below:

from jobflow import Flow, job, run_locally, Response

@job
def add(a, b):
    return a + b

@job
def check_add(a):
    if a < 10:
        j1 = add(a, 2)
        j2 = check_add(j1.output)
        return Response(replace=Flow([j1,j2], output=j2.output))
    return a

job1 = add(1, 2)
job2 = check_add(job1.output)
job3 = add(job2.output, 2)
flow = Flow([job1, job2, job3])

responses = run_locally(flow)
This is the output:
2026-01-21 12:29:39,952 INFO Started executing jobs locally
2026-01-21 12:29:40,133 INFO Starting job - add (56871079-08e2-4a25-9278-597d6c18ffa8)
2026-01-21 12:29:40,140 INFO Finished job - add (56871079-08e2-4a25-9278-597d6c18ffa8)
2026-01-21 12:29:40,140 INFO Starting job - check_add (4336c567-df22-4025-9df2-1582fc552b83)
2026-01-21 12:29:40,144 INFO Finished job - check_add (4336c567-df22-4025-9df2-1582fc552b83)
2026-01-21 12:29:40,145 INFO Starting job - add (afbda760-d914-49a5-b2ac-0807d56b20e9)
2026-01-21 12:29:40,146 INFO Finished job - add (afbda760-d914-49a5-b2ac-0807d56b20e9)
2026-01-21 12:29:40,146 INFO Starting job - check_add (7d0d41e9-3bf9-4918-9fde-5f57c4921e99)
2026-01-21 12:29:40,149 INFO Finished job - check_add (7d0d41e9-3bf9-4918-9fde-5f57c4921e99)
2026-01-21 12:29:40,149 INFO Starting job - add (388a680a-d913-4b9b-9ae9-2da6a9bf7299)
2026-01-21 12:29:40,151 INFO Finished job - add (388a680a-d913-4b9b-9ae9-2da6a9bf7299)
2026-01-21 12:29:40,151 INFO Starting job - check_add (9add2560-c84c-46ca-a325-8cac6d78dc5e)
2026-01-21 12:29:40,156 INFO Finished job - check_add (9add2560-c84c-46ca-a325-8cac6d78dc5e)
2026-01-21 12:29:40,156 INFO Starting job - add (61bcf30a-29bf-4498-a357-e1fb23d91d80)
2026-01-21 12:29:40,157 INFO Finished job - add (61bcf30a-29bf-4498-a357-e1fb23d91d80)
2026-01-21 12:29:40,157 INFO Starting job - check_add (5c9e2808-1f0a-4932-96b7-a9fc2508f2ce)
2026-01-21 12:29:40,172 INFO Finished job - check_add (5c9e2808-1f0a-4932-96b7-a9fc2508f2ce)
2026-01-21 12:29:40,172 INFO Starting job - add (81284a6a-68e6-41fe-bb20-384d34ed4ad0)
2026-01-21 12:29:40,174 INFO Finished job - add (81284a6a-68e6-41fe-bb20-384d34ed4ad0)
2026-01-21 12:29:40,174 INFO Starting job - check_add (5890f5bf-5f10-40c6-8f0e-45d34f5348bb)
2026-01-21 12:29:40,177 INFO Finished job - check_add (5890f5bf-5f10-40c6-8f0e-45d34f5348bb)
2026-01-21 12:29:40,177 INFO Starting job - store_inputs (5c9e2808-1f0a-4932-96b7-a9fc2508f2ce, 2)
2026-01-21 12:29:40,178 INFO Finished job - store_inputs (5c9e2808-1f0a-4932-96b7-a9fc2508f2ce, 2)
2026-01-21 12:29:40,178 INFO Starting job - store_inputs (9add2560-c84c-46ca-a325-8cac6d78dc5e, 2)
2026-01-21 12:29:40,180 INFO Finished job - store_inputs (9add2560-c84c-46ca-a325-8cac6d78dc5e, 2)
2026-01-21 12:29:40,180 INFO Starting job - store_inputs (7d0d41e9-3bf9-4918-9fde-5f57c4921e99, 2)
2026-01-21 12:29:40,182 INFO Finished job - store_inputs (7d0d41e9-3bf9-4918-9fde-5f57c4921e99, 2)
2026-01-21 12:29:40,182 INFO Starting job - store_inputs (4336c567-df22-4025-9df2-1582fc552b83, 2)
2026-01-21 12:29:40,183 INFO Finished job - store_inputs (4336c567-df22-4025-9df2-1582fc552b83, 2)
2026-01-21 12:29:40,183 INFO Starting job - add (8e9a3b7c-d2f8-44b9-87bd-3a28b876949a)
2026-01-21 12:29:40,193 INFO Finished job - add (8e9a3b7c-d2f8-44b9-87bd-3a28b876949a)
2026-01-21 12:29:40,193 INFO Finished executing jobs locally
And this is the Flow structure (from jobflow remote):
flowchart TD
    classDef WAITING fill:#aaaaaa
    classDef READY fill:#DAF7A6
    classDef CHECKED_OUT fill:#5E6BFF
    classDef UPLOADED fill:#5E6BFF
    classDef SUBMITTED fill:#5E6BFF
    classDef RUNNING fill:#5E6BFF
    classDef RUN_FINISHED fill:#5E6BFF
    classDef DOWNLOADED fill:#5E6BFF
    classDef REMOTE_ERROR fill:#fC3737
    classDef COMPLETED fill:#47bf00
    classDef FAILED fill:#fC3737
    classDef PAUSED fill:#EAE200
    classDef STOPPED fill:#fC3737
    classDef USER_STOPPED fill:#fC3737
    classDef BATCH_SUBMITTED fill:#5E6BFF
    classDef BATCH_RUNNING fill:#5E6BFF
    849(add) --> 850(check_add)
    846(add) --> 849(add)
    846(add) --> 847(check_add)
    850(check_add) --> 851(store_inputs)
    840(add) --> 841(check_add)
    844(check_add) --> 845(store_inputs)
    848(store_inputs) --> 845(store_inputs)
    832(add) --> 840(add)
    840(add) --> 843(add)
    843(add) --> 846(add)
    832(add) --> 833(check_add)
    841(check_add) --> 842(store_inputs)
    845(store_inputs) --> 842(store_inputs)
    843(add) --> 844(check_add)
    847(check_add) --> 848(store_inputs)
    851(store_inputs) --> 848(store_inputs)
    833(check_add) --> 834(add)
    842(store_inputs) --> 834(add)
    847(check_add) -.-> aba8cfa6-a10b-47e0-8ce8-6d0da72a891f
    841(check_add) -.-> ebdb4040-8654-4a43-ad53-69f96189d274
    833(check_add) -.-> b34eb9b5-dfe2-4175-81eb-acc65892d048
    844(check_add) -.-> eafacaa5-e18e-41da-82f8-7a344ed146d6
    832:::COMPLETED
    833:::COMPLETED
    834:::COMPLETED
    subgraph b34eb9b5-dfe2-4175-81eb-acc65892d048[ ]
        840:::COMPLETED
        841:::COMPLETED
        842:::COMPLETED
        subgraph ebdb4040-8654-4a43-ad53-69f96189d274[ ]
            843:::COMPLETED
            844:::COMPLETED
            845:::COMPLETED
            subgraph eafacaa5-e18e-41da-82f8-7a344ed146d6[ ]
                846:::COMPLETED
                847:::COMPLETED
                848:::COMPLETED
                subgraph aba8cfa6-a10b-47e0-8ce8-6d0da72a891f[ ]
                    849:::COMPLETED
                    850:::COMPLETED
                    851:::COMPLETED
                end
            end
        end
    end
    style b34eb9b5-dfe2-4175-81eb-acc65892d048 fill:#2B65EC,opacity:0.2
    style ebdb4040-8654-4a43-ad53-69f96189d274 fill:#2B65EC,opacity:0.2
    style eafacaa5-e18e-41da-82f8-7a344ed146d6 fill:#2B65EC,opacity:0.2
    style aba8cfa6-a10b-47e0-8ce8-6d0da72a891f fill:#2B65EC,opacity:0.2
Loading

If this happens, there are a series of negative consequences:

  1. the nice feature of the single job replacement of the last job having the highest index is lost
  2. a bunch of pointless Jobs are executed and their outputs stored (this may be more annoying when using jobflow-remote/fireworks, where every job needs to be submitted)
  3. When the last add Job (job3) runs, to resolve the references jobflow needs to recursively get the outputs of all the store_inputs before getting to the actual value that needs to be fetched. This results in many pointless (although small) queries to the JobStore.

Problem 2: Failed jobs before store_inputs

Consider a case when a simple Flow with two steps replaces a single Job. No recursive replacement in this case. If the last Job of the replaced Flow fails, the store_inputs job is executed normally by all managers, since it has on_missing_references=OnMissing.NONE. However, if there is an additional subsequent job that has on_missing_references=OnMissing.ERROR and depends on the output of the Flow, this has no way of knowing that the real Reference is missing.
Here is an example:

from jobflow import Flow, job, run_locally, Response

@job
def add(a, b):
    return a + b

@job
def fail(x):
    raise RuntimeError("Expected failure!")

@job
def generate():
    jg1 = add(1, 2)
    jg2 = fail(jg1.output)
    f = Flow([jg1, jg2], output=jg2.output)
    return Response(replace=f)


j1 = generate()
j2 = add(j1.output, 2)
flow = Flow([j1, j2])

run_locally(flow)
This is the output:
2026-01-21 16:10:15,994 INFO Started executing jobs locally
2026-01-21 16:10:16,253 INFO Starting job - generate (27068f05-5a17-49a5-bdd5-2cca8867e2da)
2026-01-21 16:10:16,309 INFO Finished job - generate (27068f05-5a17-49a5-bdd5-2cca8867e2da)
2026-01-21 16:10:16,310 INFO Starting job - add (246af488-3376-421f-a414-e685eca3f5f6)
2026-01-21 16:10:16,311 INFO Finished job - add (246af488-3376-421f-a414-e685eca3f5f6)
2026-01-21 16:10:16,311 INFO Starting job - fail (11d0d634-6569-4678-a30c-1e7dc18a84b7)
2026-01-21 16:10:16,316 INFO fail failed with exception:
Traceback (most recent call last):
  File "./jobflow/src/jobflow/managers/local.py", line 117, in _run_job
    response = job.run(store=store)
               ^^^^^^^^^^^^^^^^^^^^
  File ".../jobflow/src/jobflow/core/job.py", line 604, in run
    response = function(*self.function_args, **self.function_kwargs)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File ".../jobflow/store_inputs_failure/store_inputs_failure.py", line 13, in fail
    raise RuntimeError("Expected failure!")
RuntimeError: Expected failure!

2026-01-21 16:10:16,317 INFO Starting job - store_inputs (27068f05-5a17-49a5-bdd5-2cca8867e2da, 2)
2026-01-21 16:10:16,317 INFO Finished job - store_inputs (27068f05-5a17-49a5-bdd5-2cca8867e2da, 2)
2026-01-21 16:10:16,317 INFO Starting job - add (db902382-9455-4151-bbad-73f7380b7034)
2026-01-21 16:10:16,320 INFO add failed with exception:
Traceback (most recent call last):
  File ".../jobflow/src/jobflow/managers/local.py", line 117, in _run_job
    response = job.run(store=store)
               ^^^^^^^^^^^^^^^^^^^^
  File ".../jobflow/src/jobflow/core/job.py", line 593, in run
    self.resolve_args(store=store)
  File ".../jobflow/src/jobflow/core/job.py", line 703, in resolve_args
    resolved_args = find_and_resolve_references(
                    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File ".../jobflow/src/jobflow/core/reference.py", line 473, in find_and_resolve_references
    resolved_references = resolve_references(
                          ^^^^^^^^^^^^^^^^^^^
  File ".../jobflow/src/jobflow/core/reference.py", line 356, in resolve_references
    cache[uuid][index] = store.get_output(
                         ^^^^^^^^^^^^^^^^^
  File ".../jobflow/src/jobflow/core/store.py", line 523, in get_output
    return find_and_resolve_references(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File ".../jobflow/src/jobflow/core/reference.py", line 452, in find_and_resolve_references
    return arg.resolve(
           ^^^^^^^^^^^^
  File ".../jobflow/src/jobflow/core/reference.py", line 166, in resolve
    raise ValueError(
ValueError: Could not resolve reference - 11d0d634-6569-4678-a30c-1e7dc18a84b7 not in store or index=None, cache={'11d0d634-6569-4678-a30c-1e7dc18a84b7': {}}

2026-01-21 16:10:16,320 INFO Finished executing jobs locally
And this is the final Flow structure (again from jobflow remote):
flowchart TD
    classDef WAITING fill:#aaaaaa
    classDef READY fill:#DAF7A6
    classDef CHECKED_OUT fill:#5E6BFF
    classDef UPLOADED fill:#5E6BFF
    classDef SUBMITTED fill:#5E6BFF
    classDef RUNNING fill:#5E6BFF
    classDef RUN_FINISHED fill:#5E6BFF
    classDef DOWNLOADED fill:#5E6BFF
    classDef REMOTE_ERROR fill:#fC3737
    classDef COMPLETED fill:#47bf00
    classDef FAILED fill:#fC3737
    classDef PAUSED fill:#EAE200
    classDef STOPPED fill:#fC3737
    classDef USER_STOPPED fill:#fC3737
    classDef BATCH_SUBMITTED fill:#5E6BFF
    classDef BATCH_RUNNING fill:#5E6BFF
    837(add) --> 838(fail)
    835(generate) --> 836(add)
    839(store_inputs) --> 836(add)
    838(fail) --> 839(store_inputs)
    835(generate) -.-> 0eca4ab8-4193-481e-9616-925436e8aacc
    subgraph 0eca4ab8-4193-481e-9616-925436e8aacc[ ]
        837:::COMPLETED
        838:::FAILED
        839:::COMPLETED
    end
    835:::COMPLETED
    836:::REMOTE_ERROR
    style 0eca4ab8-4193-481e-9616-925436e8aacc fill:#2B65EC,opacity:0.2
Loading

The problem is that the last Job fails with an unexpected and unclear error, while it should have not been executed in the first place. Note that this is true both for run_locally and joblfow-remote (I did not test, but expect it will be the same for fireworks).

I can probably find some hackish workaround in jobflow-remote to prevent the final Job to become READY, but I think it would be preferable if this can be solved uniformly for all the managers directly in jobflow. Although I am not sure if this would be possible

Conclusion

I will propose a potential solution for a subset of the cases where store_inputs is added, but that does not address the problem in general. A complete solution may require rethinking more deeply the process of the store_inputs.
Discussing with @davidwaroquiers we considered the option of having a Flow output stored in the DB and this could probably partially solve the problem of the store_inputs. However, I believe it is may not be trivial to implement in a way that is different from what the store_inputs does and also avoiding storing redundant data.
In general I would like to hear about the experience of the other users and check if anyone has some ideas on how to improve this.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions