Skip to content

Commit 685fd7d

Browse files
committed
Merge branch 'release_25.0' into release_25.1
2 parents c443d08 + 5566adf commit 685fd7d

File tree

7 files changed

+136
-10
lines changed

7 files changed

+136
-10
lines changed

lib/galaxy/datatypes/data.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -850,6 +850,7 @@ def convert_dataset(
850850
deps: Optional[dict] = None,
851851
target_context: Optional[dict] = None,
852852
history=None,
853+
use_cached_job: bool = False,
853854
):
854855
"""This function adds a job to the queue to convert a dataset to another type. Returns a message about success/failure."""
855856
converter = trans.app.datatypes_registry.get_converter_by_target_type(original_dataset.ext, target_type)
@@ -865,8 +866,16 @@ def convert_dataset(
865866
# Make the target datatype available to the converter
866867
params["__target_datatype__"] = target_type
867868
# Run converter, job is dispatched through Queue
869+
# Always use cached job if it exists
870+
completed_jobs = converter.completed_jobs(trans, all_params=[params], use_cached_job=use_cached_job)
871+
completed_job = completed_jobs[0] if completed_jobs else None
868872
job, converted_datasets, *_ = converter.execute(
869-
trans, incoming=params, set_output_hid=visible, history=history, flush_job=False
873+
trans,
874+
incoming=params,
875+
set_output_hid=visible,
876+
history=history,
877+
flush_job=False,
878+
completed_job=completed_job,
870879
)
871880
# We should only have a single converted output, but let's be defensive here
872881
n_converted_datasets = len(converted_datasets)

lib/galaxy/managers/jobs.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -369,6 +369,8 @@ def stop(self, job, message=None):
369369
class JobSearch:
370370
"""Search for jobs using tool inputs or other jobs"""
371371

372+
IGNORED_NON_JOB_PARAMETERS = ("__use_cached_job__", "__workflow_invocation_uuid__", "__when_value__", "__input_ext")
373+
372374
def __init__(
373375
self,
374376
sa_session: galaxy_scoped_session,
@@ -560,7 +562,7 @@ def replace_dataset_ids(path, key, value):
560562
continue
561563
elif k == "chromInfo" and "?.len" in v:
562564
continue
563-
elif k == "__when_value__":
565+
elif k in self.IGNORED_NON_JOB_PARAMETERS:
564566
continue
565567
a = aliased(model.JobParameter)
566568
job_parameter_conditions.append(
@@ -647,7 +649,7 @@ def _filter_jobs(
647649
continue
648650
elif k == "chromInfo" and "?.len" in v:
649651
continue
650-
elif k == "__when_value__":
652+
elif k in self.IGNORED_NON_JOB_PARAMETERS:
651653
# TODO: really need to separate this.
652654
continue
653655
value_dump = None if v is None else json.dumps(v, sort_keys=True)

lib/galaxy/model/__init__.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5371,7 +5371,7 @@ def get_converted_files_by_type(self, file_type, include_errored=False):
53715371
return item
53725372
return None
53735373

5374-
def get_converted_dataset_deps(self, trans, target_ext):
5374+
def get_converted_dataset_deps(self, trans, target_ext, use_cached_job=False):
53755375
"""
53765376
Returns dict of { "dependency" => HDA }
53775377
"""
@@ -5380,9 +5380,11 @@ def get_converted_dataset_deps(self, trans, target_ext):
53805380
depends_list = trans.app.datatypes_registry.converter_deps[self.extension][target_ext]
53815381
except KeyError:
53825382
depends_list = []
5383-
return {dep: self.get_converted_dataset(trans, dep) for dep in depends_list}
5383+
return {dep: self.get_converted_dataset(trans, dep, use_cached_job=use_cached_job) for dep in depends_list}
53845384

5385-
def get_converted_dataset(self, trans, target_ext, target_context=None, history=None, include_errored=False):
5385+
def get_converted_dataset(
5386+
self, trans, target_ext, target_context=None, history=None, include_errored=False, use_cached_job=False
5387+
):
53865388
"""
53875389
Return converted dataset(s) if they exist, along with a dict of dependencies.
53885390
If not converted yet, do so and return None (the first time). If unconvertible, raise exception.
@@ -5407,7 +5409,7 @@ def get_converted_dataset(self, trans, target_ext, target_context=None, history=
54075409
# Check if we have dependencies
54085410
try:
54095411
for dependency in depends_list:
5410-
dep_dataset = self.get_converted_dataset(trans, dependency)
5412+
dep_dataset = self.get_converted_dataset(trans, dependency, use_cached_job=use_cached_job)
54115413
if dep_dataset is None:
54125414
# None means converter is running first time
54135415
return None
@@ -5432,6 +5434,7 @@ def get_converted_dataset(self, trans, target_ext, target_context=None, history=
54325434
deps=deps,
54335435
target_context=target_context,
54345436
history=history,
5437+
use_cached_job=use_cached_job,
54355438
).values()
54365439
)
54375440
)

lib/galaxy/tools/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2400,6 +2400,7 @@ def execute(
24002400
history: Optional[History] = None,
24012401
set_output_hid: bool = DEFAULT_SET_OUTPUT_HID,
24022402
flush_job: bool = True,
2403+
completed_job: Optional[Job] = None,
24032404
):
24042405
"""
24052406
Execute the tool using parameter values in `incoming`. This just
@@ -2417,6 +2418,7 @@ def execute(
24172418
history=history,
24182419
set_output_hid=set_output_hid,
24192420
flush_job=flush_job,
2421+
completed_job=completed_job,
24202422
)
24212423

24222424
def _execute(

lib/galaxy/tools/actions/__init__.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,13 @@ def process_dataset(data, formats=None):
176176
if converted_dataset:
177177
data = converted_dataset
178178
else:
179-
data = data.get_converted_dataset(trans, target_ext, target_context=parent, history=history)
179+
data = data.get_converted_dataset(
180+
trans,
181+
target_ext,
182+
target_context=parent,
183+
history=history,
184+
use_cached_job=param_values.get("__use_cached_job__", False),
185+
)
180186

181187
input_name = prefixed_name
182188
# Checked security of whole collection all at once if mapping over this input, else

lib/galaxy_test/api/test_tools.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2674,7 +2674,7 @@ def test_multi_param_column_nested_list_fails_on_invalid_column(self):
26742674
exception_raised = e
26752675
assert exception_raised, "Expected invalid column selection to fail job"
26762676

2677-
@skip_without_tool("implicit_conversion_format_input")
2677+
@skip_without_tool("Grep1")
26782678
def test_implicit_conversion_input_dataset_tracking(self):
26792679
with self.dataset_populator.test_history() as history_id:
26802680
compressed_path = self.test_data_resolver.get_filename("1.fastqsanger.gz")
@@ -2683,7 +2683,7 @@ def test_implicit_conversion_input_dataset_tracking(self):
26832683
history_id, content=fh, file_type="fastqsanger.gz", wait=True
26842684
)
26852685
outputs = self._run(
2686-
"Grep1", history_id=history_id, inputs={"data": {"src": "hda", "id": dataset["id"]}}, assert_ok=True
2686+
"Grep1", history_id=history_id, inputs={"input": {"src": "hda", "id": dataset["id"]}}, assert_ok=True
26872687
)
26882688
job_details = self.dataset_populator.get_job_details(outputs["jobs"][0]["id"], full=True).json()
26892689
assert job_details["inputs"]["input"]["id"] != dataset["id"]
@@ -2692,6 +2692,17 @@ def test_implicit_conversion_input_dataset_tracking(self):
26922692
)
26932693
assert converted_input["extension"] == "fastqsanger"
26942694

2695+
outputs = self._run(
2696+
"Grep1",
2697+
history_id=history_id,
2698+
inputs={"input": {"src": "hda", "id": dataset["id"]}},
2699+
use_cached_job=True,
2700+
wait_for_job=True,
2701+
assert_ok=True,
2702+
)
2703+
job_details = self.dataset_populator.get_job_details(outputs["jobs"][0]["id"], full=True).json()
2704+
assert job_details["copied_from_job_id"]
2705+
26952706
@skip_without_tool("column_multi_param")
26962707
def test_implicit_conversion_and_reduce(self):
26972708
with self.dataset_populator.test_history() as history_id:

lib/galaxy_test/api/test_workflows.py

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8739,3 +8739,96 @@ def test_run_workflow_use_cached_job_format_source_pick_param(self):
87398739
).strip()
87408740
== "2"
87418741
)
8742+
8743+
def test_run_workflow_use_cached_job_implicit_conversion_send_to_new_history(self):
8744+
wf = """class: GalaxyWorkflow
8745+
inputs:
8746+
fastq_input:
8747+
type: data
8748+
steps:
8749+
grep:
8750+
# Grep1 requires fastqsanger, so fastqsanger.gz will be implicitly converted
8751+
tool_id: Grep1
8752+
in:
8753+
input: fastq_input
8754+
"""
8755+
with self.dataset_populator.test_history() as history_id:
8756+
# Create a fastqsanger.gz dataset
8757+
compressed_path = self.test_data_resolver.get_filename("1.fastqsanger.gz")
8758+
with open(compressed_path, "rb") as fh:
8759+
dataset = self.dataset_populator.new_dataset(
8760+
history_id, content=fh, file_type="fastqsanger.gz", wait=True
8761+
)
8762+
8763+
# Upload workflow
8764+
workflow_id = self.workflow_populator.upload_yaml_workflow(wf)
8765+
8766+
# Run workflow first time
8767+
workflow_request: Dict[str, Any] = {
8768+
"inputs": json.dumps({"fastq_input": self._ds_entry(dataset)}),
8769+
"history": f"hist_id={history_id}",
8770+
"inputs_by": "name",
8771+
}
8772+
first_invocation_summary = self.workflow_populator.invoke_workflow_and_wait(
8773+
workflow_id, request=workflow_request
8774+
).json()
8775+
self.workflow_populator.wait_for_invocation_and_jobs(
8776+
history_id=first_invocation_summary["history_id"],
8777+
workflow_id=workflow_id,
8778+
invocation_id=first_invocation_summary["id"],
8779+
assert_ok=True,
8780+
)
8781+
first_invocation = self.workflow_populator.get_invocation(first_invocation_summary["id"], step_details=True)
8782+
first_job_id = first_invocation["steps"][1]["jobs"][0]["id"]
8783+
first_job_details = self.dataset_populator.get_job_details(first_job_id, full=True).json()
8784+
assert first_job_details["state"] == "ok"
8785+
assert not first_job_details["copied_from_job_id"]
8786+
8787+
# Verify implicit conversion happened (input to Grep1 should be fastqsanger, not fastqsanger.gz)
8788+
grep_input_id = first_job_details["inputs"]["input"]["id"]
8789+
grep_input = self.dataset_populator.get_history_dataset_details(
8790+
history_id=first_job_details["history_id"], content_id=grep_input_id
8791+
)
8792+
assert grep_input["extension"] == "fastqsanger", "Expected implicit conversion to fastqsanger"
8793+
assert grep_input_id != dataset["id"], "Input should be implicitly converted dataset"
8794+
8795+
# Run workflow second time with use_cached_job and new_history_name
8796+
# Remove history parameter since we're specifying new_history_name
8797+
workflow_request.pop("history", None)
8798+
workflow_request["use_cached_job"] = True
8799+
workflow_request["new_history_name"] = self.dataset_populator.get_random_name()
8800+
second_invocation_response = self.workflow_populator.invoke_workflow(workflow_id, request=workflow_request)
8801+
second_invocation_summary = second_invocation_response.json()
8802+
second_history_id = second_invocation_summary["history_id"]
8803+
# Wait for the workflow to complete
8804+
self.workflow_populator.wait_for_invocation_and_jobs(
8805+
history_id=second_history_id,
8806+
workflow_id=workflow_id,
8807+
invocation_id=second_invocation_summary["id"],
8808+
assert_ok=True,
8809+
)
8810+
second_invocation = self.workflow_populator.get_invocation(
8811+
second_invocation_summary["id"], step_details=True
8812+
)
8813+
second_job_id = second_invocation["steps"][1]["jobs"][0]["id"]
8814+
second_job_details = self.dataset_populator.get_job_details(second_job_id, full=True).json()
8815+
8816+
# Verify job was cached
8817+
assert second_job_details["state"] == "ok"
8818+
assert second_job_details["copied_from_job_id"] == first_job_id, "Second job should be cached from first"
8819+
8820+
# Verify the second invocation is in a different history
8821+
assert (
8822+
second_job_details["history_id"] != first_job_details["history_id"]
8823+
), "Second invocation should be in a new history"
8824+
8825+
# Verify implicit conversion dataset was copied to the new history
8826+
cached_grep_input_id = second_job_details["inputs"]["input"]["id"]
8827+
cached_grep_input = self.dataset_populator.get_history_dataset_details(
8828+
history_id=second_job_details["history_id"], content_id=cached_grep_input_id
8829+
)
8830+
assert cached_grep_input["extension"] == "fastqsanger"
8831+
# The implicitly converted dataset should have a different HDA ID but same underlying dataset
8832+
assert (
8833+
cached_grep_input_id != grep_input_id
8834+
), "Cached run should have copied the implicitly converted dataset to the new history"

0 commit comments

Comments
 (0)