Skip to content

Commit 22ed9fe

Browse files
committed
Merge branch 'release_25.1' into dev
2 parents 6294074 + 685fd7d commit 22ed9fe

File tree

7 files changed

+136
-10
lines changed

7 files changed

+136
-10
lines changed

lib/galaxy/datatypes/data.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -850,6 +850,7 @@ def convert_dataset(
850850
deps: Optional[dict] = None,
851851
target_context: Optional[dict] = None,
852852
history=None,
853+
use_cached_job: bool = False,
853854
):
854855
"""This function adds a job to the queue to convert a dataset to another type. Returns a message about success/failure."""
855856
converter = trans.app.datatypes_registry.get_converter_by_target_type(original_dataset.ext, target_type)
@@ -865,8 +866,16 @@ def convert_dataset(
865866
# Make the target datatype available to the converter
866867
params["__target_datatype__"] = target_type
867868
# Run converter, job is dispatched through Queue
869+
# Always use cached job if it exists
870+
completed_jobs = converter.completed_jobs(trans, all_params=[params], use_cached_job=use_cached_job)
871+
completed_job = completed_jobs[0] if completed_jobs else None
868872
job, converted_datasets, *_ = converter.execute(
869-
trans, incoming=params, set_output_hid=visible, history=history, flush_job=False
873+
trans,
874+
incoming=params,
875+
set_output_hid=visible,
876+
history=history,
877+
flush_job=False,
878+
completed_job=completed_job,
870879
)
871880
# We should only have a single converted output, but let's be defensive here
872881
n_converted_datasets = len(converted_datasets)

lib/galaxy/managers/jobs.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -369,6 +369,8 @@ def stop(self, job, message=None):
369369
class JobSearch:
370370
"""Search for jobs using tool inputs or other jobs"""
371371

372+
IGNORED_NON_JOB_PARAMETERS = ("__use_cached_job__", "__workflow_invocation_uuid__", "__when_value__", "__input_ext")
373+
372374
def __init__(
373375
self,
374376
sa_session: galaxy_scoped_session,
@@ -560,7 +562,7 @@ def replace_dataset_ids(path, key, value):
560562
continue
561563
elif k == "chromInfo" and "?.len" in v:
562564
continue
563-
elif k == "__when_value__":
565+
elif k in self.IGNORED_NON_JOB_PARAMETERS:
564566
continue
565567
a = aliased(model.JobParameter)
566568
job_parameter_conditions.append(
@@ -647,7 +649,7 @@ def _filter_jobs(
647649
continue
648650
elif k == "chromInfo" and "?.len" in v:
649651
continue
650-
elif k == "__when_value__":
652+
elif k in self.IGNORED_NON_JOB_PARAMETERS:
651653
# TODO: really need to separate this.
652654
continue
653655
value_dump = None if v is None else json.dumps(v, sort_keys=True)

lib/galaxy/model/__init__.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5370,7 +5370,7 @@ def get_converted_files_by_type(self, file_type, include_errored=False):
53705370
return item
53715371
return None
53725372

5373-
def get_converted_dataset_deps(self, trans, target_ext):
5373+
def get_converted_dataset_deps(self, trans, target_ext, use_cached_job=False):
53745374
"""
53755375
Returns dict of { "dependency" => HDA }
53765376
"""
@@ -5379,9 +5379,11 @@ def get_converted_dataset_deps(self, trans, target_ext):
53795379
depends_list = trans.app.datatypes_registry.converter_deps[self.extension][target_ext]
53805380
except KeyError:
53815381
depends_list = []
5382-
return {dep: self.get_converted_dataset(trans, dep) for dep in depends_list}
5382+
return {dep: self.get_converted_dataset(trans, dep, use_cached_job=use_cached_job) for dep in depends_list}
53835383

5384-
def get_converted_dataset(self, trans, target_ext, target_context=None, history=None, include_errored=False):
5384+
def get_converted_dataset(
5385+
self, trans, target_ext, target_context=None, history=None, include_errored=False, use_cached_job=False
5386+
):
53855387
"""
53865388
Return converted dataset(s) if they exist, along with a dict of dependencies.
53875389
If not converted yet, do so and return None (the first time). If unconvertible, raise exception.
@@ -5406,7 +5408,7 @@ def get_converted_dataset(self, trans, target_ext, target_context=None, history=
54065408
# Check if we have dependencies
54075409
try:
54085410
for dependency in depends_list:
5409-
dep_dataset = self.get_converted_dataset(trans, dependency)
5411+
dep_dataset = self.get_converted_dataset(trans, dependency, use_cached_job=use_cached_job)
54105412
if dep_dataset is None:
54115413
# None means converter is running first time
54125414
return None
@@ -5431,6 +5433,7 @@ def get_converted_dataset(self, trans, target_ext, target_context=None, history=
54315433
deps=deps,
54325434
target_context=target_context,
54335435
history=history,
5436+
use_cached_job=use_cached_job,
54345437
).values()
54355438
)
54365439
)

lib/galaxy/tools/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2387,6 +2387,7 @@ def execute(
23872387
history: Optional[History] = None,
23882388
set_output_hid: bool = DEFAULT_SET_OUTPUT_HID,
23892389
flush_job: bool = True,
2390+
completed_job: Optional[Job] = None,
23902391
):
23912392
"""
23922393
Execute the tool using parameter values in `incoming`. This just
@@ -2404,6 +2405,7 @@ def execute(
24042405
history=history,
24052406
set_output_hid=set_output_hid,
24062407
flush_job=flush_job,
2408+
completed_job=completed_job,
24072409
)
24082410

24092411
def _execute(

lib/galaxy/tools/actions/__init__.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,13 @@ def process_dataset(data, formats=None):
175175
if converted_dataset:
176176
data = converted_dataset
177177
else:
178-
data = data.get_converted_dataset(trans, target_ext, target_context=parent, history=history)
178+
data = data.get_converted_dataset(
179+
trans,
180+
target_ext,
181+
target_context=parent,
182+
history=history,
183+
use_cached_job=param_values.get("__use_cached_job__", False),
184+
)
179185

180186
input_name = prefixed_name
181187
# Checked security of whole collection all at once if mapping over this input, else

lib/galaxy_test/api/test_tools.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2674,7 +2674,7 @@ def test_multi_param_column_nested_list_fails_on_invalid_column(self):
26742674
exception_raised = e
26752675
assert exception_raised, "Expected invalid column selection to fail job"
26762676

2677-
@skip_without_tool("implicit_conversion_format_input")
2677+
@skip_without_tool("Grep1")
26782678
def test_implicit_conversion_input_dataset_tracking(self):
26792679
with self.dataset_populator.test_history() as history_id:
26802680
compressed_path = self.test_data_resolver.get_filename("1.fastqsanger.gz")
@@ -2683,7 +2683,7 @@ def test_implicit_conversion_input_dataset_tracking(self):
26832683
history_id, content=fh, file_type="fastqsanger.gz", wait=True
26842684
)
26852685
outputs = self._run(
2686-
"Grep1", history_id=history_id, inputs={"data": {"src": "hda", "id": dataset["id"]}}, assert_ok=True
2686+
"Grep1", history_id=history_id, inputs={"input": {"src": "hda", "id": dataset["id"]}}, assert_ok=True
26872687
)
26882688
job_details = self.dataset_populator.get_job_details(outputs["jobs"][0]["id"], full=True).json()
26892689
assert job_details["inputs"]["input"]["id"] != dataset["id"]
@@ -2692,6 +2692,17 @@ def test_implicit_conversion_input_dataset_tracking(self):
26922692
)
26932693
assert converted_input["extension"] == "fastqsanger"
26942694

2695+
outputs = self._run(
2696+
"Grep1",
2697+
history_id=history_id,
2698+
inputs={"input": {"src": "hda", "id": dataset["id"]}},
2699+
use_cached_job=True,
2700+
wait_for_job=True,
2701+
assert_ok=True,
2702+
)
2703+
job_details = self.dataset_populator.get_job_details(outputs["jobs"][0]["id"], full=True).json()
2704+
assert job_details["copied_from_job_id"]
2705+
26952706
@skip_without_tool("column_multi_param")
26962707
def test_implicit_conversion_and_reduce(self):
26972708
with self.dataset_populator.test_history() as history_id:

lib/galaxy_test/api/test_workflows.py

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8713,3 +8713,96 @@ def test_run_workflow_use_cached_job_format_source_pick_param(self):
87138713
).strip()
87148714
== "2"
87158715
)
8716+
8717+
def test_run_workflow_use_cached_job_implicit_conversion_send_to_new_history(self):
8718+
wf = """class: GalaxyWorkflow
8719+
inputs:
8720+
fastq_input:
8721+
type: data
8722+
steps:
8723+
grep:
8724+
# Grep1 requires fastqsanger, so fastqsanger.gz will be implicitly converted
8725+
tool_id: Grep1
8726+
in:
8727+
input: fastq_input
8728+
"""
8729+
with self.dataset_populator.test_history() as history_id:
8730+
# Create a fastqsanger.gz dataset
8731+
compressed_path = self.test_data_resolver.get_filename("1.fastqsanger.gz")
8732+
with open(compressed_path, "rb") as fh:
8733+
dataset = self.dataset_populator.new_dataset(
8734+
history_id, content=fh, file_type="fastqsanger.gz", wait=True
8735+
)
8736+
8737+
# Upload workflow
8738+
workflow_id = self.workflow_populator.upload_yaml_workflow(wf)
8739+
8740+
# Run workflow first time
8741+
workflow_request: Dict[str, Any] = {
8742+
"inputs": json.dumps({"fastq_input": self._ds_entry(dataset)}),
8743+
"history": f"hist_id={history_id}",
8744+
"inputs_by": "name",
8745+
}
8746+
first_invocation_summary = self.workflow_populator.invoke_workflow_and_wait(
8747+
workflow_id, request=workflow_request
8748+
).json()
8749+
self.workflow_populator.wait_for_invocation_and_jobs(
8750+
history_id=first_invocation_summary["history_id"],
8751+
workflow_id=workflow_id,
8752+
invocation_id=first_invocation_summary["id"],
8753+
assert_ok=True,
8754+
)
8755+
first_invocation = self.workflow_populator.get_invocation(first_invocation_summary["id"], step_details=True)
8756+
first_job_id = first_invocation["steps"][1]["jobs"][0]["id"]
8757+
first_job_details = self.dataset_populator.get_job_details(first_job_id, full=True).json()
8758+
assert first_job_details["state"] == "ok"
8759+
assert not first_job_details["copied_from_job_id"]
8760+
8761+
# Verify implicit conversion happened (input to Grep1 should be fastqsanger, not fastqsanger.gz)
8762+
grep_input_id = first_job_details["inputs"]["input"]["id"]
8763+
grep_input = self.dataset_populator.get_history_dataset_details(
8764+
history_id=first_job_details["history_id"], content_id=grep_input_id
8765+
)
8766+
assert grep_input["extension"] == "fastqsanger", "Expected implicit conversion to fastqsanger"
8767+
assert grep_input_id != dataset["id"], "Input should be implicitly converted dataset"
8768+
8769+
# Run workflow second time with use_cached_job and new_history_name
8770+
# Remove history parameter since we're specifying new_history_name
8771+
workflow_request.pop("history", None)
8772+
workflow_request["use_cached_job"] = True
8773+
workflow_request["new_history_name"] = self.dataset_populator.get_random_name()
8774+
second_invocation_response = self.workflow_populator.invoke_workflow(workflow_id, request=workflow_request)
8775+
second_invocation_summary = second_invocation_response.json()
8776+
second_history_id = second_invocation_summary["history_id"]
8777+
# Wait for the workflow to complete
8778+
self.workflow_populator.wait_for_invocation_and_jobs(
8779+
history_id=second_history_id,
8780+
workflow_id=workflow_id,
8781+
invocation_id=second_invocation_summary["id"],
8782+
assert_ok=True,
8783+
)
8784+
second_invocation = self.workflow_populator.get_invocation(
8785+
second_invocation_summary["id"], step_details=True
8786+
)
8787+
second_job_id = second_invocation["steps"][1]["jobs"][0]["id"]
8788+
second_job_details = self.dataset_populator.get_job_details(second_job_id, full=True).json()
8789+
8790+
# Verify job was cached
8791+
assert second_job_details["state"] == "ok"
8792+
assert second_job_details["copied_from_job_id"] == first_job_id, "Second job should be cached from first"
8793+
8794+
# Verify the second invocation is in a different history
8795+
assert (
8796+
second_job_details["history_id"] != first_job_details["history_id"]
8797+
), "Second invocation should be in a new history"
8798+
8799+
# Verify implicit conversion dataset was copied to the new history
8800+
cached_grep_input_id = second_job_details["inputs"]["input"]["id"]
8801+
cached_grep_input = self.dataset_populator.get_history_dataset_details(
8802+
history_id=second_job_details["history_id"], content_id=cached_grep_input_id
8803+
)
8804+
assert cached_grep_input["extension"] == "fastqsanger"
8805+
# The implicitly converted dataset should have a different HDA ID but same underlying dataset
8806+
assert (
8807+
cached_grep_input_id != grep_input_id
8808+
), "Cached run should have copied the implicitly converted dataset to the new history"

0 commit comments

Comments
 (0)