Skip to content

Commit a76a096

Browse files
authored
update api, implement example (#46)
* update api, implement example * almost working with dashboard * simplify code, make notifier work with tracks added * format * lint * update lock * make uv lock work * apply comment suggestions, refine frontend * format * comment suggestions
1 parent ab4bff6 commit a76a096

File tree

17 files changed

+1741
-628
lines changed

17 files changed

+1741
-628
lines changed
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
# Your Fishjam envs, which you can get at https://fishjam.io/app
2+
FISHJAM_ID="your-fishjam-id"
3+
FISHJAM_MANAGEMENT_TOKEN="your-management-token"
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
# Selective Subscription Demo
2+
3+
Demo application showing selective subscription functionality with [Fishjam](https://fishjam.io) and the Python Server SDK.
4+
5+
## Prerequisites
6+
7+
- Python 3.11+
8+
- [uv](https://docs.astral.sh/uv/) package manager
9+
- Fishjam credentials ([get them here](https://fishjam.io/app))
10+
11+
> [!IMPORTANT]
12+
> All commands should be run from the `examples/selective_subscription` directory
13+
14+
## Quick Start
15+
16+
1. Install dependencies (in the `examples/selective_subscription` directory):
17+
```bash
18+
uv sync
19+
```
20+
21+
To run the app, first copy [`.env.example`](./.env.example) to `.env` and populate your environment variables.
22+
23+
Once you have populated `.env`, you can run the demo with
24+
25+
2. Run the server:
26+
```bash
27+
uv run ./main.py
28+
```
29+
30+
3. Open http://localhost:8000 in your browser
31+
32+
You create peers using the web UI at [http://localhost:8000](http://localhost:8000).
33+
34+
1. Create peers with names
35+
2. Copy peer tokens and use them with a WebRTC client (e.g., [minimal-react](https://github.com/fishjam-cloud/web-client-sdk/tree/main/examples/react-client/minimal-react))
36+
3. Once peers have tracks, manage subscriptions through the web interface
37+
38+
### API Endpoints
39+
40+
- `POST /api/peers` - Create a peer with manual subscription mode
41+
- `POST /api/subscribe_peer` - Subscribe to all tracks from a peer
42+
- `POST /api/subscribe_tracks` - Subscribe to specific track IDs
43+
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
"""Selective subscription demo package."""
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
from contextlib import asynccontextmanager
2+
3+
import uvicorn
4+
5+
from selective_subscription.app import app, room_service
6+
from selective_subscription.config import HOST, PORT
7+
from selective_subscription.notification_handler import NotificationHandler
8+
from selective_subscription.worker import async_worker
9+
10+
11+
@asynccontextmanager
12+
async def lifespan(app):
13+
async with async_worker() as worker:
14+
notification_handler = NotificationHandler(room_service)
15+
worker.run_in_background(notification_handler.start())
16+
print(f"Selective subscription demo started on http://{HOST}:{PORT}")
17+
yield
18+
19+
20+
app.router.lifespan_context = lifespan
21+
22+
23+
if __name__ == "__main__":
24+
uvicorn.run("main:app", host=HOST, port=PORT, reload=True, log_level="info")
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
[project]
2+
name = "selective-subscription-demo"
3+
version = "0.1.0"
4+
description = "Selective subscription demo using Fishjam Python SDK"
5+
readme = "README.md"
6+
requires-python = ">=3.11"
7+
dependencies = [
8+
"starlette>=0.35.0",
9+
"uvicorn>=0.25.0",
10+
"fishjam-server-sdk",
11+
"python-dotenv",
12+
]
13+
14+
[tool.uv.sources]
15+
fishjam-server-sdk = { workspace = true }
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
"""Selective subscription demo package."""
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
from pathlib import Path
2+
3+
from starlette.applications import Starlette
4+
from starlette.middleware import Middleware
5+
from starlette.middleware.cors import CORSMiddleware
6+
from starlette.requests import Request
7+
from starlette.responses import JSONResponse, Response
8+
from starlette.routing import Route
9+
from starlette.templating import Jinja2Templates
10+
11+
from .room_service import RoomService
12+
13+
room_service = RoomService()
14+
templates = Jinja2Templates(
15+
directory=str(Path(__file__).resolve().parent.parent / "templates")
16+
)
17+
18+
19+
async def create_peer(request: Request) -> Response:
20+
try:
21+
body = await request.json()
22+
room_name = body.get("room_name")
23+
peer_name = body.get("peer_name")
24+
25+
if not room_name or not peer_name:
26+
return JSONResponse(
27+
{"error": "room_name and peer_name are required"}, status_code=400
28+
)
29+
30+
peer, token = room_service.create_peer()
31+
32+
return JSONResponse(
33+
{
34+
"peer_id": peer.id,
35+
"token": token,
36+
"room_name": room_name,
37+
"peer_name": peer_name,
38+
}
39+
)
40+
except Exception as e:
41+
return JSONResponse({"error": str(e)}, status_code=500)
42+
43+
44+
async def subscribe_peer(request: Request) -> Response:
45+
try:
46+
body = await request.json()
47+
peer_id = body.get("peer_id")
48+
target_peer_id = body.get("target_peer_id")
49+
50+
if not peer_id or not target_peer_id:
51+
return JSONResponse(
52+
{"error": "peer_id and target_peer_id are required"}, status_code=400
53+
)
54+
55+
room_service.subscribe_peer(peer_id, target_peer_id)
56+
return JSONResponse({"status": "subscribed"})
57+
except Exception as e:
58+
return JSONResponse({"error": str(e)}, status_code=500)
59+
60+
61+
async def subscribe_tracks(request: Request) -> Response:
62+
try:
63+
body = await request.json()
64+
peer_id = body.get("peer_id")
65+
track_ids = body.get("track_ids")
66+
67+
if not peer_id or not track_ids:
68+
return JSONResponse(
69+
{"error": "peer_id and track_ids are required"}, status_code=400
70+
)
71+
72+
room_service.subscribe_tracks(peer_id, track_ids)
73+
return JSONResponse({"status": "subscribed"})
74+
except Exception as e:
75+
return JSONResponse({"error": str(e)}, status_code=500)
76+
77+
78+
async def serve_index(request: Request) -> Response:
79+
return templates.TemplateResponse("index.html", {"request": request})
80+
81+
82+
routes = [
83+
Route("/", serve_index, methods=["GET"]),
84+
Route("/api/peers", create_peer, methods=["POST"]),
85+
Route("/api/subscribe_peer", subscribe_peer, methods=["POST"]),
86+
Route("/api/subscribe_tracks", subscribe_tracks, methods=["POST"]),
87+
]
88+
89+
middleware = [
90+
Middleware(
91+
CORSMiddleware,
92+
allow_origins=["*"],
93+
allow_credentials=True,
94+
allow_methods=["*"],
95+
allow_headers=["*"],
96+
)
97+
]
98+
99+
app = Starlette(
100+
routes=routes,
101+
middleware=middleware,
102+
)
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
import os
2+
3+
import dotenv
4+
5+
dotenv.load_dotenv()
6+
7+
FISHJAM_ID = os.environ["FISHJAM_ID"]
8+
FISHJAM_TOKEN = os.environ["FISHJAM_MANAGEMENT_TOKEN"]
9+
HOST = os.getenv("HOST", "localhost")
10+
PORT = int(os.getenv("PORT", "8000"))
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
from fishjam._ws_notifier import FishjamNotifier
2+
from fishjam.events import (
3+
ServerMessagePeerConnected,
4+
ServerMessagePeerDisconnected,
5+
ServerMessagePeerType,
6+
ServerMessageTrackAdded,
7+
ServerMessageTrackRemoved,
8+
)
9+
from fishjam.events.allowed_notifications import AllowedNotification
10+
11+
from .config import FISHJAM_ID, FISHJAM_TOKEN
12+
from .room_service import RoomService
13+
14+
15+
class NotificationHandler:
16+
def __init__(self, room_service: RoomService):
17+
self.room_service = room_service
18+
self._notifier = FishjamNotifier(FISHJAM_ID, FISHJAM_TOKEN)
19+
20+
@self._notifier.on_server_notification
21+
async def _(notification: AllowedNotification):
22+
match notification:
23+
case ServerMessagePeerConnected(
24+
peer_type=ServerMessagePeerType.PEER_TYPE_WEBRTC,
25+
):
26+
await handle_peer_connected(notification)
27+
case ServerMessagePeerDisconnected(
28+
peer_type=ServerMessagePeerType.PEER_TYPE_WEBRTC,
29+
):
30+
await handle_peer_disconnected(notification)
31+
case ServerMessageTrackAdded():
32+
await handle_track_added(notification)
33+
case ServerMessageTrackRemoved():
34+
await handle_track_removed(notification)
35+
36+
async def handle_peer_connected(notification: ServerMessagePeerConnected):
37+
print(f"Peer connected: {notification.peer_id}")
38+
39+
async def handle_peer_disconnected(notification: ServerMessagePeerDisconnected):
40+
print(f"Peer disconnected: {notification.peer_id}")
41+
42+
async def handle_track_added(notification: ServerMessageTrackAdded):
43+
print(f"Track added: {notification.track}")
44+
45+
async def handle_track_removed(notification: ServerMessageTrackRemoved):
46+
print(f"Track removed: {notification.track}")
47+
48+
async def start(self) -> None:
49+
await self._notifier.connect()
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
from typing import List
2+
3+
from fishjam import FishjamClient, Peer, PeerOptions, Room, RoomOptions
4+
from fishjam.errors import NotFoundError
5+
6+
from .config import FISHJAM_ID, FISHJAM_TOKEN
7+
8+
9+
class RoomService:
10+
def __init__(self):
11+
self.fishjam = FishjamClient(
12+
fishjam_id=FISHJAM_ID, management_token=FISHJAM_TOKEN
13+
)
14+
self.room = self.fishjam.create_room(
15+
RoomOptions(max_peers=10, room_type="conference")
16+
)
17+
18+
def get_or_create_room(self) -> Room:
19+
try:
20+
self.room = self.fishjam.get_room(self.room.id)
21+
except NotFoundError:
22+
self.room = self.fishjam.create_room()
23+
24+
return self.room
25+
26+
def create_peer(self) -> tuple[Peer, str]:
27+
room = self.get_or_create_room()
28+
options = PeerOptions(subscribe_mode="manual")
29+
peer, token = self.fishjam.create_peer(room.id, options)
30+
return peer, token
31+
32+
def subscribe_peer(self, peer_id: str, target_peer_id: str):
33+
room = self.get_or_create_room()
34+
self.fishjam.subscribe_peer(room.id, peer_id, target_peer_id)
35+
36+
def subscribe_tracks(self, peer_id: str, track_ids: List[str]):
37+
room = self.get_or_create_room()
38+
self.fishjam.subscribe_tracks(room.id, peer_id, track_ids)

0 commit comments

Comments
 (0)