Skip to content

Commit 6be77bc

Browse files
committed
restructure API to use URI path
1 parent 3c28384 commit 6be77bc

File tree

4 files changed

+84
-42
lines changed

4 files changed

+84
-42
lines changed

demo/index.html

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ <h2>PulseBeam</h2>
1414

1515
<form id="controls" class="field middle-align mb-2">
1616
<input type="text" id="endpoint" placeholder="Enter WHIP/WHEP endpoint"
17-
value="http://localhost:3000?room=test&participant=lukas" required />
17+
value="http://localhost:3000/api/v1/rooms/test" required />
1818
<button type="submit" id="toggle" class="small-round">Start</button>
1919
</form>
2020

pulsebeam/src/controller.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,8 @@ pub enum ControllerError {
3131
#[derive(Debug)]
3232
pub enum ControllerMessage {
3333
Allocate(
34-
RoomId,
35-
ParticipantId,
34+
Arc<RoomId>,
35+
Arc<ParticipantId>,
3636
String,
3737
oneshot::Sender<Result<String, ControllerError>>,
3838
),
@@ -88,8 +88,8 @@ impl ControllerActor {
8888
pub async fn allocate(
8989
&mut self,
9090
_ctx: &mut actor::ActorContext<Self>,
91-
room_id: RoomId,
92-
participant_id: ParticipantId,
91+
room_id: Arc<RoomId>,
92+
participant_id: Arc<ParticipantId>,
9393
offer: String,
9494
) -> Result<String, ControllerError> {
9595
let offer = SdpOffer::from_sdp_string(&offer)?;
@@ -115,15 +115,14 @@ impl ControllerActor {
115115
.accept_offer(offer)
116116
.map_err(ControllerError::OfferRejected)?;
117117

118-
let room_id = Arc::new(room_id);
119118
let room_handle = self.get_or_create_room(room_id);
120119

121120
// TODO: probably retry? Or, let the client to retry instead?
122121
// Each room will always have a graceful timeout before closing.
123122
// But, a data race can still occur nonetheless
124123
room_handle
125124
.send_high(room::RoomMessage::AddParticipant(
126-
Arc::new(participant_id),
125+
participant_id,
127126
Box::new(rtc),
128127
))
129128
.await

pulsebeam/src/entity.rs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,7 @@ impl AsRef<str> for ExternalRoomId {
193193
}
194194
}
195195

196+
#[derive(serde::Serialize, serde::Deserialize)]
196197
pub struct ParticipantId {
197198
pub internal: EntityId,
198199
}
@@ -204,6 +205,12 @@ impl ParticipantId {
204205
}
205206
}
206207

208+
impl Default for ParticipantId {
209+
fn default() -> Self {
210+
Self::new()
211+
}
212+
}
213+
207214
impl hash::Hash for ParticipantId {
208215
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
209216
self.internal.hash(state);
@@ -222,6 +229,28 @@ impl Ord for ParticipantId {
222229
}
223230
}
224231

232+
impl TryFrom<String> for ParticipantId {
233+
type Error = IdValidationError;
234+
235+
fn try_from(value: String) -> Result<Self, Self::Error> {
236+
// Check prefix
237+
let (prefix, encoded) = value
238+
.split_once('_')
239+
.ok_or(IdValidationError::InvalidCharacters)?;
240+
241+
if prefix != prefix::PARTICIPANT_ID {
242+
return Err(IdValidationError::InvalidCharacters);
243+
}
244+
245+
// Check if the encoded part is valid base58
246+
bs58::decode(encoded)
247+
.into_vec()
248+
.map_err(|_| IdValidationError::InvalidCharacters)?;
249+
250+
Ok(ParticipantId { internal: value })
251+
}
252+
}
253+
225254
impl Eq for ParticipantId {}
226255

227256
impl PartialEq for ParticipantId {

pulsebeam/src/signaling.rs

Lines changed: 49 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,19 @@
1-
use crate::entity::{ParticipantId, RoomId};
2-
use crate::{controller, entity::ExternalRoomId};
1+
use std::sync::Arc;
2+
3+
use crate::controller;
4+
use crate::entity::{ExternalRoomId, ParticipantId, RoomId};
35
use axum::{
46
Router,
5-
extract::{Query, State},
6-
http::StatusCode,
7-
response::{IntoResponse, Response},
8-
routing::{delete, post},
7+
extract::{Path, State},
8+
http::{HeaderMap, StatusCode},
9+
response::IntoResponse,
10+
routing::{delete, get, post},
911
};
10-
use axum_extra::{TypedHeader, headers::ContentType};
11-
use hyper::HeaderMap;
12+
use axum_extra::TypedHeader;
13+
use axum_extra::headers::ContentType;
1214
use hyper::header::LOCATION;
1315

16+
/// Error type for signaling operations
1417
#[derive(thiserror::Error, Debug)]
1518
pub enum SignalingError {
1619
#[error("join failed: {0}")]
@@ -24,7 +27,7 @@ pub enum SignalingError {
2427
}
2528

2629
impl IntoResponse for SignalingError {
27-
fn into_response(self) -> Response {
30+
fn into_response(self) -> axum::response::Response {
2831
let status = match self {
2932
SignalingError::JoinError(controller::ControllerError::OfferInvalid(_)) => {
3033
StatusCode::BAD_REQUEST
@@ -48,56 +51,67 @@ impl IntoResponse for SignalingError {
4851
}
4952
}
5053

51-
#[derive(serde::Deserialize, serde::Serialize, Debug)]
52-
pub struct ParticipantInfo {
53-
room: ExternalRoomId,
54-
}
55-
54+
/// Join a room / create participant
55+
/// POST /api/v1/rooms/{roomId}
5656
#[axum::debug_handler]
57-
async fn spawn_participant(
58-
Query(info): Query<ParticipantInfo>,
57+
async fn join_room(
58+
Path(room_id): Path<ExternalRoomId>,
5959
State(con): State<controller::ControllerHandle>,
6060
TypedHeader(_content_type): TypedHeader<ContentType>,
6161
raw_offer: String,
6262
) -> Result<impl IntoResponse, SignalingError> {
63-
// TODO: validate content_type = "application/sdp"
64-
65-
let room_id = RoomId::new(info.room);
63+
let room_id = RoomId::new(room_id);
64+
let room_id = Arc::new(room_id);
6665
let participant_id = ParticipantId::new();
67-
tracing::info!("allocated {} to {}", participant_id, room_id);
66+
let participant_id = Arc::new(participant_id);
6867

69-
// TODO: better unique ID to handle session.
70-
let location_url = format!("/rooms/{}/participants/{}", &room_id, &participant_id,);
7168
let (answer_tx, answer_rx) = tokio::sync::oneshot::channel();
7269
con.send_high(controller::ControllerMessage::Allocate(
73-
room_id,
74-
participant_id,
70+
room_id.clone(),
71+
participant_id.clone(),
7572
raw_offer,
7673
answer_tx,
7774
))
7875
.await
7976
.map_err(|_| controller::ControllerError::ServiceUnavailable)?;
80-
let answer = answer_rx
77+
let answer_sdp = answer_rx
8178
.await
8279
.map_err(|_| controller::ControllerError::ServiceUnavailable)??;
8380

84-
let mut headers = HeaderMap::new();
85-
headers.insert(LOCATION, location_url.parse().unwrap());
86-
let resp = (StatusCode::CREATED, headers, answer);
87-
Ok(resp)
81+
let location_url = format!(
82+
"/api/v1/rooms/{}/participants/{}",
83+
&room_id, &participant_id
84+
);
85+
let mut response_headers = HeaderMap::new();
86+
response_headers.insert(LOCATION, location_url.parse().unwrap());
87+
Ok((StatusCode::CREATED, response_headers, answer_sdp))
8888
}
8989

90+
/// Leave a room / delete participant
91+
/// DELETE /api/v1/rooms/{roomId}/participants/{participantId}
9092
#[axum::debug_handler]
91-
async fn delete_participant(
92-
State(_state): State<controller::ControllerHandle>,
93+
async fn leave_room(
94+
Path((room_id, participant_id)): Path<(ExternalRoomId, ParticipantId)>,
95+
State(_controller): State<controller::ControllerHandle>,
9396
) -> Result<impl IntoResponse, SignalingError> {
94-
// TODO: delete participant from the room
95-
Ok(StatusCode::OK)
97+
// TODO: terminate participant session
98+
Ok(StatusCode::NO_CONTENT)
99+
}
100+
101+
/// Healthcheck endpoint for Kubernetes
102+
/// GET /healthz
103+
async fn healthcheck() -> impl IntoResponse {
104+
StatusCode::OK
96105
}
97106

107+
/// Router setup
98108
pub fn router(controller: controller::ControllerHandle) -> Router {
99109
Router::new()
100-
.route("/", post(spawn_participant))
101-
.route("/", delete(delete_participant))
110+
.route("/api/v1/rooms/{external_room_id}", post(join_room))
111+
.route(
112+
"/api/v1/rooms/{external_room_id}/participants/{participant_id}",
113+
delete(leave_room),
114+
)
115+
.route("/healthz", get(healthcheck))
102116
.with_state(controller)
103117
}

0 commit comments

Comments
 (0)