Skip to content

Commit cf06295

Browse files
committed
update api, implement example
1 parent ab4bff6 commit cf06295

File tree

12 files changed

+789
-0
lines changed

12 files changed

+789
-0
lines changed
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
"""Selective subscription demo package."""
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
from contextlib import asynccontextmanager
2+
3+
import uvicorn
4+
from selective_subscription.app import app, room_service
5+
from selective_subscription.config import HOST, PORT
6+
from selective_subscription.notification_handler import NotificationHandler
7+
from selective_subscription.selective_subscription.worker import async_worker
8+
9+
10+
@asynccontextmanager
11+
async def lifespan(app):
12+
"""Application lifespan manager."""
13+
async with async_worker() as worker:
14+
notification_handler = NotificationHandler(room_service)
15+
worker.run_in_background(notification_handler.start())
16+
17+
print(f"Selective subscription demo started on http://{HOST}:{PORT}")
18+
print("Available endpoints:")
19+
print(" POST /api/peers - Create a new peer")
20+
print(" GET /api/rooms/{room_name}/peers - Get available peers")
21+
print(" POST /api/subscriptions - Toggle subscription")
22+
23+
yield
24+
25+
26+
app.router.lifespan_context = lifespan
27+
28+
29+
if __name__ == "__main__":
30+
uvicorn.run(
31+
"main:app",
32+
host=HOST,
33+
port=PORT,
34+
reload=True,
35+
log_level="info"
36+
)
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
[project]
2+
name = "selective-subscription-demo"
3+
version = "0.1.0"
4+
description = "Selective subscription demo using Fishjam Python SDK"
5+
readme = "README.md"
6+
requires-python = ">=3.11"
7+
dependencies = [
8+
"starlette>=0.35.0",
9+
"uvicorn>=0.25.0",
10+
"jinja2>=3.1.0",
11+
"python-multipart>=0.0.6",
12+
]
13+
14+
[build-system]
15+
requires = ["hatchling"]
16+
build-backend = "hatchling.build"
17+
18+
[tool.hatch.build.targets.wheel]
19+
packages = ["selective_subscription"]
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
"""Selective subscription demo package."""
Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
import json
2+
from typing import Dict, Any
3+
4+
from starlette.applications import Starlette
5+
from starlette.middleware import Middleware
6+
from starlette.middleware.cors import CORSMiddleware
7+
from starlette.requests import Request
8+
from starlette.responses import JSONResponse, Response
9+
from starlette.routing import Route
10+
from starlette.templating import Jinja2Templates
11+
12+
from .room_service import RoomService
13+
14+
15+
# Initialize services
16+
room_service = RoomService()
17+
templates = Jinja2Templates(directory="templates")
18+
19+
20+
async def create_peer(request: Request) -> Response:
21+
"""Create a new peer in a room."""
22+
try:
23+
body = await request.json()
24+
room_name = body.get("room_name")
25+
peer_name = body.get("peer_name")
26+
27+
if not room_name or not peer_name:
28+
return JSONResponse(
29+
{"error": "room_name and peer_name are required"},
30+
status_code=400
31+
)
32+
33+
peer, token = room_service.create_peer()
34+
35+
return JSONResponse({
36+
"peer_id": peer.id,
37+
"token": token,
38+
"room_name": room_name,
39+
"peer_name": peer_name
40+
})
41+
42+
except Exception as e:
43+
return JSONResponse({"error": str(e)}, status_code=500)
44+
45+
46+
async def get_available_peers(request: Request) -> Response:
47+
"""Get peers available for subscription."""
48+
room_name = request.path_params.get("room_name")
49+
peer_id = request.query_params.get("peer_id")
50+
51+
if not room_name:
52+
return JSONResponse({"error": "room_name is required"}, status_code=400)
53+
54+
peers = room_service.get_available_peers(room_name, peer_id)
55+
56+
return JSONResponse({
57+
"peers": [
58+
{
59+
"id": peer.id,
60+
"metadata": peer.metadata if peer.metadata else {},
61+
"tracks": [
62+
{
63+
"id": track["id"],
64+
"type": track.get("type", "unknown")
65+
}
66+
for track in peer.tracks
67+
]
68+
}
69+
for peer in peers
70+
]
71+
})
72+
73+
74+
async def toggle_subscription(request: Request) -> Response:
75+
"""Toggle subscription to a peer's tracks."""
76+
try:
77+
body = await request.json()
78+
peer_id = body.get("peer_id")
79+
target_peer_id = body.get("target_peer_id")
80+
81+
if not peer_id or not target_peer_id:
82+
return JSONResponse(
83+
{"error": "peer_id and target_peer_id are required"},
84+
status_code=400
85+
)
86+
87+
subscribed = room_service.toggle_subscription(peer_id, target_peer_id)
88+
89+
return JSONResponse({
90+
"subscribed": subscribed,
91+
"peer_id": peer_id,
92+
"target_peer_id": target_peer_id
93+
})
94+
95+
except Exception as e:
96+
return JSONResponse({"error": str(e)}, status_code=500)
97+
98+
99+
async def get_subscription_status(request: Request) -> Response:
100+
"""Get current subscription status for a peer."""
101+
peer_id = request.path_params.get("peer_id")
102+
103+
if not peer_id:
104+
return JSONResponse({"error": "peer_id is required"}, status_code=400)
105+
106+
session = room_service.get_peer_session(peer_id)
107+
108+
if not session:
109+
return JSONResponse({"error": "Peer not found"}, status_code=404)
110+
111+
return JSONResponse({
112+
"peer_id": peer_id,
113+
"subscribed_peers": list(session.subscribed_peers)
114+
})
115+
116+
117+
async def health_check(request: Request) -> Response:
118+
"""Health check endpoint."""
119+
return JSONResponse({"status": "OK"})
120+
121+
122+
async def serve_index(request: Request) -> Response:
123+
"""Serve the main HTML interface."""
124+
return templates.TemplateResponse("index.html", {"request": request})
125+
126+
127+
# Define routes
128+
routes = [
129+
Route("/", serve_index, methods=["GET"]),
130+
Route("/health", health_check, methods=["GET"]),
131+
Route("/api/peers", create_peer, methods=["POST"]),
132+
Route("/api/rooms/{room_name}/peers", get_available_peers, methods=["GET"]),
133+
Route("/api/subscriptions", toggle_subscription, methods=["POST"]),
134+
Route("/api/peers/{peer_id}/subscriptions", get_subscription_status, methods=["GET"]),
135+
]
136+
137+
# Define middleware
138+
middleware = [
139+
Middleware(
140+
CORSMiddleware,
141+
allow_origins=["*"],
142+
allow_credentials=True,
143+
allow_methods=["*"],
144+
allow_headers=["*"],
145+
)
146+
]
147+
148+
# Create application
149+
app = Starlette(
150+
routes=routes,
151+
middleware=middleware,
152+
)
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
import os
2+
3+
FISHJAM_ID = os.getenv("FISHJAM_ID", "")
4+
FISHJAM_TOKEN = os.environ["FISHJAM_MANAGEMENT_TOKEN"]
5+
FISHJAM_URL = os.getenv("FISHJAM_URL", "http://localhost:5002")
6+
HOST = os.getenv("HOST", "localhost")
7+
PORT = int(os.getenv("PORT", "8000"))
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
from typing import Dict, Set
2+
import asyncio
3+
4+
from .config import FISHJAM_ID, FISHJAM_TOKEN, FISHJAM_URL
5+
from .room_service import RoomService
6+
from fishjam._ws_notifier import FishjamNotifier
7+
from fishjam.events import (
8+
ServerMessagePeerType,
9+
ServerMessagePeerConnected,
10+
ServerMessagePeerDisconnected,
11+
ServerMessageTrackAdded,
12+
ServerMessageTrackRemoved,
13+
)
14+
from fishjam.events.allowed_notifications import AllowedNotification
15+
16+
17+
class NotificationHandler:
18+
"""Handles Fishjam server notifications for selective subscription.
19+
"""
20+
21+
def __init__(self, room_service: RoomService):
22+
self.room_service = room_service
23+
self._notifier = FishjamNotifier(FISHJAM_ID, FISHJAM_TOKEN)
24+
@self._notifier.on_server_notification
25+
async def _(notification: AllowedNotification):
26+
match notification:
27+
case ServerMessagePeerConnected(
28+
peer_type=ServerMessagePeerType.PEER_TYPE_WEBRTC,
29+
):
30+
await handle_peer_connected(notification)
31+
case ServerMessagePeerDisconnected(
32+
peer_type=ServerMessagePeerType.PEER_TYPE_WEBRTC,
33+
):
34+
await handle_peer_disconnected(notification)
35+
case ServerMessageTrackAdded(
36+
peer_type=ServerMessagePeerType.PEER_TYPE_WEBRTC,
37+
):
38+
await handle_track_added(notification)
39+
case ServerMessageTrackRemoved(
40+
peer_type=ServerMessagePeerType.PEER_TYPE_WEBRTC,
41+
):
42+
await handle_track_removed(notification)
43+
44+
async def handle_peer_connected(notification: ServerMessagePeerConnected):
45+
print(f"Peer connected: {notification.peer_id}")
46+
47+
async def handle_peer_disconnected(notification: ServerMessagePeerDisconnected):
48+
print(f"Peer disconnected: {notification.peer_id}")
49+
50+
async def handle_track_added(notification: ServerMessageTrackAdded):
51+
print(f"Track added: {notification.track}")
52+
53+
async def handle_track_removed(notification: ServerMessageTrackRemoved):
54+
print(f"Track removed: {notification.track}")
55+
56+
async def start(self) -> None:
57+
"""Long-running coroutine that connects the notifier and processes messages."""
58+
await self._notifier.connect()
59+
60+
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
from dataclasses import dataclass
2+
from typing import Dict, List, Optional
3+
4+
from fishjam import FishjamClient, Peer, PeerOptions, Room, RoomOptions
5+
from fishjam.errors import NotFoundError
6+
7+
from .config import FISHJAM_ID, FISHJAM_TOKEN, FISHJAM_URL
8+
9+
10+
class RoomService:
11+
"""Service for managing rooms and peer subscriptions."""
12+
13+
def __init__(self):
14+
self.fishjam = FishjamClient(
15+
FISHJAM_ID,
16+
FISHJAM_TOKEN,
17+
)
18+
self.room = self.fishjam.create_room(RoomOptions(
19+
max_peers=10,
20+
room_type="conference"
21+
))
22+
23+
def get_or_create_room(self) -> Room:
24+
"""Get existing room or create a new one."""
25+
if self.room:
26+
try:
27+
room = self.fishjam.get_room(self.room.id)
28+
return room
29+
except NotFoundError:
30+
pass
31+
32+
return self.fishjam.create_room()
33+
34+
def create_peer(self) -> tuple[Peer, str]:
35+
"""Create a peer with manual subscription mode."""
36+
room = self.get_or_create_room()
37+
38+
options = PeerOptions(
39+
subscribe_mode="manual",
40+
)
41+
42+
peer, token = self.fishjam.create_peer(room.id, options)
43+
return peer, token
44+
45+
46+
def subscibe_peer(self, peer_id: str, target_peer_id: str):
47+
"""Subscribe a peer to all tracks of another peer."""
48+
room = self.get_or_create_room()
49+
50+
self.fishjam.subscribe_peer(room.id, peer_id, target_peer_id)
51+
52+
def subscribe_tracks(self, peer_id: str, track_ids: List[str]):
53+
"""Subscribe a peer to specific tracks."""
54+
room = self.get_or_create_room()
55+
56+
self.fishjam.subscribe_tracks(room.id, peer_id, track_ids)
57+
58+
def get_peer_session(self, peer_id: str):
59+
"""Return a lightweight session-like object for example endpoints."""
60+
room = self.get_or_create_room()
61+
for p in room.peers:
62+
if p.id == peer_id:
63+
# create a simple object that has subscribed_peers attribute
64+
class _Session:
65+
def __init__(self):
66+
self.subscribed_peers: set[str] = set()
67+
68+
return _Session()
69+
70+
return None
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
from asyncio import Task, TaskGroup
2+
from contextlib import asynccontextmanager
3+
from typing import Any, Coroutine
4+
5+
6+
class BackgroundWorker:
7+
def __init__(self, tg: TaskGroup) -> None:
8+
self._tg = tg
9+
self._tasks: set[Task[None]] = set()
10+
11+
def run_in_background(self, coro: Coroutine[Any, Any, None]):
12+
task = self._tg.create_task(coro)
13+
task.add_done_callback(self._remove_task)
14+
self._tasks.add(task)
15+
return task
16+
17+
def _remove_task(self, task: Task[None]):
18+
self._tasks.discard(task)
19+
20+
def cleanup(self):
21+
for task in self._tasks:
22+
task.cancel()
23+
self._tasks = set()
24+
25+
26+
@asynccontextmanager
27+
async def async_worker():
28+
async with TaskGroup() as tg:
29+
worker = BackgroundWorker(tg)
30+
yield worker
31+
worker.cleanup()

0 commit comments

Comments
 (0)