Skip to content

Commit a1faa03

Browse files
committed
add utilities to dvc manager
1 parent 9612dc4 commit a1faa03

File tree

1 file changed

+105
-6
lines changed

1 file changed

+105
-6
lines changed

api/managers/dvc_manager.py

Lines changed: 105 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,20 +2,38 @@
22
from pathlib import Path
33
from typing import List, Union
44

5+
import structlog
6+
7+
logger = structlog.getLogger(__name__)
8+
59

610
class DVCManager:
711
def __init__(self, repo_path: Union[str, Path]) -> None:
812
self.repo_path = Path(repo_path)
913

1014
def _run_command(self, command: List[str]) -> subprocess.CompletedProcess:
11-
return subprocess.run(
12-
command, cwd=str(self.repo_path), capture_output=True, text=True, check=True
13-
)
15+
try:
16+
return subprocess.run(
17+
command,
18+
cwd=str(self.repo_path),
19+
capture_output=True,
20+
text=True,
21+
check=True,
22+
)
23+
except subprocess.CalledProcessError as e:
24+
# Log the error with details
25+
logger.error(f"Command failed: {' '.join(command)}, Error: {e.stderr}")
26+
# You can choose to re-raise or handle differently
27+
raise
1428

15-
def track_resource(self, file_path: Union[str, Path]) -> str:
16-
"""Add a resource file to DVC tracking"""
29+
def track_resource(self, file_path: Union[str, Path], chunked: bool = False) -> str:
30+
"""Add a resource file to DVC tracking with optional chunking for large files"""
1731
rel_path = Path(file_path).relative_to(self.repo_path)
18-
self._run_command(["dvc", "add", str(rel_path)])
32+
cmd = ["dvc", "add"]
33+
if chunked:
34+
cmd.append("--chunked") # Enables chunking for large files
35+
cmd.append(str(rel_path))
36+
self._run_command(cmd)
1937
return str(rel_path) + ".dvc"
2038

2139
def commit_version(self, dvc_file: str, message: str) -> None:
@@ -31,3 +49,84 @@ def push_to_remote(self) -> None:
3149
"""Push data to DVC remote and metadata to git"""
3250
self._run_command(["dvc", "push"])
3351
self._run_command(["git", "push", "--follow-tags"])
52+
53+
def get_version(self) -> str:
54+
"""Get the current version from DVC"""
55+
result = self._run_command(["dvc", "version"])
56+
return str(result.stdout.strip())
57+
58+
def get_remote(self) -> str:
59+
"""Get the current remote from DVC"""
60+
result = self._run_command(["dvc", "remote", "list"])
61+
return str(result.stdout.strip())
62+
63+
def setup_remote(self, remote_name: str, remote_url: str) -> None:
64+
"""Configure a DVC remote for storing data"""
65+
self._run_command(["dvc", "remote", "add", remote_name, remote_url])
66+
self._run_command(["dvc", "remote", "default", remote_name])
67+
68+
def verify_file(self, file_path: Union[str, Path]) -> bool:
69+
"""Verify file integrity using DVC checksums"""
70+
rel_path = Path(file_path).relative_to(self.repo_path)
71+
try:
72+
result = self._run_command(["dvc", "status", str(rel_path)])
73+
return "up to date" in str(result.stdout)
74+
except subprocess.CalledProcessError:
75+
return False
76+
77+
def rollback_to_version(
78+
self, file_path: Union[str, Path], version_tag: str
79+
) -> None:
80+
"""Roll back a file to a specific version using DVC and Git"""
81+
rel_path = Path(file_path).relative_to(self.repo_path)
82+
dvc_file = str(rel_path) + ".dvc"
83+
84+
# Checkout the specific version of the DVC file
85+
self._run_command(["git", "checkout", version_tag, "--", dvc_file])
86+
87+
# Pull the data for that version
88+
self._run_command(["dvc", "checkout", str(rel_path)])
89+
90+
def lock_resource(self, file_path: Union[str, Path]) -> None:
91+
"""Lock a resource to prevent concurrent modifications"""
92+
rel_path = Path(file_path).relative_to(self.repo_path)
93+
self._run_command(["dvc", "lock", str(rel_path)])
94+
95+
def unlock_resource(self, file_path: Union[str, Path]) -> None:
96+
"""Unlock a previously locked resource"""
97+
rel_path = Path(file_path).relative_to(self.repo_path)
98+
self._run_command(["dvc", "unlock", str(rel_path)])
99+
100+
def add_metric(self, metric_file: Union[str, Path], metric_name: str) -> None:
101+
"""Add a file as a DVC metric for tracking"""
102+
rel_path = Path(metric_file).relative_to(self.repo_path)
103+
self._run_command(
104+
["dvc", "metrics", "add", str(rel_path), "--name", metric_name]
105+
)
106+
107+
def show_metrics(self) -> str:
108+
"""Show all tracked metrics"""
109+
result = self._run_command(["dvc", "metrics", "show"])
110+
return str(result.stdout.strip())
111+
112+
def gc_cache(self, force: bool = False) -> None:
113+
"""Clean up unused cache to save disk space"""
114+
cmd = ["dvc", "gc"]
115+
if force:
116+
cmd.append("-f")
117+
self._run_command(cmd)
118+
119+
def configure(self, section: str, option: str, value: str) -> None:
120+
"""Configure DVC settings"""
121+
self._run_command(["dvc", "config", f"{section}.{option}", value])
122+
123+
def has_changes(self, file_path: Union[str, Path]) -> bool:
124+
"""Check if a file has uncommitted changes"""
125+
rel_path = Path(file_path).relative_to(self.repo_path)
126+
try:
127+
result = self._run_command(["dvc", "status", str(rel_path)])
128+
# If there are changes, the output will contain the file path
129+
return str(rel_path) in str(result.stdout)
130+
except subprocess.CalledProcessError:
131+
# If the command fails, assume there are changes
132+
return True

0 commit comments

Comments
 (0)