Skip to content

Commit 1f63b78

Browse files
Add transcription example
1 parent 7a4cf6e commit 1f63b78

File tree

15 files changed

+864
-265
lines changed

15 files changed

+864
-265
lines changed
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
3.12

examples/transcription/README.md

Whitespace-only changes.

examples/transcription/main.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
import asyncio
2+
from contextlib import asynccontextmanager
3+
from typing import Annotated
4+
5+
from fastapi import Depends, FastAPI
6+
from transcription.notifier import make_notifier
7+
from transcription.room import RoomService, fishjam
8+
from transcription.transcription import transcription_session_factory
9+
10+
from fishjam import PeerOptions, SubscribeOptions
11+
12+
_room_service: RoomService | None = None
13+
14+
15+
def get_room_service():
16+
if not _room_service:
17+
raise RuntimeError("Application skipped lifespan events!")
18+
return _room_service
19+
20+
21+
@asynccontextmanager
22+
async def lifespan(_app: FastAPI):
23+
async with (
24+
transcription_session_factory() as session_factory,
25+
asyncio.TaskGroup() as tg,
26+
):
27+
global _room_service
28+
_room_service = RoomService(session_factory)
29+
notifier = make_notifier(_room_service)
30+
notifier_task = tg.create_task(notifier.connect())
31+
32+
yield
33+
34+
notifier_task.cancel()
35+
36+
37+
app = FastAPI(lifespan=lifespan)
38+
39+
40+
@app.get("/")
41+
def get_peer(room_service: Annotated[RoomService, Depends(get_room_service)]):
42+
_peer, token = fishjam.create_peer(
43+
room_service.get_room().id,
44+
PeerOptions(subscribe=SubscribeOptions()),
45+
)
46+
return token
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
[project]
2+
name = "transcription"
3+
version = "0.1.0"
4+
description = "Add your description here"
5+
readme = "README.md"
6+
requires-python = ">=3.12"
7+
dependencies = [
8+
"fastapi[standard]==0.116.0",
9+
"fishjam-server-sdk",
10+
]
11+
12+
[tool.uv.sources]
13+
fishjam-server-sdk = { workspace = true }

examples/transcription/transcription/__init__.py

Whitespace-only changes.
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
import asyncio
2+
from typing import Callable
3+
4+
from fishjam.agent import Agent, AgentResponseTrackData
5+
6+
from .transcription import TranscriptionSession
7+
8+
9+
class TranscriptionAgent:
10+
def __init__(
11+
self,
12+
room_id: str,
13+
agent: Agent,
14+
on_text: Callable[[str, str], None],
15+
session_factory: Callable[[str], TranscriptionSession],
16+
):
17+
self._room_id = room_id
18+
self._agent = agent
19+
self._peers: dict[str, TranscriptionSession] = {}
20+
self._on_text = on_text
21+
self._session_factory = session_factory
22+
self._leave_event = asyncio.Event()
23+
self.done = False
24+
25+
@agent.on_track_data
26+
def _(track_data: AgentResponseTrackData):
27+
if track_data.peer_id not in self._peers:
28+
return
29+
self._peers[track_data.peer_id].transcribe(track_data.data)
30+
31+
print(f"Created agent for room {room_id}")
32+
self._task = asyncio.create_task(self._run_agent())
33+
34+
async def _run_agent(self):
35+
print(f"Connecting agent to room {self._room_id}")
36+
async with self._agent:
37+
print(f"Agent connected to room {self._room_id}")
38+
await self._leave_event.wait()
39+
print(f"Agent disconnected from room {self._room_id}")
40+
41+
def on_peer_enter(self, peer_id: str):
42+
if peer_id in self._peers:
43+
return
44+
45+
print(f"Starting transcription session for peer {peer_id}")
46+
47+
self._peers[peer_id] = self._session_factory(peer_id)
48+
49+
def on_peer_leave(self, peer_id: str):
50+
if peer_id not in self._peers:
51+
return
52+
53+
print(f"Ending transcription session for peer {peer_id}")
54+
55+
session = self._peers[peer_id]
56+
session.end()
57+
self._peers.pop(peer_id)
58+
59+
if len(self._peers) == 0:
60+
self._leave_event.set()
61+
self.done = True
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
import os
2+
3+
from google.genai.types import AudioTranscriptionConfig, LiveConnectConfig, Modality
4+
5+
FISHJAM_ID = os.environ["FISHJAM_ID"]
6+
FISHJAM_TOKEN = os.environ["FISHJAM_MANAGEMENT_TOKEN"]
7+
FISHJAM_URL = os.getenv("FISHJAM_URL")
8+
TRANSCRIPTION_MODEL = "gemini-live-2.5-flash-preview"
9+
TRANSCRIPTION_CONFIG = LiveConnectConfig(
10+
response_modalities=[Modality.TEXT],
11+
input_audio_transcription=AudioTranscriptionConfig(),
12+
)
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
from fishjam import FishjamNotifier
2+
from fishjam.events import ServerMessagePeerConnected, ServerMessagePeerDisconnected
3+
from fishjam.events.allowed_notifications import AllowedNotification
4+
5+
from .config import FISHJAM_ID, FISHJAM_TOKEN, FISHJAM_URL
6+
from .room import RoomService
7+
8+
9+
def make_notifier(room_service: RoomService):
10+
notifier = FishjamNotifier(FISHJAM_ID, FISHJAM_TOKEN, fishjam_url=FISHJAM_URL)
11+
12+
@notifier.on_server_notification
13+
def _(notification: AllowedNotification):
14+
print(f"Received notification {notification}")
15+
match notification:
16+
case ServerMessagePeerConnected(peer_id=peer_id, room_id=room_id):
17+
handle_peer_connected(peer_id, room_id)
18+
19+
case ServerMessagePeerDisconnected(peer_id=peer_id, room_id=room_id):
20+
handle_peer_disconnected(peer_id, room_id)
21+
22+
def handle_peer_connected(peer_id: str, room_id: str):
23+
if room_id != room_service.room_id:
24+
return
25+
26+
agent = room_service.create_agent()
27+
agent.on_peer_enter(peer_id)
28+
29+
def handle_peer_disconnected(peer_id: str, room_id: str):
30+
if room_id != room_service.room_id:
31+
return
32+
33+
agent = room_service.get_agent()
34+
if agent:
35+
agent.on_peer_leave(peer_id)
36+
37+
return notifier
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
from typing import Callable
2+
3+
from fishjam import FishjamClient, FishjamNotifier, Room
4+
from fishjam.errors import NotFoundError
5+
6+
from .agent import TranscriptionAgent
7+
from .config import FISHJAM_ID, FISHJAM_TOKEN, FISHJAM_URL
8+
from .transcription import TranscriptionSession
9+
10+
fishjam = FishjamClient(
11+
FISHJAM_ID,
12+
FISHJAM_TOKEN,
13+
fishjam_url=FISHJAM_URL,
14+
)
15+
16+
17+
class RoomService:
18+
def __init__(
19+
self,
20+
session_factory: Callable[[Callable[[str], None]], TranscriptionSession],
21+
):
22+
self.room_id = fishjam.create_room().id
23+
self._agent: TranscriptionAgent | None = None
24+
self._notifier = FishjamNotifier(
25+
FISHJAM_ID,
26+
FISHJAM_TOKEN,
27+
fishjam_url=FISHJAM_URL,
28+
)
29+
30+
def _make_session(peer_id: str):
31+
return session_factory(lambda t: self._handle_transcription(peer_id, t))
32+
33+
self._session_factory = _make_session
34+
35+
def get_room(self) -> Room:
36+
try:
37+
return fishjam.get_room(self.room_id)
38+
except NotFoundError:
39+
self.clear()
40+
room = fishjam.create_room()
41+
self.room_id = room.id
42+
return room
43+
44+
def clear(self):
45+
fishjam.delete_room(self.room_id)
46+
47+
def _handle_transcription(self, peer_id: str, text: str):
48+
print(f"Peer {peer_id} in room {self.room_id} said: {text}")
49+
50+
def create_agent(self):
51+
if not self._agent or self._agent.done:
52+
self._agent = TranscriptionAgent(
53+
self.room_id,
54+
fishjam.create_agent(self.room_id),
55+
self._handle_transcription,
56+
self._session_factory,
57+
)
58+
return self._agent
59+
60+
def get_agent(self):
61+
if self._agent.done:
62+
self._agent = None
63+
return self._agent
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
from asyncio import Event, Queue, Task, TaskGroup
2+
from contextlib import asynccontextmanager
3+
from typing import Callable
4+
5+
from google import genai
6+
from google.genai.live import AsyncSession
7+
from google.genai.types import Blob
8+
9+
from .config import TRANSCRIPTION_CONFIG, TRANSCRIPTION_MODEL
10+
11+
12+
class TranscriptionSession:
13+
def __init__(self, on_text: Callable[[str], None]):
14+
self._gemini = genai.Client()
15+
self._audio_queue = Queue[bytes]()
16+
self._end_event = Event()
17+
self._model = TRANSCRIPTION_MODEL
18+
self._on_text = on_text
19+
20+
async def start(self):
21+
async with self._gemini.aio.live.connect(
22+
model=self._model,
23+
config=TRANSCRIPTION_CONFIG,
24+
) as session:
25+
async with TaskGroup() as tg:
26+
send_task = tg.create_task(self._send_loop(session))
27+
recv_task = tg.create_task(self._recv_loop(session))
28+
29+
print("Started transcription session")
30+
31+
await self._end_event.wait()
32+
33+
send_task.cancel()
34+
recv_task.cancel()
35+
36+
def transcribe(self, audio: bytes):
37+
self._audio_queue.put_nowait(audio)
38+
39+
def end(self):
40+
self._end_event.set()
41+
42+
async def _send_loop(self, session: AsyncSession):
43+
while True:
44+
audio_frame = await self._audio_queue.get()
45+
await session.send_realtime_input(
46+
audio=Blob(data=audio_frame, mime_type="audio/pcm;rate=16000")
47+
)
48+
49+
async def _recv_loop(self, session: AsyncSession):
50+
while True:
51+
async for res in session.receive():
52+
if (
53+
(content := res.server_content)
54+
and (transcription := content.input_transcription)
55+
and (text := transcription.text)
56+
):
57+
self._on_text(text)
58+
59+
60+
@asynccontextmanager
61+
async def transcription_session_factory():
62+
async with TaskGroup() as tg:
63+
sessions: list[TranscriptionSession] = []
64+
65+
def make_session(on_text: Callable[[str], None]):
66+
session = TranscriptionSession(on_text)
67+
sessions.append(session)
68+
tg.create_task(session.start())
69+
return session
70+
71+
yield make_session
72+
73+
for session in sessions:
74+
session.end()

0 commit comments

Comments
 (0)