Skip to content

Commit 1e2737a

Browse files
authored
Merge pull request #20650 from jmchilton/deferred_multiple_25_0
[25.0] Fix deferred datasets in multiple dataset parameters.
2 parents 21d827f + 50e652c commit 1e2737a

File tree

6 files changed

+133
-38
lines changed

6 files changed

+133
-38
lines changed

lib/galaxy/job_execution/datasets.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,25 @@
77
ABCMeta,
88
abstractmethod,
99
)
10-
from typing import Union
10+
from typing import (
11+
List,
12+
Union,
13+
)
1114

1215
from galaxy.model import (
1316
DatasetCollectionElement,
1417
DatasetInstance,
1518
HistoryDatasetCollectionAssociation,
1619
)
1720

18-
DeferrableObjectsT = Union[DatasetInstance, HistoryDatasetCollectionAssociation, DatasetCollectionElement]
21+
DeferrableObjectsT = Union[
22+
DatasetInstance,
23+
HistoryDatasetCollectionAssociation,
24+
DatasetCollectionElement,
25+
List[DatasetInstance],
26+
List[Union[HistoryDatasetCollectionAssociation, DatasetCollectionElement]],
27+
List[Union[DatasetInstance, HistoryDatasetCollectionAssociation, DatasetCollectionElement]],
28+
]
1929

2030

2131
def dataset_path_rewrites(dataset_paths):

lib/galaxy/model/deferred.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import os
44
import shutil
55
from typing import (
6+
Dict,
67
NamedTuple,
78
Optional,
89
Union,
@@ -88,6 +89,7 @@ def __init__(
8889
self._object_store_populator = object_store_populator
8990
self._file_sources = file_sources
9091
self._sa_session = sa_session
92+
self._previously_materialized: Dict[int, HistoryDatasetAssociation] = {}
9193

9294
def ensure_materialized(
9395
self,
@@ -105,6 +107,12 @@ def ensure_materialized(
105107
if dataset.state != Dataset.states.DEFERRED and isinstance(dataset_instance, HistoryDatasetAssociation):
106108
return dataset_instance
107109

110+
if dataset_instance.id in self._previously_materialized and isinstance(
111+
dataset_instance, HistoryDatasetAssociation
112+
):
113+
# If we have already materialized this dataset, return the previously materialized instance.
114+
return self._previously_materialized[dataset_instance.id]
115+
108116
materialized_dataset_hashes = [h.copy() for h in dataset.hashes]
109117
if in_place:
110118
materialized_dataset = dataset_instance.dataset
@@ -195,6 +203,7 @@ def ensure_materialized(
195203
metadata_tmp_files_dir = None
196204
materialized_dataset_instance.set_meta(metadata_tmp_files_dir=metadata_tmp_files_dir)
197205
materialized_dataset_instance.metadata_deferred = False
206+
self._previously_materialized[dataset_instance.id] = materialized_dataset_instance
198207
return materialized_dataset_instance
199208

200209
def _stream_source(self, target_source: DatasetSource, datatype, dataset: Dataset) -> str:

lib/galaxy/tools/evaluation.py

Lines changed: 55 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
List,
1414
Optional,
1515
TYPE_CHECKING,
16+
Union,
1617
)
1718

1819
from packaging.version import Version
@@ -287,6 +288,30 @@ def _materialize_objects(
287288
assert isinstance(value, (model.HistoryDatasetAssociation, model.LibraryDatasetDatasetAssociation))
288289
undeferred = dataset_materializer.ensure_materialized(value)
289290
undeferred_objects[key] = undeferred
291+
elif isinstance(value, list):
292+
undeferred_list: List[
293+
Union[
294+
model.DatasetInstance, model.HistoryDatasetCollectionAssociation, model.DatasetCollectionElement
295+
]
296+
] = []
297+
for potentially_deferred in value:
298+
if isinstance(potentially_deferred, model.DatasetInstance):
299+
if potentially_deferred.state != model.Dataset.states.DEFERRED:
300+
undeferred_list.append(potentially_deferred)
301+
else:
302+
assert isinstance(
303+
potentially_deferred,
304+
(model.HistoryDatasetAssociation, model.LibraryDatasetDatasetAssociation),
305+
)
306+
undeferred = dataset_materializer.ensure_materialized(potentially_deferred)
307+
undeferred_list.append(undeferred)
308+
elif isinstance(
309+
potentially_deferred,
310+
(model.HistoryDatasetCollectionAssociation, model.DatasetCollectionElement),
311+
):
312+
undeferred_collection = materialize_collection_input(potentially_deferred, dataset_materializer)
313+
undeferred_list.append(undeferred_collection)
314+
undeferred_objects[key] = undeferred_list
290315
else:
291316
undeferred_collection = materialize_collection_input(value, dataset_materializer)
292317
undeferred_objects[key] = undeferred_collection
@@ -348,10 +373,6 @@ def _deferred_objects(
348373
Walk input datasets and collections and find inputs that need to be materialized.
349374
"""
350375
deferred_objects: Dict[str, DeferrableObjectsT] = {}
351-
for key, value in input_datasets.items():
352-
if value is not None and value.state == model.Dataset.states.DEFERRED:
353-
if self._should_materialize_deferred_input(key, value):
354-
deferred_objects[key] = value
355376

356377
def find_deferred_collections(input, value, context, prefixed_name=None, **kwargs):
357378
if (
@@ -360,8 +381,38 @@ def find_deferred_collections(input, value, context, prefixed_name=None, **kwarg
360381
):
361382
deferred_objects[prefixed_name] = value
362383

384+
def find_deferred_datasets(input, value, context, prefixed_name=None, **kwargs):
385+
if isinstance(input, DataToolParameter):
386+
if isinstance(value, model.DatasetInstance) and value.state == model.Dataset.states.DEFERRED:
387+
deferred_objects[prefixed_name] = value
388+
elif isinstance(value, list):
389+
# handle single list reduction as a collection input
390+
if (
391+
value
392+
and len(value) == 1
393+
and isinstance(
394+
value[0], (model.HistoryDatasetCollectionAssociation, model.DatasetCollectionElement)
395+
)
396+
):
397+
deferred_objects[prefixed_name] = value
398+
return
399+
400+
for v in value:
401+
if self._should_materialize_deferred_input(prefixed_name, v):
402+
deferred_objects[prefixed_name] = value
403+
break
404+
405+
visit_input_values(self.tool.inputs, incoming, find_deferred_datasets)
363406
visit_input_values(self.tool.inputs, incoming, find_deferred_collections)
364407

408+
# now place the the inputX datasets hacked in for multiple inputs into the deferred
409+
# object array also. This is so messy. I think in this case - we only need these for
410+
# Pulsar staging up which uses the hackier input_datasets flat dict.
411+
for key, value in input_datasets.items():
412+
if key not in deferred_objects and value is not None and value.state == model.Dataset.states.DEFERRED:
413+
if self._should_materialize_deferred_input(key, value):
414+
deferred_objects[key] = value
415+
365416
return deferred_objects
366417

367418
def _should_materialize_deferred_input(self, input_name: str, input_value: DeferrableObjectsT) -> bool:

lib/galaxy_test/api/test_tool_execute.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -613,3 +613,35 @@ def test_null_to_text_tool_with_validation(required_tool: RequiredTool, tool_inp
613613
required_tool.execute.with_inputs(tool_input_format.when.any({})).assert_fails()
614614
required_tool.execute.with_inputs(tool_input_format.when.any({"parameter": None})).assert_fails()
615615
required_tool.execute.with_inputs(tool_input_format.when.any({"parameter": ""})).assert_fails()
616+
617+
618+
@requires_tool_id("cat|cat1")
619+
def test_deferred_basic(required_tool: RequiredTool, target_history: TargetHistory):
620+
has_src_dict = target_history.with_deferred_dataset_for_test_file("1.bed", ext="bed")
621+
inputs = {
622+
"input1": has_src_dict.src_dict,
623+
}
624+
output = required_tool.execute.with_inputs(inputs).assert_has_single_job.with_single_output
625+
output.assert_contains("chr1 147962192 147962580 CCDS989.1_cds_0_0_chr1_147962193_r 0 -")
626+
627+
628+
@requires_tool_id("metadata_bam")
629+
def test_deferred_with_metadata_options_filter(required_tool: RequiredTool, target_history: TargetHistory):
630+
has_src_dict = target_history.with_deferred_dataset_for_test_file("1.bam", ext="bam")
631+
inputs = {
632+
"input_bam": has_src_dict.src_dict,
633+
"ref_names": "chrM",
634+
}
635+
required_tool.execute.with_inputs(inputs).assert_has_single_job.with_single_output.with_contents_stripped("chrM")
636+
637+
638+
@requires_tool_id("cat_list")
639+
def test_deferred_multi_input(required_tool: RequiredTool, target_history: TargetHistory):
640+
has_src_dict_bed = target_history.with_deferred_dataset_for_test_file("1.bed", ext="bed")
641+
has_src_dict_txt = target_history.with_deferred_dataset_for_test_file("1.txt", ext="txt")
642+
inputs = {
643+
"input1": [has_src_dict_bed.src_dict, has_src_dict_txt.src_dict],
644+
}
645+
output = required_tool.execute.with_inputs(inputs).assert_has_single_job.with_single_output
646+
output.assert_contains("chr1 147962192 147962580 CCDS989.1_cds_0_0_chr1_147962193_r 0 -")
647+
output.assert_contains("chr1 4225 19670")

lib/galaxy_test/api/test_tools.py

Lines changed: 0 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -2862,38 +2862,6 @@ def test_group_tag_selection_multiple(self, history_id):
28622862
output_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output)
28632863
assert output_content.strip() == "123\n456\n456\n0ab"
28642864

2865-
@skip_without_tool("cat1")
2866-
def test_run_deferred_dataset(self, history_id):
2867-
details = self.dataset_populator.create_deferred_hda(
2868-
history_id, "https://raw.githubusercontent.com/galaxyproject/galaxy/dev/test-data/1.bed", ext="bed"
2869-
)
2870-
inputs = {
2871-
"input1": dataset_to_param(details),
2872-
}
2873-
outputs = self._cat1_outputs(history_id, inputs=inputs)
2874-
output = outputs[0]
2875-
details = self.dataset_populator.get_history_dataset_details(
2876-
history_id, dataset=output, wait=True, assert_ok=True
2877-
)
2878-
assert details["state"] == "ok"
2879-
output_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output)
2880-
assert output_content.startswith("chr1 147962192 147962580 CCDS989.1_cds_0_0_chr1_147962193_r 0 -")
2881-
2882-
@skip_without_tool("metadata_bam")
2883-
def test_run_deferred_dataset_with_metadata_options_filter(self, history_id):
2884-
details = self.dataset_populator.create_deferred_hda(
2885-
history_id, "https://raw.githubusercontent.com/galaxyproject/galaxy/dev/test-data/1.bam", ext="bam"
2886-
)
2887-
inputs = {"input_bam": dataset_to_param(details), "ref_names": "chrM"}
2888-
run_response = self.dataset_populator.run_tool(tool_id="metadata_bam", inputs=inputs, history_id=history_id)
2889-
output = run_response["outputs"][0]
2890-
output_details = self.dataset_populator.get_history_dataset_details(
2891-
history_id, dataset=output, wait=True, assert_ok=True
2892-
)
2893-
assert output_details["state"] == "ok"
2894-
output_content = self.dataset_populator.get_history_dataset_content(history_id, dataset=output)
2895-
assert output_content.startswith("chrM")
2896-
28972865
@skip_without_tool("pileup")
28982866
def test_metadata_validator_on_deferred_input(self, history_id):
28992867
deferred_bam_details = self.dataset_populator.create_deferred_hda(

lib/galaxy_test/base/populators.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4137,6 +4137,31 @@ def with_dataset(
41374137
)
41384138
return HasSrcDict("hda", new_dataset)
41394139

4140+
def with_deferred_dataset(
4141+
self,
4142+
uri: str,
4143+
named: Optional[str] = None,
4144+
ext: Optional[str] = None,
4145+
) -> "HasSrcDict":
4146+
kwd = {}
4147+
if named is not None:
4148+
kwd["name"] = named
4149+
new_dataset = self._dataset_populator.create_deferred_hda(
4150+
history_id=self._history_id,
4151+
uri=uri,
4152+
ext=ext,
4153+
)
4154+
return HasSrcDict("hda", new_dataset)
4155+
4156+
def with_deferred_dataset_for_test_file(
4157+
self,
4158+
filename: str,
4159+
named: Optional[str] = None,
4160+
ext: Optional[str] = None,
4161+
) -> "HasSrcDict":
4162+
base64_url = self._dataset_populator.base64_url_for_test_file(filename)
4163+
return self.with_deferred_dataset(base64_url, named=named, ext=ext)
4164+
41404165
def with_unpaired(self) -> "HasSrcDict":
41414166
return self._fetch_response(
41424167
self._dataset_collection_populator.create_unpaired_in_history(self._history_id, wait=True)

0 commit comments

Comments
 (0)