Skip to content

Commit 09384f8

Browse files
committed
refactor: lift files away from directory.py
1 parent 52e4ea9 commit 09384f8

File tree

11 files changed

+247
-252
lines changed

11 files changed

+247
-252
lines changed

cache/Snakefile

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,33 @@
1-
from cache import link
1+
from cache import FetchConfig, link
2+
from cache.directory import CacheItem # for exposing to Snakefiles that import this Snakefile.
23
from cache.util import uncompress
34
import urllib.parse
4-
from dataclasses import dataclass
55
from typing import Union
66
from pathlib import Path
77

8-
@dataclass
9-
class FetchConfig:
10-
directive: list[str]
11-
uncompress: bool = False
8+
def stringify_directive(directive: Union[CacheItem, FetchConfig]) -> str:
9+
return urllib.parse.quote_plus(directive.name if isinstance(directive, CacheItem) else '/'.join(directive))
1210

1311
def produce_fetch_rules(input_dict: dict[str, Union[FetchConfig, list[str]]]):
1412
"""
1513
Produces fetch rules based on a dictionary mapping
1614
output files to their directory.py-based directive.
1715
"""
18-
# Map inputs to be wrapped with FetchConfig if list[str]
19-
input_dict = {k: FetchConfig(v) if isinstance(v, list) else v for k, v in input_dict.items()}
16+
# Map inputs to be wrapped with FetchConfig if list[str] or CacheItem
17+
input_dict = {k: FetchConfig(v) if isinstance(v, tuple) or isinstance(v, CacheItem) else v for k, v in input_dict.items()}
2018

21-
directives = [urllib.parse.quote_plus("/".join(directive.directive)) for directive in input_dict.values()]
19+
directives = list(input_dict.values())
2220
assert len(directives) == len(set(directives)), "Directives aren't unique!"
2321

2422
for output_file, config in input_dict.items():
2523
# Since placeholders are evaluated when the job is actually ran,
2624
# we pass data using params and output.
2725
rule:
28-
name: f"fetch_{urllib.parse.quote_plus('/'.join(config.directive))}_to_{urllib.parse.quote_plus(output_file)}"
26+
name:
27+
f"fetch_{stringify_directive(config.directive)}_to_{urllib.parse.quote_plus(output_file)}"
2928
output: file=output_file
3029
params:
3130
config=config
3231
run:
3332
Path(output.file).parent.mkdir(exist_ok=True)
34-
link(Path(output.file), params.config.directive, uncompress=params.config.uncompress)
33+
link(Path(output.file), params.config)

cache/__init__.py

Lines changed: 73 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -2,34 +2,41 @@
22
This is how spras-benchmarking handles artifact caching. `cache` should be used specifically inside `Snakefile`
33
"""
44

5+
from dataclasses import dataclass
6+
from typing import Union
57
from cache.util import uncompress as uncompress_file
6-
from cache.directory import get_cache_item
8+
from cache.directory import CacheItem, get_cache_item
79
from pathlib import Path
810
import os
911
from urllib.parse import quote_plus
1012
import pickle
1113

12-
__all__ = ["link"]
14+
__all__ = ["FetchConfig", "link"]
1315

1416
dir_path = Path(os.path.dirname(os.path.realpath(__file__)))
1517
artifacts_dir = dir_path / "artifacts"
1618

19+
@dataclass(frozen=True)
20+
class FetchConfig:
21+
directive: Union[CacheItem, tuple[str, ...]]
22+
uncompress: bool = False
1723

18-
def get_artifact_name(directive: list[str]) -> str:
24+
def get_artifact_name(directive: tuple[str, ...]) -> str:
1925
return quote_plus("/".join(directive))
2026

27+
def add_suffix(path: Path, suffix: str):
28+
return path.with_suffix(path.suffix + suffix)
2129

22-
def has_expired(directive: list[str]) -> bool:
30+
def has_expired(
31+
cache_item: CacheItem,
32+
output: Path
33+
) -> bool:
2334
"""
2435
Check if the artifact metadata associated with a directive has expired.
2536
Avoids re-downloading the artifact if nothing has changed.
2637
"""
27-
artifact_name = get_artifact_name(directive)
28-
cache_item = get_cache_item(directive)
2938

30-
metadata_dir = artifacts_dir / "metadata"
31-
metadata_dir.mkdir(exist_ok=True)
32-
metadata_file = (artifacts_dir / "metadata" / artifact_name).with_suffix((artifacts_dir / artifact_name).suffix + ".metadata")
39+
metadata_file = add_suffix(output, ".metadata")
3340

3441
# metadata never existed: we need to retrieve the new file
3542
if not metadata_file.exists():
@@ -50,36 +57,71 @@ def has_expired(directive: list[str]) -> bool:
5057
# metadata hasn't changed and already existed: this hasn't expired
5158
return False
5259

60+
def link_with_cache_item(
61+
output: Path,
62+
cache_item: CacheItem,
63+
uncompress: bool = False
64+
):
65+
"""
66+
Intermediary function for `link`.
67+
This does almost all of what `link` is characterized to do in its documentation,
68+
except for doing symlinking.
69+
"""
70+
# If `uncompress` is `True`, we make
71+
# `output` our 'compressed output.'
72+
uncompressed_output = output
73+
if uncompress:
74+
output = add_suffix(output, ".compresseded")
75+
76+
# Re-download if the file doesn't exist or the directive has expired.
77+
# Note that we check for expiration first to trigger metadata creation.
78+
if has_expired(cache_item, output) or not output.exists():
79+
output.unlink(missing_ok=True)
80+
cache_item.download(output)
81+
82+
if uncompress:
83+
uncompressed_artifact_path = add_suffix(output, ".uncompressed")
84+
uncompressed_artifact_path.unlink(missing_ok=True)
85+
uncompress_file(output, uncompressed_output)
5386

54-
def link(output: str, directive: list[str], uncompress=False):
87+
def link(
88+
output: str,
89+
config: FetchConfig
90+
):
5591
"""
5692
Links output files from cache.directory directives.
5793
For example,
5894
5995
```py
60-
link("output/ensg-ensp.tsv", ["BioMart", "ensg-ensp.tsv"])
96+
link("output/ensg-ensp.tsv", FetchConfig(["BioMart", "ensg-ensp.tsv"]))
6197
```
6298
63-
would download and check BioMart's cache for ENSG-ENSP mapping, then symlink the cached output
64-
(lying somewhere in the cache folder) with the desired `output`.
65-
"""
66-
67-
artifacts_dir.mkdir(exist_ok=True)
68-
69-
artifact_name = get_artifact_name(directive)
70-
71-
Path(output).unlink(missing_ok=True)
99+
would download and check BioMart's cache for ENSG-ENSP mapping, then:
100+
- If `config.directive` is a `CacheItem`, we write the file directly to `output`.
101+
- Otherwise, we symlink the cached output (lying somewhere in the cache folder) with the desired `output`
102+
to avoid file duplication.
72103
73-
# Re-download if the file doesn't exist or the directive has expired.
74-
cache_item = get_cache_item(directive)
75-
if not (artifacts_dir / artifact_name).exists() or has_expired(directive):
76-
(artifacts_dir / artifact_name).unlink(missing_ok=True)
77-
cache_item.download(artifacts_dir / artifact_name)
104+
This function wraps around link_with_cache_item and handles symlinking
105+
depending on the type of config.directive.
106+
TODO: most likely a nicer way to design this.
107+
"""
78108

79-
if uncompress:
80-
uncompressed_artifact_path = Path(str(artifacts_dir / artifact_name) + ".uncompressed")
81-
uncompressed_artifact_path.unlink(missing_ok=True)
82-
uncompress_file(artifacts_dir / artifact_name, uncompressed_artifact_path)
83-
Path(output).symlink_to(uncompressed_artifact_path)
109+
if isinstance(config.directive, CacheItem):
110+
link_with_cache_item(
111+
Path(output),
112+
config.directive,
113+
config.uncompress
114+
)
84115
else:
85-
Path(output).symlink_to(artifacts_dir / artifact_name)
116+
artifacts_dir.mkdir(exist_ok=True)
117+
artifact_name = get_artifact_name(config.directive)
118+
artifact_output = artifacts_dir / artifact_name
119+
120+
link_with_cache_item(
121+
artifact_output,
122+
get_cache_item(config.directive),
123+
config.uncompress
124+
)
125+
126+
Path(output).symlink_to(artifact_output)
127+

0 commit comments

Comments
 (0)