Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 32 additions & 11 deletions src/jobflow/core/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -1359,7 +1359,7 @@ def prepare_replace(
"""
Prepare a replacement :obj:`Flow` or :obj:`Job`.

If the replacement is a ``Flow``, then an additional ``Job`` will be inserted
If the replacement is a ``Flow``, then an additional ``Job`` may be inserted
that maps the output id of the original job to outputs of the ``Flow``.

If the replacement is a ``Flow`` or a ``Job``, then this function pass on
Expand All @@ -1383,16 +1383,37 @@ def prepare_replace(
replace = Flow(jobs=replace)

if isinstance(replace, Flow) and replace.output is not None:
# add a job with same UUID as the current job to store the outputs of the
# flow; this job will inherit the metadata and output schema of the current
# job
store_output_job = store_inputs(replace.output)
store_output_job.set_uuid(current_job.uuid)
store_output_job.index = current_job.index + 1
store_output_job.metadata = current_job.metadata
store_output_job.output_schema = current_job.output_schema
store_output_job._kwargs = current_job._kwargs
replace.add_jobs(store_output_job)
leaf_nodes = [n for n, d in replace.graph.out_degree() if d == 0]
is_last_output_leaf = len(
leaf_nodes
) == 1 and replace.output == OutputReference(leaf_nodes[0])
if is_last_output_leaf:
# the last job of the replace inherits UUID and metadata from
# the original job
for j in replace.jobs:
if j.uuid == leaf_nodes[0]:
leaf_job = j
break
leaf_job.set_uuid(current_job.uuid)
leaf_job.index = current_job.index + 1

metadata = leaf_job.metadata
metadata.update(current_job.metadata)
leaf_job.metadata = metadata

if not leaf_job.output_schema:
leaf_job.output_schema = current_job.output_schema
else:
# add a job with same UUID as the current job to store the outputs of the
# flow; this job will inherit the metadata and output schema of the current
# job
store_output_job = store_inputs(replace.output)
store_output_job.set_uuid(current_job.uuid)
store_output_job.index = current_job.index + 1
store_output_job.metadata = current_job.metadata
store_output_job.output_schema = current_job.output_schema
store_output_job._kwargs = current_job._kwargs
replace.add_jobs(store_output_job)

elif isinstance(replace, Job):
# replace is a single Job
Expand Down
Loading