Skip to content

Commit 4762c8e

Browse files
authored
Merge pull request #168 from GetStream/feature/uni-124-expose-participant-count
Feature/uni 124 expose participant count
2 parents 691a4e6 + 8f39701 commit 4762c8e

File tree

11 files changed

+208
-14
lines changed

11 files changed

+208
-14
lines changed

Packages/StreamVideo/Runtime/Core/CoordinatorEventType.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ internal static class CoordinatorEventType
2222
public const string CallRejected = "call.rejected";
2323
public const string CallRing = "call.ring";
2424
public const string CallSessionEnded = "call.session_ended";
25+
public const string CallSessionParticipantCountsUpdated = "call.session_participant_count_updated";
2526
public const string CallSessionParticipantJoined = "call.session_participant_joined";
2627
public const string CallSessionParticipantLeft = "call.session_participant_left";
2728
public const string CallSessionStarted = "call.session_started";

Packages/StreamVideo/Runtime/Core/LowLevelClient/RtcSession.cs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -604,7 +604,7 @@ private async Task SubscribeToTracksAsync()
604604
if (ActiveCall.Participants == null || !ActiveCall.Participants.Any())
605605
{
606606
#if STREAM_DEBUG_ENABLED
607-
_logs.Info($"{nameof(SubscribeToTracksAsync)} Ignored - No participants in the call to subscribe tracks for");
607+
_logs.Error($"{nameof(SubscribeToTracksAsync)} Ignored - No participants in the call to subscribe tracks for");
608608
#endif
609609

610610
return;
@@ -1077,6 +1077,12 @@ private void OnSfuPinsUpdated(PinsChanged pinsChanged)
10771077
ActiveCall.UpdateFromSfu(pinsChanged, _cache);
10781078
}
10791079

1080+
private void OnSfuHealthCheck(HealthCheckResponse healthCheckResponse)
1081+
{
1082+
_sfuTracer?.Trace("healthCheck", healthCheckResponse);
1083+
ActiveCall.UpdateFromSfu(healthCheckResponse, _cache);
1084+
}
1085+
10801086
private void OnSfuIceRestart(ICERestart iceRestart)
10811087
{
10821088
_sfuTracer?.Trace("iceRestart", iceRestart);
@@ -1669,6 +1675,7 @@ private void SubscribeToSfuEvents()
16691675
_sfuWebSocket.ParticipantLeft += OnSfuParticipantLeft;
16701676
_sfuWebSocket.DominantSpeakerChanged += OnSfuDominantSpeakerChanged;
16711677
_sfuWebSocket.JoinResponse += OnSfuJoinResponse;
1678+
_sfuWebSocket.HealthCheck += OnSfuHealthCheck;
16721679
_sfuWebSocket.TrackPublished += OnSfuTrackPublished;
16731680
_sfuWebSocket.TrackUnpublished += OnSfuTrackUnpublished;
16741681
_sfuWebSocket.Error += OnSfuWebSocketOnError;
@@ -1698,6 +1705,7 @@ private void UnsubscribeFromSfuEvents()
16981705
_sfuWebSocket.ParticipantLeft -= OnSfuParticipantLeft;
16991706
_sfuWebSocket.DominantSpeakerChanged -= OnSfuDominantSpeakerChanged;
17001707
_sfuWebSocket.JoinResponse -= OnSfuJoinResponse;
1708+
_sfuWebSocket.HealthCheck -= OnSfuHealthCheck;
17011709
_sfuWebSocket.TrackPublished -= OnSfuTrackPublished;
17021710
_sfuWebSocket.TrackUnpublished -= OnSfuTrackUnpublished;
17031711
_sfuWebSocket.Error -= OnSfuWebSocketOnError;

Packages/StreamVideo/Runtime/Core/LowLevelClient/StreamVideoLowLevelClient.cs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -287,6 +287,9 @@ public void Dispose()
287287
internal event Action<CallRingEventInternalDTO> InternalCallRingEvent;
288288
internal event Action<CallSessionEndedEventInternalDTO> InternalCallSessionEndedEvent;
289289
internal event Action<CallSessionStartedEventInternalDTO> InternalCallSessionStartedEvent;
290+
internal event Action<CallSessionParticipantJoinedEventInternalDTO> InternalCallSessionParticipantJoinedEvent;
291+
internal event Action<CallSessionParticipantLeftEventInternalDTO> InternalCallSessionParticipantLeftEvent;
292+
internal event Action<CallSessionParticipantCountsUpdatedEventInternalDTO> InternalCallSessionParticipantCountsUpdatedEvent;
290293
internal event Action<BlockedUserEventInternalDTO> InternalCallUnblockedUserEvent;
291294
internal event Action<ConnectionErrorEventInternalDTO> InternalConnectionErrorEvent;
292295
internal event Action<CustomVideoEventInternalDTO> InternalCustomVideoEvent;
@@ -479,6 +482,18 @@ private void RegisterCoordinatorEventHandlers()
479482
CoordinatorEventType.CallSessionStarted,
480483
e => InternalCallSessionStartedEvent?.Invoke(e));
481484

485+
_coordinatorWS.RegisterEventType<CallSessionParticipantJoinedEventInternalDTO>(
486+
CoordinatorEventType.CallSessionParticipantJoined,
487+
e => InternalCallSessionParticipantJoinedEvent?.Invoke(e));
488+
489+
_coordinatorWS.RegisterEventType<CallSessionParticipantLeftEventInternalDTO>(
490+
CoordinatorEventType.CallSessionParticipantLeft,
491+
e => InternalCallSessionParticipantLeftEvent?.Invoke(e));
492+
493+
_coordinatorWS.RegisterEventType<CallSessionParticipantCountsUpdatedEventInternalDTO>(
494+
CoordinatorEventType.CallSessionParticipantCountsUpdated,
495+
e => InternalCallSessionParticipantCountsUpdatedEvent?.Invoke(e));
496+
482497
_coordinatorWS.RegisterEventType<BlockedUserEventInternalDTO>(CoordinatorEventType.CallUnblockedUser,
483498
e => InternalCallUnblockedUserEvent?.Invoke(e));
484499

Packages/StreamVideo/Runtime/Core/LowLevelClient/WebSockets/CoordinatorWebSocket.cs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -166,11 +166,7 @@ private void HandleNewWebsocketMessage(string msg)
166166
if (!EventHandlers.TryGetValue(type, out var handler))
167167
{
168168
#if STREAM_DEBUG_ENABLED
169-
var ignoreTypes = new[] { "call.session_participant_count_updated" };
170-
if (!ignoreTypes.Any(type.Contains))
171-
{
172-
Logs.Warning($"No message handler registered for `{type}`. Message not handled: " + msg);
173-
}
169+
Logs.Warning($"No message handler registered for `{type}`. Message not handled: " + msg);
174170
#endif
175171
return;
176172
}

Packages/StreamVideo/Runtime/Core/LowLevelClient/WebSockets/SfuWebSocket.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ internal class SfuWebSocket : BasePersistentWebSocket
2929
public event Action<ParticipantLeft> ParticipantLeft;
3030
public event Action<DominantSpeakerChanged> DominantSpeakerChanged;
3131
public event Action<JoinResponse> JoinResponse;
32+
public event Action<HealthCheckResponse> HealthCheck;
3233
public event Action<TrackPublished> TrackPublished;
3334
public event Action<TrackUnpublished> TrackUnpublished;
3435
public event Action<Error> Error;
@@ -232,7 +233,7 @@ protected override void ProcessMessages()
232233
OnHandleJoinResponse(sfuEvent.JoinResponse);
233234
break;
234235
case SfuEvent.EventPayloadOneofCase.HealthCheckResponse:
235-
//StreamTodo: healthCheck contains participantsCount, we should probably sync our state with this info
236+
HealthCheck?.Invoke(sfuEvent.HealthCheckResponse);
236237
OnHealthCheckReceived();
237238
break;
238239
case SfuEvent.EventPayloadOneofCase.TrackPublished:

Packages/StreamVideo/Runtime/Core/Models/CallSession.cs

Lines changed: 107 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@ void IStateLoadableFrom<CallSessionResponseInternalDTO, CallSession>.LoadFromDto
5959
StartedAt = dto.StartedAt;
6060
LiveEndedAt = dto.LiveEndedAt;
6161
LiveStartedAt = dto.LiveStartedAt;
62+
63+
UpdateParticipantCountFromSessionInternal(dto.AnonymousParticipantCount, dto.ParticipantsCountByRole);
6264
}
6365

6466
void IStateLoadableFrom<SfuCallState, CallSession>.LoadFromDto(SfuCallState dto, ICache cache)
@@ -68,7 +70,7 @@ void IStateLoadableFrom<SfuCallState, CallSession>.LoadFromDto(SfuCallState dto,
6870
StartedAt = dto.StartedAt.ToDateTimeOffset();
6971
}
7072

71-
// dto.CallState.Participants may not contain all of the participants
73+
// dto.CallState.Participants may not contain all participants
7274
UpdateExtensions<StreamVideoCallParticipant, SfuParticipant>.TryAddUniqueTrackedObjects(_participants,
7375
dto.Participants, cache.CallParticipants);
7476

@@ -87,6 +89,12 @@ internal IStreamVideoCallParticipant UpdateFromSfu(ParticipantJoined participant
8789

8890
return participant;
8991
}
92+
93+
internal void UpdateFromSfu(HealthCheckResponse healthCheckResponse, ICache cache)
94+
{
95+
((IStateLoadableFrom<SfuParticipantCount, ParticipantCount>)ParticipantCount).LoadFromDto(
96+
healthCheckResponse.ParticipantCount, cache);
97+
}
9098

9199
internal (string sessionId, string userId) UpdateFromSfu(ParticipantLeft participantLeft, ICache cache)
92100
{
@@ -95,10 +103,107 @@ internal IStreamVideoCallParticipant UpdateFromSfu(ParticipantJoined participant
95103

96104
return (participantLeft.Participant.SessionId, participantLeft.Participant.UserId);
97105
}
98-
106+
107+
internal void UpdateFromCoordinator(
108+
InternalDTO.Events.CallSessionParticipantCountsUpdatedEventInternalDTO participantCountsUpdated,
109+
LowLevelClient.CallingState callingState)
110+
{
111+
_participantsCountByRole.TryReplaceValuesFromDto(participantCountsUpdated.ParticipantsCountByRole);
112+
UpdateParticipantCountFromCoordinator(participantCountsUpdated.AnonymousParticipantCount,
113+
participantCountsUpdated.ParticipantsCountByRole, callingState);
114+
}
115+
116+
internal void UpdateFromCoordinator(
117+
InternalDTO.Events.CallSessionParticipantJoinedEventInternalDTO participantJoined, ICache cache,
118+
LowLevelClient.CallingState callingState)
119+
{
120+
var participant = cache.TryCreateOrUpdate(participantJoined.Participant);
121+
122+
if (!_participants.Contains(participant))
123+
{
124+
_participants.Add(participant);
125+
}
126+
127+
var role = participantJoined.Participant.Role;
128+
if (_participantsCountByRole.ContainsKey(role))
129+
{
130+
_participantsCountByRole[role]++;
131+
}
132+
else
133+
{
134+
_participantsCountByRole[role] = 1;
135+
}
136+
137+
var anonymousCount = ParticipantCount != null ? (int)ParticipantCount.Anonymous : 0;
138+
UpdateParticipantCountFromCoordinator(anonymousCount, _participantsCountByRole, callingState);
139+
}
140+
141+
//StreamTODO: double-check this logic
142+
internal void UpdateFromCoordinator(
143+
InternalDTO.Events.CallSessionParticipantLeftEventInternalDTO participantLeft, ICache cache,
144+
LowLevelClient.CallingState callingState)
145+
{
146+
var participant = cache.TryCreateOrUpdate(participantLeft.Participant);
147+
_participants.Remove(participant);
148+
149+
var role = participantLeft.Participant.Role;
150+
if (_participantsCountByRole.ContainsKey(role))
151+
{
152+
_participantsCountByRole[role] = Math.Max(0, _participantsCountByRole[role] - 1);
153+
154+
if (_participantsCountByRole[role] == 0)
155+
{
156+
_participantsCountByRole.Remove(role);
157+
}
158+
}
159+
160+
var anonymousCount = ParticipantCount != null ? (int)ParticipantCount.Anonymous : 0;
161+
UpdateParticipantCountFromCoordinator(anonymousCount, _participantsCountByRole, callingState);
162+
}
163+
99164
private readonly Dictionary<string, DateTimeOffset> _acceptedBy = new Dictionary<string, DateTimeOffset>();
100165
private readonly List<StreamVideoCallParticipant> _participants = new List<StreamVideoCallParticipant>();
101166
private readonly Dictionary<string, int> _participantsCountByRole = new Dictionary<string, int>();
102167
private readonly Dictionary<string, DateTimeOffset> _rejectedBy = new Dictionary<string, DateTimeOffset>();
168+
169+
/// <summary>
170+
/// Updates the ParticipantCount based on session data (used when NOT connected to SFU)
171+
/// </summary>
172+
private void UpdateParticipantCountFromCoordinator(int anonymousParticipantCount,
173+
IReadOnlyDictionary<string, int> participantsCountByRole, LowLevelClient.CallingState callingState)
174+
{
175+
// When in JOINED state, we should use the participant count coming through
176+
// the SFU healthcheck event, as it's more accurate.
177+
if (callingState == LowLevelClient.CallingState.Joined)
178+
{
179+
return;
180+
}
181+
182+
UpdateParticipantCountFromSessionInternal(anonymousParticipantCount, participantsCountByRole);
183+
}
184+
185+
/// <summary>
186+
/// Updates the ParticipantCount based on session data
187+
/// </summary>
188+
private void UpdateParticipantCountFromSessionInternal(int anonymousParticipantCount,
189+
IReadOnlyDictionary<string, int> participantsCountByRole)
190+
{
191+
var byRoleCount = 0;
192+
foreach (var count in participantsCountByRole.Values)
193+
{
194+
byRoleCount += count;
195+
}
196+
197+
var total = Math.Max(byRoleCount, _participants.Count);
198+
199+
var dto = new SfuParticipantCount
200+
{
201+
Total = (uint)total,
202+
Anonymous = (uint)anonymousParticipantCount
203+
};
204+
205+
((IStateLoadableFrom<SfuParticipantCount, ParticipantCount>)ParticipantCount)
206+
.LoadFromDto(dto, null);
207+
}
103208
}
104209
}

Packages/StreamVideo/Runtime/Core/State/Caches/ICacheExt.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,5 +30,8 @@ public static StreamVideoUser TryCreateOrUpdate(this ICache cache, OwnUserRespon
3030

3131
public static StreamVideoCallParticipant TryCreateOrUpdate(this ICache cache, Participant dto)
3232
=> dto == null ? null : cache.CallParticipants.CreateOrUpdate<StreamVideoCallParticipant, Participant>(dto, out _);
33+
34+
public static StreamVideoCallParticipant TryCreateOrUpdate(this ICache cache, CallParticipantResponseInternalDTO dto)
35+
=> dto == null ? null : cache.CallParticipants.CreateOrUpdate<StreamVideoCallParticipant, CallParticipantResponseInternalDTO>(dto, out _);
3336
}
3437
}

Packages/StreamVideo/Runtime/Core/StatefulModels/IStreamCall.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
using System.Threading.Tasks;
44
using StreamVideo.Core.LowLevelClient;
55
using StreamVideo.Core.Models;
6+
using StreamVideo.Core.Models.Sfu;
67
using StreamVideo.Core.QueryBuilders.Filters;
78
using StreamVideo.Core.QueryBuilders.Sort;
89
using StreamVideo.Core.State;
@@ -93,6 +94,8 @@ public interface IStreamCall : IStreamStatefulModel, IHasCustomData
9394
/// Any update to this collection will trigger the <see cref="SortedParticipantsUpdated"/> event.
9495
/// </summary>
9596
IEnumerable<IStreamVideoCallParticipant> SortedParticipants { get; }
97+
98+
ParticipantCount ParticipantCount { get; }
9699

97100
/// <summary>
98101
/// Participant that is currently the most actively speaking.

Packages/StreamVideo/Runtime/Core/StatefulModels/StreamCall.cs

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@
1616
using StreamVideo.Core.State.Caches;
1717
using StreamVideo.Core.StatefulModels.Tracks;
1818
using StreamVideo.Core.Utils;
19-
using UnityEngine;
2019
using TrackType = StreamVideo.Core.Models.Sfu.TrackType;
20+
using ParticipantCount = StreamVideo.Core.Models.Sfu.ParticipantCount;
2121

2222
namespace StreamVideo.Core.StatefulModels
2323
{
@@ -62,6 +62,8 @@ internal sealed class StreamCall : StreamStatefulModelBase<StreamCall>,
6262

6363
//StreamTodo: Maybe add OtherParticipants -> All participants except for the local participant?
6464
public IReadOnlyList<IStreamVideoCallParticipant> Participants => Session?.Participants;
65+
66+
public ParticipantCount ParticipantCount => Session.ParticipantCount;
6567

6668
public bool IsLocalUserOwner
6769
{
@@ -574,6 +576,30 @@ internal void UpdateFromSfu(PinsChanged pinsChanged, ICache cache)
574576
UpdatePinnedParticipants(out _);
575577
}
576578

579+
internal void UpdateFromSfu(HealthCheckResponse healthCheckResponse, ICache cache)
580+
{
581+
Session?.UpdateFromSfu(healthCheckResponse, cache);
582+
}
583+
584+
internal void UpdateFromCoordinator(CallSessionParticipantCountsUpdatedEventInternalDTO eventData)
585+
{
586+
Session?.UpdateFromCoordinator(eventData, Client.InternalLowLevelClient.RtcSession.CallState);
587+
}
588+
589+
internal void UpdateFromCoordinator(CallSessionParticipantJoinedEventInternalDTO eventData, ICache cache)
590+
{
591+
Session?.UpdateFromCoordinator(eventData, cache, Client.InternalLowLevelClient.RtcSession.CallState);
592+
593+
//StreamTodo: we should extract AddParticipant logic from SFU and whatever is received first (SFU or Coordinator) should handle it
594+
}
595+
596+
internal void UpdateFromCoordinator(CallSessionParticipantLeftEventInternalDTO eventData, ICache cache)
597+
{
598+
Session?.UpdateFromCoordinator(eventData, cache, Client.InternalLowLevelClient.RtcSession.CallState);
599+
600+
//StreamTodo: we should extract RemoveParticipant logic from SFU and whatever is received first (SFU or Coordinator) should handle it
601+
}
602+
577603
//StreamTodo: missing TrackRemoved or perhaps we should not care whether a track was added/removed but only published/unpublished -> enabled/disabled
578604
internal void NotifyTrackAdded(IStreamVideoCallParticipant participant, IStreamTrack track)
579605
=> ParticipantTrackAdded?.Invoke(participant, track);
@@ -647,12 +673,12 @@ internal void InternalHandleCallRecordingStartedEvent(CallReactionEventInternalD
647673

648674
//StreamTodo: NullReferenceException here because _client is never set
649675
var participant
650-
= _client.RtcSession.ActiveCall.Participants.FirstOrDefault(p => p.UserId == reaction.User.Id);
676+
= Client.InternalLowLevelClient.RtcSession.ActiveCall.Participants.FirstOrDefault(p => p.UserId == reaction.User.Id);
651677
if (participant == null)
652678
{
653679
Logs.ErrorIfDebug(
654680
$"Failed to find participant for reaction. UserId: {reaction.User.Id}, Participants: " +
655-
string.Join(", ", _client.RtcSession.ActiveCall.Participants.Select(p => p.UserId)));
681+
string.Join(", ", Client.InternalLowLevelClient.RtcSession.ActiveCall.Participants.Select(p => p.UserId)));
656682
return;
657683
}
658684

@@ -726,7 +752,6 @@ private readonly List<IStreamVideoCallParticipant>
726752

727753
#endregion
728754

729-
private readonly StreamVideoLowLevelClient _client;
730755
private readonly StreamCallType _type;
731756

732757
private string _id;

Packages/StreamVideo/Runtime/Core/StatefulModels/Tracks/BaseStreamTrack.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ namespace StreamVideo.Core.StatefulModels.Tracks
1010
/// </summary>
1111
public abstract class BaseStreamTrack : IStreamTrack
1212
{
13-
private bool _isEnabled;
1413
public event StreamTrackStateChangeHandler EnabledChanged;
1514

1615
public bool IsEnabled
@@ -59,6 +58,8 @@ protected virtual void OnDisposing()
5958
{
6059

6160
}
61+
62+
private bool _isEnabled;
6263
}
6364

6465
public abstract class BaseStreamTrack<TTrack> : BaseStreamTrack

0 commit comments

Comments
 (0)