Skip to content

Commit c6cf82c

Browse files
author
groovecoder
committed
Refactor meeting ingestion pipeline and add registry support
* Replaced `create_meetings_csv` with `get_new_meetings` to track and return only new meetings. * Added support for reading/writing a meeting registry to S3 using JSONL format. * Improved `parse_meetings` to reliably extract `clip_id` and video URL using regex. * Introduced `clip_id` field to the `Meeting` model. * Added utility functions `to_jsonl` and `from_jsonl` for model serialization/deserialization. * Renamed `get_meetings` to `get_tgov_meetings` for clarity.
1 parent b91af41 commit c6cf82c

File tree

5 files changed

+138
-68
lines changed

5 files changed

+138
-68
lines changed

flows/translate_meetings.py

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,14 @@
11
from prefect import flow
22

3-
from tasks.meetings import create_meetings_csv
3+
from tasks.meetings import get_new_meetings
44

55

66
@flow(log_prints=True)
77
async def translate_meetings():
8-
await create_meetings_csv()
9-
# TODO: await download_videos()
10-
# TODO: await transcribe_videos()
11-
# TODO: await diarize_transcriptions()
12-
# TODO: await translate_transcriptions()
13-
# TODO: await create_subtitled_video_pages()
8+
new_meetings = await get_new_meetings()
9+
# new_transcribed_meetings = await transcribe_videos(new_meetings)
10+
# new_subtitled_video_pages = await create_subtitled_video_pages(new_transcribed_meetings)
11+
# new_translated_meetings = await translate_transcriptions(new_transcribed_meetings)
1412

1513
if __name__ == "__main__":
1614
import asyncio

src/meetings.py

Lines changed: 71 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,22 @@
66
Television websites.
77
"""
88

9-
from typing import Dict, List
9+
import re
10+
from typing import Dict, List, Sequence
1011
from urllib.parse import urljoin
1112

1213
import aiohttp
1314
import pandas as pd
1415
from selectolax.parser import HTMLParser
1516

17+
from src.aws import is_aws_configured
18+
from src.models.utils import from_jsonl, to_jsonl
19+
1620
from .models.meeting import Meeting
1721

1822
BASE_URL = "https://tulsa-ok.granicus.com/ViewPublisher.php?view_id=4"
23+
TGOV_BUCKET_NAME = "tgov-meetings"
24+
MEETINGS_REGISTRY_PATH = "data/meetings.jsonl"
1925

2026

2127
async def fetch_page(url: str, session: aiohttp.ClientSession) -> str:
@@ -35,6 +41,10 @@ async def fetch_page(url: str, session: aiohttp.ClientSession) -> str:
3541
return await response.text()
3642

3743

44+
def clean_date(date: str) -> str:
45+
return re.sub(r"\s+", "", date)
46+
47+
3848
async def parse_meetings(html: str) -> List[Dict[str, str]]:
3949
"""
4050
Parse the meeting data from the HTML content.
@@ -69,9 +79,10 @@ async def parse_meetings(html: str) -> List[Dict[str, str]]:
6979

7080
meeting_data = {
7181
"meeting": cells[0].text().strip(),
72-
"date": cells[1].text().strip(),
82+
"date": clean_date(cells[1].text().strip()),
7383
"duration": cells[2].text().strip(),
7484
"agenda": None,
85+
"clip_id": None,
7586
"video": None,
7687
}
7788

@@ -86,37 +97,22 @@ async def parse_meetings(html: str) -> List[Dict[str, str]]:
8697
# Extract video link if available
8798
video_cell = cells[4]
8899
video_link = video_cell.css_first("a")
89-
if video_link:
90-
# First try to extract from onclick attribute
91-
onclick = video_link.attributes.get("onclick", "")
92-
if onclick:
93-
# Look for window.open pattern
94-
if "window.open(" in onclick:
95-
# Extract URL from window.open('URL', ...)
96-
start_quote = onclick.find("'", onclick.find("window.open("))
97-
end_quote = onclick.find("'", start_quote + 1)
98-
if start_quote > 0 and end_quote > start_quote:
99-
video_url = onclick[start_quote + 1 : end_quote]
100-
# Handle protocol-relative URLs (starting with //)
101-
if video_url.startswith("//"):
102-
video_url = f"https:{video_url}"
103-
meeting_data["video"] = video_url
104-
105-
# If onclick extraction failed, try href
106-
if meeting_data["video"] is None and video_link.attributes.get("href"):
107-
href = video_link.attributes.get("href")
108-
# Handle javascript: hrefs
109-
if href.startswith("javascript:"):
110-
# Try to extract clip_id from the onclick attribute again
111-
# This handles cases where href is javascript:void(0) but onclick has the real URL
112-
if meeting_data["video"] is None and "clip_id=" in onclick:
113-
start_idx = onclick.find("clip_id=")
114-
end_idx = onclick.find("'", start_idx)
115-
if start_idx > 0 and end_idx > start_idx:
116-
clip_id = onclick[start_idx + 8 : end_idx]
117-
meeting_data["video"] = (
118-
f"https://tulsa-ok.granicus.com/MediaPlayer.php?view_id=4&clip_id={clip_id}"
119-
)
100+
onclick = video_link.attributes.get("onclick", "")
101+
onclick_match = re.search(r"window\.open\(['\"](//[^'\"]+)['\"]", onclick)
102+
clip_id_exp = r"clip_id=(\d+)"
103+
104+
if onclick_match:
105+
meeting_data["video"] = f"https:{onclick_match.group(1)}"
106+
meeting_data["clip_id"] = re.search(clip_id_exp, onclick).group(1)
107+
108+
if not meeting_data["video"]:
109+
href = video_link.attributes.get("href", "")
110+
if href.startswith("javascript:"):
111+
clip_id_match = re.search(clip_id_exp, href)
112+
if clip_id_match:
113+
clip_id = clip_id_match.group(1)
114+
meeting_data["clip_id"] = clip_id
115+
meeting_data["video"] = f"https://tulsa-ok.granicus.com/MediaPlayer.php?view_id=4&clip_id={clip_id}"
120116
else:
121117
meeting_data["video"] = urljoin(BASE_URL, href)
122118

@@ -125,7 +121,7 @@ async def parse_meetings(html: str) -> List[Dict[str, str]]:
125121
return meetings
126122

127123

128-
async def get_meetings() -> List[Meeting]:
124+
async def get_tgov_meetings() -> Sequence[Meeting]:
129125
"""
130126
Fetch and parse meeting data from the Government Access Television website.
131127
@@ -164,3 +160,44 @@ def duration_to_minutes(duration):
164160
return hours * 60 + minutes
165161
except:
166162
return None
163+
164+
165+
def get_registry_meetings() -> Sequence[Meeting]:
166+
if is_aws_configured():
167+
print(f'Getting registry from AWS S3 bucket: {TGOV_BUCKET_NAME}, path: {MEETINGS_REGISTRY_PATH}')
168+
import boto3
169+
from botocore.exceptions import ClientError
170+
s3 = boto3.client('s3')
171+
try:
172+
registry_response = s3.get_object(Bucket=TGOV_BUCKET_NAME, Key=MEETINGS_REGISTRY_PATH)
173+
registry_body = registry_response['Body'].read().decode('utf-8')
174+
return from_jsonl(registry_body, Meeting)
175+
except ClientError as e:
176+
if e.response['Error']['Code'] == 'NoSuchKey':
177+
print('No registry file found on S3. Returning empty list.')
178+
179+
return []
180+
181+
182+
def write_registry_meetings(meetings: Sequence[Meeting]) -> Sequence[Meeting]:
183+
jsonl_str = to_jsonl(meetings)
184+
185+
if is_aws_configured():
186+
print(f'Writing registry to AWS S3 bucket: {TGOV_BUCKET_NAME}, path: {MEETINGS_REGISTRY_PATH}')
187+
import boto3
188+
from botocore.exceptions import ClientError
189+
s3 = boto3.client('s3')
190+
191+
try:
192+
s3.put_object(
193+
Bucket=TGOV_BUCKET_NAME,
194+
Key=MEETINGS_REGISTRY_PATH,
195+
Body=jsonl_str,
196+
ContentType='application/x-ndjson'
197+
)
198+
print(f'Wrote {len(meetings)} meetings to S3.')
199+
except ClientError as e:
200+
print(f"Failed to write to S3: {e}")
201+
raise
202+
203+
return meetings

src/models/meeting.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
Pydantic models for meeting data
33
"""
44

5-
from datetime import datetime
65
from typing import Optional
76

87
from pydantic import BaseModel, Field, HttpUrl
@@ -18,6 +17,7 @@ class Meeting(BaseModel):
1817
duration: str = Field(description="Duration of the meeting")
1918
agenda: Optional[HttpUrl] = Field(None, description="URL to the meeting agenda")
2019
video: Optional[HttpUrl] = Field(None, description="URL to the meeting video")
20+
clip_id: Optional[str] = Field(None, description="Granicus clip ID")
2121

2222
def __str__(self) -> str:
2323
"""String representation of the meeting"""

src/models/utils.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
import json
2+
from typing import Sequence, Type, TypeVar
3+
4+
5+
from pydantic import BaseModel
6+
7+
8+
T = TypeVar("T", bound=BaseModel)
9+
10+
11+
def to_jsonl(models: Sequence[T]) -> str:
12+
"""
13+
Serialize a list of Pydantic models to a JSONL (JSON Lines) formatted string.
14+
15+
Each model is serialized to a single line of JSON using `model_dump_json()`.
16+
17+
Args:
18+
models: A list of Pydantic BaseModel instances.
19+
20+
Returns:
21+
A JSONL-formatted string with one model per line.
22+
"""
23+
return "\n".join(model.model_dump_json() for model in models)
24+
25+
26+
def from_jsonl(jsonl_str: str, model_class: Type[T]) -> Sequence[T]:
27+
"""
28+
Deserialize a JSONL string into a list of Pydantic model instances.
29+
30+
Args:
31+
jsonl_str: The JSON Lines string to parse.
32+
model_class: The Pydantic model class to use for validation.
33+
34+
Returns:
35+
A list of instances of the specified Pydantic model class.
36+
"""
37+
return [model_class.model_validate(json.loads(line)) for line in jsonl_str.strip().splitlines()]

tasks/meetings.py

Lines changed: 24 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,29 @@
1-
import os
2-
3-
import pandas as pd
1+
from typing import Sequence
42
from prefect import task
53

6-
from src.aws import create_bucket_if_not_exists, is_aws_configured, upload_to_s3
7-
from src.meetings import duration_to_minutes, get_meetings
8-
4+
from src.meetings import get_tgov_meetings, get_registry_meetings, write_registry_meetings
5+
from src.models.meeting import Meeting
96

10-
file_path = 'data/meetings.csv' # Path where the file will be saved locally temporarily
11-
meetings_bucket_name = 'tgov-meetings'
127

138
@task
14-
async def create_meetings_csv():
15-
meetings = await get_meetings()
16-
print(f"Got meetings: {meetings}")
17-
meeting_dicts = [meeting.model_dump() for meeting in meetings]
18-
print(f"meeting_dicts: {meeting_dicts}")
19-
df = pd.DataFrame(meeting_dicts)
20-
df['duration_minutes'] = df['duration'].apply(duration_to_minutes)
21-
df.to_csv(file_path, index=False)
22-
23-
if is_aws_configured():
24-
print(f"file_path: {file_path}")
25-
create_bucket_if_not_exists(meetings_bucket_name)
26-
if not upload_to_s3(file_path, meetings_bucket_name, file_path):
27-
raise RuntimeError("Failed to upload to S3")
28-
os.remove(file_path) # Remove local file after successful upload
29-
else:
30-
output_path = 'meetings.csv' # Local path if AWS is not configured
31-
df.to_csv(output_path, index=False)
9+
async def get_new_meetings():
10+
# TODO: accept max_limit parameter
11+
tgov_meetings: Sequence[Meeting] = await get_tgov_meetings()
12+
print(f"Got {len(tgov_meetings)} tgov meetings.")
13+
tgov_clip_ids = [tm.clip_id for tm in tgov_meetings]
14+
# print(f"tgov_clip_ids: {tgov_clip_ids}")
15+
16+
registry_meetings: Sequence[Meeting] = get_registry_meetings()
17+
print(f"Got {len(registry_meetings)} registry meetings.")
18+
19+
registry_clip_ids = [rm.clip_id for rm in registry_meetings]
20+
21+
new_meetings: Sequence[Meeting] = [tm for tm in tgov_meetings if tm.clip_id not in registry_clip_ids]
22+
23+
if new_meetings:
24+
registry_meetings += new_meetings
25+
write_registry_meetings(registry_meetings)
26+
return new_meetings
27+
28+
print(f"No new meetings. {len(registry_meetings)} in registry.")
29+
return []

0 commit comments

Comments
 (0)