Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions changelog_entry.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
- bump: patch
changes:
added:
- Versioning to dataset uploads.
43 changes: 14 additions & 29 deletions policyengine_us_data/storage/upload_completed_datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,43 +4,28 @@
CPS_2023,
)
from policyengine_us_data.storage import STORAGE_FOLDER
from policyengine_us_data.utils.huggingface import upload
from policyengine_us_data.utils.data_upload import upload_data_files
from google.cloud import storage
import google.auth


def upload_datasets():
credentials, project_id = google.auth.default()
storage_client = storage.Client(
credentials=credentials, project=project_id
)
bucket = storage_client.bucket("policyengine-us-data")

datasets_to_upload = [
EnhancedCPS_2024,
Pooled_3_Year_CPS_2023,
CPS_2023,
dataset_files = [
EnhancedCPS_2024.file_path,
Pooled_3_Year_CPS_2023.file_path,
CPS_2023.file_path,
]

for dataset in datasets_to_upload:
dataset = dataset()
if not dataset.exists:
raise ValueError(
f"Dataset {dataset.name} does not exist at {dataset.file_path}."
)
for file_path in dataset_files:
if not file_path.exists():
raise ValueError(f"File {file_path} does not exist.")

upload(
dataset.file_path,
"policyengine/policyengine-us-data",
dataset.file_path.name,
)

blob = dataset.file_path.name
blob = bucket.blob(blob)
blob.upload_from_filename(dataset.file_path)
print(
f"Uploaded {dataset.file_path.name} to GCS bucket policyengine-us-data."
)
upload_data_files(
files=dataset_files,
hf_repo_name="policyengine/policyengine-us-data",
hf_repo_type="model",
gcs_bucket_name="policyengine-us-data",
)


if __name__ == "__main__":
Expand Down
104 changes: 104 additions & 0 deletions policyengine_us_data/utils/data_upload.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
from typing import List
from huggingface_hub import HfApi, CommitOperationAdd
from huggingface_hub.errors import RevisionNotFoundError
from google.cloud import storage
from pathlib import Path
from importlib import metadata
import google.auth
import logging


def upload_data_files(
files: List[str],
gcs_bucket_name: str = "policyengine-us-data",
hf_repo_name: str = "policyengine/policyengine-us-data",
hf_repo_type: str = "model",
version: str = None,
):
if version is None:
version = metadata.version("policyengine-us-data")

upload_files_to_hf(
files=files,
version=version,
hf_repo_name=hf_repo_name,
hf_repo_type=hf_repo_type,
)

upload_files_to_gcs(
files=files,
version=version,
gcs_bucket_name=gcs_bucket_name,
)


def upload_files_to_hf(
files: List[str],
version: str,
hf_repo_name: str = "policyengine/policyengine-us-data",
hf_repo_type: str = "model",
):
"""
Upload files to Hugging Face repository and tag the commit with the version.
"""
api = HfApi()
hf_operations = []

for file_path in files:
file_path = Path(file_path)
if not file_path.exists():
raise ValueError(f"File {file_path} does not exist.")
hf_operations.append(
CommitOperationAdd(
path_in_repo=file_path.name,
path_or_fileobj=str(file_path),
)
)
commit_info = api.create_commit(
repo_id=hf_repo_name,
operations=hf_operations,
repo_type=hf_repo_type,
commit_message=f"Upload data files for version {version}",
)
logging.info(f"Uploaded files to Hugging Face repository {hf_repo_name}.")

# Tag commit with version
api.create_tag(
repo_id=hf_repo_name,
tag=version,
revision=commit_info.oid,
repo_type=hf_repo_type,
)
logging.info(
f"Tagged commit with {version} in Hugging Face repository {hf_repo_name}."
)


def upload_files_to_gcs(
files: List[str],
version: str,
gcs_bucket_name: str = "policyengine-us-data",
):
"""
Upload files to Google Cloud Storage and set metadata with the version.
"""
credentials, project_id = google.auth.default()
storage_client = storage.Client(
credentials=credentials, project=project_id
)
bucket = storage_client.bucket(gcs_bucket_name)

for file_path in files:
file_path = Path(file_path)
blob = bucket.blob(file_path.name)
blob.upload_from_filename(file_path)
logging.info(
f"Uploaded {file_path.name} to GCS bucket {gcs_bucket_name}."
)

# Set metadata
blob.metadata = {"version": version}
blob.patch()
logging.info(
f"Set metadata for {file_path.name} in GCS bucket {gcs_bucket_name}."
)
Loading