@@ -43,16 +43,11 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
4343 PCTransport ? get primary => _subscriberPrimary ? subscriber : publisher;
4444
4545 // data channels for packets
46- rtc.RTCDataChannel ? _reliableDC ;
47- rtc.RTCDataChannel ? _lossyDC ;
46+ rtc.RTCDataChannel ? _reliableDCPub ;
47+ rtc.RTCDataChannel ? _lossyDCPub ;
4848 rtc.RTCDataChannel ? _reliableDCSub;
4949 rtc.RTCDataChannel ? _lossyDCSub;
5050
51- rtc.RTCDataChannelState get reliableDataChannelState =>
52- _reliableDC? .state ?? rtc.RTCDataChannelState .RTCDataChannelClosed ;
53-
54- rtc.RTCDataChannelState get lossyDataChannelState =>
55- _lossyDC? .state ?? rtc.RTCDataChannelState .RTCDataChannelClosed ;
5651 bool _iceConnected = false ;
5752
5853 ConnectionState _connectionState = ConnectionState .disconnected;
@@ -192,49 +187,61 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
192187 Future <void > sendDataPacket (
193188 lk_models.DataPacket packet,
194189 ) async {
195- // make sure we do have a data connection
196- await _ensurePublisherConnected ();
190+ //
191+ rtc.RTCDataChannel ? publisherDataChannel (Reliability reliability) =>
192+ reliability == Reliability .reliable ? _reliableDCPub : _lossyDCPub;
193+
194+ rtc.RTCDataChannelState publisherDataChannelState (
195+ Reliability reliability) =>
196+ publisherDataChannel (reliability)? .state ??
197+ rtc.RTCDataChannelState .RTCDataChannelClosed ;
198+
199+ final reliability = packet.kind.toSDKType ();
197200
198201 // construct the data channel message
199202 final message =
200203 rtc.RTCDataChannelMessage .fromBinary (packet.writeToBuffer ());
201204
202- // chose data channel
203- final rtc.RTCDataChannel ? channel =
204- packet.kind == lk_models.DataPacket_Kind .LOSSY ? _lossyDC : _reliableDC;
205+ if (_subscriberPrimary) {
206+ // make sure publisher transport is connected
205207
206- // send if channel exists
207- if (channel == null ) {
208- throw UnexpectedStateException ('Data channel is not ready' );
209- }
208+ if (publisher? .pc.iceConnectionState? .isConnected () != true ) {
209+ logger.fine ('Publisher is not connected...' );
210210
211- logger.fine ('sendDataPacket(label:${channel .label })' );
212- await channel.send (message);
213- }
211+ // start negotiation
212+ if (publisher? .pc.iceConnectionState !=
213+ rtc.RTCIceConnectionState .RTCIceConnectionStateChecking ) {
214+ await negotiate ();
215+ }
214216
215- Future <void > _ensurePublisherConnected () async {
216- logger.fine ('ensurePublisherConnected()' );
217- if (! _subscriberPrimary) {
218- return ;
219- }
217+ logger.fine ('Waiting for publisher to ice-connect...' );
218+ await events.waitFor <EnginePublisherIceStateUpdatedEvent >(
219+ filter: (event) => event.iceState.isConnected (),
220+ duration: Timeouts .iceConnection,
221+ );
222+ }
220223
221- if (publisher? .pc.iceConnectionState? .isConnected () == true ) {
222- logger.warning ('[$objectId ] publisher is already connected' );
223- return ;
224+ // wait for data channel to open (if not already)
225+ if (publisherDataChannelState (packet.kind.toSDKType ()) !=
226+ rtc.RTCDataChannelState .RTCDataChannelOpen ) {
227+ logger.fine ('Waiting for data channel ${reliability } to open...' );
228+ await events.waitFor <PublisherDataChannelStateUpdatedEvent >(
229+ filter: (event) => event.type == reliability,
230+ duration: Timeouts .connection,
231+ );
232+ }
224233 }
225234
226- // start negotiation
227- await negotiate ();
228-
229- logger.fine ('[PUBLISHER] waiting for to ice-connect '
230- '(current: ${publisher ?.pc .iceConnectionState })' );
235+ // chose data channel
236+ final rtc.RTCDataChannel ? channel = publisherDataChannel (reliability);
231237
232- await events. waitFor < EnginePublisherIceStateUpdatedEvent >(
233- filter : (event) => event.iceState. isConnected (),
234- duration : Timeouts .iceConnection,
235- );
238+ if (channel == null ) {
239+ throw UnexpectedStateException (
240+ 'Data channel for ${ packet . kind . toSDKType ()} is null' );
241+ }
236242
237- logger.fine ('[PUBLISHER] connected' );
243+ logger.fine ('sendDataPacket(label:${channel .label })' );
244+ await channel.send (message);
238245 }
239246
240247 @internal
@@ -421,11 +428,16 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
421428 ..binaryType = 'binary'
422429 ..ordered = true
423430 ..maxRetransmits = 0 ;
424- _lossyDC =
431+ _lossyDCPub =
425432 await publisher? .pc.createDataChannel (_lossyDCLabel, lossyInit);
426- _lossyDC? .onMessage = _onDCMessage;
427- _lossyDC? .stateChangeStream
428- .listen ((state) => _onDCStateUpdated (Reliability .lossy, state));
433+ _lossyDCPub? .onMessage = _onDCMessage;
434+ _lossyDCPub? .stateChangeStream
435+ .listen ((state) => events.emit (PublisherDataChannelStateUpdatedEvent (
436+ isPrimary: ! _subscriberPrimary,
437+ state: state,
438+ type: Reliability .lossy,
439+ )));
440+ // _onDCStateUpdated(Reliability.lossy, state)
429441 } catch (_) {
430442 logger.severe ('[$objectId ] createDataChannel() did throw $_ ' );
431443 }
@@ -434,11 +446,15 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
434446 final reliableInit = rtc.RTCDataChannelInit ()
435447 ..binaryType = 'binary'
436448 ..ordered = true ;
437- _reliableDC =
449+ _reliableDCPub =
438450 await publisher? .pc.createDataChannel (_reliableDCLabel, reliableInit);
439- _reliableDC? .onMessage = _onDCMessage;
440- _reliableDC? .stateChangeStream
441- .listen ((state) => _onDCStateUpdated (Reliability .reliable, state));
451+ _reliableDCPub? .onMessage = _onDCMessage;
452+ _reliableDCPub? .stateChangeStream
453+ .listen ((state) => events.emit (PublisherDataChannelStateUpdatedEvent (
454+ isPrimary: ! _subscriberPrimary,
455+ state: state,
456+ type: Reliability .reliable,
457+ )));
442458 } catch (_) {
443459 logger.severe ('[$objectId ] createDataChannel() did throw $_ ' );
444460 }
@@ -450,29 +466,32 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
450466 logger.fine ('Server opened DC label: ${dc .label }' );
451467 _reliableDCSub = dc;
452468 _reliableDCSub? .onMessage = _onDCMessage;
453- _reliableDCSub? .stateChangeStream
454- .listen ((state) => _onDCStateUpdated (Reliability .reliable, state));
469+ _reliableDCSub? .stateChangeStream.listen ((state) =>
470+ _reliableDCPub? .stateChangeStream.listen (
471+ (state) => events.emit (SubscriberDataChannelStateUpdatedEvent (
472+ isPrimary: _subscriberPrimary,
473+ state: state,
474+ type: Reliability .reliable,
475+ ))));
455476 break ;
456477 case _lossyDCLabel:
457478 logger.fine ('Server opened DC label: ${dc .label }' );
458479 _lossyDCSub = dc;
459480 _lossyDCSub? .onMessage = _onDCMessage;
460- _lossyDCSub? .stateChangeStream
461- .listen ((event) => _onDCStateUpdated (Reliability .lossy, event));
481+ _lossyDCSub? .stateChangeStream.listen ((event) =>
482+ _reliableDCPub? .stateChangeStream.listen (
483+ (state) => events.emit (SubscriberDataChannelStateUpdatedEvent (
484+ isPrimary: _subscriberPrimary,
485+ state: state,
486+ type: Reliability .lossy,
487+ ))));
462488 break ;
463489 default :
464490 logger.warning ('Unknown DC label: ${dc .label }' );
465491 break ;
466492 }
467493 }
468494
469- void _onDCStateUpdated (
470- Reliability channel,
471- rtc.RTCDataChannelState state,
472- ) {
473- logger.fine ('Data channel state updated ${channel } ${state }' );
474- }
475-
476495 void _onDCMessage (rtc.RTCDataChannelMessage message) {
477496 // always expect binary
478497 if (! message.isBinary) {
0 commit comments