|
1 | 1 | import os
|
2 | 2 | import shutil
|
3 | 3 | import attr
|
| 4 | +import typing as ty |
4 | 5 | import numpy as np
|
| 6 | +import time |
| 7 | +from unittest import mock |
| 8 | +from pathlib import Path |
5 | 9 | import pytest
|
| 10 | +import time |
| 11 | +from fileformats.generic import File |
| 12 | +import pydra.mark |
6 | 13 |
|
7 | 14 | from .utils import (
|
8 | 15 | fun_addtwo,
|
@@ -306,6 +313,7 @@ def test_task_init_7(tmp_path):
|
306 | 313 | output_dir1 = nn1.output_dir
|
307 | 314 |
|
308 | 315 | # changing the content of the file
|
| 316 | + time.sleep(2) # need the mtime to be different |
309 | 317 | file2 = tmp_path / "file2.txt"
|
310 | 318 | with open(file2, "w") as f:
|
311 | 319 | f.write("from pydra")
|
@@ -1560,3 +1568,98 @@ def test_task_state_cachelocations_updated(plugin, tmp_path):
|
1560 | 1568 | # both workflows should be run
|
1561 | 1569 | assert all([dir.exists() for dir in nn.output_dir])
|
1562 | 1570 | assert all([dir.exists() for dir in nn2.output_dir])
|
| 1571 | + |
| 1572 | + |
| 1573 | +def test_task_files_cachelocations(plugin_dask_opt, tmp_path): |
| 1574 | + """ |
| 1575 | + Two identical tasks with provided cache_dir that use file as an input; |
| 1576 | + the second task has cache_locations and should not recompute the results |
| 1577 | + """ |
| 1578 | + cache_dir = tmp_path / "test_task_nostate" |
| 1579 | + cache_dir.mkdir() |
| 1580 | + cache_dir2 = tmp_path / "test_task_nostate2" |
| 1581 | + cache_dir2.mkdir() |
| 1582 | + input_dir = tmp_path / "input" |
| 1583 | + input_dir.mkdir() |
| 1584 | + |
| 1585 | + input1 = input_dir / "input1.txt" |
| 1586 | + input1.write_text("test") |
| 1587 | + input2 = input_dir / "input2.txt" |
| 1588 | + input2.write_text("test") |
| 1589 | + |
| 1590 | + nn = fun_file(name="NA", filename=input1, cache_dir=cache_dir) |
| 1591 | + with Submitter(plugin=plugin_dask_opt) as sub: |
| 1592 | + sub(nn) |
| 1593 | + |
| 1594 | + nn2 = fun_file( |
| 1595 | + name="NA", filename=input2, cache_dir=cache_dir2, cache_locations=cache_dir |
| 1596 | + ) |
| 1597 | + with Submitter(plugin=plugin_dask_opt) as sub: |
| 1598 | + sub(nn2) |
| 1599 | + |
| 1600 | + # checking the results |
| 1601 | + results2 = nn2.result() |
| 1602 | + assert results2.output.out == "test" |
| 1603 | + |
| 1604 | + # checking if the second task didn't run the interface again |
| 1605 | + assert nn.output_dir.exists() |
| 1606 | + assert not nn2.output_dir.exists() |
| 1607 | + |
| 1608 | + |
| 1609 | +class OverriddenContentsFile(File): |
| 1610 | + """A class for testing purposes, to that enables you to override the contents |
| 1611 | + of the file to allow you to check whether the persistent cache is used.""" |
| 1612 | + |
| 1613 | + def __init__( |
| 1614 | + self, |
| 1615 | + fspaths: ty.Iterator[Path], |
| 1616 | + contents: ty.Optional[bytes] = None, |
| 1617 | + metadata: ty.Dict[str, ty.Any] = None, |
| 1618 | + ): |
| 1619 | + super().__init__(fspaths, metadata=metadata) |
| 1620 | + self._contents = contents |
| 1621 | + |
| 1622 | + def byte_chunks(self, **kwargs) -> ty.Generator[ty.Tuple[str, bytes], None, None]: |
| 1623 | + if self._contents is not None: |
| 1624 | + yield (str(self.fspath), iter([self._contents])) |
| 1625 | + else: |
| 1626 | + yield from super().byte_chunks(**kwargs) |
| 1627 | + |
| 1628 | + @property |
| 1629 | + def contents(self): |
| 1630 | + if self._contents is not None: |
| 1631 | + return self._contents |
| 1632 | + return super().contents |
| 1633 | + |
| 1634 | + |
| 1635 | +def test_task_files_persistentcache(tmp_path): |
| 1636 | + """ |
| 1637 | + Two identical tasks with provided cache_dir that use file as an input; |
| 1638 | + the second task has cache_locations and should not recompute the results |
| 1639 | + """ |
| 1640 | + test_file_path = tmp_path / "test_file.txt" |
| 1641 | + test_file_path.write_bytes(b"foo") |
| 1642 | + cache_dir = tmp_path / "cache-dir" |
| 1643 | + cache_dir.mkdir() |
| 1644 | + test_file = OverriddenContentsFile(test_file_path) |
| 1645 | + |
| 1646 | + @pydra.mark.task |
| 1647 | + def read_contents(x: OverriddenContentsFile) -> bytes: |
| 1648 | + return x.contents |
| 1649 | + |
| 1650 | + assert ( |
| 1651 | + read_contents(x=test_file, cache_dir=cache_dir)(plugin="serial").output.out |
| 1652 | + == b"foo" |
| 1653 | + ) |
| 1654 | + test_file._contents = b"bar" |
| 1655 | + # should return result from the first run using the persistent cache |
| 1656 | + assert ( |
| 1657 | + read_contents(x=test_file, cache_dir=cache_dir)(plugin="serial").output.out |
| 1658 | + == b"foo" |
| 1659 | + ) |
| 1660 | + time.sleep(2) # Windows has a 2-second resolution for mtime |
| 1661 | + test_file_path.touch() # update the mtime to invalidate the persistent cache value |
| 1662 | + assert ( |
| 1663 | + read_contents(x=test_file, cache_dir=cache_dir)(plugin="serial").output.out |
| 1664 | + == b"bar" |
| 1665 | + ) # returns the overridden value |
0 commit comments