Skip to content

Commit 50e652c

Browse files
committed
Fix using multiple data inputs with deferred datasets & Pulsar.
We really have to eliminate that input_datasets dictionary - this is very hacky. Maybe we can take a pass at rewriting Pulsar staging after we have stronger typing in the tool state branch for this stuff. Now we're putting multiple copies of an HDA into the deferred object structure - so I am now caching them during materialization so we don't materialize them more than once - this was probably needed anyway regardless since an input can be supplied to a tool more than once.
1 parent da575fe commit 50e652c

File tree

2 files changed

+17
-0
lines changed

2 files changed

+17
-0
lines changed

lib/galaxy/model/deferred.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import os
44
import shutil
55
from typing import (
6+
Dict,
67
NamedTuple,
78
Optional,
89
Union,
@@ -88,6 +89,7 @@ def __init__(
8889
self._object_store_populator = object_store_populator
8990
self._file_sources = file_sources
9091
self._sa_session = sa_session
92+
self._previously_materialized: Dict[int, HistoryDatasetAssociation] = {}
9193

9294
def ensure_materialized(
9395
self,
@@ -105,6 +107,12 @@ def ensure_materialized(
105107
if dataset.state != Dataset.states.DEFERRED and isinstance(dataset_instance, HistoryDatasetAssociation):
106108
return dataset_instance
107109

110+
if dataset_instance.id in self._previously_materialized and isinstance(
111+
dataset_instance, HistoryDatasetAssociation
112+
):
113+
# If we have already materialized this dataset, return the previously materialized instance.
114+
return self._previously_materialized[dataset_instance.id]
115+
108116
materialized_dataset_hashes = [h.copy() for h in dataset.hashes]
109117
if in_place:
110118
materialized_dataset = dataset_instance.dataset
@@ -195,6 +203,7 @@ def ensure_materialized(
195203
metadata_tmp_files_dir = None
196204
materialized_dataset_instance.set_meta(metadata_tmp_files_dir=metadata_tmp_files_dir)
197205
materialized_dataset_instance.metadata_deferred = False
206+
self._previously_materialized[dataset_instance.id] = materialized_dataset_instance
198207
return materialized_dataset_instance
199208

200209
def _stream_source(self, target_source: DatasetSource, datatype, dataset: Dataset) -> str:

lib/galaxy/tools/evaluation.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -405,6 +405,14 @@ def find_deferred_datasets(input, value, context, prefixed_name=None, **kwargs):
405405
visit_input_values(self.tool.inputs, incoming, find_deferred_datasets)
406406
visit_input_values(self.tool.inputs, incoming, find_deferred_collections)
407407

408+
# now place the the inputX datasets hacked in for multiple inputs into the deferred
409+
# object array also. This is so messy. I think in this case - we only need these for
410+
# Pulsar staging up which uses the hackier input_datasets flat dict.
411+
for key, value in input_datasets.items():
412+
if key not in deferred_objects and value is not None and value.state == model.Dataset.states.DEFERRED:
413+
if self._should_materialize_deferred_input(key, value):
414+
deferred_objects[key] = value
415+
408416
return deferred_objects
409417

410418
def _should_materialize_deferred_input(self, input_name: str, input_value: DeferrableObjectsT) -> bool:

0 commit comments

Comments
 (0)