Skip to content

Commit c83331b

Browse files
Copilotcrossjam
andcommitted
Implement castchat: AI-powered podcast exploration (Phases 1-4)
Co-authored-by: crossjam <208062+crossjam@users.noreply.github.com>
1 parent 4449220 commit c83331b

File tree

4 files changed

+490
-4
lines changed

4 files changed

+490
-4
lines changed

pyproject.toml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,12 @@ transcription-full = [
9090
"retrocast[transcription-mlx,transcription-cuda,transcription-diarization]",
9191
]
9292

93+
# AI-powered chat interface for exploring transcribed podcast content
94+
castchat = [
95+
"chromadb>=0.5.23",
96+
"pydantic-ai>=0.0.14",
97+
]
98+
9399
[project.scripts]
94100
retrocast = "retrocast.cli:cli"
95101

src/retrocast/castchat_agent.py

Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
"""PydanticAI agent for interactive podcast transcript exploration."""
2+
3+
from typing import Any
4+
5+
from loguru import logger
6+
from pydantic_ai import Agent, RunContext
7+
from pydantic_ai.models.anthropic import AnthropicModel
8+
9+
from retrocast.chromadb_manager import ChromaDBManager
10+
11+
12+
def create_castchat_agent(
13+
chroma_manager: ChromaDBManager, model_name: str = "claude-sonnet-4-20250514"
14+
) -> Agent:
15+
"""Create a PydanticAI agent for exploring podcast transcripts.
16+
17+
Args:
18+
chroma_manager: ChromaDB manager instance for searching transcripts
19+
model_name: Anthropic model name to use
20+
21+
Returns:
22+
Configured PydanticAI Agent
23+
"""
24+
model = AnthropicModel(model_name)
25+
26+
# Create agent with system prompt
27+
agent = Agent(
28+
model=model,
29+
system_prompt=(
30+
"You are an AI assistant helping users explore their podcast archive. "
31+
"You have access to transcribed podcast episodes and can search through "
32+
"them to answer questions about topics, guests, discussions, and specific "
33+
"content mentioned across episodes. When searching, provide context about "
34+
"which podcast and episode the information came from, and include timestamps "
35+
"when relevant. Be conversational and helpful."
36+
),
37+
)
38+
39+
@agent.tool
40+
def search_transcripts(ctx: RunContext[Any], query: str, max_results: int = 5) -> str:
41+
"""Search podcast transcription segments for relevant content.
42+
43+
Use this tool to find information in podcast transcripts. It performs
44+
semantic search across all transcribed episodes to find relevant segments
45+
based on the query.
46+
47+
Args:
48+
ctx: Run context (automatically provided)
49+
query: The search query describing what to look for
50+
max_results: Maximum number of results to return (default 5, max 10)
51+
52+
Returns:
53+
Formatted string with search results including episode info and timestamps
54+
"""
55+
logger.debug(f"Searching transcripts with query: {query}")
56+
57+
# Limit max_results
58+
max_results = min(max_results, 10)
59+
60+
try:
61+
results = chroma_manager.search(query, n_results=max_results)
62+
63+
if not results:
64+
return "No relevant segments found in the transcription archive."
65+
66+
# Format results for the agent
67+
formatted = "Found relevant segments:\n\n"
68+
for i, result in enumerate(results, 1):
69+
metadata = result["metadata"]
70+
text = result["text"]
71+
72+
# Format timestamp
73+
start_time = metadata.get("start_time", 0)
74+
minutes = int(start_time // 60)
75+
seconds = int(start_time % 60)
76+
timestamp = f"{minutes}:{seconds:02d}"
77+
78+
# Build result entry
79+
formatted += f"{i}. **{metadata.get('podcast_title', 'Unknown Podcast')}**\n"
80+
formatted += f" Episode: {metadata.get('episode_title', 'Unknown Episode')}\n"
81+
formatted += f" Time: {timestamp}"
82+
83+
speaker = metadata.get("speaker")
84+
if speaker:
85+
formatted += f" | Speaker: {speaker}"
86+
87+
formatted += f"\n > {text}\n\n"
88+
89+
return formatted
90+
91+
except Exception as e:
92+
logger.error(f"Error searching transcripts: {e}")
93+
return f"Error searching transcripts: {str(e)}"
94+
95+
@agent.tool
96+
def search_podcast(
97+
ctx: RunContext[Any], podcast_title: str, query: str, max_results: int = 5
98+
) -> str:
99+
"""Search transcripts within a specific podcast.
100+
101+
Use this tool when the user asks about a specific podcast by name.
102+
This filters search results to only that podcast.
103+
104+
Args:
105+
ctx: Run context (automatically provided)
106+
podcast_title: The name of the podcast to search within
107+
query: The search query describing what to look for
108+
max_results: Maximum number of results to return (default 5, max 10)
109+
110+
Returns:
111+
Formatted string with search results for that podcast
112+
"""
113+
logger.debug(f"Searching podcast '{podcast_title}' with query: {query}")
114+
115+
max_results = min(max_results, 10)
116+
117+
try:
118+
results = chroma_manager.search(
119+
query, n_results=max_results, podcast_filter=podcast_title
120+
)
121+
122+
if not results:
123+
return (
124+
f"No relevant segments found in '{podcast_title}'. "
125+
"The podcast might not be in the archive or might not match exactly."
126+
)
127+
128+
# Format results
129+
formatted = f"Found segments in **{podcast_title}**:\n\n"
130+
for i, result in enumerate(results, 1):
131+
metadata = result["metadata"]
132+
text = result["text"]
133+
134+
start_time = metadata.get("start_time", 0)
135+
minutes = int(start_time // 60)
136+
seconds = int(start_time % 60)
137+
timestamp = f"{minutes}:{seconds:02d}"
138+
139+
formatted += f"{i}. Episode: {metadata.get('episode_title', 'Unknown')}\n"
140+
formatted += f" Time: {timestamp}"
141+
142+
speaker = metadata.get("speaker")
143+
if speaker:
144+
formatted += f" | Speaker: {speaker}"
145+
146+
formatted += f"\n > {text}\n\n"
147+
148+
return formatted
149+
150+
except Exception as e:
151+
logger.error(f"Error searching podcast transcripts: {e}")
152+
return f"Error searching podcast: {str(e)}"
153+
154+
@agent.tool
155+
def get_collection_info(ctx: RunContext[Any]) -> str:
156+
"""Get information about the indexed transcript collection.
157+
158+
Use this tool when users ask about what's available in their archive
159+
or how many episodes have been transcribed.
160+
161+
Args:
162+
ctx: Run context (automatically provided)
163+
164+
Returns:
165+
Summary of indexed content
166+
"""
167+
try:
168+
count = chroma_manager.get_collection_count()
169+
return (
170+
f"The transcript archive contains {count:,} indexed segments "
171+
"from transcribed podcast episodes. You can search across all "
172+
"of them or filter by specific podcast titles."
173+
)
174+
except Exception as e:
175+
logger.error(f"Error getting collection info: {e}")
176+
return f"Error accessing collection info: {str(e)}"
177+
178+
return agent

src/retrocast/chromadb_manager.py

Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
"""ChromaDB integration for podcast transcription search and RAG."""
2+
3+
from pathlib import Path
4+
from typing import Any, cast
5+
6+
import chromadb
7+
from chromadb.config import Settings
8+
from loguru import logger
9+
10+
from retrocast.datastore import Datastore
11+
12+
13+
class ChromaDBManager:
14+
"""Manages ChromaDB collections for transcription segment indexing."""
15+
16+
def __init__(self, persist_directory: Path):
17+
"""Initialize ChromaDB client with persistent storage.
18+
19+
Args:
20+
persist_directory: Directory path for ChromaDB persistence
21+
"""
22+
self.persist_directory = persist_directory
23+
self.persist_directory.mkdir(parents=True, exist_ok=True)
24+
25+
logger.debug(f"Initializing ChromaDB at {persist_directory}")
26+
self.client = chromadb.PersistentClient(
27+
path=str(persist_directory),
28+
settings=Settings(
29+
anonymized_telemetry=False,
30+
allow_reset=True,
31+
),
32+
)
33+
self.collection_name = "transcription_segments"
34+
self.collection = self.client.get_or_create_collection(
35+
name=self.collection_name,
36+
metadata={"description": "Podcast transcription segments with timestamps"},
37+
)
38+
39+
def index_transcriptions(self, datastore: Datastore, batch_size: int = 100) -> int:
40+
"""Index all transcription segments from the database into ChromaDB.
41+
42+
Args:
43+
datastore: Datastore instance for querying transcription data
44+
batch_size: Number of segments to process per batch
45+
46+
Returns:
47+
Number of segments indexed
48+
"""
49+
logger.info("Starting transcription indexing...")
50+
51+
# Query all transcription segments with metadata
52+
query = """
53+
SELECT
54+
ts.transcription_id,
55+
ts.segment_index,
56+
ts.start_time,
57+
ts.end_time,
58+
ts.text,
59+
ts.speaker,
60+
t.podcast_title,
61+
t.episode_title,
62+
t.episode_url,
63+
t.media_path,
64+
t.language,
65+
t.backend,
66+
t.model_size
67+
FROM transcription_segments ts
68+
JOIN transcriptions t ON ts.transcription_id = t.transcription_id
69+
ORDER BY ts.transcription_id, ts.segment_index
70+
"""
71+
72+
segments = list(datastore.db.execute(query).fetchall())
73+
total_segments = len(segments)
74+
75+
if total_segments == 0:
76+
logger.warning("No transcription segments found in database")
77+
return 0
78+
79+
logger.info(f"Found {total_segments} segments to index")
80+
81+
# Process in batches
82+
indexed_count = 0
83+
for i in range(0, total_segments, batch_size):
84+
batch = segments[i : i + batch_size]
85+
documents = []
86+
metadatas = []
87+
ids = []
88+
89+
for segment in batch:
90+
# Create unique ID for each segment
91+
segment_id = f"t{segment[0]}_s{segment[1]}"
92+
ids.append(segment_id)
93+
94+
# The text to be embedded and searched
95+
documents.append(segment[4]) # text column
96+
97+
# Metadata for context and filtering
98+
metadatas.append(
99+
{
100+
"transcription_id": str(segment[0]),
101+
"segment_index": str(segment[1]),
102+
"start_time": float(segment[2]),
103+
"end_time": float(segment[3]),
104+
"speaker": str(segment[5] or ""),
105+
"podcast_title": str(segment[6] or ""),
106+
"episode_title": str(segment[7] or ""),
107+
"episode_url": str(segment[8] or ""),
108+
"media_path": str(segment[9] or ""),
109+
"language": str(segment[10] or ""),
110+
"backend": str(segment[11] or ""),
111+
"model_size": str(segment[12] or ""),
112+
}
113+
)
114+
115+
# Add batch to collection
116+
self.collection.add(documents=documents, metadatas=cast(Any, metadatas), ids=ids)
117+
indexed_count += len(batch)
118+
119+
logger.debug(f"Indexed {indexed_count}/{total_segments} segments")
120+
121+
logger.info(f"Successfully indexed {indexed_count} segments")
122+
return indexed_count
123+
124+
def search(
125+
self, query: str, n_results: int = 5, podcast_filter: str | None = None
126+
) -> list[dict[str, Any]]:
127+
"""Search transcription segments using semantic similarity.
128+
129+
Args:
130+
query: The search query text
131+
n_results: Maximum number of results to return
132+
podcast_filter: Optional podcast title to filter results
133+
134+
Returns:
135+
List of matching segments with metadata
136+
"""
137+
where_filter: Any = None
138+
if podcast_filter:
139+
where_filter = {"podcast_title": {"$eq": podcast_filter}}
140+
141+
results = self.collection.query(
142+
query_texts=[query], n_results=n_results, where=where_filter
143+
)
144+
145+
# Format results for easier consumption
146+
formatted_results = []
147+
if results["documents"] and results["documents"][0]:
148+
for i, doc in enumerate(results["documents"][0]):
149+
metadata = results["metadatas"][0][i] if results["metadatas"] else {}
150+
distance = results["distances"][0][i] if results["distances"] else None
151+
152+
formatted_results.append(
153+
{
154+
"text": doc,
155+
"metadata": metadata,
156+
"distance": distance,
157+
"id": results["ids"][0][i] if results["ids"] else None,
158+
}
159+
)
160+
161+
return formatted_results
162+
163+
def get_collection_count(self) -> int:
164+
"""Get the number of segments in the collection.
165+
166+
Returns:
167+
Number of indexed segments
168+
"""
169+
return self.collection.count()
170+
171+
def reset(self) -> None:
172+
"""Clear all data from the collection."""
173+
logger.warning(f"Resetting collection '{self.collection_name}'")
174+
self.client.delete_collection(name=self.collection_name)
175+
self.collection = self.client.get_or_create_collection(
176+
name=self.collection_name,
177+
metadata={"description": "Podcast transcription segments with timestamps"},
178+
)
179+
logger.info("Collection reset complete")

0 commit comments

Comments
 (0)