Skip to content

Commit d4361cc

Browse files
committed
init delete endpoint
1 parent 6be77bc commit d4361cc

File tree

6 files changed

+47
-37
lines changed

6 files changed

+47
-37
lines changed

demo/src/main.ts

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ const statusEl = document.getElementById("status") as HTMLSpanElement;
77

88
let pc: RTCPeerConnection | null = null;
99
let localStream: MediaStream | null = null;
10+
let sessionUrl: string | null = null;
1011

1112
form.onsubmit = async (e) => {
1213
e.preventDefault();
@@ -47,7 +48,9 @@ async function start(endpoint: string) {
4748
pc.addTransceiver("audio", { direction: "recvonly" });
4849
const remoteStream = new MediaStream();
4950
remoteVideo.srcObject = remoteStream;
50-
pc.ontrack = (e) => remoteStream.addTrack(e.track);
51+
pc.ontrack = (e) => {
52+
remoteStream.addTrack(e.track);
53+
};
5154

5255
pc.onconnectionstatechange = () => {
5356
statusEl.textContent = pc?.connectionState ?? "Disconnected";
@@ -61,10 +64,16 @@ async function start(endpoint: string) {
6164
});
6265
if (!response.ok) throw new Error(`request failed: ${response.status}`);
6366
const answerSdp = await response.text();
67+
sessionUrl = response.headers.get("location");
6468
await pc.setRemoteDescription({ type: "answer", sdp: answerSdp });
6569
}
6670

67-
function stop() {
71+
async function stop() {
72+
if (sessionUrl) {
73+
await fetch(sessionUrl, { method: "DELETE" });
74+
sessionUrl = null;
75+
}
76+
6877
pc?.close();
6978
localStream?.getTracks().forEach(track => track.stop());
7079
localVideo.srcObject = null;

pulsebeam/src/controller.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ pub enum ControllerMessage {
3636
String,
3737
oneshot::Sender<Result<String, ControllerError>>,
3838
),
39+
RemoveParticipant(Arc<RoomId>, Arc<ParticipantId>),
3940
}
4041

4142
pub struct ControllerActor {
@@ -80,6 +81,15 @@ impl actor::Actor for ControllerActor {
8081
ControllerMessage::Allocate(room_id, participant_id, offer, resp) => {
8182
let _ = resp.send(self.allocate(ctx, room_id, participant_id, offer).await);
8283
}
84+
85+
ControllerMessage::RemoveParticipant(room_id, participant_id) => {
86+
if let Some(room_handle) = self.rooms.get(&room_id) {
87+
// if the room has exited, the participants have already cleaned up too.
88+
let _ = room_handle
89+
.send_high(room::RoomMessage::RemoveParticipant(participant_id))
90+
.await;
91+
}
92+
}
8393
}
8494
}
8595
}

pulsebeam/src/entity.rs

Lines changed: 2 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,8 @@ impl AsRef<str> for ExternalRoomId {
193193
}
194194
}
195195

196-
#[derive(serde::Serialize, serde::Deserialize)]
196+
#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Hash, serde::Serialize, serde::Deserialize)]
197+
#[serde(try_from = "String")]
197198
pub struct ParticipantId {
198199
pub internal: EntityId,
199200
}
@@ -211,24 +212,6 @@ impl Default for ParticipantId {
211212
}
212213
}
213214

214-
impl hash::Hash for ParticipantId {
215-
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
216-
self.internal.hash(state);
217-
}
218-
}
219-
220-
impl PartialOrd for ParticipantId {
221-
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
222-
Some(self.cmp(other))
223-
}
224-
}
225-
226-
impl Ord for ParticipantId {
227-
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
228-
self.internal.cmp(&other.internal)
229-
}
230-
}
231-
232215
impl TryFrom<String> for ParticipantId {
233216
type Error = IdValidationError;
234217

@@ -251,14 +234,6 @@ impl TryFrom<String> for ParticipantId {
251234
}
252235
}
253236

254-
impl Eq for ParticipantId {}
255-
256-
impl PartialEq for ParticipantId {
257-
fn eq(&self, other: &Self) -> bool {
258-
self.internal.eq(&other.internal)
259-
}
260-
}
261-
262237
impl fmt::Display for ParticipantId {
263238
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
264239
fmt::Display::fmt(&self.internal, f)

pulsebeam/src/node.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ pub async fn run(
1313
// Configure CORS
1414
let cors = CorsLayer::very_permissive()
1515
.allow_origin(AllowOrigin::mirror_request())
16+
.expose_headers([hyper::header::LOCATION])
1617
.max_age(Duration::from_secs(86400));
1718

1819
// Spawn system and controller actors

pulsebeam/src/room.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ const EMPTY_ROOM_TIMEOUT: Duration = Duration::from_secs(30);
1919
pub enum RoomMessage {
2020
PublishTrack(Arc<TrackMeta>),
2121
AddParticipant(Arc<ParticipantId>, Box<Rtc>),
22+
RemoveParticipant(Arc<ParticipantId>),
2223
}
2324

2425
#[derive(Clone, Debug)]
@@ -103,6 +104,12 @@ impl actor::Actor for RoomActor {
103104
self.handle_participant_joined(ctx, participant_id, rtc)
104105
.await
105106
}
107+
RoomMessage::RemoveParticipant(participant_id) => {
108+
if let Some(participant_handle) = self.state.participants.get(&participant_id) {
109+
// if it's closed, then the participant has exited
110+
let _ = participant_handle.handle.terminate().await;
111+
}
112+
}
106113
RoomMessage::PublishTrack(track_meta) => {
107114
self.handle_track_published(track_meta).await;
108115
}

pulsebeam/src/signaling.rs

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,6 @@ impl IntoResponse for SignalingError {
5151
}
5252
}
5353

54-
/// Join a room / create participant
55-
/// POST /api/v1/rooms/{roomId}
5654
#[axum::debug_handler]
5755
async fn join_room(
5856
Path(room_id): Path<ExternalRoomId>,
@@ -78,23 +76,33 @@ async fn join_room(
7876
.await
7977
.map_err(|_| controller::ControllerError::ServiceUnavailable)??;
8078

79+
// TODO: remove hardcoded URI
8180
let location_url = format!(
82-
"/api/v1/rooms/{}/participants/{}",
83-
&room_id, &participant_id
81+
"http://localhost:3000/api/v1/rooms/{}/participants/{}",
82+
&room_id.external, &participant_id
8483
);
8584
let mut response_headers = HeaderMap::new();
8685
response_headers.insert(LOCATION, location_url.parse().unwrap());
8786
Ok((StatusCode::CREATED, response_headers, answer_sdp))
8887
}
8988

90-
/// Leave a room / delete participant
91-
/// DELETE /api/v1/rooms/{roomId}/participants/{participantId}
9289
#[axum::debug_handler]
9390
async fn leave_room(
9491
Path((room_id, participant_id)): Path<(ExternalRoomId, ParticipantId)>,
95-
State(_controller): State<controller::ControllerHandle>,
92+
State(con): State<controller::ControllerHandle>,
9693
) -> Result<impl IntoResponse, SignalingError> {
97-
// TODO: terminate participant session
94+
let room_id = RoomId::new(room_id);
95+
let room_id = Arc::new(room_id);
96+
let participant_id = Arc::new(participant_id);
97+
98+
// If controller has exited, there's no dangling participants and rooms
99+
let _ = con
100+
.send_high(controller::ControllerMessage::RemoveParticipant(
101+
room_id,
102+
participant_id,
103+
))
104+
.await;
105+
98106
Ok(StatusCode::NO_CONTENT)
99107
}
100108

0 commit comments

Comments
 (0)