diff --git a/db/README.md b/db/README.md new file mode 100644 index 0000000..31ee620 --- /dev/null +++ b/db/README.md @@ -0,0 +1,62 @@ +# DynamoDB Database Setup and Usage + +## Setting Up the Database + +The `setup.py` script creates the DynamoDB table used to store meeting data. This should be run once before using the application for the first time. + +### Prerequisites + +1. AWS credentials configured in your environment: + - Set `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY` in your environment or in a `.env` file at the project root + - Ensure the AWS account has permissions to create and manage DynamoDB tables + +2. Required Python packages: + ```bash + poetry add boto3 python-dotenv + ``` + +### Running the Setup Script + +Basic usage with default settings: +```bash +poetry run python db/setup.py +``` + +Custom table name: +```bash +poetry run python db/setup.py --table-name MyCustomTable +``` + +Custom schema path: +```bash +poetry run python db/setup.py --schema-path /path/to/schema.json +``` + +The script is idempotent - you can run it multiple times without creating duplicate tables. + +## Table Structure + +The DynamoDB table uses the following structure: + +- **Primary Key:** + - Partition Key: `name` (String) - The name of the meeting + - Sort Key: `date` (String) - The date and time of the meeting + +- **Secondary Indexes:** + - `DateIndex` - Allows querying meetings by date + - `ClipIdIndex` - Allows querying meetings by clip ID + +- **Main Attributes:** + - `meeting` - Name of the meeting (String) + - `date` - Date and time of the meeting (String) + - `clip_id` - Granicus clip ID (String, optional) + - `value` - Map containing index values and all other meeting attributes + +## Data Storage Pattern + +Meeting data follows this pattern: +- Core identification fields (`name`, `date`, `clip_id`) are stored as top-level attributes to allow for efficient querying +- All other meeting details (duration, agenda URL, video URL, etc.) are stored in a single `value` map attribute + +## Limitations +- You cannot directly query or filter based on attributes inside the `value` map diff --git a/db/dynamo_example.py b/db/dynamo_example.py new file mode 100644 index 0000000..39a3430 --- /dev/null +++ b/db/dynamo_example.py @@ -0,0 +1,179 @@ +#!/usr/bin/env python3 +"""Example script demonstrating the use of DynamoDB service with Meeting objects.""" + +import asyncio +import os +from datetime import datetime +from typing import List + +from dotenv import load_dotenv +from pydantic import HttpUrl + +import sys +from pathlib import Path + +# Add the parent directory to the Python path to allow importing from src +sys.path.append(str(Path(__file__).parent.parent)) + +from db.meeting_db import DynamoDBService +from src.models.meeting import Meeting + + +async def main(): + """Demonstrate DynamoDB operations with Meeting objects.""" + # Load environment variables from .env file + load_dotenv() + + # Initialize DynamoDB service + dynamo_service = DynamoDBService(table_name="ExampleMeetings") + + # Example meeting data + meetings = [ + Meeting( + name="City Council", + date="2023-05-15T18:00:00", + duration="2h 15m", + agenda=HttpUrl("https://example.com/agenda1"), + video=HttpUrl("https://example.com/video1"), + clip_id="12345", + ), + Meeting( + name="Planning Commission", + date="2023-05-16T10:00:00", + duration="1h 30m", + agenda=HttpUrl("https://example.com/agenda2"), + video=HttpUrl("https://example.com/video2"), + clip_id="67890", + ), + Meeting( + name="City Council", + date="2023-05-22T18:00:00", + duration="2h 45m", + agenda=HttpUrl("https://example.com/agenda3"), + video=HttpUrl("https://example.com/video3"), + clip_id="54321", + ), + ] + + # Insert meetings + print("\n=== Inserting meetings ===") + for meeting in meetings: + success = await dynamo_service.save(meeting) + if success: + print(f"Successfully inserted: {meeting}") + else: + print(f"Failed to insert: {meeting}") + + # Query meetings by name + print("\n=== Querying meetings by name ===") + city_council_meetings = await dynamo_service.query_meetings_by_name("City Council") + print(f"Found {len(city_council_meetings)} City Council meetings:") + for meeting in city_council_meetings: + print(f" - {meeting}") + + # Query meetings by date + print("\n=== Querying meetings by date ===") + may16_meetings = await dynamo_service.query_meetings_by_date("2023-05-16T10:00:00") + print(f"Found {len(may16_meetings)} meetings on May 16, 2023:") + for meeting in may16_meetings: + print(f" - {meeting}") + + # Query meetings by clip_id + print("\n=== Querying meetings by clip_id ===") + clip_meetings = await dynamo_service.query_meetings_by_clip_id("67890") + print(f"Found {len(clip_meetings)} meetings with clip_id '67890':") + for meeting in clip_meetings: + print(f" - {meeting}") + + # Get a specific meeting + print("\n=== Getting a specific meeting ===") + specific_meeting = await dynamo_service.get_meeting( + "Planning Commission", "2023-05-16T10:00:00" + ) + if specific_meeting: + print(f"Found specific meeting: {specific_meeting}") + else: + print("Specific meeting not found") + + # Update meeting using the dictionary-based method + print("\n=== Updating meeting (dictionary-based) ===") + update_success = await dynamo_service.update_meeting( + "Planning Commission", + "2023-05-16T10:00:00", + { + "duration": "2h 0m", # Changed from 1h 30m to 2h 0m + "clip_id": "67890-updated", + }, + ) + if update_success: + print("Successfully updated Planning Commission meeting") + # Get the updated meeting to verify changes + updated_meeting = await dynamo_service.get_meeting( + "Planning Commission", "2023-05-16T10:00:00" + ) + if updated_meeting: + print(f"Updated meeting: {updated_meeting}") + else: + print("Failed to update meeting") + + # Query meetings with updated clip_id + print("\n=== Querying meetings by updated clip_id ===") + updated_clip_meetings = await dynamo_service.query_meetings_by_clip_id( + "67890-updated" + ) + print(f"Found {len(updated_clip_meetings)} meetings with clip_id '67890-updated':") + for meeting in updated_clip_meetings: + print(f" - {meeting}") + + # Update meeting using the model-based method + print("\n=== Updating meeting (model-based) ===") + # Create a partial model with only the fields to update + updated_model = Meeting( + name="City Council", + date="2023-05-15T18:00:00", + duration="3h 0m", # Changed from 2h 15m to 3h 0m + video=HttpUrl("https://example.com/video1-new"), + ) + + update_success = await dynamo_service.update( + "City Council", "2023-05-15T18:00:00", updated_model + ) + + if update_success: + print("Successfully updated City Council meeting") + # Get the updated meeting to verify changes + updated_meeting = await dynamo_service.get_meeting( + "City Council", "2023-05-15T18:00:00" + ) + if updated_meeting: + print(f"Updated meeting: {updated_meeting}") + else: + print("Failed to update meeting") + + # List all meetings + print("\n=== Listing all meetings ===") + all_meetings = await dynamo_service.all() + print(f"Found {len(all_meetings)} total meetings:") + for meeting in all_meetings: + print(f" - {meeting}") + + # Delete a meeting + print("\n=== Deleting a meeting ===") + delete_success = await dynamo_service.delete( + "Planning Commission", "2023-05-16T10:00:00" + ) + if delete_success: + print("Successfully deleted Planning Commission meeting") + else: + print("Failed to delete meeting") + + # Verify deletion by listing all meetings again + print("\n=== Verifying deletion ===") + remaining_meetings = await dynamo_service.all() + print(f"Remaining meetings ({len(remaining_meetings)}):") + for meeting in remaining_meetings: + print(f" - {meeting}") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/db/meeting_db.py b/db/meeting_db.py new file mode 100644 index 0000000..5e88b14 --- /dev/null +++ b/db/meeting_db.py @@ -0,0 +1,219 @@ +""" +DynamoDB service for storing and querying Meeting objects. +""" + +import os +import json +from pathlib import Path +from typing import List, Optional, Dict, Any +from datetime import datetime + +import boto3 +from botocore.exceptions import ClientError +from pydantic import BaseModel + +from src.models.meeting import Meeting, MeetingQuery + + +class DynamoDBService: + """Service for interacting with DynamoDB to store and retrieve Meeting objects.""" + + def __init__(self, table_name: str = "Meetings"): + """Initialize the DynamoDB service.""" + self.table_name = table_name + self.dynamodb = boto3.resource("dynamodb") + self.table = self.dynamodb.Table(table_name) + + async def save(self, meeting: Meeting) -> bool: + """Insert or update a Meeting in DynamoDB.""" + + try: + item = meeting.model_dump() + self.table.put_item(Item=item) + print(f"Meeting '{meeting.name}' on {meeting.date} saved to DynamoDB") + return True + except ClientError as e: + print(f"Error putting meeting: {e}") + return False + + async def query(self, query: MeetingQuery) -> List[Meeting]: + """ + Query meetings based on the provided MeetingQuery object. + Uses the most efficient index based on which fields are set in the query. + If multiple fields are set, returns only meetings matching all criteria. + """ + try: + # Get the non-None values from the query + query_dict = {k: v for k, v in query.model_dump().items() if v is not None} + + # If no query parameters, return empty list + if not query_dict: + print("No query parameters provided") + return [] + + # Choose the appropriate query method based on which fields are set + if "meeting" in query_dict: + # Query by meeting name (primary key) + key_condition = boto3.dynamodb.conditions.Key("meeting").eq(query.name) + + # If date is also specified, use it as a range condition + if "date" in query_dict: + key_condition = key_condition & boto3.dynamodb.conditions.Key( + "date" + ).eq(query.date) + + response = self.table.query(KeyConditionExpression=key_condition) + + elif "date" in query_dict: + # Query by date using DateIndex + response = self.table.query( + IndexName="DateIndex", + KeyConditionExpression=boto3.dynamodb.conditions.Key("date").eq( + query.date + ), + ) + + elif "clip_id" in query_dict: + # Query by clip_id using ClipIdIndex + response = self.table.query( + IndexName="ClipIdIndex", + KeyConditionExpression=boto3.dynamodb.conditions.Key("clip_id").eq( + query.clip_id + ), + ) + + else: + print( + "Invalid query: must specify at least one of meeting, date, or clip_id" + ) + return [] + + # Process the results + meetings = [] + for item in response.get("Items", []): + meetings.append(Meeting.model_validate(item)) + + # Handle pagination if needed + while "LastEvaluatedKey" in response: + # Use the appropriate pagination method based on which query was used + if "meeting" in query_dict: + response = self.table.query( + KeyConditionExpression=key_condition, + ExclusiveStartKey=response["LastEvaluatedKey"], + ) + elif "date" in query_dict: + response = self.table.query( + IndexName="DateIndex", + KeyConditionExpression=boto3.dynamodb.conditions.Key("date").eq( + query.date + ), + ExclusiveStartKey=response["LastEvaluatedKey"], + ) + elif "clip_id" in query_dict: + response = self.table.query( + IndexName="ClipIdIndex", + KeyConditionExpression=boto3.dynamodb.conditions.Key( + "clip_id" + ).eq(query.clip_id), + ExclusiveStartKey=response["LastEvaluatedKey"], + ) + + for item in response.get("Items", []): + meetings.append(Meeting.model_validate(item)) + + return meetings + + except ClientError as e: + print(f"Error querying meetings: {e}") + return [] + + async def all(self) -> List[Meeting]: + """List all meetings in the table.""" + if not self.is_configured(): + print("AWS credentials not configured") + return [] + + try: + response = self.table.scan() + + meetings = [] + for item in response.get("Items", []): + meetings.append(Meeting.model_validate(item)) + + # Handle pagination if needed + while "LastEvaluatedKey" in response: + response = self.table.scan( + ExclusiveStartKey=response["LastEvaluatedKey"] + ) + for item in response.get("Items", []): + meetings.append(Meeting.model_validate(item)) + + return meetings + except ClientError as e: + print(f"Error listing all meetings: {e}") + return [] + + async def update_meeting( + self, meeting_name: str, date: str, update_data: Dict[str, Any] + ) -> bool: + """Update specific attributes of a meeting without replacing the entire item.""" + + # Don't allow updating the primary key attributes + if "meeting" in update_data or "date" in update_data: + print("Cannot update primary key attributes (meeting, date)") + return False + + try: + # Build the update expression and attribute values + update_expression_parts = [] + expression_attribute_values = {} + + for key, value in update_data.items(): + update_expression_parts.append(f"#{key} = :{key}") + expression_attribute_values[f":{key}"] = value + + # Build expression attribute names (to handle reserved words) + expression_attribute_names = {f"#{key}": key for key in update_data.keys()} + + # Construct the complete update expression + update_expression = "SET " + ", ".join(update_expression_parts) + + self.table.update_item( + Key={"meeting": meeting_name, "date": date}, + UpdateExpression=update_expression, + ExpressionAttributeNames=expression_attribute_names, + ExpressionAttributeValues=expression_attribute_values, + ReturnValues="UPDATED_NEW", + ) + + print(f"Updated meeting '{meeting_name}' on {date}") + return True + except ClientError as e: + print(f"Error updating meeting: {e}") + return False + + async def update(self, meeting: Meeting) -> bool: + """Update a meeting using a Meeting model.""" + # Convert the model to a dict and remove None values + update_data = { + k: v + for k, v in meeting.model_dump().items() + if v is not None and k not in ["meeting", "date"] + } + + # Only update if there are fields to update + if not update_data: + print("No fields to update") + return False + + return await self.update_meeting(meeting_name, date, update_data) + + async def delete(self, meeting: MeetingQuery) -> bool: + + try: + self.table.delete_item(Key={"meeting": meeting.name, "date": meeting.date}) + print(f"Meeting '{meeting.name}' on {meeting.date} deleted") + return True + except ClientError as e: + print(f"Error deleting meeting: {e}") + return False diff --git a/db/schema.json b/db/schema.json new file mode 100644 index 0000000..38fe928 --- /dev/null +++ b/db/schema.json @@ -0,0 +1,65 @@ +{ + "TableName": "TGOV-Meetings", + "KeySchema": [ + { + "AttributeName": "name", + "KeyType": "HASH" + }, + { + "AttributeName": "date", + "KeyType": "RANGE" + } + ], + "AttributeDefinitions": [ + { + "AttributeName": "name", + "AttributeType": "S" + }, + { + "AttributeName": "date", + "AttributeType": "S" + }, + { + "AttributeName": "clip_id", + "AttributeType": "S" + } + ], + "ProvisionedThroughput": { + "ReadCapacityUnits": 5, + "WriteCapacityUnits": 5 + }, + "GlobalSecondaryIndexes": [ + { + "IndexName": "DateIndex", + "KeySchema": [ + { + "AttributeName": "date", + "KeyType": "HASH" + } + ], + "Projection": { + "ProjectionType": "ALL" + }, + "ProvisionedThroughput": { + "ReadCapacityUnits": 5, + "WriteCapacityUnits": 5 + } + }, + { + "IndexName": "ClipIdIndex", + "KeySchema": [ + { + "AttributeName": "clip_id", + "KeyType": "HASH" + } + ], + "Projection": { + "ProjectionType": "ALL" + }, + "ProvisionedThroughput": { + "ReadCapacityUnits": 5, + "WriteCapacityUnits": 5 + } + } + ] +} diff --git a/db/setup.py b/db/setup.py new file mode 100755 index 0000000..594be9b --- /dev/null +++ b/db/setup.py @@ -0,0 +1,112 @@ +#!/usr/bin/env python3 +""" +Setup script for creating the DynamoDB table. +This should be run once to initialize the database. +""" + +import os +import json +import argparse +from pathlib import Path +from typing import Dict, Any + +import boto3 +from botocore.exceptions import ClientError +from dotenv import load_dotenv + + +def load_schema(schema_path: Path) -> Dict[str, Any]: + """Load the DynamoDB table schema from JSON file.""" + try: + with open(schema_path, "r") as f: + return json.load(f) + except FileNotFoundError: + print(f"Schema file not found at {schema_path}") + return {} + except json.JSONDecodeError: + print(f"Invalid JSON in schema file at {schema_path}") + return {} + + +def is_aws_configured() -> bool: + """Check if AWS credentials are configured.""" + required_vars = ["AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY"] + return all(var in os.environ for var in required_vars) + + +def create_table(table_name: str, schema: Dict[str, Any]) -> bool: + """Create the DynamoDB table if it doesn't exist.""" + if not is_aws_configured(): + print( + "AWS credentials not configured. Please set AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY environment variables." + ) + return False + + if not schema: + print("Table schema is empty") + return False + + # Set the table name in the schema + schema["TableName"] = table_name + + dynamodb = boto3.resource("dynamodb") + + try: + # Check if table exists + dynamodb.meta.client.describe_table(TableName=table_name) + print(f"Table '{table_name}' already exists") + return True + except ClientError as e: + if e.response["Error"]["Code"] == "ResourceNotFoundException": + # Create the table + try: + table = dynamodb.create_table(**schema) + # Wait for the table to be created + table.meta.client.get_waiter("table_exists").wait(TableName=table_name) + print(f"Table '{table_name}' created successfully") + return True + except ClientError as create_error: + print(f"Failed to create table: {create_error}") + return False + else: + print(f"Error checking table existence: {e}") + return False + + +def main(): + """Main function to set up the DynamoDB table.""" + parser = argparse.ArgumentParser(description="Set up DynamoDB table") + parser.add_argument( + "--table-name", + default="TGOV-Meetings", + help="Name of the DynamoDB table to create", + ) + parser.add_argument( + "--schema-path", + default=str(Path(__file__).parent.parent / "db" / "schema.json"), + help="Path to the schema JSON file", + ) + args = parser.parse_args() + + # Load environment variables from .env file + load_dotenv() + + # Load the schema + schema_path = Path(args.schema_path) + schema = load_schema(schema_path) + + if not schema: + print("Failed to load schema. Exiting.") + return 1 + + # Create the table + if create_table(args.table_name, schema): + print("DynamoDB setup completed successfully.") + return 0 + else: + print("DynamoDB setup failed.") + return 1 + + +if __name__ == "__main__": + exit(main()) diff --git a/examples/dynamo_example.py b/examples/dynamo_example.py new file mode 100644 index 0000000..f938682 --- /dev/null +++ b/examples/dynamo_example.py @@ -0,0 +1,185 @@ +#!/usr/bin/env python3 +"""Example script demonstrating the use of DynamoDB service with Meeting objects.""" + +import asyncio +import os +from datetime import datetime +from typing import List + +from dotenv import load_dotenv +from pydantic import HttpUrl + +import sys +from pathlib import Path + +# Add the parent directory to the Python path to allow importing from src +sys.path.append(str(Path(__file__).parent.parent)) + +from db.meeting_db import DynamoDBService +from src.models.meeting import Meeting + + +async def main(): + """Demonstrate DynamoDB operations with Meeting objects.""" + # Load environment variables from .env file + load_dotenv() + + # Initialize DynamoDB service + dynamo_service = DynamoDBService(table_name="ExampleMeetings") + + # Create table if it doesn't exist + table_exists = dynamo_service.create_table_if_not_exists() + if not table_exists: + print("Failed to create or verify DynamoDB table. Exiting.") + return + + # Example meeting data + meetings = [ + Meeting( + name="City Council", + date="2023-05-15T18:00:00", + duration="2h 15m", + agenda=HttpUrl("https://example.com/agenda1"), + video=HttpUrl("https://example.com/video1"), + clip_id="12345", + ), + Meeting( + name="Planning Commission", + date="2023-05-16T10:00:00", + duration="1h 30m", + agenda=HttpUrl("https://example.com/agenda2"), + video=HttpUrl("https://example.com/video2"), + clip_id="67890", + ), + Meeting( + name="City Council", + date="2023-05-22T18:00:00", + duration="2h 45m", + agenda=HttpUrl("https://example.com/agenda3"), + video=HttpUrl("https://example.com/video3"), + clip_id="54321", + ), + ] + + # Insert meetings + print("\n=== Inserting meetings ===") + for meeting in meetings: + success = await dynamo_service.save(meeting) + if success: + print(f"Successfully inserted: {meeting}") + else: + print(f"Failed to insert: {meeting}") + + # Query meetings by name + print("\n=== Querying meetings by name ===") + city_council_meetings = await dynamo_service.query_meetings_by_name("City Council") + print(f"Found {len(city_council_meetings)} City Council meetings:") + for meeting in city_council_meetings: + print(f" - {meeting}") + + # Query meetings by date + print("\n=== Querying meetings by date ===") + may16_meetings = await dynamo_service.query_meetings_by_date("2023-05-16T10:00:00") + print(f"Found {len(may16_meetings)} meetings on May 16, 2023:") + for meeting in may16_meetings: + print(f" - {meeting}") + + # Query meetings by clip_id + print("\n=== Querying meetings by clip_id ===") + clip_meetings = await dynamo_service.query_meetings_by_clip_id("67890") + print(f"Found {len(clip_meetings)} meetings with clip_id '67890':") + for meeting in clip_meetings: + print(f" - {meeting}") + + # Get a specific meeting + print("\n=== Getting a specific meeting ===") + specific_meeting = await dynamo_service.get_meeting( + "Planning Commission", "2023-05-16T10:00:00" + ) + if specific_meeting: + print(f"Found specific meeting: {specific_meeting}") + else: + print("Specific meeting not found") + + # Update meeting using the dictionary-based method + print("\n=== Updating meeting (dictionary-based) ===") + update_success = await dynamo_service.update_meeting( + "Planning Commission", + "2023-05-16T10:00:00", + { + "duration": "2h 0m", # Changed from 1h 30m to 2h 0m + "clip_id": "67890-updated", + }, + ) + if update_success: + print("Successfully updated Planning Commission meeting") + # Get the updated meeting to verify changes + updated_meeting = await dynamo_service.get_meeting( + "Planning Commission", "2023-05-16T10:00:00" + ) + if updated_meeting: + print(f"Updated meeting: {updated_meeting}") + else: + print("Failed to update meeting") + + # Query meetings with updated clip_id + print("\n=== Querying meetings by updated clip_id ===") + updated_clip_meetings = await dynamo_service.query_meetings_by_clip_id( + "67890-updated" + ) + print(f"Found {len(updated_clip_meetings)} meetings with clip_id '67890-updated':") + for meeting in updated_clip_meetings: + print(f" - {meeting}") + + # Update meeting using the model-based method + print("\n=== Updating meeting (model-based) ===") + # Create a partial model with only the fields to update + updated_model = Meeting( + name="City Council", + date="2023-05-15T18:00:00", + duration="3h 0m", # Changed from 2h 15m to 3h 0m + video=HttpUrl("https://example.com/video1-new"), + ) + + update_success = await dynamo_service.update( + "City Council", "2023-05-15T18:00:00", updated_model + ) + + if update_success: + print("Successfully updated City Council meeting") + # Get the updated meeting to verify changes + updated_meeting = await dynamo_service.get_meeting( + "City Council", "2023-05-15T18:00:00" + ) + if updated_meeting: + print(f"Updated meeting: {updated_meeting}") + else: + print("Failed to update meeting") + + # List all meetings + print("\n=== Listing all meetings ===") + all_meetings = await dynamo_service.all() + print(f"Found {len(all_meetings)} total meetings:") + for meeting in all_meetings: + print(f" - {meeting}") + + # Delete a meeting + print("\n=== Deleting a meeting ===") + delete_success = await dynamo_service.delete( + "Planning Commission", "2023-05-16T10:00:00" + ) + if delete_success: + print("Successfully deleted Planning Commission meeting") + else: + print("Failed to delete meeting") + + # Verify deletion by listing all meetings again + print("\n=== Verifying deletion ===") + remaining_meetings = await dynamo_service.all() + print(f"Remaining meetings ({len(remaining_meetings)}):") + for meeting in remaining_meetings: + print(f" - {meeting}") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/pyproject.toml b/pyproject.toml index 51725d4..556c5c8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -24,9 +24,10 @@ jupyter = "^1.1.1" jupyter-nbextensions-configurator = "^0.6.4" python-dotenv = "^1.0.1" aiofiles = "^24.1.0" -faster-whisper = "^1.1.1" +faster-whisper = "1.1.0" prefect = "^3.3.0" boto3 = "^1.37.24" +whisperx = "^3.3.1" [tool.poetry.group.dev.dependencies] diff --git a/src/dynamo.py b/src/dynamo.py new file mode 100644 index 0000000..afff174 --- /dev/null +++ b/src/dynamo.py @@ -0,0 +1,284 @@ +""" +DynamoDB service for storing and querying Meeting objects. +""" + +import os +import json +from pathlib import Path +from typing import List, Optional, Dict, Any +from datetime import datetime + +import boto3 +from botocore.exceptions import ClientError +from pydantic import BaseModel + +from src.models.meeting import Meeting + + +class DynamoDBService: + """Service for interacting with DynamoDB to store and retrieve Meeting objects.""" + + def __init__(self, table_name: str = "Meetings"): + """Initialize the DynamoDB service.""" + self.table_name = table_name + self.dynamodb = boto3.resource("dynamodb") + self.table = self.dynamodb.Table(table_name) + self.schema_path = Path(__file__).parent / "schema.json" + self.table_schema = self._load_schema() + + def _load_schema(self) -> Dict[str, Any]: + """Load the DynamoDB table schema from JSON file.""" + try: + with open(self.schema_path, "r") as f: + schema = json.load(f) + if self.table_name != schema.get("TableName"): + schema["TableName"] = self.table_name + return schema + except FileNotFoundError: + print(f"Schema file not found at {self.schema_path}") + return {} + except json.JSONDecodeError: + print(f"Invalid JSON in schema file at {self.schema_path}") + return {} + + def is_configured(self) -> bool: + """Check if AWS credentials are configured.""" + required_vars = ["AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY"] + return all(var in os.environ for var in required_vars) + + def create_table_if_not_exists(self) -> bool: + """Create the DynamoDB table if it doesn't exist.""" + if not self.is_configured(): + print("AWS credentials not configured") + return False + + if not self.table_schema: + print("Table schema not loaded") + return False + + try: + # Check if table exists + self.dynamodb.meta.client.describe_table(TableName=self.table_name) + print(f"Table '{self.table_name}' already exists") + return True + except ClientError as e: + if e.response["Error"]["Code"] == "ResourceNotFoundException": + # Create the table + try: + table = self.dynamodb.create_table(**self.table_schema) + # Wait for the table to be created + table.meta.client.get_waiter("table_exists").wait( + TableName=self.table_name + ) + print(f"Table '{self.table_name}' created successfully") + return True + except ClientError as create_error: + print(f"Failed to create table: {create_error}") + return False + else: + print(f"Error checking table existence: {e}") + return False + + async def put_meeting(self, meeting: Meeting) -> bool: + """Insert or update a Meeting in DynamoDB.""" + if not self.is_configured(): + print("AWS credentials not configured") + return False + + try: + item = meeting.model_dump() + self.table.put_item(Item=item) + print(f"Meeting '{meeting.name}' on {meeting.date} saved to DynamoDB") + return True + except ClientError as e: + print(f"Error putting meeting: {e}") + return False + + async def get_meeting(self, meeting_name: str, date: str) -> Optional[Meeting]: + """Get a specific Meeting by its name and date.""" + if not self.is_configured(): + print("AWS credentials not configured") + return None + + try: + response = self.table.get_item(Key={"meeting": meeting_name, "date": date}) + + if "Item" in response: + return Meeting.model_validate(item) + else: + print(f"No meeting found with name '{meeting_name}' and date '{date}'") + return None + except ClientError as e: + print(f"Error getting meeting: {e}") + return None + + async def query_meetings_by_name(self, meeting_name: str) -> List[Meeting]: + """Query meetings by name.""" + if not self.is_configured(): + print("AWS credentials not configured") + return [] + + try: + response = self.table.query( + KeyConditionExpression=boto3.dynamodb.conditions.Key("meeting").eq( + meeting_name + ) + ) + + meetings = [] + for item in response.get("Items", []): + meetings.append(Meeting.model_validate(item)) + + return meetings + except ClientError as e: + print(f"Error querying meetings by name: {e}") + return [] + + async def query_meetings_by_date(self, date: str) -> List[Meeting]: + """Query meetings by date using the GSI.""" + if not self.is_configured(): + print("AWS credentials not configured") + return [] + + try: + response = self.table.query( + IndexName="DateIndex", + KeyConditionExpression=boto3.dynamodb.conditions.Key("date").eq(date), + ) + + meetings = [] + for item in response.get("Items", []): + item = self._convert_decimal_to_float(item) + meetings.append(Meeting.model_validate(item)) + + return meetings + except ClientError as e: + print(f"Error querying meetings by date: {e}") + return [] + + async def list_all_meetings(self) -> List[Meeting]: + """List all meetings in the table.""" + if not self.is_configured(): + print("AWS credentials not configured") + return [] + + try: + response = self.table.scan() + + meetings = [] + for item in response.get("Items", []): + meetings.append(Meeting.model_validate(item)) + + # Handle pagination if needed + while "LastEvaluatedKey" in response: + response = self.table.scan( + ExclusiveStartKey=response["LastEvaluatedKey"] + ) + for item in response.get("Items", []): + meetings.append(Meeting.model_validate(item)) + + return meetings + except ClientError as e: + print(f"Error listing all meetings: {e}") + return [] + + async def update_meeting( + self, meeting_name: str, date: str, update_data: Dict[str, Any] + ) -> bool: + """Update specific attributes of a meeting without replacing the entire item.""" + if not self.is_configured(): + print("AWS credentials not configured") + return False + + # Don't allow updating the primary key attributes + if "meeting" in update_data or "date" in update_data: + print("Cannot update primary key attributes (meeting, date)") + return False + + try: + # Build the update expression and attribute values + update_expression_parts = [] + expression_attribute_values = {} + + for key, value in update_data.items(): + update_expression_parts.append(f"#{key} = :{key}") + expression_attribute_values[f":{key}"] = value + + # Build expression attribute names (to handle reserved words) + expression_attribute_names = {f"#{key}": key for key in update_data.keys()} + + # Construct the complete update expression + update_expression = "SET " + ", ".join(update_expression_parts) + + self.table.update_item( + Key={"meeting": meeting_name, "date": date}, + UpdateExpression=update_expression, + ExpressionAttributeNames=expression_attribute_names, + ExpressionAttributeValues=expression_attribute_values, + ReturnValues="UPDATED_NEW", + ) + + print(f"Updated meeting '{meeting_name}' on {date}") + return True + except ClientError as e: + print(f"Error updating meeting: {e}") + return False + + async def update(self, meeting_name: str, date: str, meeting: Meeting) -> bool: + """Update a meeting using a Meeting model.""" + # Convert the model to a dict and remove None values + update_data = { + k: v + for k, v in meeting.model_dump().items() + if v is not None and k not in ["meeting", "date"] + } + + # Only update if there are fields to update + if not update_data: + print("No fields to update") + return False + + return await self.update_meeting(meeting_name, date, update_data) + + async def delete_meeting(self, meeting_name: str, date: str) -> bool: + """Delete a meeting by name and date.""" + if not self.is_configured(): + print("AWS credentials not configured") + return False + + try: + self.table.delete_item(Key={"meeting": meeting_name, "date": date}) + print(f"Meeting '{meeting_name}' on {date} deleted") + return True + except ClientError as e: + print(f"Error deleting meeting: {e}") + return False + + async def query_meetings_by_clip_id(self, clip_id: str) -> List[Meeting]: + """Query meetings by clip_id using the ClipIdIndex GSI.""" + if not self.is_configured(): + print("AWS credentials not configured") + return [] + + # Handle None clip_id + if not clip_id: + print("Clip ID cannot be None or empty") + return [] + + try: + response = self.table.query( + IndexName="ClipIdIndex", + KeyConditionExpression=boto3.dynamodb.conditions.Key("clip_id").eq( + clip_id + ), + ) + + meetings = [] + for item in response.get("Items", []): + item = self._convert_decimal_to_float(item) + meetings.append(Meeting.model_validate(item)) + + return meetings + except ClientError as e: + print(f"Error querying meetings by clip_id: {e}") + return [] diff --git a/src/models/meeting.py b/src/models/meeting.py index 0e5096e..a33b1db 100644 --- a/src/models/meeting.py +++ b/src/models/meeting.py @@ -7,21 +7,26 @@ from pydantic import BaseModel, Field, HttpUrl -class Meeting(BaseModel): +class MeetingQuery(BaseModel): + """Model for meeting index""" + + name: Optional[str] = Field(description="Name of the meeting") + date: Optional[str] = Field(description="Date and time of the meeting") + clip_id: Optional[str] = Field(None, description="Granicus clip ID") + + +class Meeting(MeetingQuery): """ Model representing a government meeting """ - meeting: str = Field(description="Name of the meeting") - date: str = Field(description="Date and time of the meeting") duration: str = Field(description="Duration of the meeting") agenda: Optional[HttpUrl] = Field(None, description="URL to the meeting agenda") video: Optional[HttpUrl] = Field(None, description="URL to the meeting video") - clip_id: Optional[str] = Field(None, description="Granicus clip ID") def __str__(self) -> str: """String representation of the meeting""" - return f"{self.meeting} - {self.date} ({self.duration})" + return f"{self.name}-{self.date}-({self.clip_id})" class GranicusPlayerPage(BaseModel): diff --git a/src/videos.py b/src/videos.py index 6c77f7c..d536bfa 100644 --- a/src/videos.py +++ b/src/videos.py @@ -218,7 +218,7 @@ async def transcribe_video_with_diarization( video_path: Path, output_path: Path, model_size: str = "medium", - device: str = "auto", + device: str = "mps", compute_type: str = "auto", batch_size: int = 8, ): diff --git a/tasks/meetings.py b/tasks/meetings.py index 750b560..9b3d2cc 100644 --- a/tasks/meetings.py +++ b/tasks/meetings.py @@ -1,7 +1,11 @@ from typing import Sequence from prefect import task -from src.meetings import get_tgov_meetings, get_registry_meetings, write_registry_meetings +from src.meetings import ( + get_tgov_meetings, + get_registry_meetings, + write_registry_meetings, +) from src.models.meeting import Meeting @@ -13,12 +17,14 @@ async def get_new_meetings(): tgov_clip_ids = [tm.clip_id for tm in tgov_meetings] # print(f"tgov_clip_ids: {tgov_clip_ids}") - registry_meetings: Sequence[Meeting] = get_registry_meetings() + registry_meetings: Sequence[Meeting] = () print(f"Got {len(registry_meetings)} registry meetings.") registry_clip_ids = [rm.clip_id for rm in registry_meetings] - new_meetings: Sequence[Meeting] = [tm for tm in tgov_meetings if tm.clip_id not in registry_clip_ids] + new_meetings: Sequence[Meeting] = [ + tm for tm in tgov_meetings if tm.clip_id not in registry_clip_ids + ] if new_meetings: registry_meetings += new_meetings