Skip to content

Commit cb584c6

Browse files
committed
dvcfs: detach from pipeline outputs
Introducing simple repo tree that combines all workspaces into one structure, and allows us to detach data representation from pipelines. Pre-requisite for dvc-data extraction and using relpaths in dvcfs/repofs.
1 parent 9b5ccb7 commit cb584c6

File tree

12 files changed

+189
-178
lines changed

12 files changed

+189
-178
lines changed

dvc/data/meta.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
from collections import OrderedDict
22
from dataclasses import dataclass, field
3-
from typing import Optional
3+
from typing import TYPE_CHECKING, Optional
4+
5+
if TYPE_CHECKING:
6+
from dvc.objects.db import ObjectDB
7+
from dvc.objects.file import HashFile
48

59

610
@dataclass
@@ -13,6 +17,10 @@ class Meta:
1317
nfiles: Optional[int] = field(default=None)
1418
isexec: Optional[bool] = field(default=False)
1519

20+
obj: Optional["HashFile"] = field(default=None)
21+
odb: Optional["ObjectDB"] = field(default=None)
22+
remote: Optional[str] = field(default=None)
23+
1624
@classmethod
1725
def from_dict(cls, d):
1826
if not d:

dvc/data/stage.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ def _build_tree(fs_path, fs, name, **kwargs):
168168
assert key
169169
tree.add(key, meta, obj.hash_info)
170170

171-
tree_meta.size += meta.size
171+
tree_meta.size += meta.size or 0
172172
tree_meta.nfiles += 1
173173

174174
return tree_meta, tree

dvc/data/tree.py

Lines changed: 45 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import json
22
import logging
33
import posixpath
4-
from typing import TYPE_CHECKING, Dict, Optional, Tuple
4+
from typing import TYPE_CHECKING, Dict, Iterable, Optional, Tuple
55

66
from funcy import cached_property
77

@@ -12,12 +12,29 @@
1212

1313
if TYPE_CHECKING:
1414
from dvc.hash_info import HashInfo
15+
from dvc.objects.db import ObjectDB
1516

1617
from .meta import Meta
1718

1819
logger = logging.getLogger(__name__)
1920

2021

22+
def _try_load(
23+
odbs: Iterable["ObjectDB"],
24+
hash_info: "HashInfo",
25+
) -> Optional["HashFile"]:
26+
for odb in odbs:
27+
if not odb:
28+
continue
29+
30+
try:
31+
return Tree.load(odb, hash_info)
32+
except (FileNotFoundError, ObjectFormatError):
33+
pass
34+
35+
return None
36+
37+
2138
class Tree(HashFile):
2239
PARAM_RELPATH = "relpath"
2340

@@ -26,7 +43,7 @@ def __init__(self, *args, **kwargs):
2643
self._dict: Dict[Tuple[str], Tuple["Meta", "HashInfo"]] = {}
2744

2845
@cached_property
29-
def trie(self):
46+
def _trie(self):
3047
from pygtrie import Trie
3148

3249
return Trie(self._dict)
@@ -52,6 +69,30 @@ def digest(self, hash_info: Optional["HashInfo"] = None):
5269
assert self.hash_info.value
5370
self.hash_info.value += ".dir"
5471

72+
def _load(self, key, meta, hash_info):
73+
if hash_info and hash_info.isdir and not meta.obj:
74+
meta.obj = _try_load([meta.odb, meta.remote], hash_info)
75+
if meta.obj:
76+
for ikey, value in meta.obj.iteritems():
77+
self._trie[key + ikey] = value
78+
self._dict[key + ikey] = value
79+
80+
def iteritems(self, prefix=None):
81+
kwargs = {}
82+
if prefix:
83+
kwargs = {"prefix": prefix}
84+
item = self._trie.longest_prefix(prefix)
85+
if item:
86+
key, (meta, hash_info) = item
87+
self._load(key, meta, hash_info)
88+
89+
for key, (meta, hash_info) in self._trie.iteritems(**kwargs):
90+
self._load(key, meta, hash_info)
91+
yield key, (meta, hash_info)
92+
93+
def shortest_prefix(self, *args, **kwargs):
94+
return self._trie.shortest_prefix(*args, **kwargs)
95+
5596
def __len__(self):
5697
return len(self._dict)
5798

@@ -131,7 +172,7 @@ def filter(self, prefix: Tuple[str]) -> Optional["Tree"]:
131172
"""
132173
tree = Tree(self.fs_path, self.fs, self.hash_info)
133174
try:
134-
for key, (meta, oid) in self.trie.items(prefix):
175+
for key, (meta, oid) in self._trie.items(prefix):
135176
tree.add(key, meta, oid)
136177
except KeyError:
137178
pass
@@ -149,7 +190,7 @@ def get(self, odb, prefix: Tuple[str]) -> Optional[HashFile]:
149190
tree = Tree(None, None, None)
150191
depth = len(prefix)
151192
try:
152-
for key, (meta, entry_oid) in self.trie.items(prefix):
193+
for key, (meta, entry_oid) in self._trie.items(prefix):
153194
tree.add(key[depth:], meta, entry_oid)
154195
except KeyError:
155196
return None

dvc/fs/dvc.py

Lines changed: 65 additions & 115 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,10 @@
22
import os
33
import typing
44

5-
from dvc.exceptions import OutputNotFoundError
6-
from dvc.utils import relpath
7-
85
from ._callback import DEFAULT_CALLBACK
96
from .base import FileSystem
107

118
if typing.TYPE_CHECKING:
12-
from dvc.output import Output
139
from dvc.types import AnyPath
1410

1511
logger = logging.getLogger(__name__)
@@ -35,67 +31,44 @@ def __init__(self, **kwargs):
3531
def config(self):
3632
raise NotImplementedError
3733

38-
def _find_outs(self, path, *args, **kwargs):
39-
outs = self.repo.find_outs_by_path(path, *args, **kwargs)
34+
def _get_key(self, path):
35+
from . import get_cloud_fs
4036

41-
def _is_cached(out):
42-
return out.use_cache
37+
cls, kwargs, fs_path = get_cloud_fs(None, url=path)
4338

44-
outs = list(filter(_is_cached, outs))
45-
if not outs:
46-
raise OutputNotFoundError(path, self.repo)
39+
if not path.startswith(self.repo.root_dir):
40+
fs = cls(**kwargs)
41+
return (cls.scheme, *fs.path.parts(fs_path))
4742

48-
return outs
43+
fs_key = "repo"
44+
key = self.path.relparts(path, self.repo.root_dir)
45+
if key == (".",):
46+
key = ()
4947

50-
def _get_granular_hash(self, path: "AnyPath", out: "Output", remote=None):
51-
# NOTE: use string paths here for performance reasons
52-
key = tuple(relpath(path, out.fs_path).split(os.sep))
53-
out.get_dir_cache(remote=remote)
54-
if out.obj is None:
55-
raise FileNotFoundError
56-
(_, oid) = out.obj.trie.get(key) or (None, None)
57-
if oid:
58-
return oid
59-
raise FileNotFoundError
48+
return (fs_key, *key)
6049

6150
def _get_fs_path(self, path: "AnyPath", remote=None):
62-
try:
63-
outs = self._find_outs(path, strict=False)
64-
except OutputNotFoundError as exc:
65-
raise FileNotFoundError from exc
51+
from dvc.config import NoRemoteError
6652

67-
if len(outs) != 1 or (
68-
outs[0].is_dir_checksum and path == outs[0].fs_path
69-
):
53+
info = self.info(path)
54+
if info["type"] == "directory":
7055
raise IsADirectoryError
7156

72-
out = outs[0]
73-
74-
if not out.hash_info:
57+
value = info.get("md5")
58+
if not value:
7559
raise FileNotFoundError
7660

77-
if out.changed_cache(filter_info=path):
78-
from dvc.config import NoRemoteError
79-
80-
try:
81-
remote_odb = self.repo.cloud.get_remote_odb(remote)
82-
except NoRemoteError as exc:
83-
raise FileNotFoundError from exc
84-
if out.is_dir_checksum:
85-
checksum = self._get_granular_hash(path, out).value
86-
else:
87-
checksum = out.hash_info.value
88-
remote_fs_path = remote_odb.hash_to_path(checksum)
89-
return remote_odb.fs, remote_fs_path
90-
91-
if out.is_dir_checksum:
92-
checksum = self._get_granular_hash(path, out).value
93-
cache_path = out.odb.fs.unstrip_protocol(
94-
out.odb.hash_to_path(checksum)
95-
)
96-
else:
97-
cache_path = out.cache_path
98-
return out.odb.fs, cache_path
61+
cache_path = self.repo.odb.local.hash_to_path(value)
62+
63+
if self.repo.odb.local.fs.exists(cache_path):
64+
return self.repo.odb.local.fs, cache_path
65+
66+
try:
67+
remote_odb = self.repo.cloud.get_remote_odb(remote)
68+
except NoRemoteError as exc:
69+
raise FileNotFoundError from exc
70+
remote_fs_path = remote_odb.hash_to_path(value)
71+
return remote_odb.fs, remote_fs_path
9972

10073
def open( # type: ignore
10174
self, path: str, mode="r", encoding=None, **kwargs
@@ -122,37 +95,27 @@ def isfile(self, path): # pylint: disable=arguments-renamed
12295
except FileNotFoundError:
12396
return False
12497

125-
def _fetch_dir(self, out, **kwargs):
126-
# pull dir cache if needed
127-
out.get_dir_cache(**kwargs)
128-
129-
if not out.obj:
130-
raise FileNotFoundError
131-
132-
def _add_dir(self, trie, out, **kwargs):
133-
self._fetch_dir(out, **kwargs)
134-
135-
base = out.fs.path.parts(out.fs_path)
136-
for key, _, _ in out.obj: # noqa: B301
137-
trie[base + key] = None
138-
139-
def _walk(self, root, trie, topdown=True, **kwargs):
98+
def _walk(self, root, topdown=True, **kwargs):
14099
dirs = set()
141100
files = []
142101

143-
root_parts = self.path.parts(root)
144-
out = trie.get(root_parts)
145-
if out and out.is_dir_checksum:
146-
self._add_dir(trie, out, **kwargs)
147-
102+
root_parts = self._get_key(root)
148103
root_len = len(root_parts)
149104
try:
150-
for key, out in trie.iteritems(prefix=root_parts): # noqa: B301
105+
for key, (meta, hash_info) in self.repo.index.tree.iteritems(
106+
prefix=root_parts
107+
): # noqa: B301
108+
if hash_info and hash_info.isdir and meta and not meta.obj:
109+
raise FileNotFoundError
110+
151111
if key == root_parts:
152112
continue
153113

114+
if hash_info.isdir:
115+
continue
116+
154117
name = key[root_len]
155-
if len(key) > root_len + 1 or (out and out.is_dir_checksum):
118+
if len(key) > root_len + 1:
156119
dirs.add(name)
157120
continue
158121

@@ -165,11 +128,9 @@ def _walk(self, root, trie, topdown=True, **kwargs):
165128
yield root, dirs, files
166129

167130
for dname in dirs:
168-
yield from self._walk(self.path.join(root, dname), trie)
131+
yield from self._walk(self.path.join(root, dname))
169132

170133
def walk(self, top, topdown=True, onerror=None, **kwargs):
171-
from pygtrie import Trie
172-
173134
assert topdown
174135
root = os.path.abspath(top)
175136
try:
@@ -184,14 +145,7 @@ def walk(self, top, topdown=True, onerror=None, **kwargs):
184145
onerror(NotADirectoryError(top))
185146
return
186147

187-
trie = Trie()
188-
for out in info["outs"]:
189-
trie[out.fs.path.parts(out.fs_path)] = out
190-
191-
if out.is_dir_checksum and self.path.isin_or_eq(root, out.fs_path):
192-
self._add_dir(trie, out, **kwargs)
193-
194-
yield from self._walk(root, trie, topdown=topdown, **kwargs)
148+
yield from self._walk(root, topdown=topdown, **kwargs)
195149

196150
def find(self, path, prefix=None):
197151
for root, _, files in self.walk(path):
@@ -209,56 +163,52 @@ def isdvc(self, path, recursive=False, strict=True):
209163
return bool(info.get("outs") if recurse else info.get("isout"))
210164

211165
def info(self, path):
166+
from dvc.data.meta import Meta
167+
212168
abspath = os.path.abspath(path)
213169

170+
key = self._get_key(abspath)
171+
214172
try:
215-
outs = self._find_outs(abspath, strict=False, recursive=True)
216-
except OutputNotFoundError as exc:
173+
outs = list(self.repo.index.tree.iteritems(key))
174+
except KeyError as exc:
217175
raise FileNotFoundError from exc
218176

219177
ret = {
220178
"type": "file",
221-
"outs": outs,
222179
"size": 0,
223180
"isexec": False,
224181
"isdvc": False,
182+
"outs": outs,
225183
}
226184

227-
if len(outs) > 1:
185+
if len(outs) > 1 and outs[0][0] != key:
186+
shortest = self.repo.index.tree.shortest_prefix(key)
187+
if shortest:
188+
assert shortest[1][1].isdir
189+
if len(shortest[0]) <= len(key):
190+
ret["isdvc"] = True
191+
228192
ret["type"] = "directory"
229193
return ret
230194

231-
out = outs[0]
195+
item_key, (meta, hash_info) = outs[0]
232196

233-
if not out.hash_info:
234-
ret["isexec"] = out.meta.isexec
235-
return ret
236-
237-
if abspath == out.fs_path:
238-
if out.hash_info.isdir:
239-
ret["type"] = "directory"
240-
ret["size"] = out.meta.size
241-
ret["isexec"] = out.meta.isexec
242-
ret[out.hash_info.name] = out.hash_info.value
243-
ret["isdvc"] = True
244-
ret["isout"] = True
245-
return ret
197+
meta = meta or Meta()
246198

247-
if out.fs_path.startswith(abspath + self.sep):
199+
if key != item_key:
200+
assert item_key[: len(key)] == key
248201
ret["type"] = "directory"
249202
return ret
250203

204+
ret["size"] = meta.size
205+
ret["isexec"] = meta.isexec
206+
ret[hash_info.name] = hash_info.value
251207
ret["isdvc"] = True
252-
253-
try:
254-
self._get_granular_hash(abspath, out)
255-
except FileNotFoundError:
208+
ret["isout"] = True
209+
ret["meta"] = meta
210+
if hash_info and hash_info.isdir:
256211
ret["type"] = "directory"
257-
return ret
258-
259-
key = self.repo.fs.path.relparts(abspath, out.fs_path)
260-
(_, oid) = out.obj.trie.get(key) or (None, None)
261-
ret[oid.name] = oid.value
262212
return ret
263213

264214
def get_file(

0 commit comments

Comments
 (0)