Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions changelog_entry.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
- bump: patch
changes:
added:
- Versioning to dataset uploads.
43 changes: 14 additions & 29 deletions policyengine_us_data/storage/upload_completed_datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,43 +4,28 @@
CPS_2023,
)
from policyengine_us_data.storage import STORAGE_FOLDER
from policyengine_us_data.utils.huggingface import upload
from policyengine_us_data.utils.data_upload import upload_data_files
from google.cloud import storage
import google.auth


def upload_datasets():
credentials, project_id = google.auth.default()
storage_client = storage.Client(
credentials=credentials, project=project_id
)
bucket = storage_client.bucket("policyengine-us-data")

datasets_to_upload = [
EnhancedCPS_2024,
Pooled_3_Year_CPS_2023,
CPS_2023,
dataset_files = [
EnhancedCPS_2024.file_path,
Pooled_3_Year_CPS_2023.file_path,
CPS_2023.file_path,
]

for dataset in datasets_to_upload:
dataset = dataset()
if not dataset.exists:
raise ValueError(
f"Dataset {dataset.name} does not exist at {dataset.file_path}."
)
for file_path in dataset_files:
if not file_path.exists():
raise ValueError(f"File {file_path} does not exist.")

upload(
dataset.file_path,
"policyengine/policyengine-us-data",
dataset.file_path.name,
)

blob = dataset.file_path.name
blob = bucket.blob(blob)
blob.upload_from_filename(dataset.file_path)
print(
f"Uploaded {dataset.file_path.name} to GCS bucket policyengine-us-data."
)
upload_data_files(
files=dataset_files,
hf_repo_name="policyengine-uk-data/policyengine-us-data",
hf_repo_type="model",
gcs_bucket_name="policyengine-us-data",
)


if __name__ == "__main__":
Expand Down
104 changes: 104 additions & 0 deletions policyengine_us_data/utils/data_upload.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
from typing import List
from huggingface_hub import HfApi, CommitOperationAdd
from huggingface_hub.errors import RevisionNotFoundError
from google.cloud import storage
from pathlib import Path
from importlib import metadata
import google.auth
import logging


def upload_data_files(
files: List[str],
gcs_bucket_name: str = "policyengine-us-data",
hf_repo_name: str = "policyengine/policyengine-us-data",
hf_repo_type: str = "model",
version: str = None,
):
if version is None:
version = metadata.version("policyengine-us-data")

upload_files_to_hf(
files=files,
version=version,
hf_repo_name=hf_repo_name,
hf_repo_type=hf_repo_type,
)

upload_files_to_gcs(
files=files,
version=version,
gcs_bucket_name=gcs_bucket_name,
)


def upload_files_to_hf(
files: List[str],
version: str,
hf_repo_name: str = "policyengine/policyengine-us-data",
hf_repo_type: str = "model",
):
"""
Upload files to Hugging Face repository and tag the commit with the version.
"""
api = HfApi()
hf_operations = []

for file_path in files:
file_path = Path(file_path)
if not file_path.exists():
raise ValueError(f"File {file_path} does not exist.")
hf_operations.append(
CommitOperationAdd(
path_in_repo=file_path.name,
path_or_fileobj=str(file_path),
)
)
commit_info = api.create_commit(
repo_id=hf_repo_name,
operations=hf_operations,
repo_type=hf_repo_type,
commit_message=f"Upload data files for version {version}",
)
logging.info(f"Uploaded files to Hugging Face repository {hf_repo_name}.")

# Tag commit with version
api.create_tag(
repo_id=hf_repo_name,
tag=version,
revision=commit_info.oid,
repo_type=hf_repo_type,
)
logging.info(
f"Tagged commit with {version} in Hugging Face repository {hf_repo_name}."
)


def upload_files_to_gcs(
files: List[str],
version: str,
gcs_bucket_name: str = "policyengine-us-data",
):
"""
Upload files to Google Cloud Storage and set metadata with the version.
"""
credentials, project_id = google.auth.default()
storage_client = storage.Client(
credentials=credentials, project=project_id
)
bucket = storage_client.bucket(gcs_bucket_name)

for file_path in files:
file_path = Path(file_path)
blob = bucket.blob(file_path.name)
blob.upload_from_filename(file_path)
logging.info(
f"Uploaded {file_path.name} to GCS bucket {gcs_bucket_name}."
)

# Set metadata
blob.metadata = {"version": version}
blob.patch()
logging.info(
f"Set metadata for {file_path.name} in GCS bucket {gcs_bucket_name}."
)
Loading