Skip to content

Commit 86fae4a

Browse files
committed
Cleanup
1 parent 9105e3c commit 86fae4a

File tree

5 files changed

+17
-22
lines changed

5 files changed

+17
-22
lines changed

Sources/LiveKit/Broadcast/IPC/BroadcastUploader.swift

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,7 @@ final class BroadcastUploader: Sendable, Loggable {
4545
self.channel = channel
4646

4747
let messageLoopTask = Task.observing(channel.incomingMessages(BroadcastIPCHeader.self), by: self) { observer, message in
48-
let (header, _) = message
49-
observer.processMessage(header)
48+
observer.processMessageHeader(message.0)
5049
} onFailure: { observer, error in
5150
observer.log("IPCChannel returned error: \(error)")
5251
}
@@ -64,9 +63,6 @@ final class BroadcastUploader: Sendable, Loggable {
6463

6564
/// Close the connection to the receiver.
6665
func close() {
67-
state.mutate {
68-
$0.messageLoopTask = nil
69-
}
7066
channel.close()
7167
}
7268

@@ -87,10 +83,10 @@ final class BroadcastUploader: Sendable, Loggable {
8783
let rotation = VideoRotation(sampleBuffer.replayKitOrientation ?? .up)
8884
do {
8985
let (metadata, imageData) = try imageCodec.encode(sampleBuffer)
90-
Task { [weak self, channel] in
86+
Task {
9187
let header = BroadcastIPCHeader.image(metadata, rotation)
9288
try await channel.send(header: header, payload: imageData)
93-
self?.state.mutate { $0.isUploadingImage = false }
89+
state.mutate { $0.isUploadingImage = false }
9490
}
9591
} catch {
9692
state.mutate { $0.isUploadingImage = false }
@@ -99,7 +95,7 @@ final class BroadcastUploader: Sendable, Loggable {
9995
case .audioApp:
10096
guard state.shouldUploadAudio else { return }
10197
let (metadata, audioData) = try audioCodec.encode(sampleBuffer)
102-
Task { [channel] in
98+
Task {
10399
let header = BroadcastIPCHeader.audio(metadata)
104100
try await channel.send(header: header, payload: audioData)
105101
}
@@ -108,7 +104,7 @@ final class BroadcastUploader: Sendable, Loggable {
108104
}
109105
}
110106

111-
private func processMessage(_ header: BroadcastIPCHeader) {
107+
private func processMessageHeader(_ header: BroadcastIPCHeader) {
112108
switch header {
113109
case let .wantsAudio(wantsAudio):
114110
state.mutate { $0.shouldUploadAudio = wantsAudio }

Sources/LiveKit/Core/DataChannelPair.swift

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ class DataChannelPair: NSObject, @unchecked Sendable, Loggable {
5858
}
5959

6060
private let _state: StateSync<State>
61+
6162
private let eventContinuation: AsyncStream<ChannelEvent>.Continuation
6263
private var eventLoopTask: AnyTaskCancellable?
6364

@@ -255,13 +256,13 @@ class DataChannelPair: NSObject, @unchecked Sendable, Loggable {
255256
_state = StateSync(State(lossy: lossyChannel,
256257
reliable: reliableChannel))
257258

258-
let (eventStream, continuation) = AsyncStream.makeStream(of: ChannelEvent.self)
259-
eventContinuation = continuation
260-
261259
if let delegate {
262260
delegates.add(delegate: delegate)
263261
}
264262

263+
let (eventStream, continuation) = AsyncStream.makeStream(of: ChannelEvent.self)
264+
eventContinuation = continuation
265+
265266
super.init()
266267

267268
eventLoopTask = Task.observing(eventStream, by: self, state: Buffers()) { observer, event, buffers in

Sources/LiveKit/Core/SignalClient.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ actor SignalClient: Loggable {
140140
connectOptions: connectOptions)
141141

142142
let messageLoopTask = Task.observing(socket, by: self) { observer, message in
143-
await observer._onWebSocketMessage(message: message)
143+
await observer.onWebSocketMessage(message)
144144
} onFailure: { observer, error in
145145
await observer.cleanUp(withError: error)
146146
}
@@ -241,7 +241,7 @@ private extension SignalClient {
241241
await _requestQueue.processIfResumed(request, elseEnqueue: request.canBeQueued())
242242
}
243243

244-
func _onWebSocketMessage(message: URLSessionWebSocketTask.Message) async {
244+
func onWebSocketMessage(_ message: URLSessionWebSocketTask.Message) async {
245245
let response: Livekit_SignalResponse? = switch message {
246246
case let .data(data): try? Livekit_SignalResponse(serializedBytes: data)
247247
case let .string(string): try? Livekit_SignalResponse(jsonString: string)

Sources/LiveKit/DataStream/Incoming/IncomingStreamManager.swift

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ actor IncomingStreamManager: Loggable {
2424
let openTime: TimeInterval
2525
let continuation: StreamReaderSource.Continuation
2626
var readLength = 0
27-
var task: AnyTaskCancellable?
27+
var handlerTask: AnyTaskCancellable?
2828
}
2929

3030
/// Mapping between stream ID and descriptor for open streams.
@@ -133,16 +133,13 @@ actor IncomingStreamManager: Loggable {
133133
continuation = $0
134134
}
135135

136-
var descriptor = Descriptor(
136+
let descriptor = Descriptor(
137137
info: info,
138138
openTime: Date.timeIntervalSinceReferenceDate,
139-
continuation: continuation
139+
continuation: continuation,
140+
handlerTask: Task.detached { try await handler(source, identity) }.cancellable()
140141
)
141142

142-
descriptor.task = Task.detached {
143-
try await handler(source, identity)
144-
}.cancellable()
145-
146143
openStreams[info.id] = descriptor
147144
}
148145

Tests/LiveKitCoreTests/TaskObserveTests.swift

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ class TaskObserveTests: LKTestCase {
6868

6969
func testStreamBreaksWhenObserverDeallocates() async throws {
7070
var observer: TestObserver? = TestObserver(id: "dealloc-test")
71-
weak let weakObserver = observer
71+
weak var weakObserver = observer
7272

7373
let (stream, continuation) = AsyncStream.makeStream(of: Int.self)
7474

@@ -88,6 +88,7 @@ class TaskObserveTests: LKTestCase {
8888
try await Task.sleep(nanoseconds: 50_000_000)
8989

9090
XCTAssertNil(weakObserver, "Observer should have been deallocated")
91+
weakObserver = nil
9192

9293
continuation.yield(3)
9394
continuation.yield(4)

0 commit comments

Comments
 (0)