@@ -13,7 +13,7 @@ public class Call: @unchecked Sendable, WSEventsSubscriber {
1313 @Injected ( \. streamVideo) var streamVideo
1414 @Injected ( \. callCache) var callCache
1515
16- private lazy var stateMachine : StreamCallStateMachine = . init( self )
16+ private lazy var stateMachine : StateMachine = . init( self )
1717
1818 @MainActor
1919 public internal( set) var state = CallState ( )
@@ -53,6 +53,9 @@ public class Call: @unchecked Sendable, WSEventsSubscriber {
5353 internal let coordinatorClient : DefaultAPI
5454 private var cancellables = DisposableBag ( )
5555
56+ /// A serialQueueActor ensuring that call operations (e.g. join) will happen in a serial manner.
57+ private let callOperationSerialQueue = SerialActorQueue ( )
58+
5659 /// This adapter is used to manage closed captions for the
5760 /// call.
5861 private lazy var closedCaptionsAdapter = ClosedCaptionsAdapter ( self )
@@ -144,50 +147,42 @@ public class Call: @unchecked Sendable, WSEventsSubscriber {
144147 notify: Bool = false ,
145148 callSettings: CallSettings ? = nil
146149 ) async throws -> JoinCallResponse {
147- let currentStage = stateMachine. currentStage
148- switch currentStage. id {
149- case . joining:
150- break
151- case . joined where currentStage is StreamCallStateMachine . Stage. JoinedStage:
152- let stage = currentStage as! StreamCallStateMachine . Stage . JoinedStage
153- return stage. response
154- default :
155- stateMachine. transition (
156- . joining(
157- self ,
158- actionBlock: { [ weak self] in
159- guard let self else { throw ClientError . Unexpected ( ) }
160- return try await executeTask ( retryPolicy: . fastAndSimple, task: { [ weak self] in
161- guard let self else { throw ClientError . Unexpected ( ) }
162- let response = try await callController. joinCall (
150+ try await callOperationSerialQueue. sync { [ weak self] in
151+ guard let self else {
152+ throw ClientError ( )
153+ }
154+ let currentStage = stateMachine. currentStage
155+
156+ if
157+ currentStage. id == . joined,
158+ case let . joined( joinResponse) = currentStage. context. output {
159+ return joinResponse
160+ } else if
161+ currentStage. id == . joining,
162+ case let . join( input) = currentStage. context. input {
163+ return try await input
164+ . deliverySubject
165+ . nextValue ( timeout: CallConfiguration . timeout. join)
166+ } else {
167+ let deliverySubject = PassthroughSubject < JoinCallResponse , Error > ( )
168+ stateMachine. transition (
169+ . joining(
170+ self ,
171+ input: . join(
172+ . init(
163173 create: create,
164174 callSettings: callSettings,
165175 options: options,
166176 ring: ring,
167- notify: notify
177+ notify: notify,
178+ deliverySubject: deliverySubject
168179 )
169- if let callSettings {
170- await state. update ( callSettings: callSettings)
171- }
172- await state. update ( from: response)
173- let updated = await state. callSettings
174- updateCallSettingsManagers ( with: updated)
175- Task { @MainActor [ weak self] in
176- self ? . streamVideo. state. activeCall = self
177- }
178- return response
179- } )
180- }
180+ )
181+ )
181182 )
182- )
183+ return try await deliverySubject. nextValue ( timeout: CallConfiguration . timeout. join)
184+ }
183185 }
184-
185- return try await stateMachine
186- . nextStageShouldBe (
187- StreamCallStateMachine . Stage. JoinedStage. self,
188- dropFirst: 1
189- )
190- . response
191186 }
192187
193188 /// Gets the call on the backend with the given parameters.
@@ -350,24 +345,25 @@ public class Call: @unchecked Sendable, WSEventsSubscriber {
350345 @discardableResult
351346 public func accept( ) async throws -> AcceptCallResponse {
352347 let currentStage = stateMachine. currentStage
353- switch currentStage. id {
354- case . accepting:
355- break
356- case . accepted where currentStage is StreamCallStateMachine . Stage. AcceptedStage:
357- let stage = currentStage as! StreamCallStateMachine . Stage . AcceptedStage
358- return stage. response
359- default :
360- stateMachine. transition ( . accepting( self , actionBlock: { [ coordinatorClient, callType, callId] in
361- try await coordinatorClient. acceptCall ( type: callType, id: callId)
362- } ) )
363- }
364348
365- return try await stateMachine
366- . nextStageShouldBe (
367- StreamCallStateMachine . Stage. AcceptedStage. self,
368- dropFirst: 1
349+ if
350+ currentStage. id == . accepted,
351+ case let . accepted( response) = currentStage. context. output {
352+ return response
353+ } else if
354+ currentStage. id == . accepting,
355+ case let . accepting( deliverySubject) = currentStage. context. input {
356+ return try await deliverySubject. nextValue ( timeout: CallConfiguration . timeout. accept)
357+ } else {
358+ let deliverySubject = PassthroughSubject < AcceptCallResponse , Error > ( )
359+ stateMachine. transition (
360+ . accepting(
361+ self ,
362+ input: . accepting( deliverySubject: deliverySubject)
363+ )
369364 )
370- . response
365+ return try await deliverySubject. nextValue ( timeout: CallConfiguration . timeout. accept)
366+ }
371367 }
372368
373369 /// Rejects a call with an optional reason.
@@ -378,34 +374,27 @@ public class Call: @unchecked Sendable, WSEventsSubscriber {
378374 @discardableResult
379375 public func reject( reason: String ? = nil ) async throws -> RejectCallResponse {
380376 let currentStage = stateMachine. currentStage
381- switch currentStage. id {
382- case . rejecting:
383- break
384- case . rejected where currentStage is StreamCallStateMachine . Stage. RejectedStage:
385- let stage = currentStage as! StreamCallStateMachine . Stage . RejectedStage
386- return stage. response
387- default :
388- stateMachine. transition ( . rejecting( self , actionBlock: { [ coordinatorClient, callType, callId, streamVideo, cId] in
389- let response = try await coordinatorClient. rejectCall (
390- type: callType,
391- id: callId,
392- rejectCallRequest: . init( reason: reason)
393- )
394- if streamVideo. state. ringingCall? . cId == cId {
395- Task { @MainActor in
396- streamVideo. state. ringingCall = nil
397- }
398- }
399- return response
400- } ) )
401- }
402377
403- return try await stateMachine
404- . nextStageShouldBe (
405- StreamCallStateMachine . Stage. RejectedStage. self,
406- dropFirst: 1
378+ if
379+ currentStage. id == . rejected,
380+ case let . rejected( response) = currentStage. context. output {
381+ return response
382+ } else if
383+ currentStage. id == . rejecting,
384+ case let . rejecting( input) = currentStage. context. input {
385+ return try await input
386+ . deliverySubject
387+ . nextValue ( timeout: CallConfiguration . timeout. reject)
388+ } else {
389+ let deliverySubject = PassthroughSubject < RejectCallResponse , Error > ( )
390+ stateMachine. transition (
391+ . rejecting(
392+ self ,
393+ input: . rejecting( . init( deliverySubject: deliverySubject) )
394+ )
407395 )
408- . response
396+ return try await deliverySubject. nextValue ( timeout: CallConfiguration . timeout. reject)
397+ }
409398 }
410399
411400 /// Adds the given user to the list of blocked users for the call.
@@ -521,7 +510,7 @@ public class Call: @unchecked Sendable, WSEventsSubscriber {
521510 cancellables. removeAll ( )
522511 callController. leave ( )
523512 closedCaptionsAdapter. stop ( )
524- stateMachine. transition ( . idle( self ) )
513+ stateMachine. transition ( . idle( . init ( call : self ) ) )
525514 /// Upon `Call.leave` we remove the call from the cache. Any further actions that are required
526515 /// to happen on the call object (e.g. rejoin) will need to fetch a new instance from `StreamVideo`
527516 /// client.
@@ -1391,7 +1380,12 @@ public class Call: @unchecked Sendable, WSEventsSubscriber {
13911380 if stateMachine. currentStage. id == . joined {
13921381 state. disconnectionError = error
13931382 }
1394- stateMachine. transition ( . error( self , error: error) )
1383+ stateMachine. transition (
1384+ . error(
1385+ . init( call: self ) ,
1386+ error: error
1387+ )
1388+ )
13951389 }
13961390
13971391 // MARK: - private
@@ -1531,7 +1525,7 @@ public class Call: @unchecked Sendable, WSEventsSubscriber {
15311525 }
15321526 }
15331527
1534- private func updateCallSettingsManagers( with callSettings: CallSettings ) {
1528+ func updateCallSettingsManagers( with callSettings: CallSettings ) {
15351529 microphone. status = callSettings. audioOn ? . enabled : . disabled
15361530 camera. status = callSettings. videoOn ? . enabled : . disabled
15371531 camera. direction = callSettings. cameraPosition
0 commit comments