Skip to content

Commit f4cdd8d

Browse files
committed
split out diarization and download
1 parent 34a3e71 commit f4cdd8d

File tree

11 files changed

+119
-19
lines changed

11 files changed

+119
-19
lines changed

.github/workflows/python-app.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,4 +46,4 @@ jobs:
4646

4747
- name: Run Diarization
4848
run: |
49-
/opt/poetry-venv/bin/python -m flows.translate_meetings
49+
/opt/poetry-venv/bin/python -m flows.transcribe_meetings

db/migrate_s3_path.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
"""
2+
Migration script to add s3_path field to existing meeting records.
3+
"""
4+
5+
from dyntastic import A
6+
from src.models.meeting import Meeting
7+
8+
9+
def migrate_s3_path():
10+
"""
11+
Add s3_path field to all existing meeting records that don't have it.
12+
"""
13+
print("Starting migration to add s3_path field to existing meetings...")
14+
15+
# Get all meetings
16+
meetings = Meeting.scan()
17+
updated_count = 0
18+
19+
for meeting in meetings:
20+
# Check if s3_path field exists and is None
21+
if not hasattr(meeting, "s3_path") or meeting.s3_path is None:
22+
print(f"Updating meeting: {meeting.meeting} ({meeting.date})")
23+
meeting.s3_path = None
24+
meeting.save()
25+
updated_count += 1
26+
27+
print(f"Migration complete. Updated {updated_count} meetings.")
28+
29+
30+
if __name__ == "__main__":
31+
migrate_s3_path()

db/queries.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@
44
from src.models.meeting import Meeting
55

66

7-
def get_meetings(days: int = 7, video: Optional[bool] = None) -> List[Meeting]:
7+
def get_meetings(
8+
days: int = 7, video: Optional[bool] = None, s3_path: Optional[bool] = None
9+
) -> List[Meeting]:
810
"""
911
Get meetings that occurred in the past number of days from now.
1012
"""
@@ -13,6 +15,11 @@ def get_meetings(days: int = 7, video: Optional[bool] = None) -> List[Meeting]:
1315
meetings = Meeting.scan(
1416
A.date >= target_date,
1517
)
16-
meetings_list = [m for m in meetings if (video is None or bool(m.video) == video)]
18+
meetings_list = [
19+
m
20+
for m in meetings
21+
if (video is None or bool(m.video) == video)
22+
and (s3_path is None or bool(m.s3_path) == s3_path)
23+
]
1724

1825
return list(meetings_list)
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,21 @@
11
from prefect import flow
22

33
from db.queries import get_meetings
4-
from tasks.diarize import diarize_meeting
4+
from tasks.diarize import download_video_and_put_in_s3
55
from tasks.meetings import register_meetings
66

77

8-
@flow(log_prints=True)
9-
def translate_meetings():
8+
# @flow(log_prints=True)
9+
def download_meetings():
1010
new_meetings = register_meetings()
1111
print(f"Registered {len(new_meetings)} new meetings")
12-
meetings_to_diarize = get_meetings(video=True)
13-
print(f"Found {len(meetings_to_diarize)} meetings to diarize")
14-
for meeting in meetings_to_diarize:
15-
diarize_meeting(meeting)
12+
meetings_to_download = get_meetings(days=7, video=True, s3_path=False)
13+
print(f"Found {len(meetings_to_download)} meetings to download")
14+
for meeting in meetings_to_download:
15+
download_video_and_put_in_s3(meeting)
1616
# new_subtitled_video_pages = await create_subtitled_video_pages(new_transcribed_meetings)
1717
# new_translated_meetings = await translate_transcriptions(new_transcribed_meetings)
1818

1919

2020
if __name__ == "__main__":
21-
translate_meetings()
21+
download_meetings()

flows/transcribe_meetings.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
from prefect import flow
2+
3+
from db.queries import get_meetings
4+
from tasks.diarize import diarize_meeting
5+
6+
7+
@flow(log_prints=True)
8+
def transcribe_meetings():
9+
meetings_to_diarize = get_meetings(video=True, s3_path=True)
10+
print(f"Found {len(meetings_to_diarize)} meetings to diarize")
11+
for meeting in meetings_to_diarize:
12+
diarize_meeting(meeting)
13+
14+
15+
if __name__ == "__main__":
16+
transcribe_meetings()

src/aws.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import os
2+
from pathlib import Path
23

34
import boto3
45
from botocore.exceptions import ClientError, NoCredentialsError, PartialCredentialsError
@@ -52,3 +53,21 @@ def save_content_to_s3(content, bucket_name, s3_key, content_type):
5253
region = s3_client.meta.region_name
5354
url = f"https://{bucket_name}.s3.{region}.amazonaws.com/{s3_key}"
5455
return HttpUrl(url)
56+
57+
58+
def get_video_from_s3(bucket_name, s3_path):
59+
try:
60+
# Create output directory if it doesn't exist
61+
output_dir = Path("data/video")
62+
output_dir.mkdir(parents=True, exist_ok=True)
63+
64+
# Define output path
65+
output_path = output_dir / Path(s3_path).name
66+
67+
# Download file from S3
68+
s3_client.download_file(bucket_name, s3_path, str(output_path))
69+
print(f"Downloaded {s3_path} from S3 to {output_path}")
70+
return output_path
71+
except ClientError as e:
72+
print(f"Failed to get video from S3: {str(e)}")
73+
return None

src/models/meeting.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from typing import Optional
66

77
from dyntastic import Dyntastic
8-
from pydantic import BaseModel, Field, HttpUrl
8+
from pydantic import BaseModel, Field, HttpUrl, ConfigDict
99
from datetime import datetime
1010
from typing import List
1111

@@ -18,6 +18,8 @@ class Meeting(Dyntastic):
1818
__table_name__ = "tgov-meeting"
1919
__hash_key__ = "clip_id"
2020

21+
model_config = ConfigDict(extra="ignore")
22+
2123
clip_id: Optional[str] = Field(None, description="Granicus clip ID")
2224
meeting: str = Field(description="Name of the meeting")
2325
date: datetime = Field(description="Date and time of the meeting")
@@ -30,6 +32,9 @@ class Meeting(Dyntastic):
3032
subtitles: Optional[List[HttpUrl]] = Field(
3133
None, description="URLs to the meeting subtitle tracks"
3234
)
35+
s3_path: Optional[str] = Field(
36+
default=None, description="S3 path to the meeting video"
37+
)
3338

3439
def __str__(self) -> str:
3540
"""String representation of the meeting"""

src/run_diarization.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from pathlib import Path
55

66
from src.aws import save_content_to_s3
7-
from src.models.meeting import GranicusPlayerPage
7+
from src.models.meeting import GranicusPlayerPage, Meeting
88
from src.granicus import get_video_player
99
from src.videos import download_file, transcribe_video_with_diarization
1010

@@ -38,7 +38,7 @@ def download_video(file_name: str, video_url: str):
3838
return video_file
3939

4040

41-
def run_diarization(video_file: Path):
41+
def run_diarization(video_file: Path, meeting: Meeting):
4242
transcription_dir = Path("data/transcripts")
4343

4444
transcription = asyncio.run(
@@ -51,6 +51,9 @@ def run_diarization(video_file: Path):
5151
f"{FOLDER_NAME}/{video_file.name}.json",
5252
"application/json",
5353
)
54+
meeting.transcripts = [f"{FOLDER_NAME}/{video_file.name}.json"]
55+
meeting.save()
56+
5457
print(transcription)
5558

5659

src/videos.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,6 @@ def download_file(url: str, output_path: Path):
7979
)
8080

8181
print(f"Download complete: {url}")
82-
# Add to S3
83-
upload_to_s3(output_path, BUCKET_NAME, f"{FOLDER_NAME}/{output_path.name}")
8482
return output_path
8583

8684

tasks/diarize.py

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,35 @@
1+
import os
2+
from src.aws import get_video_from_s3, upload_to_s3
13
from src.run_diarization import download_video, run_diarization
24
from prefect import task
35

46
from src.models.meeting import Meeting
57

68

9+
BUCKET_NAME = os.getenv("S3_BUCKET")
10+
FOLDER_NAME = "videos"
11+
12+
13+
# @task
14+
def download_video_and_put_in_s3(meeting: Meeting):
15+
video_file = download_video(f"{meeting.meeting}_{meeting.date}", str(meeting.video))
16+
if video_file:
17+
print(f"Uploading video to S3: {video_file}")
18+
s3_path = f"{FOLDER_NAME}/{video_file.name}"
19+
upload_to_s3(video_file, BUCKET_NAME, f"{FOLDER_NAME}/{video_file.name}")
20+
print(f"Uploaded video to S3: {s3_path}")
21+
print("Saving meeting.")
22+
meeting.s3_path = s3_path
23+
meeting.save()
24+
else:
25+
print("Video file not found")
26+
27+
728
@task
829
def diarize_meeting(meeting: Meeting):
9-
video_file = download_video(f"{meeting.meeting}_{meeting.date}", str(meeting.video))
30+
video_file = get_video_from_s3(BUCKET_NAME, meeting.s3_path)
1031
if video_file:
11-
run_diarization(video_file)
32+
run_diarization(video_file, meeting)
1233
else:
1334
print("Video file not found")
1435
# TODO: Update meeting with transcript location

0 commit comments

Comments
 (0)