diff --git a/README.md b/README.md index 3fac6a2..f98812e 100644 --- a/README.md +++ b/README.md @@ -7,6 +7,7 @@ A Python application to validate and store published OC4IDS datasets. ### Prerequisites - Python 3.12 +- Postgres ### Install Python requirements @@ -16,6 +17,18 @@ source .venv/bin/activate pip install -r requirements_dev.txt ``` +### Set database enrivonment variable + +``` +export DATABASE_URL="postgresql://oc4ids_datastore@localhost/oc4ids_datastore" +``` + +### Run database migrations + +``` +alembic upgrade head +``` + ### Run app ``` @@ -37,3 +50,9 @@ mypy oc4ids_datastore_pipeline/ tests/ ``` pytest ``` + +### Generating new database migrations + +``` +alembic revision --autogenerate -m "" +``` diff --git a/alembic.ini b/alembic.ini new file mode 100644 index 0000000..e4f193e --- /dev/null +++ b/alembic.ini @@ -0,0 +1,116 @@ +# A generic, single database configuration. + +[alembic] +# path to migration scripts +# Use forward slashes (/) also on windows to provide an os agnostic path +script_location = migrations + +# template used to generate migration file names; The default value is %%(rev)s_%%(slug)s +# Uncomment the line below if you want the files to be prepended with date and time +# see https://alembic.sqlalchemy.org/en/latest/tutorial.html#editing-the-ini-file +# for all available tokens +# file_template = %%(year)d_%%(month).2d_%%(day).2d_%%(hour).2d%%(minute).2d-%%(rev)s_%%(slug)s + +# sys.path path, will be prepended to sys.path if present. +# defaults to the current working directory. +prepend_sys_path = . + +# timezone to use when rendering the date within the migration file +# as well as the filename. +# If specified, requires the python>=3.9 or backports.zoneinfo library and tzdata library. +# Any required deps can installed by adding `alembic[tz]` to the pip requirements +# string value is passed to ZoneInfo() +# leave blank for localtime +# timezone = + +# max length of characters to apply to the "slug" field +# truncate_slug_length = 40 + +# set to 'true' to run the environment during +# the 'revision' command, regardless of autogenerate +# revision_environment = false + +# set to 'true' to allow .pyc and .pyo files without +# a source .py file to be detected as revisions in the +# versions/ directory +# sourceless = false + +# version location specification; This defaults +# to alembic/versions. When using multiple version +# directories, initial revisions must be specified with --version-path. +# The path separator used here should be the separator specified by "version_path_separator" below. +# version_locations = %(here)s/bar:%(here)s/bat:alembic/versions + +# version path separator; As mentioned above, this is the character used to split +# version_locations. The default within new alembic.ini files is "os", which uses os.pathsep. +# If this key is omitted entirely, it falls back to the legacy behavior of splitting on spaces and/or commas. +# Valid values for version_path_separator are: +# +# version_path_separator = : +# version_path_separator = ; +# version_path_separator = space +# version_path_separator = newline +# +# Use os.pathsep. Default configuration used for new projects. +version_path_separator = os + +# set to 'true' to search source files recursively +# in each "version_locations" directory +# new in Alembic version 1.10 +# recursive_version_locations = false + +# the output encoding used when revision files +# are written from script.py.mako +# output_encoding = utf-8 + +[post_write_hooks] +# post_write_hooks defines scripts or Python functions that are run +# on newly generated revision scripts. See the documentation for further +# detail and examples + +# format using "black" - use the console_scripts runner, against the "black" entrypoint +# hooks = black +# black.type = console_scripts +# black.entrypoint = black +# black.options = -l 79 REVISION_SCRIPT_FILENAME + +# lint with attempts to fix using "ruff" - use the exec runner, execute a binary +# hooks = ruff +# ruff.type = exec +# ruff.executable = %(here)s/.venv/bin/ruff +# ruff.options = --fix REVISION_SCRIPT_FILENAME + +# Logging configuration +[loggers] +keys = root,sqlalchemy,alembic + +[handlers] +keys = console + +[formatters] +keys = generic + +[logger_root] +level = WARNING +handlers = console +qualname = + +[logger_sqlalchemy] +level = WARNING +handlers = +qualname = sqlalchemy.engine + +[logger_alembic] +level = INFO +handlers = +qualname = alembic + +[handler_console] +class = StreamHandler +args = (sys.stderr,) +level = NOTSET +formatter = generic + +[formatter_generic] +format = %(levelname)-5.5s [%(name)s] %(message)s +datefmt = %H:%M:%S diff --git a/migrations/README b/migrations/README new file mode 100644 index 0000000..98e4f9c --- /dev/null +++ b/migrations/README @@ -0,0 +1 @@ +Generic single-database configuration. \ No newline at end of file diff --git a/migrations/env.py b/migrations/env.py new file mode 100644 index 0000000..8c3ffd1 --- /dev/null +++ b/migrations/env.py @@ -0,0 +1,80 @@ +from logging.config import fileConfig +import os + +from sqlalchemy import engine_from_config +from sqlalchemy import pool + +from alembic import context + +from oc4ids_datastore_pipeline.database import Base + +# this is the Alembic Config object, which provides +# access to the values within the .ini file in use. +config = context.config + +# Interpret the config file for Python logging. +# This line sets up loggers basically. +if config.config_file_name is not None: + fileConfig(config.config_file_name) + +# add your model's MetaData object here +# for 'autogenerate' support +# from myapp import mymodel +# target_metadata = mymodel.Base.metadata +target_metadata = Base.metadata + +# other values from the config, defined by the needs of env.py, +# can be acquired: +# my_important_option = config.get_main_option("my_important_option") +# ... etc. +config.set_main_option("sqlalchemy.url", os.environ["DATABASE_URL"]) + + +def run_migrations_offline() -> None: + """Run migrations in 'offline' mode. + + This configures the context with just a URL + and not an Engine, though an Engine is acceptable + here as well. By skipping the Engine creation + we don't even need a DBAPI to be available. + + Calls to context.execute() here emit the given string to the + script output. + + """ + url = config.get_main_option("sqlalchemy.url") + context.configure( + url=url, + target_metadata=target_metadata, + literal_binds=True, + dialect_opts={"paramstyle": "named"}, + ) + + with context.begin_transaction(): + context.run_migrations() + + +def run_migrations_online() -> None: + """Run migrations in 'online' mode. + + In this scenario we need to create an Engine + and associate a connection with the context. + + """ + connectable = engine_from_config( + config.get_section(config.config_ini_section, {}), + prefix="sqlalchemy.", + poolclass=pool.NullPool, + ) + + with connectable.connect() as connection: + context.configure(connection=connection, target_metadata=target_metadata) + + with context.begin_transaction(): + context.run_migrations() + + +if context.is_offline_mode(): + run_migrations_offline() +else: + run_migrations_online() diff --git a/migrations/script.py.mako b/migrations/script.py.mako new file mode 100644 index 0000000..fbc4b07 --- /dev/null +++ b/migrations/script.py.mako @@ -0,0 +1,26 @@ +"""${message} + +Revision ID: ${up_revision} +Revises: ${down_revision | comma,n} +Create Date: ${create_date} + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa +${imports if imports else ""} + +# revision identifiers, used by Alembic. +revision: str = ${repr(up_revision)} +down_revision: Union[str, None] = ${repr(down_revision)} +branch_labels: Union[str, Sequence[str], None] = ${repr(branch_labels)} +depends_on: Union[str, Sequence[str], None] = ${repr(depends_on)} + + +def upgrade() -> None: + ${upgrades if upgrades else "pass"} + + +def downgrade() -> None: + ${downgrades if downgrades else "pass"} diff --git a/migrations/versions/aaabf849b37f_create_dataset_table.py b/migrations/versions/aaabf849b37f_create_dataset_table.py new file mode 100644 index 0000000..e22837e --- /dev/null +++ b/migrations/versions/aaabf849b37f_create_dataset_table.py @@ -0,0 +1,39 @@ +"""create dataset table + +Revision ID: aaabf849b37f +Revises: +Create Date: 2025-02-04 17:14:46.411090 + +""" + +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision: str = "aaabf849b37f" +down_revision: Union[str, None] = None +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.create_table( + "dataset", + sa.Column("dataset_id", sa.String(), nullable=False), + sa.Column("source_url", sa.String(), nullable=False), + sa.Column("publisher_name", sa.String(), nullable=False), + sa.Column("json_url", sa.String(), nullable=False), + sa.Column("updated_at", sa.DateTime(timezone=True), nullable=False), + sa.PrimaryKeyConstraint("dataset_id"), + ) + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.drop_table("dataset") + # ### end Alembic commands ### diff --git a/oc4ids_datastore_pipeline/database.py b/oc4ids_datastore_pipeline/database.py new file mode 100644 index 0000000..8c6f0dd --- /dev/null +++ b/oc4ids_datastore_pipeline/database.py @@ -0,0 +1,42 @@ +import datetime +import logging +import os + +from sqlalchemy import ( + DateTime, + Engine, + String, + create_engine, +) +from sqlalchemy.orm import DeclarativeBase, Mapped, Session, mapped_column + +logger = logging.getLogger(__name__) + +_engine = None + + +class Base(DeclarativeBase): + pass + + +class Dataset(Base): + __tablename__ = "dataset" + + dataset_id: Mapped[str] = mapped_column(String, primary_key=True) + source_url: Mapped[str] = mapped_column(String) + publisher_name: Mapped[str] = mapped_column(String) + json_url: Mapped[str] = mapped_column(String) + updated_at: Mapped[datetime.datetime] = mapped_column(DateTime(timezone=True)) + + +def get_engine() -> Engine: + global _engine + if _engine is None: + _engine = create_engine(os.environ["DATABASE_URL"]) + return _engine + + +def save_dataset(dataset: Dataset) -> None: + with Session(get_engine()) as session: + session.merge(dataset) + session.commit() diff --git a/oc4ids_datastore_pipeline/pipeline.py b/oc4ids_datastore_pipeline/pipeline.py index 64ce207..3b4750d 100644 --- a/oc4ids_datastore_pipeline/pipeline.py +++ b/oc4ids_datastore_pipeline/pipeline.py @@ -1,3 +1,4 @@ +import datetime import json import logging import os @@ -6,6 +7,8 @@ import requests from libcoveoc4ids.api import oc4ids_json_output +from oc4ids_datastore_pipeline.database import Dataset, save_dataset + logger = logging.getLogger(__name__) @@ -62,12 +65,34 @@ def write_json_to_file(file_name: str, json_data: Any) -> None: raise Exception("Error while writing to JSON file", e) +def save_dataset_metadata( + dataset_name: str, source_url: str, publisher_name: str, file_name: str +) -> None: + logger.info(f"Saving metadata for dataset {dataset_name}") + dataset = Dataset( + dataset_id=dataset_name, + source_url=source_url, + publisher_name=publisher_name, + json_url=file_name, + updated_at=datetime.datetime.now(datetime.UTC), + ) + save_dataset(dataset) + + def process_dataset(dataset_name: str, dataset_url: str) -> None: logger.info(f"Processing dataset {dataset_name}") try: json_data = download_json(dataset_url) validate_json(dataset_name, json_data) - write_json_to_file(f"data/{dataset_name}.json", json_data) + file_name = f"data/{dataset_name}.json" + write_json_to_file(file_name, json_data) + publisher_name = json_data.get("publisher", {}).get("name", "") + save_dataset_metadata( + dataset_name=dataset_name, + source_url=dataset_url, + publisher_name=publisher_name, + file_name=file_name, + ) logger.info(f"Processed dataset {dataset_name}") except Exception as e: logger.warning(f"Failed to process dataset {dataset_name} with error {e}") diff --git a/pyproject.toml b/pyproject.toml index a12b620..b1e19c3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -8,8 +8,11 @@ description = "OC4IDS Datastore Pipeline" version = "0.1.0" readme = "README.md" dependencies = [ + "alembic", "libcoveoc4ids", + "psycopg2", "requests", + "sqlalchemy", ] [project.optional-dependencies] diff --git a/requirements_dev.txt b/requirements_dev.txt index c9d8620..7904b9d 100644 --- a/requirements_dev.txt +++ b/requirements_dev.txt @@ -4,6 +4,8 @@ # # pip-compile --extra=dev --output-file=requirements_dev.txt pyproject.toml # +alembic==1.14.1 + # via oc4ids-datastore-pipeline (pyproject.toml) attrs==25.1.0 # via # cattrs @@ -73,6 +75,10 @@ libcoveocds==0.16.4 # via libcoveoc4ids lxml==5.3.0 # via flattentool +mako==1.3.8 + # via alembic +markupsafe==3.0.2 + # via mako mccabe==0.7.0 # via flake8 mypy==1.14.1 @@ -103,6 +109,8 @@ platformdirs==4.3.6 # requests-cache pluggy==1.5.0 # via pytest +psycopg2==2.9.10 + # via oc4ids-datastore-pipeline (pyproject.toml) pycodestyle==2.12.1 # via flake8 pycparser==2.22 @@ -146,14 +154,20 @@ six==1.17.0 # via # rfc3339-validator # url-normalize +sqlalchemy==2.0.37 + # via + # alembic + # oc4ids-datastore-pipeline (pyproject.toml) transaction==5.0 # via zodb types-requests==2.32.0.20241016 # via oc4ids-datastore-pipeline (pyproject.toml) typing-extensions==4.12.2 # via + # alembic # mypy # referencing + # sqlalchemy url-normalize==1.4.3 # via requests-cache urllib3==2.3.0