Skip to content

Commit 353b166

Browse files
committed
almost working with dashboard
1 parent cf06295 commit 353b166

File tree

10 files changed

+560
-386
lines changed

10 files changed

+560
-386
lines changed

examples/selective_subscription/main.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,21 +4,20 @@
44
from selective_subscription.app import app, room_service
55
from selective_subscription.config import HOST, PORT
66
from selective_subscription.notification_handler import NotificationHandler
7-
from selective_subscription.selective_subscription.worker import async_worker
7+
from selective_subscription.worker import async_worker
88

99

1010
@asynccontextmanager
1111
async def lifespan(app):
12-
"""Application lifespan manager."""
1312
async with async_worker() as worker:
1413
notification_handler = NotificationHandler(room_service)
1514
worker.run_in_background(notification_handler.start())
1615

1716
print(f"Selective subscription demo started on http://{HOST}:{PORT}")
1817
print("Available endpoints:")
1918
print(" POST /api/peers - Create a new peer")
20-
print(" GET /api/rooms/{room_name}/peers - Get available peers")
21-
print(" POST /api/subscriptions - Toggle subscription")
19+
print(" POST /api/subscribe_peer - subscribe to all tracks of a peer")
20+
print(" POST /api/subscribe_tracks - subscribe to specific tracks")
2221

2322
yield
2423

examples/selective_subscription/selective_subscription/app.py

Lines changed: 26 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import json
2+
from pathlib import Path
23
from typing import Dict, Any
34

45
from starlette.applications import Starlette
@@ -12,13 +13,11 @@
1213
from .room_service import RoomService
1314

1415

15-
# Initialize services
1616
room_service = RoomService()
17-
templates = Jinja2Templates(directory="templates")
17+
templates = Jinja2Templates(directory=str(Path(__file__).resolve().parent.parent / "templates"))
1818

1919

2020
async def create_peer(request: Request) -> Response:
21-
"""Create a new peer in a room."""
2221
try:
2322
body = await request.json()
2423
room_name = body.get("room_name")
@@ -42,99 +41,60 @@ async def create_peer(request: Request) -> Response:
4241
except Exception as e:
4342
return JSONResponse({"error": str(e)}, status_code=500)
4443

45-
46-
async def get_available_peers(request: Request) -> Response:
47-
"""Get peers available for subscription."""
48-
room_name = request.path_params.get("room_name")
49-
peer_id = request.query_params.get("peer_id")
50-
51-
if not room_name:
52-
return JSONResponse({"error": "room_name is required"}, status_code=400)
53-
54-
peers = room_service.get_available_peers(room_name, peer_id)
55-
56-
return JSONResponse({
57-
"peers": [
58-
{
59-
"id": peer.id,
60-
"metadata": peer.metadata if peer.metadata else {},
61-
"tracks": [
62-
{
63-
"id": track["id"],
64-
"type": track.get("type", "unknown")
65-
}
66-
for track in peer.tracks
67-
]
68-
}
69-
for peer in peers
70-
]
71-
})
72-
73-
74-
async def toggle_subscription(request: Request) -> Response:
75-
"""Toggle subscription to a peer's tracks."""
44+
async def subscribe_peer(request: Request) -> Response:
7645
try:
7746
body = await request.json()
7847
peer_id = body.get("peer_id")
7948
target_peer_id = body.get("target_peer_id")
80-
49+
8150
if not peer_id or not target_peer_id:
8251
return JSONResponse(
8352
{"error": "peer_id and target_peer_id are required"},
8453
status_code=400
8554
)
86-
87-
subscribed = room_service.toggle_subscription(peer_id, target_peer_id)
88-
89-
return JSONResponse({
90-
"subscribed": subscribed,
91-
"peer_id": peer_id,
92-
"target_peer_id": target_peer_id
93-
})
55+
56+
room_service.subscibe_peer(peer_id, target_peer_id)
57+
58+
return JSONResponse({"status": "subscribed"})
9459

9560
except Exception as e:
9661
return JSONResponse({"error": str(e)}, status_code=500)
62+
63+
async def subscribe_tracks(request: Request) -> Response:
64+
try:
65+
body = await request.json()
66+
peer_id = body.get("peer_id")
67+
track_ids = body.get("track_ids")
9768

69+
if not peer_id or not track_ids:
70+
return JSONResponse(
71+
{"error": "peer_id and track_ids are required"},
72+
status_code=400
73+
)
9874

99-
async def get_subscription_status(request: Request) -> Response:
100-
"""Get current subscription status for a peer."""
101-
peer_id = request.path_params.get("peer_id")
102-
103-
if not peer_id:
104-
return JSONResponse({"error": "peer_id is required"}, status_code=400)
105-
106-
session = room_service.get_peer_session(peer_id)
107-
108-
if not session:
109-
return JSONResponse({"error": "Peer not found"}, status_code=404)
110-
111-
return JSONResponse({
112-
"peer_id": peer_id,
113-
"subscribed_peers": list(session.subscribed_peers)
114-
})
75+
room_service.subscribe_tracks(peer_id, track_ids)
11576

77+
return JSONResponse({"status": "subscribed"})
78+
79+
except Exception as e:
80+
return JSONResponse({"error": str(e)}, status_code=500)
11681

11782
async def health_check(request: Request) -> Response:
118-
"""Health check endpoint."""
11983
return JSONResponse({"status": "OK"})
12084

12185

12286
async def serve_index(request: Request) -> Response:
123-
"""Serve the main HTML interface."""
12487
return templates.TemplateResponse("index.html", {"request": request})
12588

12689

127-
# Define routes
12890
routes = [
12991
Route("/", serve_index, methods=["GET"]),
13092
Route("/health", health_check, methods=["GET"]),
13193
Route("/api/peers", create_peer, methods=["POST"]),
132-
Route("/api/rooms/{room_name}/peers", get_available_peers, methods=["GET"]),
133-
Route("/api/subscriptions", toggle_subscription, methods=["POST"]),
134-
Route("/api/peers/{peer_id}/subscriptions", get_subscription_status, methods=["GET"]),
94+
Route("/api/subscribe_peer", subscribe_peer, methods=["POST"]),
95+
Route("/api/subscribe_tracks", subscribe_tracks, methods=["POST"]),
13596
]
13697

137-
# Define middleware
13898
middleware = [
13999
Middleware(
140100
CORSMiddleware,
@@ -145,7 +105,6 @@ async def serve_index(request: Request) -> Response:
145105
)
146106
]
147107

148-
# Create application
149108
app = Starlette(
150109
routes=routes,
151110
middleware=middleware,

examples/selective_subscription/selective_subscription/config.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import os
22

3-
FISHJAM_ID = os.getenv("FISHJAM_ID", "")
3+
FISHJAM_ID = os.environ["FISHJAM_ID"]
44
FISHJAM_TOKEN = os.environ["FISHJAM_MANAGEMENT_TOKEN"]
55
FISHJAM_URL = os.getenv("FISHJAM_URL", "http://localhost:5002")
66
HOST = os.getenv("HOST", "localhost")

examples/selective_subscription/selective_subscription/notification_handler.py

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,4 @@
1-
from typing import Dict, Set
2-
import asyncio
3-
4-
from .config import FISHJAM_ID, FISHJAM_TOKEN, FISHJAM_URL
1+
from .config import FISHJAM_ID, FISHJAM_TOKEN
52
from .room_service import RoomService
63
from fishjam._ws_notifier import FishjamNotifier
74
from fishjam.events import (
@@ -15,8 +12,6 @@
1512

1613

1714
class NotificationHandler:
18-
"""Handles Fishjam server notifications for selective subscription.
19-
"""
2015

2116
def __init__(self, room_service: RoomService):
2217
self.room_service = room_service

examples/selective_subscription/selective_subscription/room_service.py

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,12 @@
1-
from dataclasses import dataclass
2-
from typing import Dict, List, Optional
1+
from typing import List
32

43
from fishjam import FishjamClient, Peer, PeerOptions, Room, RoomOptions
54
from fishjam.errors import NotFoundError
65

7-
from .config import FISHJAM_ID, FISHJAM_TOKEN, FISHJAM_URL
6+
from .config import FISHJAM_ID, FISHJAM_TOKEN
87

98

109
class RoomService:
11-
"""Service for managing rooms and peer subscriptions."""
1210

1311
def __init__(self):
1412
self.fishjam = FishjamClient(
@@ -21,7 +19,6 @@ def __init__(self):
2119
))
2220

2321
def get_or_create_room(self) -> Room:
24-
"""Get existing room or create a new one."""
2522
if self.room:
2623
try:
2724
room = self.fishjam.get_room(self.room.id)
@@ -32,7 +29,6 @@ def get_or_create_room(self) -> Room:
3229
return self.fishjam.create_room()
3330

3431
def create_peer(self) -> tuple[Peer, str]:
35-
"""Create a peer with manual subscription mode."""
3632
room = self.get_or_create_room()
3733

3834
options = PeerOptions(
@@ -44,23 +40,19 @@ def create_peer(self) -> tuple[Peer, str]:
4440

4541

4642
def subscibe_peer(self, peer_id: str, target_peer_id: str):
47-
"""Subscribe a peer to all tracks of another peer."""
4843
room = self.get_or_create_room()
4944

5045
self.fishjam.subscribe_peer(room.id, peer_id, target_peer_id)
5146

5247
def subscribe_tracks(self, peer_id: str, track_ids: List[str]):
53-
"""Subscribe a peer to specific tracks."""
5448
room = self.get_or_create_room()
5549

5650
self.fishjam.subscribe_tracks(room.id, peer_id, track_ids)
5751

5852
def get_peer_session(self, peer_id: str):
59-
"""Return a lightweight session-like object for example endpoints."""
6053
room = self.get_or_create_room()
6154
for p in room.peers:
6255
if p.id == peer_id:
63-
# create a simple object that has subscribed_peers attribute
6456
class _Session:
6557
def __init__(self):
6658
self.subscribed_peers: set[str] = set()

0 commit comments

Comments
 (0)