Skip to content

Commit 3314ca3

Browse files
authored
Make HybridStorage Connectors for builds storage (#2175)
* Comment out bytes test for now This was taking a long time. I think there's absolutely utility in running a scraper against BYTES every week or so, but this is just too much * Update sample .env Having to re-populate my .env after an *unfortunate* incident (see two commits ahead) prompted this. It's a little out of date. * Add pub root folder to config * DANGER ZONE! remove rmtree from hps This was... maybe the worst two lines of code I have every written. If you used this connector to attempt to download a file to, say, your data-engineering/ directory, it would wipe the directory * Configure the connectors in lifecycle builds * Add builds conn * Add drafts conn * Add published conn * Add GIS conn * Add new connectors to the registry * Add a CLI in lifecycle * Add e2e test * post-review: remove slop-comments
1 parent 93cbcbd commit 3314ca3

File tree

23 files changed

+2647
-21
lines changed

23 files changed

+2647
-21
lines changed

dcpy/configuration.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
DEFAULT_S3_URL = "https://nyc3.digitaloceanspaces.com"
1717

1818
PUBLISHING_BUCKET = env.get("PUBLISHING_BUCKET")
19+
PUBLISHING_BUCKET_ROOT_FOLDER: str = env.get("PUBLISHING_BUCKET_ROOT_FOLDER", "")
1920

2021
LOGGING_DB = "edm-qaqc"
2122
LOGGING_SCHEMA = "product_data"

dcpy/connectors/edm/builds.py

Lines changed: 196 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,196 @@
1+
from dataclasses import asdict
2+
from datetime import datetime
3+
from pathlib import Path
4+
5+
import pytz
6+
7+
from dcpy.configuration import (
8+
BUILD_NAME,
9+
CI,
10+
PUBLISHING_BUCKET,
11+
PUBLISHING_BUCKET_ROOT_FOLDER,
12+
)
13+
from dcpy.connectors.hybrid_pathed_storage import PathedStorageConnector, StorageType
14+
from dcpy.connectors.registry import VersionedConnector
15+
from dcpy.models.connectors.edm.publishing import (
16+
BuildKey,
17+
)
18+
from dcpy.utils import git, s3
19+
from dcpy.utils.logging import logger
20+
21+
22+
def _bucket() -> str:
23+
assert PUBLISHING_BUCKET, (
24+
"'PUBLISHING_BUCKET' must be defined to use edm.recipes connector"
25+
)
26+
return PUBLISHING_BUCKET
27+
28+
29+
_TEMP_PUBLISHING_FILE_SUFFIXES = {
30+
".zip",
31+
".parquet",
32+
".csv",
33+
".pdf",
34+
".xlsx",
35+
".json",
36+
".text",
37+
}
38+
39+
40+
def get_builds(product: str) -> list[str]:
41+
"""Get all build versions for a product."""
42+
return sorted(s3.get_subfolders(_bucket(), f"{product}/build/"), reverse=True)
43+
44+
45+
class BuildsConnector(VersionedConnector, arbitrary_types_allowed=True):
46+
conn_type: str = "edm.publishing.builds"
47+
_storage: PathedStorageConnector | None = None
48+
49+
def __init__(self, storage: PathedStorageConnector | None = None, **kwargs):
50+
"""Initialize BuildsConnector with optional storage."""
51+
super().__init__(**kwargs)
52+
if storage is not None:
53+
self._storage = storage
54+
55+
@property
56+
def storage(self) -> PathedStorageConnector:
57+
"""Lazy-loaded storage connector. Only initializes when first accessed."""
58+
if self._storage is None:
59+
self._storage = PathedStorageConnector.from_storage_kwargs(
60+
conn_type="edm.publishing.builds",
61+
storage_backend=StorageType.S3,
62+
s3_bucket=_bucket(),
63+
root_folder=PUBLISHING_BUCKET_ROOT_FOLDER,
64+
_validate_root_path=False,
65+
)
66+
return self._storage
67+
68+
@staticmethod
69+
def create() -> "BuildsConnector":
70+
"""Create a BuildsConnector with lazy-loaded S3 storage."""
71+
return BuildsConnector()
72+
73+
def _generate_metadata(self) -> dict[str, str]:
74+
"""Generates "standard" s3 metadata for our files"""
75+
metadata = {
76+
"date-created": datetime.now(pytz.timezone("America/New_York")).isoformat()
77+
}
78+
metadata["commit"] = git.commit_hash()
79+
if CI:
80+
metadata["run-url"] = git.action_url()
81+
return metadata
82+
83+
def _upload_build(
84+
self,
85+
build_dir: Path,
86+
product: str,
87+
*,
88+
acl: s3.ACL | None = None,
89+
build_name: str | None = None,
90+
# max_files: int = s3.MAX_FILE_COUNT, # TODO
91+
) -> BuildKey:
92+
"""
93+
Uploads a product build to an S3 bucket using cloudpathlib.
94+
95+
This function handles uploading a local output folder to a specified
96+
location in an S3 bucket. The path, product, and build name must be
97+
provided, along with an optional ACL (Access Control List) to control
98+
file access in S3.
99+
100+
Raises:
101+
FileNotFoundError: If the provided output_path does not exist.
102+
ValueError: If the build name is not provided and cannot be found in the environment variables.
103+
"""
104+
if not build_dir.exists():
105+
raise FileNotFoundError(f"Path {build_dir} does not exist")
106+
build_name = build_name or BUILD_NAME
107+
if not build_name:
108+
raise ValueError(
109+
f"Build name supplied via CLI or the env var 'BUILD_NAME' cannot be '{build_name}'."
110+
)
111+
build_key = BuildKey(product, build_name)
112+
113+
logger.info(f'Uploading {build_dir} to {build_key.path} with ACL "{acl}"')
114+
self.storage.push(
115+
key=build_key.path,
116+
filepath=str(build_dir),
117+
acl=str(acl),
118+
metadata=self._generate_metadata(),
119+
)
120+
121+
return build_key
122+
123+
def push_versioned(self, key: str, version: str, **kwargs) -> dict:
124+
# For builds, the "version" is the build name/ID
125+
connector_args = kwargs["connector_args"]
126+
acl = (
127+
s3.string_as_acl(connector_args["acl"])
128+
if connector_args.get("acl")
129+
else None
130+
)
131+
132+
logger.info(f"Pushing build for product: {key}, build: {version}")
133+
result = self._upload_build(
134+
build_dir=kwargs["build_path"],
135+
product=key,
136+
acl=acl,
137+
build_name=version,
138+
)
139+
return asdict(result)
140+
141+
def _pull(
142+
self,
143+
key: str,
144+
version: str,
145+
destination_path: Path,
146+
*,
147+
filepath: str = "",
148+
**kwargs,
149+
) -> dict:
150+
build_key = BuildKey(key, version)
151+
152+
source_key = f"{build_key.path}/{filepath}"
153+
154+
if not self.storage.exists(source_key):
155+
raise FileNotFoundError(f"File {source_key} not found")
156+
157+
# Determine output path
158+
is_file_path = destination_path.suffix in _TEMP_PUBLISHING_FILE_SUFFIXES
159+
output_filepath = (
160+
destination_path / Path(filepath).name
161+
if not is_file_path
162+
else destination_path
163+
)
164+
165+
logger.info(
166+
f"Downloading {build_key}, {filepath}, {source_key} -> {output_filepath}"
167+
)
168+
169+
self.storage.pull(key=source_key, destination_path=output_filepath)
170+
return {"path": output_filepath}
171+
172+
def pull_versioned(
173+
self, key: str, version: str, destination_path: Path, **kwargs
174+
) -> dict:
175+
return self._pull(key, version, destination_path, **kwargs)
176+
177+
def list_versions(self, key: str, *, sort_desc: bool = True, **kwargs) -> list[str]:
178+
"""List all build versions (build names) for a product."""
179+
build_folder_key = f"{key}/build"
180+
if not self.storage.exists(build_folder_key):
181+
return []
182+
183+
return sorted(self.storage.get_subfolders(build_folder_key), reverse=sort_desc)
184+
185+
def get_latest_version(self, key: str, **kwargs) -> str:
186+
"""Builds don't have a meaningful 'latest' version concept."""
187+
raise NotImplementedError(
188+
"Builds don't have a meaningful 'latest' version. Use list_versions() to see available builds."
189+
)
190+
191+
def version_exists(self, key: str, version: str, **kwargs) -> bool:
192+
"""Check if a specific build exists."""
193+
return version in self.list_versions(key)
194+
195+
def data_local_sub_path(self, key: str, *, version: str, **kwargs) -> Path:
196+
return Path("edm") / "builds" / "datasets" / key / version

0 commit comments

Comments
 (0)