Skip to content

Commit f3a3389

Browse files
committed
WIP Stream forwarding + didRemoveMediaStream
1 parent 75b4a05 commit f3a3389

File tree

9 files changed

+158
-24
lines changed

9 files changed

+158
-24
lines changed

Sources/alloclient/UIWebRTCTransport.swift

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -280,7 +280,8 @@ class UIWebRTCTransport: NSObject, Transport, LKRTCPeerConnectionDelegate, LKRTC
280280

281281
public func peerConnection(_ peerConnection: LKRTCPeerConnection, didRemove stream: LKRTCMediaStream)
282282
{
283-
283+
print("Lost stream: \(clientId!): \(stream)")
284+
delegate?.transport(self, didRemoveMediaStream: stream.wrapper)
284285
}
285286

286287
var renegotiationNeeded = false
@@ -457,6 +458,11 @@ class UIWebRTCTransport: NSObject, Transport, LKRTCPeerConnectionDelegate, LKRTC
457458
micTrack?.isEnabled = newValue
458459
}
459460
}
461+
462+
static func forward(mediaStream: any MediaStream, to: any Transport) throws -> any MediaStreamForwarder
463+
{
464+
fatalError("Not implemented")
465+
}
460466
}
461467

462468
// MARK: - Wrapper classes

Sources/alloheadless/HeadlessWebRTCTransport.swift

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ public class HeadlessWebRTCTransport: Transport
3939
peer.$tracks.sinkChanges(added: { track in
4040
self.delegate?.transport(self, didReceiveMediaStream: track)
4141
}, removed: { track in
42-
// TODO: notify track removals too
42+
self.delegate?.transport(self, didRemoveMediaStream: track)
4343
}).store(in: &cancellables)
4444

4545
// TODO: subscribe to more callbacks and match UIWebRTCTransport's behavior
@@ -145,15 +145,14 @@ public class HeadlessWebRTCTransport: Transport
145145
fatalError("Not available server-side")
146146
}
147147

148-
public func addOutgoingStream(_ stream: MediaStream)
148+
public static func forward(mediaStream: MediaStream, to transport: any Transport) throws -> MediaStreamForwarder
149149
{
150-
// TODO: Implement stream forwarding when you have multiple clients
151-
// This would involve forwarding streams between HeadlessWebRTCTransport instances
152-
}
153-
154-
public func forwardStream(from otherTransport: HeadlessWebRTCTransport, streamId: String) -> Bool
155-
{
156-
return false
150+
print("Forwarding media stream \(mediaStream.mediaId) to \(transport.clientId)")
151+
let track = mediaStream as! AlloWebRTCPeer.Track
152+
let peer = (transport as! HeadlessWebRTCTransport).peer
153+
let sfu = try MediaForwardingUnit(forwarding: track, to: peer)
154+
try peer.lockLocalDescription(type: .unspecified)
155+
return sfu
157156
}
158157
}
159158

@@ -173,6 +172,10 @@ extension AlloWebRTCPeer.Track : MediaStream
173172
}
174173
}
175174

175+
extension MediaForwardingUnit : MediaStreamForwarder
176+
{
177+
}
178+
176179

177180
extension SignallingPayload
178181
{

Sources/allonet2/AlloClient.swift

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -262,6 +262,12 @@ open class AlloClient : AlloSessionDelegate, ObservableObject, Identifiable
262262
// TODO: Playback
263263
}
264264

265+
nonisolated public func session(_: AlloSession, didRemoveMediaStream: MediaStream)
266+
{
267+
print("Client lost audio track")
268+
// TODO: Stop playback
269+
}
270+
265271
// MARK: - Interactions, intent and place state
266272

267273
public struct InteractionHandler<T>

Sources/allonet2/AlloSession.swift

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,7 @@ public protocol AlloSessionDelegate: AnyObject
2020
func session(_: AlloSession, didReceiveIntent intent: Intent)
2121

2222
func session(_: AlloSession, didReceiveMediaStream: MediaStream)
23-
// TODO: handle losing a media stream too
24-
//func session(_: AlloSession, didRemoveMediaStream: MediaStream)
23+
func session(_: AlloSession, didRemoveMediaStream: MediaStream)
2524
}
2625

2726
/// Wrapper of Transport, adding Alloverse-specific channels and data types
@@ -34,7 +33,7 @@ public class AlloSession : NSObject, TransportDelegate
3433
private var interactionChannel: DataChannel!
3534
private var worldstateChannel: DataChannel!
3635

37-
private var incomingStreams: [String/*StreamID*/: MediaStream] = [:]
36+
public private(set) var incomingStreams: [String/*StreamID*/: MediaStream] = [:]
3837

3938
private var outstandingInteractions: [Interaction.RequestID: CheckedContinuation<Interaction, Never>] = [:]
4039

@@ -238,4 +237,11 @@ public class AlloSession : NSObject, TransportDelegate
238237
incomingStreams[stream.mediaId] = stream
239238
delegate?.session(self, didReceiveMediaStream: stream)
240239
}
240+
241+
public func transport(_ transport: Transport, didRemoveMediaStream stream: MediaStream)
242+
{
243+
incomingStreams[stream.mediaId] = nil
244+
delegate?.session(self, didRemoveMediaStream: stream)
245+
}
246+
241247
}

Sources/allonet2/PlaceServer/PlaceServer+Interaction.swift

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,8 @@ extension PlaceServer
178178
print("Accepted client \(client.cid) with email \(identity.emailAddress), display name \(identity.displayName), assigned avatar id \(ent.id)")
179179
await heartbeat.awaitNextSync() // make it exist before we tell client about it
180180

181+
self.start(forwardingTo: client)
182+
181183
client.session.send(interaction: inter.makeResponse(with: .announceResponse(avatarId: ent.id, placeName: name)))
182184
case .createEntity(let description):
183185
let ent = await self.createEntity(from: description, for: client)
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
//
2+
// PlaceServer+Media.swift
3+
// allonet2
4+
//
5+
// Created by Nevyn Bengtsson on 2025-08-28.
6+
//
7+
8+
import Foundation
9+
10+
extension PlaceServer
11+
{
12+
// TODO: Forward audio based on interactions instead of doing everyone-to-everyone
13+
/// A new incoming stream arrived. Forward it to everyone.
14+
internal func handle(incoming stream: MediaStream, from sender: ConnectedClient)
15+
{
16+
for (cid, receiver) in self.clients
17+
{
18+
do {
19+
try start(forwarding: stream, from: sender, to: receiver)
20+
} catch let e {
21+
print("FAILED to forward media stream \(sender.cid).\(stream.mediaId) -> \(receiver.cid): \(e)")
22+
}
23+
}
24+
}
25+
26+
/// A new client connected. Forward every existing stream to it.
27+
internal func start(forwardingTo receiver: ConnectedClient)
28+
{
29+
for sender in clients.values where sender !== receiver
30+
{
31+
for stream in sender.session.incomingStreams.values
32+
{
33+
do {
34+
try self.start(forwarding: stream, from: sender, to: receiver)
35+
} catch let e {
36+
print("FAILED to do initial forwarding of media stream \(sender.cid).\(stream.mediaId) -> \(receiver.cid): \(e)")
37+
}
38+
}
39+
}
40+
}
41+
42+
/// Perform the
43+
internal func start(forwarding stream: MediaStream, from sender: ConnectedClient, to receiver: ConnectedClient) throws
44+
{
45+
// Don't forward the stream back to its source
46+
if sender.session.clientId == receiver.session.clientId { return }
47+
48+
let transport = receiver.session.transport
49+
50+
let id = SFUIdentifier(fromMediaId: stream.mediaId, toClient: receiver.cid)
51+
if let existingSfu = sfus[id]
52+
{
53+
return
54+
}
55+
56+
let sfu = try transportClass.forward(mediaStream: stream, to: transport)
57+
sfus[id] = sfu
58+
}
59+
60+
internal func stop(forwarding stream: MediaStream, to client: ConnectedClient)
61+
{
62+
stop(forwarding: stream, toClientId: client.cid)
63+
}
64+
internal func stop(forwarding stream: MediaStream, toClientId: ClientId)
65+
{
66+
let id = SFUIdentifier(fromMediaId: stream.mediaId, toClient: toClientId)
67+
guard let sfu = sfus[id] else { return }
68+
sfu.stop()
69+
sfus[id] = nil
70+
}
71+
72+
public func stop(forwarding stream: MediaStream)
73+
{
74+
for id in sfus.keys
75+
{
76+
if id.fromMediaId == stream.mediaId
77+
{
78+
self.stop(forwarding: stream, toClientId: id.toClient)
79+
}
80+
}
81+
}
82+
83+
internal func stop(forwarding client: ConnectedClient)
84+
{
85+
for stream in client.session.incomingStreams.values
86+
{
87+
self.stop(forwarding: stream)
88+
}
89+
}
90+
91+
internal struct SFUIdentifier: Equatable, Hashable
92+
{
93+
let fromMediaId: String
94+
let toClient: ClientId
95+
}
96+
}

Sources/allonet2/PlaceServer/PlaceServer.swift

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ public class PlaceServer : AlloSessionDelegate
2222
let options: TransportConnectionOptions
2323

2424
var outstandingClientToClientInteractions: [Interaction.RequestID: ClientId] = [:]
25-
25+
internal var sfus: [SFUIdentifier: MediaStreamForwarder] = [:]
2626
internal var authenticationProvider: ConnectedClient?
2727

2828
let connectionStatus = ConnectionStatus()
@@ -68,8 +68,9 @@ public class PlaceServer : AlloSessionDelegate
6868
}
6969
print("Lost session for client \(cid)")
7070
Task { @MainActor in
71-
if let _ = self.clients.removeValue(forKey: cid)
71+
if let client = self.clients.removeValue(forKey: cid)
7272
{
73+
self.stop(forwarding: client)
7374
await self.removeEntites(ownedBy: cid)
7475
}
7576
self.unannouncedClients[cid] = nil
@@ -78,6 +79,8 @@ public class PlaceServer : AlloSessionDelegate
7879
print("Lost client was our authentication provider, removing it")
7980
authenticationProvider = nil
8081
}
82+
83+
8184
}
8285
}
8386

@@ -102,16 +105,19 @@ public class PlaceServer : AlloSessionDelegate
102105
}
103106
}
104107

105-
nonisolated public func session(_ sess: AlloSession, didReceiveMediaStream stream: MediaStream)
108+
nonisolated public func session(_ sess: AlloSession, didReceiveMediaStream stream: any MediaStream)
106109
{
110+
let cid = sess.clientId!
107111
Task { @MainActor in
108-
for (cid, client) in self.clients
109-
{
110-
if cid == sess.clientId! { continue }
111-
// TODO: Stream forwarding!
112-
//client.session.addOutgoing(stream: stream)
113-
}
114-
// TODO: also attach to new clients that connect after this stream comes in -- or maybe not, it's up to them to subscribe with interactions, right?
112+
guard let client = self.clients[cid] else { return }
113+
self.handle(incoming: stream, from: client)
114+
}
115+
}
116+
117+
nonisolated public func session(_: AlloSession, didRemoveMediaStream stream: any MediaStream)
118+
{
119+
Task { @MainActor in
120+
self.stop(forwarding: stream)
115121
}
116122
}
117123
}

Sources/allonet2/TransportProtocol.swift

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ public protocol TransportDelegate: AnyObject {
1414
func transport(didDisconnect transport: Transport)
1515
func transport(_ transport: Transport, didReceiveData data: Data, on channel: DataChannel)
1616
func transport(_ transport: Transport, didReceiveMediaStream stream: MediaStream)
17+
func transport(_ transport: Transport, didRemoveMediaStream stream: MediaStream)
1718
func transport(requestsRenegotiation transport: Transport)
1819
}
1920

@@ -34,6 +35,9 @@ public protocol Transport: AnyObject
3435
// Data channels
3536
func createDataChannel(label: DataChannelLabel, reliable: Bool) -> DataChannel?
3637
func send(data: Data, on channel: DataChannelLabel)
38+
39+
// Media channels
40+
static func forward(mediaStream: MediaStream, to: any Transport) throws -> MediaStreamForwarder
3741
}
3842

3943
public struct TransportConnectionOptions: Sendable
@@ -98,3 +102,8 @@ public protocol MediaStream {
98102
public protocol AudioTrack {
99103
var isEnabled: Bool { get set }
100104
}
105+
106+
public protocol MediaStreamForwarder
107+
{
108+
func stop()
109+
}

0 commit comments

Comments
 (0)