Skip to content

Commit 5566adf

Browse files
authored
Merge pull request #21021 from mvdbeek/fix_implicit_conversion_breaking_job_cache
[25.0] Use job cache also for implicit conversions
2 parents 2fc9903 + b6ff36e commit 5566adf

File tree

7 files changed

+136
-10
lines changed

7 files changed

+136
-10
lines changed

lib/galaxy/datatypes/data.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -849,6 +849,7 @@ def convert_dataset(
849849
deps: Optional[Dict] = None,
850850
target_context: Optional[Dict] = None,
851851
history=None,
852+
use_cached_job: bool = False,
852853
):
853854
"""This function adds a job to the queue to convert a dataset to another type. Returns a message about success/failure."""
854855
converter = trans.app.datatypes_registry.get_converter_by_target_type(original_dataset.ext, target_type)
@@ -864,8 +865,16 @@ def convert_dataset(
864865
# Make the target datatype available to the converter
865866
params["__target_datatype__"] = target_type
866867
# Run converter, job is dispatched through Queue
868+
# Always use cached job if it exists
869+
completed_jobs = converter.completed_jobs(trans, all_params=[params], use_cached_job=use_cached_job)
870+
completed_job = completed_jobs[0] if completed_jobs else None
867871
job, converted_datasets, *_ = converter.execute(
868-
trans, incoming=params, set_output_hid=visible, history=history, flush_job=False
872+
trans,
873+
incoming=params,
874+
set_output_hid=visible,
875+
history=history,
876+
flush_job=False,
877+
completed_job=completed_job,
869878
)
870879
# We should only have a single converted output, but let's be defensive here
871880
n_converted_datasets = len(converted_datasets)

lib/galaxy/managers/jobs.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -368,6 +368,8 @@ def stop(self, job, message=None):
368368
class JobSearch:
369369
"""Search for jobs using tool inputs or other jobs"""
370370

371+
IGNORED_NON_JOB_PARAMETERS = ("__use_cached_job__", "__workflow_invocation_uuid__", "__when_value__", "__input_ext")
372+
371373
def __init__(
372374
self,
373375
sa_session: galaxy_scoped_session,
@@ -559,7 +561,7 @@ def replace_dataset_ids(path, key, value):
559561
continue
560562
elif k == "chromInfo" and "?.len" in v:
561563
continue
562-
elif k == "__when_value__":
564+
elif k in self.IGNORED_NON_JOB_PARAMETERS:
563565
continue
564566
a = aliased(model.JobParameter)
565567
job_parameter_conditions.append(
@@ -646,7 +648,7 @@ def _filter_jobs(
646648
continue
647649
elif k == "chromInfo" and "?.len" in v:
648650
continue
649-
elif k == "__when_value__":
651+
elif k in self.IGNORED_NON_JOB_PARAMETERS:
650652
# TODO: really need to separate this.
651653
continue
652654
value_dump = None if v is None else json.dumps(v, sort_keys=True)

lib/galaxy/model/__init__.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5165,7 +5165,7 @@ def get_converted_files_by_type(self, file_type, include_errored=False):
51655165
return item
51665166
return None
51675167

5168-
def get_converted_dataset_deps(self, trans, target_ext):
5168+
def get_converted_dataset_deps(self, trans, target_ext, use_cached_job=False):
51695169
"""
51705170
Returns dict of { "dependency" => HDA }
51715171
"""
@@ -5174,9 +5174,11 @@ def get_converted_dataset_deps(self, trans, target_ext):
51745174
depends_list = trans.app.datatypes_registry.converter_deps[self.extension][target_ext]
51755175
except KeyError:
51765176
depends_list = []
5177-
return {dep: self.get_converted_dataset(trans, dep) for dep in depends_list}
5177+
return {dep: self.get_converted_dataset(trans, dep, use_cached_job=use_cached_job) for dep in depends_list}
51785178

5179-
def get_converted_dataset(self, trans, target_ext, target_context=None, history=None, include_errored=False):
5179+
def get_converted_dataset(
5180+
self, trans, target_ext, target_context=None, history=None, include_errored=False, use_cached_job=False
5181+
):
51805182
"""
51815183
Return converted dataset(s) if they exist, along with a dict of dependencies.
51825184
If not converted yet, do so and return None (the first time). If unconvertible, raise exception.
@@ -5201,7 +5203,7 @@ def get_converted_dataset(self, trans, target_ext, target_context=None, history=
52015203
# Check if we have dependencies
52025204
try:
52035205
for dependency in depends_list:
5204-
dep_dataset = self.get_converted_dataset(trans, dependency)
5206+
dep_dataset = self.get_converted_dataset(trans, dependency, use_cached_job=use_cached_job)
52055207
if dep_dataset is None:
52065208
# None means converter is running first time
52075209
return None
@@ -5226,6 +5228,7 @@ def get_converted_dataset(self, trans, target_ext, target_context=None, history=
52265228
deps=deps,
52275229
target_context=target_context,
52285230
history=history,
5231+
use_cached_job=use_cached_job,
52295232
).values()
52305233
)
52315234
)

lib/galaxy/tools/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2402,6 +2402,7 @@ def execute(
24022402
history: Optional[model.History] = None,
24032403
set_output_hid: bool = DEFAULT_SET_OUTPUT_HID,
24042404
flush_job: bool = True,
2405+
completed_job: Optional[Job] = None,
24052406
):
24062407
"""
24072408
Execute the tool using parameter values in `incoming`. This just
@@ -2419,6 +2420,7 @@ def execute(
24192420
history=history,
24202421
set_output_hid=set_output_hid,
24212422
flush_job=flush_job,
2423+
completed_job=completed_job,
24222424
)
24232425

24242426
def _execute(

lib/galaxy/tools/actions/__init__.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,13 @@ def process_dataset(data, formats=None):
173173
if converted_dataset:
174174
data = converted_dataset
175175
else:
176-
data = data.get_converted_dataset(trans, target_ext, target_context=parent, history=history)
176+
data = data.get_converted_dataset(
177+
trans,
178+
target_ext,
179+
target_context=parent,
180+
history=history,
181+
use_cached_job=param_values.get("__use_cached_job__", False),
182+
)
177183

178184
input_name = prefixed_name
179185
# Checked security of whole collection all at once if mapping over this input, else

lib/galaxy_test/api/test_tools.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2613,7 +2613,7 @@ def test_multi_param_column_nested_list_fails_on_invalid_column(self):
26132613
exception_raised = e
26142614
assert exception_raised, "Expected invalid column selection to fail job"
26152615

2616-
@skip_without_tool("implicit_conversion_format_input")
2616+
@skip_without_tool("Grep1")
26172617
def test_implicit_conversion_input_dataset_tracking(self):
26182618
with self.dataset_populator.test_history() as history_id:
26192619
compressed_path = self.test_data_resolver.get_filename("1.fastqsanger.gz")
@@ -2622,7 +2622,7 @@ def test_implicit_conversion_input_dataset_tracking(self):
26222622
history_id, content=fh, file_type="fastqsanger.gz", wait=True
26232623
)
26242624
outputs = self._run(
2625-
"Grep1", history_id=history_id, inputs={"data": {"src": "hda", "id": dataset["id"]}}, assert_ok=True
2625+
"Grep1", history_id=history_id, inputs={"input": {"src": "hda", "id": dataset["id"]}}, assert_ok=True
26262626
)
26272627
job_details = self.dataset_populator.get_job_details(outputs["jobs"][0]["id"], full=True).json()
26282628
assert job_details["inputs"]["input"]["id"] != dataset["id"]
@@ -2631,6 +2631,17 @@ def test_implicit_conversion_input_dataset_tracking(self):
26312631
)
26322632
assert converted_input["extension"] == "fastqsanger"
26332633

2634+
outputs = self._run(
2635+
"Grep1",
2636+
history_id=history_id,
2637+
inputs={"input": {"src": "hda", "id": dataset["id"]}},
2638+
use_cached_job=True,
2639+
wait_for_job=True,
2640+
assert_ok=True,
2641+
)
2642+
job_details = self.dataset_populator.get_job_details(outputs["jobs"][0]["id"], full=True).json()
2643+
assert job_details["copied_from_job_id"]
2644+
26342645
@skip_without_tool("column_multi_param")
26352646
def test_implicit_conversion_and_reduce(self):
26362647
with self.dataset_populator.test_history() as history_id:

lib/galaxy_test/api/test_workflows.py

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8487,3 +8487,96 @@ def test_run_workflow_use_cached_job_format_source_pick_param(self):
84878487
).strip()
84888488
== "2"
84898489
)
8490+
8491+
def test_run_workflow_use_cached_job_implicit_conversion_send_to_new_history(self):
8492+
wf = """class: GalaxyWorkflow
8493+
inputs:
8494+
fastq_input:
8495+
type: data
8496+
steps:
8497+
grep:
8498+
# Grep1 requires fastqsanger, so fastqsanger.gz will be implicitly converted
8499+
tool_id: Grep1
8500+
in:
8501+
input: fastq_input
8502+
"""
8503+
with self.dataset_populator.test_history() as history_id:
8504+
# Create a fastqsanger.gz dataset
8505+
compressed_path = self.test_data_resolver.get_filename("1.fastqsanger.gz")
8506+
with open(compressed_path, "rb") as fh:
8507+
dataset = self.dataset_populator.new_dataset(
8508+
history_id, content=fh, file_type="fastqsanger.gz", wait=True
8509+
)
8510+
8511+
# Upload workflow
8512+
workflow_id = self.workflow_populator.upload_yaml_workflow(wf)
8513+
8514+
# Run workflow first time
8515+
workflow_request: Dict[str, Any] = {
8516+
"inputs": json.dumps({"fastq_input": self._ds_entry(dataset)}),
8517+
"history": f"hist_id={history_id}",
8518+
"inputs_by": "name",
8519+
}
8520+
first_invocation_summary = self.workflow_populator.invoke_workflow_and_wait(
8521+
workflow_id, request=workflow_request
8522+
).json()
8523+
self.workflow_populator.wait_for_invocation_and_jobs(
8524+
history_id=first_invocation_summary["history_id"],
8525+
workflow_id=workflow_id,
8526+
invocation_id=first_invocation_summary["id"],
8527+
assert_ok=True,
8528+
)
8529+
first_invocation = self.workflow_populator.get_invocation(first_invocation_summary["id"], step_details=True)
8530+
first_job_id = first_invocation["steps"][1]["jobs"][0]["id"]
8531+
first_job_details = self.dataset_populator.get_job_details(first_job_id, full=True).json()
8532+
assert first_job_details["state"] == "ok"
8533+
assert not first_job_details["copied_from_job_id"]
8534+
8535+
# Verify implicit conversion happened (input to Grep1 should be fastqsanger, not fastqsanger.gz)
8536+
grep_input_id = first_job_details["inputs"]["input"]["id"]
8537+
grep_input = self.dataset_populator.get_history_dataset_details(
8538+
history_id=first_job_details["history_id"], content_id=grep_input_id
8539+
)
8540+
assert grep_input["extension"] == "fastqsanger", "Expected implicit conversion to fastqsanger"
8541+
assert grep_input_id != dataset["id"], "Input should be implicitly converted dataset"
8542+
8543+
# Run workflow second time with use_cached_job and new_history_name
8544+
# Remove history parameter since we're specifying new_history_name
8545+
workflow_request.pop("history", None)
8546+
workflow_request["use_cached_job"] = True
8547+
workflow_request["new_history_name"] = self.dataset_populator.get_random_name()
8548+
second_invocation_response = self.workflow_populator.invoke_workflow(workflow_id, request=workflow_request)
8549+
second_invocation_summary = second_invocation_response.json()
8550+
second_history_id = second_invocation_summary["history_id"]
8551+
# Wait for the workflow to complete
8552+
self.workflow_populator.wait_for_invocation_and_jobs(
8553+
history_id=second_history_id,
8554+
workflow_id=workflow_id,
8555+
invocation_id=second_invocation_summary["id"],
8556+
assert_ok=True,
8557+
)
8558+
second_invocation = self.workflow_populator.get_invocation(
8559+
second_invocation_summary["id"], step_details=True
8560+
)
8561+
second_job_id = second_invocation["steps"][1]["jobs"][0]["id"]
8562+
second_job_details = self.dataset_populator.get_job_details(second_job_id, full=True).json()
8563+
8564+
# Verify job was cached
8565+
assert second_job_details["state"] == "ok"
8566+
assert second_job_details["copied_from_job_id"] == first_job_id, "Second job should be cached from first"
8567+
8568+
# Verify the second invocation is in a different history
8569+
assert (
8570+
second_job_details["history_id"] != first_job_details["history_id"]
8571+
), "Second invocation should be in a new history"
8572+
8573+
# Verify implicit conversion dataset was copied to the new history
8574+
cached_grep_input_id = second_job_details["inputs"]["input"]["id"]
8575+
cached_grep_input = self.dataset_populator.get_history_dataset_details(
8576+
history_id=second_job_details["history_id"], content_id=cached_grep_input_id
8577+
)
8578+
assert cached_grep_input["extension"] == "fastqsanger"
8579+
# The implicitly converted dataset should have a different HDA ID but same underlying dataset
8580+
assert (
8581+
cached_grep_input_id != grep_input_id
8582+
), "Cached run should have copied the implicitly converted dataset to the new history"

0 commit comments

Comments
 (0)