Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 5 additions & 7 deletions flows/translate_meetings.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
from prefect import flow

from tasks.meetings import create_meetings_csv
from tasks.meetings import get_new_meetings


@flow(log_prints=True)
async def translate_meetings():
await create_meetings_csv()
# TODO: await download_videos()
# TODO: await transcribe_videos()
# TODO: await diarize_transcriptions()
# TODO: await translate_transcriptions()
# TODO: await create_subtitled_video_pages()
new_meetings = await get_new_meetings()
# new_transcribed_meetings = await transcribe_videos(new_meetings)
# new_subtitled_video_pages = await create_subtitled_video_pages(new_transcribed_meetings)
# new_translated_meetings = await translate_transcriptions(new_transcribed_meetings)

if __name__ == "__main__":
import asyncio
Expand Down
148 changes: 98 additions & 50 deletions src/meetings.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,22 @@
Television websites.
"""

from typing import Dict, List
import re
from typing import Dict, List, Sequence
from urllib.parse import urljoin

import aiohttp
import pandas as pd
from selectolax.parser import HTMLParser

from src.aws import is_aws_configured
from src.models.utils import from_jsonl, to_jsonl

from .models.meeting import Meeting

BASE_URL = "https://tulsa-ok.granicus.com/ViewPublisher.php?view_id=4"
TGOV_BUCKET_NAME = "tgov-meetings"
MEETINGS_REGISTRY_PATH = "data/meetings.jsonl"


async def fetch_page(url: str, session: aiohttp.ClientSession) -> str:
Expand All @@ -35,6 +41,10 @@ async def fetch_page(url: str, session: aiohttp.ClientSession) -> str:
return await response.text()


def clean_date(date: str) -> str:
return re.sub(r"\s+", " ", date).strip()


async def parse_meetings(html: str) -> List[Dict[str, str]]:
"""
Parse the meeting data from the HTML content.
Expand All @@ -56,76 +66,73 @@ async def parse_meetings(html: str) -> List[Dict[str, str]]:

# Process each table
for table in tables:
# Find the tbody section which contains the actual meeting rows
tbody = table.css_first("tbody")
if not tbody:
continue

# Process each row in the tbody
for row in tbody.css("tr"):
for row in table.css("tr.listingRow"):
cells = row.css("td")
if len(cells) < 5:
continue
name_cells = row.css('td.listItem[headers^="Name"]')
meeting_name = name_cells[0].text().strip() if name_cells else "Unknown"

date_cells = row.css('td.listItem[headers^="Date"]')
raw_date = clean_date(date_cells[0].text().strip()) if date_cells else "Unknown"
meeting_date = raw_date.split("-")[0].strip() if "-" in raw_date else raw_date


duration_cells = row.css('td.listItem[headers^="Duration"]')
duration_str = duration_cells[0].text().strip() if duration_cells else "Unknown"
minutes = duration_to_minutes(duration_str)
meeting_duration = f"{minutes // 60}:{minutes % 60:02d}" if minutes is not None else "Unknown"


meeting_data = {
"meeting": cells[0].text().strip(),
"date": cells[1].text().strip(),
"duration": cells[2].text().strip(),
"meeting": meeting_name,
"date": meeting_date,
"duration": meeting_duration,
"agenda": None,
"clip_id": None,
"video": None,
}

# Extract agenda link if available
agenda_cell = cells[3]
agenda_link = agenda_cell.css_first("a")
if agenda_link and agenda_link.attributes.get("href"):
agenda_cells = row.css('td.listItem:has(a[href*="AgendaViewer.php"])')
agenda_link = agenda_cells[0].css_first("a") if agenda_cells else None
if agenda_link is not None:
meeting_data["agenda"] = urljoin(
BASE_URL, agenda_link.attributes.get("href")
)

# Extract video link if available
video_cell = cells[4]
video_link = video_cell.css_first("a")
if video_link:
# First try to extract from onclick attribute
video_cells = row.css('td.listItem[headers^="VideoLink"]')
video_cell = video_cells[0] if video_cells else None
if video_cell is not None:
video_link = video_cell.css_first("a")

onclick = video_link.attributes.get("onclick", "")
if onclick:
# Look for window.open pattern
if "window.open(" in onclick:
# Extract URL from window.open('URL', ...)
start_quote = onclick.find("'", onclick.find("window.open("))
end_quote = onclick.find("'", start_quote + 1)
if start_quote > 0 and end_quote > start_quote:
video_url = onclick[start_quote + 1 : end_quote]
# Handle protocol-relative URLs (starting with //)
if video_url.startswith("//"):
video_url = f"https:{video_url}"
meeting_data["video"] = video_url

# If onclick extraction failed, try href
if meeting_data["video"] is None and video_link.attributes.get("href"):
href = video_link.attributes.get("href")
# Handle javascript: hrefs
if href.startswith("javascript:"):
# Try to extract clip_id from the onclick attribute again
# This handles cases where href is javascript:void(0) but onclick has the real URL
if meeting_data["video"] is None and "clip_id=" in onclick:
start_idx = onclick.find("clip_id=")
end_idx = onclick.find("'", start_idx)
if start_idx > 0 and end_idx > start_idx:
clip_id = onclick[start_idx + 8 : end_idx]
meeting_data["video"] = (
f"https://tulsa-ok.granicus.com/MediaPlayer.php?view_id=4&clip_id={clip_id}"
)
onclick_match = re.search(r"window\.open\(['\"](//[^'\"]+)['\"]", onclick)
clip_id_exp = r"clip_id=(\d+)"

if onclick_match:
meeting_data["video"] = f"https:{onclick_match.group(1)}"
clip_id_match = re.search(clip_id_exp, onclick)
if clip_id_match:
meeting_data["clip_id"] = clip_id_match.group(1)
else:
meeting_data["video"] = urljoin(BASE_URL, href)
meeting_data["clip_id"] = None
if not meeting_data["video"]:
href = video_link.attributes.get("href", "")
if href.startswith("javascript:"):
clip_id_match = re.search(clip_id_exp, href)
if clip_id_match:
clip_id = clip_id_match.group(1)
meeting_data["clip_id"] = clip_id
meeting_data["video"] = f"https://tulsa-ok.granicus.com/MediaPlayer.php?view_id=4&clip_id={clip_id}"
else:
meeting_data["video"] = urljoin(BASE_URL, href)

meetings.append(meeting_data)

return meetings


async def get_meetings() -> List[Meeting]:
async def get_tgov_meetings() -> Sequence[Meeting]:
"""
Fetch and parse meeting data from the Government Access Television website.

Expand Down Expand Up @@ -164,3 +171,44 @@ def duration_to_minutes(duration):
return hours * 60 + minutes
except:
return None


def get_registry_meetings() -> Sequence[Meeting]:
if is_aws_configured():
print(f'Getting registry from AWS S3 bucket: {TGOV_BUCKET_NAME}, path: {MEETINGS_REGISTRY_PATH}')
import boto3
from botocore.exceptions import ClientError
s3 = boto3.client('s3')
try:
registry_response = s3.get_object(Bucket=TGOV_BUCKET_NAME, Key=MEETINGS_REGISTRY_PATH)
registry_body = registry_response['Body'].read().decode('utf-8')
return from_jsonl(registry_body, Meeting)
except ClientError as e:
if e.response['Error']['Code'] == 'NoSuchKey':
print('No registry file found on S3. Returning empty list.')

return []


def write_registry_meetings(meetings: Sequence[Meeting]) -> Sequence[Meeting]:
jsonl_str = to_jsonl(meetings)

if is_aws_configured():
print(f'Writing registry to AWS S3 bucket: {TGOV_BUCKET_NAME}, path: {MEETINGS_REGISTRY_PATH}')
import boto3
from botocore.exceptions import ClientError
s3 = boto3.client('s3')

try:
s3.put_object(
Bucket=TGOV_BUCKET_NAME,
Key=MEETINGS_REGISTRY_PATH,
Body=jsonl_str,
ContentType='application/x-ndjson'
)
print(f'Wrote {len(meetings)} meetings to S3.')
except ClientError as e:
print(f"Failed to write to S3: {e}")
raise

return meetings
2 changes: 1 addition & 1 deletion src/models/meeting.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
Pydantic models for meeting data
"""

from datetime import datetime
from typing import Optional

from pydantic import BaseModel, Field, HttpUrl
Expand All @@ -18,6 +17,7 @@ class Meeting(BaseModel):
duration: str = Field(description="Duration of the meeting")
agenda: Optional[HttpUrl] = Field(None, description="URL to the meeting agenda")
video: Optional[HttpUrl] = Field(None, description="URL to the meeting video")
clip_id: Optional[str] = Field(None, description="Granicus clip ID")

def __str__(self) -> str:
"""String representation of the meeting"""
Expand Down
37 changes: 37 additions & 0 deletions src/models/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import json
from typing import Sequence, Type, TypeVar


from pydantic import BaseModel


T = TypeVar("T", bound=BaseModel)


def to_jsonl(models: Sequence[T]) -> str:
"""
Serialize a list of Pydantic models to a JSONL (JSON Lines) formatted string.

Each model is serialized to a single line of JSON using `model_dump_json()`.

Args:
models: A list of Pydantic BaseModel instances.

Returns:
A JSONL-formatted string with one model per line.
"""
return "\n".join(model.model_dump_json() for model in models)


def from_jsonl(jsonl_str: str, model_class: Type[T]) -> Sequence[T]:
"""
Deserialize a JSONL string into a list of Pydantic model instances.

Args:
jsonl_str: The JSON Lines string to parse.
model_class: The Pydantic model class to use for validation.

Returns:
A list of instances of the specified Pydantic model class.
"""
return [model_class.model_validate(json.loads(line)) for line in jsonl_str.strip().splitlines()]
50 changes: 24 additions & 26 deletions tasks/meetings.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,29 @@
import os

import pandas as pd
from typing import Sequence
from prefect import task

from src.aws import create_bucket_if_not_exists, is_aws_configured, upload_to_s3
from src.meetings import duration_to_minutes, get_meetings

from src.meetings import get_tgov_meetings, get_registry_meetings, write_registry_meetings
from src.models.meeting import Meeting

file_path = 'data/meetings.csv' # Path where the file will be saved locally temporarily
meetings_bucket_name = 'tgov-meetings'

@task
async def create_meetings_csv():
meetings = await get_meetings()
print(f"Got meetings: {meetings}")
meeting_dicts = [meeting.model_dump() for meeting in meetings]
print(f"meeting_dicts: {meeting_dicts}")
df = pd.DataFrame(meeting_dicts)
df['duration_minutes'] = df['duration'].apply(duration_to_minutes)
df.to_csv(file_path, index=False)

if is_aws_configured():
print(f"file_path: {file_path}")
create_bucket_if_not_exists(meetings_bucket_name)
if not upload_to_s3(file_path, meetings_bucket_name, file_path):
raise RuntimeError("Failed to upload to S3")
os.remove(file_path) # Remove local file after successful upload
else:
output_path = 'meetings.csv' # Local path if AWS is not configured
df.to_csv(output_path, index=False)
async def get_new_meetings():
# TODO: accept max_limit parameter
tgov_meetings: Sequence[Meeting] = await get_tgov_meetings()
print(f"Got {len(tgov_meetings)} tgov meetings.")
tgov_clip_ids = [tm.clip_id for tm in tgov_meetings]
# print(f"tgov_clip_ids: {tgov_clip_ids}")

registry_meetings: Sequence[Meeting] = get_registry_meetings()
print(f"Got {len(registry_meetings)} registry meetings.")

registry_clip_ids = [rm.clip_id for rm in registry_meetings]

new_meetings: Sequence[Meeting] = [tm for tm in tgov_meetings if tm.clip_id not in registry_clip_ids]

if new_meetings:
registry_meetings += new_meetings
write_registry_meetings(registry_meetings)
return new_meetings

print(f"No new meetings. {len(registry_meetings)} in registry.")
return []
Loading