Skip to content

Commit e1256ae

Browse files
Merge pull request #16 from OpenDataServices/3-database
Save metadata about valid datasets to database
2 parents cfd06ad + 9172f53 commit e1256ae

File tree

10 files changed

+366
-1
lines changed

10 files changed

+366
-1
lines changed

README.md

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ A Python application to validate and store published OC4IDS datasets.
77
### Prerequisites
88

99
- Python 3.12
10+
- Postgres
1011

1112
### Install Python requirements
1213

@@ -16,6 +17,18 @@ source .venv/bin/activate
1617
pip install -r requirements_dev.txt
1718
```
1819

20+
### Set database enrivonment variable
21+
22+
```
23+
export DATABASE_URL="postgresql://oc4ids_datastore@localhost/oc4ids_datastore"
24+
```
25+
26+
### Run database migrations
27+
28+
```
29+
alembic upgrade head
30+
```
31+
1932
### Run app
2033

2134
```
@@ -37,3 +50,9 @@ mypy oc4ids_datastore_pipeline/ tests/
3750
```
3851
pytest
3952
```
53+
54+
### Generating new database migrations
55+
56+
```
57+
alembic revision --autogenerate -m "<MESSAGE HERE>"
58+
```

alembic.ini

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
# A generic, single database configuration.
2+
3+
[alembic]
4+
# path to migration scripts
5+
# Use forward slashes (/) also on windows to provide an os agnostic path
6+
script_location = migrations
7+
8+
# template used to generate migration file names; The default value is %%(rev)s_%%(slug)s
9+
# Uncomment the line below if you want the files to be prepended with date and time
10+
# see https://alembic.sqlalchemy.org/en/latest/tutorial.html#editing-the-ini-file
11+
# for all available tokens
12+
# file_template = %%(year)d_%%(month).2d_%%(day).2d_%%(hour).2d%%(minute).2d-%%(rev)s_%%(slug)s
13+
14+
# sys.path path, will be prepended to sys.path if present.
15+
# defaults to the current working directory.
16+
prepend_sys_path = .
17+
18+
# timezone to use when rendering the date within the migration file
19+
# as well as the filename.
20+
# If specified, requires the python>=3.9 or backports.zoneinfo library and tzdata library.
21+
# Any required deps can installed by adding `alembic[tz]` to the pip requirements
22+
# string value is passed to ZoneInfo()
23+
# leave blank for localtime
24+
# timezone =
25+
26+
# max length of characters to apply to the "slug" field
27+
# truncate_slug_length = 40
28+
29+
# set to 'true' to run the environment during
30+
# the 'revision' command, regardless of autogenerate
31+
# revision_environment = false
32+
33+
# set to 'true' to allow .pyc and .pyo files without
34+
# a source .py file to be detected as revisions in the
35+
# versions/ directory
36+
# sourceless = false
37+
38+
# version location specification; This defaults
39+
# to alembic/versions. When using multiple version
40+
# directories, initial revisions must be specified with --version-path.
41+
# The path separator used here should be the separator specified by "version_path_separator" below.
42+
# version_locations = %(here)s/bar:%(here)s/bat:alembic/versions
43+
44+
# version path separator; As mentioned above, this is the character used to split
45+
# version_locations. The default within new alembic.ini files is "os", which uses os.pathsep.
46+
# If this key is omitted entirely, it falls back to the legacy behavior of splitting on spaces and/or commas.
47+
# Valid values for version_path_separator are:
48+
#
49+
# version_path_separator = :
50+
# version_path_separator = ;
51+
# version_path_separator = space
52+
# version_path_separator = newline
53+
#
54+
# Use os.pathsep. Default configuration used for new projects.
55+
version_path_separator = os
56+
57+
# set to 'true' to search source files recursively
58+
# in each "version_locations" directory
59+
# new in Alembic version 1.10
60+
# recursive_version_locations = false
61+
62+
# the output encoding used when revision files
63+
# are written from script.py.mako
64+
# output_encoding = utf-8
65+
66+
[post_write_hooks]
67+
# post_write_hooks defines scripts or Python functions that are run
68+
# on newly generated revision scripts. See the documentation for further
69+
# detail and examples
70+
71+
# format using "black" - use the console_scripts runner, against the "black" entrypoint
72+
# hooks = black
73+
# black.type = console_scripts
74+
# black.entrypoint = black
75+
# black.options = -l 79 REVISION_SCRIPT_FILENAME
76+
77+
# lint with attempts to fix using "ruff" - use the exec runner, execute a binary
78+
# hooks = ruff
79+
# ruff.type = exec
80+
# ruff.executable = %(here)s/.venv/bin/ruff
81+
# ruff.options = --fix REVISION_SCRIPT_FILENAME
82+
83+
# Logging configuration
84+
[loggers]
85+
keys = root,sqlalchemy,alembic
86+
87+
[handlers]
88+
keys = console
89+
90+
[formatters]
91+
keys = generic
92+
93+
[logger_root]
94+
level = WARNING
95+
handlers = console
96+
qualname =
97+
98+
[logger_sqlalchemy]
99+
level = WARNING
100+
handlers =
101+
qualname = sqlalchemy.engine
102+
103+
[logger_alembic]
104+
level = INFO
105+
handlers =
106+
qualname = alembic
107+
108+
[handler_console]
109+
class = StreamHandler
110+
args = (sys.stderr,)
111+
level = NOTSET
112+
formatter = generic
113+
114+
[formatter_generic]
115+
format = %(levelname)-5.5s [%(name)s] %(message)s
116+
datefmt = %H:%M:%S

migrations/README

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Generic single-database configuration.

migrations/env.py

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
from logging.config import fileConfig
2+
import os
3+
4+
from sqlalchemy import engine_from_config
5+
from sqlalchemy import pool
6+
7+
from alembic import context
8+
9+
from oc4ids_datastore_pipeline.database import Base
10+
11+
# this is the Alembic Config object, which provides
12+
# access to the values within the .ini file in use.
13+
config = context.config
14+
15+
# Interpret the config file for Python logging.
16+
# This line sets up loggers basically.
17+
if config.config_file_name is not None:
18+
fileConfig(config.config_file_name)
19+
20+
# add your model's MetaData object here
21+
# for 'autogenerate' support
22+
# from myapp import mymodel
23+
# target_metadata = mymodel.Base.metadata
24+
target_metadata = Base.metadata
25+
26+
# other values from the config, defined by the needs of env.py,
27+
# can be acquired:
28+
# my_important_option = config.get_main_option("my_important_option")
29+
# ... etc.
30+
config.set_main_option("sqlalchemy.url", os.environ["DATABASE_URL"])
31+
32+
33+
def run_migrations_offline() -> None:
34+
"""Run migrations in 'offline' mode.
35+
36+
This configures the context with just a URL
37+
and not an Engine, though an Engine is acceptable
38+
here as well. By skipping the Engine creation
39+
we don't even need a DBAPI to be available.
40+
41+
Calls to context.execute() here emit the given string to the
42+
script output.
43+
44+
"""
45+
url = config.get_main_option("sqlalchemy.url")
46+
context.configure(
47+
url=url,
48+
target_metadata=target_metadata,
49+
literal_binds=True,
50+
dialect_opts={"paramstyle": "named"},
51+
)
52+
53+
with context.begin_transaction():
54+
context.run_migrations()
55+
56+
57+
def run_migrations_online() -> None:
58+
"""Run migrations in 'online' mode.
59+
60+
In this scenario we need to create an Engine
61+
and associate a connection with the context.
62+
63+
"""
64+
connectable = engine_from_config(
65+
config.get_section(config.config_ini_section, {}),
66+
prefix="sqlalchemy.",
67+
poolclass=pool.NullPool,
68+
)
69+
70+
with connectable.connect() as connection:
71+
context.configure(connection=connection, target_metadata=target_metadata)
72+
73+
with context.begin_transaction():
74+
context.run_migrations()
75+
76+
77+
if context.is_offline_mode():
78+
run_migrations_offline()
79+
else:
80+
run_migrations_online()

migrations/script.py.mako

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
"""${message}
2+
3+
Revision ID: ${up_revision}
4+
Revises: ${down_revision | comma,n}
5+
Create Date: ${create_date}
6+
7+
"""
8+
from typing import Sequence, Union
9+
10+
from alembic import op
11+
import sqlalchemy as sa
12+
${imports if imports else ""}
13+
14+
# revision identifiers, used by Alembic.
15+
revision: str = ${repr(up_revision)}
16+
down_revision: Union[str, None] = ${repr(down_revision)}
17+
branch_labels: Union[str, Sequence[str], None] = ${repr(branch_labels)}
18+
depends_on: Union[str, Sequence[str], None] = ${repr(depends_on)}
19+
20+
21+
def upgrade() -> None:
22+
${upgrades if upgrades else "pass"}
23+
24+
25+
def downgrade() -> None:
26+
${downgrades if downgrades else "pass"}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
"""create dataset table
2+
3+
Revision ID: aaabf849b37f
4+
Revises:
5+
Create Date: 2025-02-04 17:14:46.411090
6+
7+
"""
8+
9+
from typing import Sequence, Union
10+
11+
from alembic import op
12+
import sqlalchemy as sa
13+
14+
15+
# revision identifiers, used by Alembic.
16+
revision: str = "aaabf849b37f"
17+
down_revision: Union[str, None] = None
18+
branch_labels: Union[str, Sequence[str], None] = None
19+
depends_on: Union[str, Sequence[str], None] = None
20+
21+
22+
def upgrade() -> None:
23+
# ### commands auto generated by Alembic - please adjust! ###
24+
op.create_table(
25+
"dataset",
26+
sa.Column("dataset_id", sa.String(), nullable=False),
27+
sa.Column("source_url", sa.String(), nullable=False),
28+
sa.Column("publisher_name", sa.String(), nullable=False),
29+
sa.Column("json_url", sa.String(), nullable=False),
30+
sa.Column("updated_at", sa.DateTime(timezone=True), nullable=False),
31+
sa.PrimaryKeyConstraint("dataset_id"),
32+
)
33+
# ### end Alembic commands ###
34+
35+
36+
def downgrade() -> None:
37+
# ### commands auto generated by Alembic - please adjust! ###
38+
op.drop_table("dataset")
39+
# ### end Alembic commands ###
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
import datetime
2+
import logging
3+
import os
4+
5+
from sqlalchemy import (
6+
DateTime,
7+
Engine,
8+
String,
9+
create_engine,
10+
)
11+
from sqlalchemy.orm import DeclarativeBase, Mapped, Session, mapped_column
12+
13+
logger = logging.getLogger(__name__)
14+
15+
_engine = None
16+
17+
18+
class Base(DeclarativeBase):
19+
pass
20+
21+
22+
class Dataset(Base):
23+
__tablename__ = "dataset"
24+
25+
dataset_id: Mapped[str] = mapped_column(String, primary_key=True)
26+
source_url: Mapped[str] = mapped_column(String)
27+
publisher_name: Mapped[str] = mapped_column(String)
28+
json_url: Mapped[str] = mapped_column(String)
29+
updated_at: Mapped[datetime.datetime] = mapped_column(DateTime(timezone=True))
30+
31+
32+
def get_engine() -> Engine:
33+
global _engine
34+
if _engine is None:
35+
_engine = create_engine(os.environ["DATABASE_URL"])
36+
return _engine
37+
38+
39+
def save_dataset(dataset: Dataset) -> None:
40+
with Session(get_engine()) as session:
41+
session.merge(dataset)
42+
session.commit()

oc4ids_datastore_pipeline/pipeline.py

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import datetime
12
import json
23
import logging
34
import os
@@ -6,6 +7,8 @@
67
import requests
78
from libcoveoc4ids.api import oc4ids_json_output
89

10+
from oc4ids_datastore_pipeline.database import Dataset, save_dataset
11+
912
logger = logging.getLogger(__name__)
1013

1114

@@ -62,12 +65,34 @@ def write_json_to_file(file_name: str, json_data: Any) -> None:
6265
raise Exception("Error while writing to JSON file", e)
6366

6467

68+
def save_dataset_metadata(
69+
dataset_name: str, source_url: str, publisher_name: str, file_name: str
70+
) -> None:
71+
logger.info(f"Saving metadata for dataset {dataset_name}")
72+
dataset = Dataset(
73+
dataset_id=dataset_name,
74+
source_url=source_url,
75+
publisher_name=publisher_name,
76+
json_url=file_name,
77+
updated_at=datetime.datetime.now(datetime.UTC),
78+
)
79+
save_dataset(dataset)
80+
81+
6582
def process_dataset(dataset_name: str, dataset_url: str) -> None:
6683
logger.info(f"Processing dataset {dataset_name}")
6784
try:
6885
json_data = download_json(dataset_url)
6986
validate_json(dataset_name, json_data)
70-
write_json_to_file(f"data/{dataset_name}.json", json_data)
87+
file_name = f"data/{dataset_name}.json"
88+
write_json_to_file(file_name, json_data)
89+
publisher_name = json_data.get("publisher", {}).get("name", "")
90+
save_dataset_metadata(
91+
dataset_name=dataset_name,
92+
source_url=dataset_url,
93+
publisher_name=publisher_name,
94+
file_name=file_name,
95+
)
7196
logger.info(f"Processed dataset {dataset_name}")
7297
except Exception as e:
7398
logger.warning(f"Failed to process dataset {dataset_name} with error {e}")

0 commit comments

Comments
 (0)