Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
0a33aad
Add airtable_issue_management DAG with close_expired_issues task
Feb 18, 2026
d57e02c
Fix Secret Manager hook import for Composer
Feb 18, 2026
821f4c0
Use Secret Manager client instead of Airflow hook
Feb 18, 2026
7ce964f
Replace query.sql with query.py for airtable_issue_management DAG
Feb 18, 2026
8bf403b
Fix query import for Composer DAG parsing
Feb 18, 2026
5c8dc1e
Fix query import for Composer DAG parsing
Feb 18, 2026
7223966
Fix query import for Composer DAG parsing
Feb 18, 2026
956c62c
Update airtable_issue_management DAG after rebase
Feb 19, 2026
f1f0564
Remove project qualification from BigQuery tables for env-specific re…
Feb 20, 2026
975be6e
Fix SQL: remove stray appended lines from QUERY
Feb 20, 2026
4bdba4b
Redeploy airtable_issue_management to staging
Feb 21, 2026
491f01d
Redeploy airtable_issue_management to staging (no-op change)
Feb 21, 2026
0e82c10
Fix email table HTML: include new_end_date inside table row
Feb 21, 2026
568ac14
Redeploy airtable_issue_management to staging
Feb 21, 2026
ebb48e3
Update close_expired_issues logic
Feb 25, 2026
3955fdd
Add queries for airtable_issue_management
Feb 25, 2026
5afaed8
Create mart table
erikamov Feb 26, 2026
307dc22
Create and Test BigQuery to Airtable Issues Operator
erikamov Feb 26, 2026
8f57398
Create new dag
erikamov Feb 26, 2026
2de9c07
Redeploy airtable_issue_management to staging
Feb 21, 2026
b94857c
Redeploy airtable_issue_management to staging
Feb 21, 2026
dd495f9
Remove deprecated query.py (moved to queries directory)
Feb 26, 2026
440eb54
Move fct_close_expired_issues model to mart/airtable_issue_management
Feb 26, 2026
c0d13c2
Restore model to mart/transit_database for now
Feb 26, 2026
2d02e61
Adding service_name to the table
Mar 4, 2026
94885d4
Adding service_name to the fact table
Mar 4, 2026
49639c4
Add Airtable email operator and tests
Mar 24, 2026
c817ac1
Temporarily removing LatestOnlyOperator
Mar 24, 2026
dcc67af
Temporarily removing LatestOnlyOperator
Mar 24, 2026
311cbd3
Temporarily removing LatestOnlyOperator
Mar 24, 2026
8b06384
Including LatestOnlyOperator
Mar 24, 2026
36c987c
Refactor Airtable issue management DAG and remove old conventional DAG
Mar 26, 2026
c3b991e
Fixing the dbt deploy - Sync Metabase error
Mar 26, 2026
8ce5294
Rename DAG to airtable_issue_management (remove _new suffix)
Mar 26, 2026
eda95c8
Adding Environmental Variables.
Mar 27, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions airflow/.production.env
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,5 @@ SENTRY_DSN=https://0fc56e5e8a96482da63b8d9dd3955ee7@sentry.k8s.calitp.jarv.us/2
SENTRY_ENVIRONMENT=cal-itp-data-infra
GTFS_RT_VALIDATOR_JAR=gtfs-realtime-validator-lib-1.0.0-20220223.003525-2.jar
GTFS_RT_VALIDATOR_VERSION=v1.0.0
AIRTABLE_ISSUE_MANAGEMENT_EMAIL=airtable-issue-alerts@dot.ca.gov
TRANSIT_DATA_QUALITY_ISSUES=Transit Data Quality Issues
2 changes: 2 additions & 0 deletions airflow/.staging.env
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,5 @@ SENTRY_DSN=https://0fc56e5e8a96482da63b8d9dd3955ee7@sentry.k8s.calitp.jarv.us/2
SENTRY_ENVIRONMENT=cal-itp-data-infra-staging
GTFS_RT_VALIDATOR_JAR=gtfs-realtime-validator-lib-1.0.0-20220223.003525-2.jar
GTFS_RT_VALIDATOR_VERSION=v1.0.0
AIRTABLE_ISSUE_MANAGEMENT_EMAIL=farhad.salemi@dot.ca.gov
TRANSIT_DATA_QUALITY_ISSUES=Staging Transit Data Quality Issues
11 changes: 10 additions & 1 deletion airflow/dags/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
| 2:00 PM | 6:00 AM | 7:00 AM | [dbt_all](#dbt_all) | Monday and Thursday |
| 2:00 PM | 6:00 AM | 7:00 AM | [dbt_daily](#dbt_daily) | Sunday, Tuesday, Wednesday, Friday, and Saturday |
| - | - | - | [dbt_manual](#dbt_manual)<br>[download_gtfs_schedule_v2](./download_gtfs_schedule_v2)<br>[unzip_and_validate_gtfs_schedule_hourly](./unzip_and_validate_gtfs_schedule_hourly)| Runs Only Manually |

| 2:00 PM | 6:00 AM | 6:00 AM | [airtable_issue_management](#airtable_issue_management) | Fridays |

## dbt_all

Expand Down Expand Up @@ -202,3 +202,12 @@
### Automated Tests

Each operator and hook file have pytest under `airflow/tests/` folder. Go to [running-automated-tests](https://github.com/cal-itp/data-infra/tree/main/airflow#running-automated-tests) for more information.

### airtable_issue_management

This DAG automates the lifecycle management of Transit Data Quality (TDQ) expired GTFS issues.
It performs the following tasks:
1. Queries BigQuery for expired or soon-to-expire GTFS issues that should be closed
2. Updates the corresponding records in Airtable to mark them as resolved
3. Sends an HTML email summarizing the updates
4. Logs any failed update batches for troubleshooting
66 changes: 66 additions & 0 deletions airflow/dags/airtable_issue_management.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import os
from datetime import datetime, timedelta

import pendulum
from operators.airtable_issues_email_operator import AirtableIssuesEmailOperator
from operators.airtable_issues_update_operator import AirtableIssuesUpdateOperator
from operators.bigquery_to_airtable_issues_operator import (
BigQueryToAirtableIssuesOperator,
)

from airflow import DAG
from airflow.models.xcom_arg import XComArg
from airflow.operators.latest_only import LatestOnlyOperator

local_tz = pendulum.timezone("America/Los_Angeles")

with DAG(
dag_id="airtable_issue_management",
tags=["airtable", "tdq", "automation"],
# Every Friday at 6am PT (1pm UTC during PDT / 2pm UTC during PST)
schedule="0 6 * * 5",
start_date=datetime(2026, 3, 20, tzinfo=local_tz),
catchup=False,
default_args={
"email": [os.getenv("AIRTABLE_ISSUE_MANAGEMENT_EMAIL")],
"air_table_name": os.getenv("TRANSIT_DATA_QUALITY_ISSUES"),
},
) as dag:
latest_only = LatestOnlyOperator(
task_id="latest_only",
depends_on_past=False,
)

airtable_issues = BigQueryToAirtableIssuesOperator(
task_id="bigquery_to_airtable_issues",
retries=1,
retry_delay=timedelta(seconds=10),
dataset_name="mart_transit_database",
table_name="fct_close_expired_issues",
)

update_airtable_issues = AirtableIssuesUpdateOperator(
task_id="update_airtable_issues",
retries=1,
retry_delay=timedelta(seconds=10),
airtable_conn_id="airtable_issue_management",
air_base_id="appmBGOFTvsDv4jdJ",
air_table_name=dag.default_args["air_table_name"],
rows=XComArg(airtable_issues),
)

send_airtable_issue_email = AirtableIssuesEmailOperator(
task_id="send_airtable_issue_email",
retries=1,
retry_delay=timedelta(seconds=10),
to_emails=dag.default_args["email"],
subject="[Airflow] Airtable Issue Management",
update_result=XComArg(update_airtable_issues),
)

(
latest_only
>> airtable_issues
>> update_airtable_issues
>> send_airtable_issue_email
)
37 changes: 37 additions & 0 deletions airflow/plugins/hooks/airtable_issues_hook.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
from __future__ import annotations

from typing import Any

from pyairtable import Api

from airflow.hooks.base import BaseHook
from airflow.models.connection import Connection


class AirtableIssuesHook(BaseHook):
connection: Connection
_api: Api | None

def __init__(self, airtable_conn_id: str = "airtable_issue_management") -> None:
super().__init__()
self.connection = BaseHook.get_connection(airtable_conn_id)
self._api = None

def api(self) -> Api:
if self._api is None:
self._api = Api(self.connection.password)
return self._api

def table(self, air_base_id: str, air_table_name: str):
return self.api().table(air_base_id, air_table_name)

def read(self, air_base_id: str, air_table_name: str) -> list[dict[str, Any]]:
return self.table(air_base_id, air_table_name).all()

def batch_update(
self,
air_base_id: str,
air_table_name: str,
records: list[dict[str, Any]],
) -> Any:
return self.table(air_base_id, air_table_name).batch_update(records)
116 changes: 116 additions & 0 deletions airflow/plugins/operators/airtable_issues_email_operator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
from __future__ import annotations

from typing import Any, Sequence

from airflow.models import BaseOperator
from airflow.models.taskinstance import Context
from airflow.utils.email import send_email


class AirtableIssuesEmailOperator(BaseOperator):
template_fields: Sequence[str] = (
"to_emails",
"subject",
"update_result",
)

def __init__(
self,
to_emails: list[str] | str,
update_result: dict[str, Any],
subject: str = "[Airflow] Airtable Issue Management",
**kwargs,
) -> None:
super().__init__(**kwargs)

self.to_emails = to_emails
self.subject = subject
self.update_result = update_result

def build_table_rows(self, email_rows: list[dict[str, Any]]) -> str:
table_rows = ""

for row in email_rows:
table_rows += (
f"<tr><td>{row.get('issue_number', '')}</td>"
f"<td>{row.get('gtfs_dataset_name', '')}</td>"
f"<td>{row.get('status', '')}</td>"
f"<td>{row.get('new_end_date', '')}</td></tr>"
)

return table_rows

def build_failed_html(self, failed_batches: list[dict[str, Any]]) -> str:
if not failed_batches:
return "None"

return "<br>".join(
[
f"Batch {batch['batch_num']}: {batch['error']}"
for batch in failed_batches
]
)

def build_email_body(
self,
email_rows: list[dict[str, Any]],
failed_batches: list[dict[str, Any]],
) -> str:
table_rows = self.build_table_rows(email_rows)
failed_html = self.build_failed_html(failed_batches)

return f"""
<b>✅ Successfully updated {len(email_rows)} Airtable records.</b><br><br>
<b>Closed About to Expire Issues:</b><br>
<table border="1" cellspacing="0" cellpadding="5">
<tr>
<th>Issue Number</th>
<th>GTFS Dataset Name</th>
<th>Status</th>
<th>New End Date</th>
</tr>
{table_rows}
</table><br><br>
<b>❌ Failed batches:</b><br>
{failed_html}
"""

def execute(self, context: Context) -> dict[str, Any]:
del context

update_result = self.update_result

if not update_result:
self.log.info("No update result found. Email not sent.")
return {
"email_sent": False,
"reason": "no_update_result",
}

email_rows = update_result.get("email_rows", [])
failed_batches = update_result.get("failed_batches", [])
updated_record_ids = update_result.get("updated_record_ids", [])

if not email_rows:
self.log.info("No updated rows. Email not sent.")
return {
"email_sent": False,
"reason": "no_updated_rows",
}

body = self.build_email_body(email_rows, failed_batches)

send_email(
to=self.to_emails,
subject=self.subject,
html_content=body,
)

self.log.info("Email sent via Airflow send_email.")

return {
"email_sent": True,
"updated_count": len(email_rows),
"updated_record_ids": updated_record_ids,
"failed_batches": failed_batches,
}
127 changes: 127 additions & 0 deletions airflow/plugins/operators/airtable_issues_update_operator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
from __future__ import annotations

from datetime import datetime, timezone
from typing import Any, Sequence

from hooks.airtable_issues_hook import AirtableIssuesHook

from airflow.models import BaseOperator
from airflow.models.taskinstance import Context


class AirtableIssuesUpdateOperator(BaseOperator):
template_fields: Sequence[str] = (
"air_base_id",
"air_table_name",
"airtable_conn_id",
"rows",
)

def __init__(
self,
air_base_id: str,
air_table_name: str,
rows: list[dict[str, Any]],
airtable_conn_id: str = "airtable_issue_management",
batch_size: int = 10,
**kwargs,
) -> None:
super().__init__(**kwargs)

self.air_base_id = air_base_id
self.air_table_name = air_table_name
self.rows = rows
self.airtable_conn_id = airtable_conn_id
self.batch_size = batch_size

def airtable_hook(self) -> AirtableIssuesHook:
return AirtableIssuesHook(airtable_conn_id=self.airtable_conn_id)

def today(self) -> str:
return datetime.now(timezone.utc).date().isoformat()

def derive_status(self, outreach_status: str | None) -> str:
if outreach_status == "Waiting on Customer Success":
return "Fixed - on its own"
return "Fixed - with Cal-ITP help"

def build_updates(self, rows: list[dict[str, Any]]) -> list[dict[str, Any]]:
resolution_date = self.today()

return [
{
"id": row["issue_source_record_id"],
"fields": {
"Status": self.derive_status(row.get("outreach_status")),
"Outreach Status": None,
"Resolution Date": resolution_date,
},
}
for row in rows
]

def build_email_rows(self, rows: list[dict[str, Any]]) -> list[dict[str, Any]]:
return [
{
"issue_number": row.get("issue_number"),
"gtfs_dataset_name": row.get("gtfs_dataset_name"),
"status": self.derive_status(row.get("outreach_status")),
"new_end_date": row.get("new_end_date"),
}
for row in rows
]

def execute(self, context: Context) -> dict[str, Any]:
del context

rows = self.rows

if not rows:
self.log.info("No Airtable issue rows found.")
return {
"updated_count": 0,
"updated_record_ids": [],
"failed_batches": [],
"email_rows": [],
}

updates = self.build_updates(rows)

updated_records: list[dict[str, Any]] = []
email_rows: list[dict[str, Any]] = []
failed_batches: list[dict[str, Any]] = []

for i in range(0, len(updates), self.batch_size):
start = i
end = i + self.batch_size
batch = updates[start:end]
batch_rows = rows[start:end]

batch_num = i // self.batch_size + 1

try:
self.airtable_hook().batch_update(
air_base_id=self.air_base_id,
air_table_name=self.air_table_name,
records=batch,
)
updated_records.extend(batch)
email_rows.extend(self.build_email_rows(batch_rows))
self.log.info("Updated Airtable batch %s successfully.", batch_num)

except Exception as exc:
self.log.exception("Failed Airtable batch %s", batch_num)
failed_batches.append(
{
"batch_num": batch_num,
"error": str(exc),
"record_ids": [record["id"] for record in batch],
}
)

return {
"updated_count": len(updated_records),
"updated_record_ids": [record["id"] for record in updated_records],
"failed_batches": failed_batches,
"email_rows": email_rows,
}
Loading
Loading