Skip to content

Commit 346c0bc

Browse files
authored
chore(rail): prod migration regen rail perf 20260101 to 20260106 (#712)
1 parent d903860 commit 346c0bc

File tree

7 files changed

+198
-4
lines changed

7 files changed

+198
-4
lines changed

src/lamp_py/migrations/README

Lines changed: 0 additions & 1 deletion
This file was deleted.

src/lamp_py/migrations/README.md

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
Generic single-database configuration.
2+
3+
# Regeneration instructions
4+
5+
1. run migration_template_generator.py
6+
2. fill it in with the migration instructions.
7+
3. deploy this to performance manager and ingestion
8+
- ingestion updates Metadata table
9+
- performance manager updates Data Tables
10+
11+
It is allowable to put both metadata and prod db migrations in a single prod migration script.
12+
13+
This is valid because ingestion deploys first (e.g. resetting metadata "finished processing" flags)
14+
Then performance manager deploys (e.g. clearing old data)
15+
Then performance manager checks if there is work (there is now, bc metadata is rest) and starts to re-process and fill in days (which are empty b/c they were cleared)
16+
17+
4. verify that the migration has worked (either by looking at updated_date and the reset-metadata flags on the database, or running another adhoc query to check those...)
18+
19+
5. in a new PR, bump version of the tableau upload file to regenerate the whole dataset (otherwise we only grab daily incremental changes)

src/lamp_py/migrations/migration_template_generator.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -80,10 +80,14 @@ def downgrade() -> None:
8080
if __name__ == "__main__":
8181
import uuid
8282

83-
short_desc = "reprocess_422_423"
83+
short_desc = input("Enter a short description for the migration: ").strip()
8484
uuid_new = uuid.uuid4().hex[-12:]
8585

86-
versions_dir = "{LAMP}/src/lamp_py/migrations/versions"
86+
import pathlib
87+
88+
# Get the directory containing the current script file
89+
BASE_DIR = pathlib.Path(__file__).resolve().parent
90+
versions_dir = BASE_DIR / "versions"
8791

8892
# List directories in the versions directory
8993
if os.path.exists(versions_dir):
@@ -96,7 +100,9 @@ def downgrade() -> None:
96100

97101
print(options)
98102
for o in options:
99-
latest_migration = sorted([d for d in os.listdir(os.path.join(versions_dir, o)) if not d.startswith("sql")])[-1]
103+
latest_migration = sorted(
104+
[d for d in os.listdir(os.path.join(versions_dir, o)) if not d.startswith("sql") and d.endswith(".py")]
105+
)[-1]
100106
parts = os.path.basename(latest_migration).split("_")
101107
breakpoint()
102108
increment_migration_count = str(int(parts[0]) + 1).zfill(3)

src/lamp_py/migrations/versions/metadata_staging/5853837233ce_backfill_2026_01_01_to_2026_01_04.py renamed to src/lamp_py/migrations/versions/metadata_staging/004_5853837233ce_backfill_2026_01_01_to_2026_01_04.py

File renamed without changes.
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
"""regen-rail-perf-20260101-to-20260104
2+
3+
Revision ID: f138635d1338
4+
Revises: 5e3066f113ff
5+
Create Date: 2026-02-28 11:31:59.737856
6+
7+
Details: regen-rail-perf-20260101-to-20260104 -
8+
This will clean up missing data from RDS performance issues/outage from 1/1 to 1/4
9+
Rerunning 1/1 - 1/5 just in case.
10+
* upgrade -> Delete all records from 1/1 to 1/5 in vehicle events and vehicle_trips
11+
-> Set all flags to "unprocessed" in metadata log from 1/1 to 1/5
12+
* downgrade -> Nothing
13+
"""
14+
15+
import logging
16+
import os
17+
import tempfile
18+
import polars as pl
19+
import pyarrow as pa
20+
import pyarrow.parquet as pq
21+
from typing import List
22+
23+
from alembic import op
24+
import sqlalchemy as sa
25+
from sqlalchemy.exc import ProgrammingError
26+
27+
from lamp_py.aws.s3 import download_file, upload_file
28+
from lamp_py.postgres.postgres_utils import DatabaseIndex, DatabaseManager
29+
30+
# revision identifiers, used by Alembic.
31+
revision = "f138635d1338"
32+
down_revision = "5e3066f113ff"
33+
branch_labels = None # tbd
34+
depends_on = None # tbd
35+
36+
37+
def upgrade() -> None:
38+
39+
# SELECT FROM vehicle_events WHERE service_date >= 20250404 AND service_date <= 20250423;"
40+
41+
clear_events = "DELETE FROM vehicle_events WHERE service_date >= 20260101 AND service_date <= 20260105;"
42+
op.execute(clear_events)
43+
44+
clear_trips = "DELETE FROM vehicle_trips WHERE service_date >= 20260101 AND service_date <= 20260105;"
45+
op.execute(clear_trips)
46+
47+
# Query to Check
48+
# SELECT created_on, rail_pm_processed, rail_pm_process_fail
49+
# FROM public.metadata_log
50+
# WHERE created_on > '2026-01-01' and created_on < '2026-01-05 23:59:59'
51+
# AND (path LIKE '%/RT_TRIP_UPDATES/%' or path LIKE '%/RT_VEHICLE_POSITIONS/%')
52+
# ORDER BY created_on;
53+
54+
try:
55+
update_md_query = """
56+
UPDATE
57+
metadata_log
58+
SET
59+
rail_pm_process_fail = false
60+
, rail_pm_processed = false
61+
WHERE
62+
created_on > '2026-01-01 00:00:00'
63+
and created_on < '2026-01-05 23:59:59'
64+
and (
65+
path LIKE '%/RT_TRIP_UPDATES/%'
66+
or path LIKE '%/RT_VEHICLE_POSITIONS/%'
67+
)
68+
;
69+
"""
70+
md_manager = DatabaseManager(DatabaseIndex.METADATA)
71+
md_manager.execute(sa.text(update_md_query))
72+
73+
except ProgrammingError as error:
74+
# Error 42P01 is an 'Undefined Table' error. This occurs when there is
75+
# no metadata_log table in the rail performance manager database
76+
#
77+
# Raise all other sql errors
78+
original_error = error.orig
79+
if original_error is not None and hasattr(original_error, "pgcode") and original_error.pgcode == "42P01":
80+
logging.info("No Metadata Table in Rail Performance Manager")
81+
else:
82+
raise
83+
84+
85+
def downgrade() -> None:
86+
pass
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
"""backfill_2026_01_06
2+
3+
Revision ID: 7cb3dbb1dac0
4+
Revises: f138635d1338
5+
Create Date: 2026-03-01 09:05:04.036779
6+
7+
Details: Query of metadata table revealed that 1/6 also failed processing, so generating that again
8+
9+
* upgrade -> same as previous migration but for 1/6
10+
* downgrade -> None
11+
"""
12+
13+
import logging
14+
import os
15+
import tempfile
16+
import polars as pl
17+
import pyarrow as pa
18+
import pyarrow.parquet as pq
19+
from typing import List
20+
21+
from alembic import op
22+
import sqlalchemy as sa
23+
from sqlalchemy.exc import ProgrammingError
24+
25+
from lamp_py.aws.s3 import download_file, upload_file
26+
from lamp_py.postgres.postgres_utils import DatabaseIndex, DatabaseManager
27+
28+
# revision identifiers, used by Alembic.
29+
revision = "7cb3dbb1dac0"
30+
down_revision = "f138635d1338"
31+
branch_labels = None # tbd
32+
depends_on = None # tbd
33+
34+
35+
def upgrade() -> None:
36+
37+
# SELECT FROM vehicle_events WHERE service_date >= 20250404 AND service_date <= 20250423;"
38+
39+
clear_events = "DELETE FROM vehicle_events WHERE service_date = 20260106;"
40+
op.execute(clear_events)
41+
42+
clear_trips = "DELETE FROM vehicle_trips WHERE service_date = 20260106;"
43+
op.execute(clear_trips)
44+
45+
# Query to Check
46+
# SELECT created_on, rail_pm_processed, rail_pm_process_fail
47+
# FROM public.metadata_log
48+
# WHERE created_on > '2026-01-06' and created_on < '2026-01-06 23:59:59'
49+
# AND (path LIKE '%/RT_TRIP_UPDATES/%' or path LIKE '%/RT_VEHICLE_POSITIONS/%')
50+
# ORDER BY created_on;
51+
52+
try:
53+
update_md_query = """
54+
UPDATE
55+
metadata_log
56+
SET
57+
rail_pm_process_fail = false
58+
, rail_pm_processed = false
59+
WHERE
60+
created_on > '2026-01-06 00:00:00'
61+
and created_on < '2026-01-06 23:59:59'
62+
and (
63+
path LIKE '%/RT_TRIP_UPDATES/%'
64+
or path LIKE '%/RT_VEHICLE_POSITIONS/%'
65+
)
66+
;
67+
"""
68+
md_manager = DatabaseManager(DatabaseIndex.METADATA)
69+
md_manager.execute(sa.text(update_md_query))
70+
71+
except ProgrammingError as error:
72+
# Error 42P01 is an 'Undefined Table' error. This occurs when there is
73+
# no metadata_log table in the rail performance manager database
74+
#
75+
# Raise all other sql errors
76+
original_error = error.orig
77+
if original_error is not None and hasattr(original_error, "pgcode") and original_error.pgcode == "42P01":
78+
logging.info("No Metadata Table in Rail Performance Manager")
79+
else:
80+
raise
81+
82+
83+
def downgrade() -> None:
84+
pass

src/lamp_py/migrations/versions/performance_manager_staging/587e2c484a7d_backfill_2026_01_01_to_2026_01_04.py renamed to src/lamp_py/migrations/versions/performance_manager_staging/013_587e2c484a7d_backfill_2026_01_01_to_2026_01_04.py

File renamed without changes.

0 commit comments

Comments
 (0)