Skip to content

Commit 67a4689

Browse files
committed
Enable hash-based job matching for composite datasets
by computing and storing hashes for all files (primary file and extra files). Enhanced ``has_same_hash()`` to match datasets only when all hashes match, preventing partial matches.
1 parent 5f2790b commit 67a4689

File tree

4 files changed

+101
-5
lines changed

4 files changed

+101
-5
lines changed

lib/galaxy/jobs/__init__.py

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2242,14 +2242,28 @@ def fail(message=job.info, exception=None):
22422242
if self.app.config.enable_celery_tasks:
22432243
from galaxy.celery.tasks import compute_dataset_hash
22442244

2245-
extra_files_path = dataset.extra_files_path if dataset.extra_files_path_exists() else None
22462245
request = ComputeDatasetHashTaskRequest(
22472246
dataset_id=dataset.id,
2248-
extra_files_path=extra_files_path,
2247+
extra_files_path=None,
22492248
hash_function=self.app.config.hash_function,
22502249
)
22512250
compute_dataset_hash.delay(request=request)
22522251

2252+
# For composite datasets with extra files, hash each extra file individually
2253+
if dataset.extra_files_path_exists():
2254+
for root, _, files in os.walk(dataset.extra_files_path):
2255+
for file in files:
2256+
file_path = os.path.join(root, file)
2257+
if os.path.exists(file_path):
2258+
# Calculate relative path from extra_files_path
2259+
relative_path = os.path.relpath(file_path, dataset.extra_files_path)
2260+
request = ComputeDatasetHashTaskRequest(
2261+
dataset_id=dataset.id,
2262+
extra_files_path=relative_path,
2263+
hash_function=self.app.config.hash_function,
2264+
)
2265+
compute_dataset_hash.delay(request=request)
2266+
22532267
user = job.user
22542268
if user and collected_bytes > 0 and quota_source_info is not None and quota_source_info.use:
22552269
user.adjust_total_disk_usage(collected_bytes, quota_source_info.label)

lib/galaxy/managers/jobs.py

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,14 +177,18 @@ def has_same_hash(
177177
) -> "Select[tuple[int]]":
178178
a_hash = aliased(model.DatasetHash)
179179
b_hash = aliased(model.DatasetHash)
180+
b_hash_total = aliased(model.DatasetHash)
181+
180182
# Join b directly, checking for either direct dataset match or hash match
181183
# The hash match uses a correlated subquery to avoid the expensive cartesian product
182184
stmt = stmt.join(
183185
b,
184186
or_(
185187
# Direct dataset match
186188
b.dataset_id == a.dataset_id,
187-
# Hash match: b's dataset has a hash that matches any of a's hashes
189+
# Hash match: b's dataset has hashes that match all of a's hashes
190+
# For composite datasets, this means matching the primary file hash AND all extra file hashes
191+
# For regular datasets, this means matching the single primary file hash
188192
b.dataset_id.in_(
189193
select(b_hash.dataset_id)
190194
.select_from(a_hash)
@@ -193,9 +197,37 @@ def has_same_hash(
193197
and_(
194198
a_hash.hash_function == b_hash.hash_function,
195199
a_hash.hash_value == b_hash.hash_value,
200+
# Match extra_files_path: both NULL or both the same path
201+
or_(
202+
and_(
203+
a_hash.extra_files_path.is_(None),
204+
b_hash.extra_files_path.is_(None),
205+
),
206+
a_hash.extra_files_path == b_hash.extra_files_path,
207+
),
196208
),
197209
)
198210
.where(a_hash.dataset_id == a.dataset_id)
211+
# Group by b's dataset_id and ensure all of a's hashes are matched
212+
.group_by(b_hash.dataset_id)
213+
.having(
214+
and_(
215+
# Number of matched hashes equals total hashes in A
216+
func.count(b_hash.id)
217+
== select(func.count(model.DatasetHash.id))
218+
.where(model.DatasetHash.dataset_id == a.dataset_id)
219+
.correlate(a)
220+
.scalar_subquery(),
221+
# Total hashes in B equals total hashes in A (ensures no extra hashes in B)
222+
select(func.count(b_hash_total.id))
223+
.where(b_hash_total.dataset_id == b_hash.dataset_id)
224+
.scalar_subquery()
225+
== select(func.count(model.DatasetHash.id))
226+
.where(model.DatasetHash.dataset_id == a.dataset_id)
227+
.correlate(a)
228+
.scalar_subquery(),
229+
)
230+
)
199231
),
200232
),
201233
)

lib/galaxy_test/api/test_datasets.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -789,10 +789,12 @@ def test_compute_md5_on_primary_dataset(self, history_id):
789789
hda_details = self.dataset_populator.get_history_dataset_details(history_id, dataset=hda)
790790
self.assert_hash_value(hda_details, "940cbe15c94d7e339dc15550f6bdcf4d", "MD5")
791791

792-
def test_compute_sha1_on_composite_dataset(self, history_id):
792+
def test_compute_sha256_on_composite_dataset_by_default(self, history_id):
793793
output = self.dataset_populator.fetch_hda(history_id, COMPOSITE_DATA_FETCH_REQUEST_1, wait=True)
794-
self.dataset_populator.compute_hash(output["id"], hash_function="SHA-256", extra_files_path="Roadmaps")
795794
hda_details = self.dataset_populator.get_history_dataset_details(history_id, dataset=output)
795+
self.assert_hash_value(
796+
hda_details, "94e09ae129f1ec32d1736af833160e8bdaa3a75cef2982712076c7bcd7d155d3", "SHA-256"
797+
)
796798
self.assert_hash_value(
797799
hda_details,
798800
"3cbd311889963528954fe03b28b68a09685ea7a75660bd2268d5b44cafbe0d22",

lib/galaxy_test/api/test_tool_execute.py

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@
77
files, etc..).
88
"""
99

10+
import copy
1011
from dataclasses import dataclass
12+
from typing import Any
1113

1214
import pytest
1315

@@ -610,6 +612,52 @@ def test_job_cache_with_dataset_hash(target_history: TargetHistory, required_too
610612
assert execution.final_details["copied_from_job_id"]
611613

612614

615+
@requires_tool_id("gx_data")
616+
def test_job_cache_with_extra_files(target_history: TargetHistory, required_tool: RequiredTool) -> None:
617+
# Upload a composite dataset (velvet) which creates extra files
618+
velvet_upload_request: dict[str, Any] = {
619+
"src": "composite",
620+
"ext": "velvet",
621+
"composite": {
622+
"items": [
623+
{"src": "pasted", "paste_content": "sequences content"},
624+
{"src": "pasted", "paste_content": "roadmaps content"},
625+
{"src": "pasted", "paste_content": "log content"},
626+
]
627+
},
628+
}
629+
630+
# Upload first velvet dataset - access the private _dataset_populator
631+
velvet1_hda = target_history._dataset_populator.fetch_hda(target_history.id, velvet_upload_request, wait=True)
632+
velvet1 = {"src": "hda", "id": velvet1_hda["id"]}
633+
634+
# Run gx_data tool on the first velvet dataset
635+
_ = required_tool.execute().with_inputs({"parameter": velvet1}).assert_has_single_job
636+
637+
# Upload the same velvet dataset a second time
638+
velvet2_hda = target_history._dataset_populator.fetch_hda(target_history.id, velvet_upload_request, wait=True)
639+
velvet2 = {"src": "hda", "id": velvet2_hda["id"]}
640+
641+
# Run gx_data on the second velvet dataset with job cache enabled
642+
job = required_tool.execute(use_cached_job=True).with_inputs({"parameter": velvet2}).assert_has_single_job
643+
644+
# Job cache should be used when all hashes match
645+
assert job.final_details["copied_from_job_id"]
646+
647+
# Upload a third velvet dataset with modified content in one of the extra files
648+
velvet_modified_request = copy.deepcopy(velvet_upload_request)
649+
velvet_modified_request["composite"]["items"][1]["paste_content"] = "roadmaps content MODIFIED"
650+
651+
velvet3_hda = target_history._dataset_populator.fetch_hda(target_history.id, velvet_modified_request, wait=True)
652+
velvet3 = {"src": "hda", "id": velvet3_hda["id"]}
653+
654+
# Run gx_data on the third velvet dataset with job cache enabled
655+
job3 = required_tool.execute(use_cached_job=True).with_inputs({"parameter": velvet3}).assert_has_single_job
656+
657+
# Job cache should NOT be used when hashes don't match completely
658+
assert not job3.final_details["copied_from_job_id"]
659+
660+
613661
@requires_tool_id("gx_repeat_boolean_min")
614662
def test_optional_repeats_with_mins_filled_id(target_history: TargetHistory, required_tool: RequiredTool):
615663
# we have a tool test for this but I wanted to verify it wasn't just the

0 commit comments

Comments
 (0)