Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Packages/StreamVideo/Runtime/Core/CoordinatorEventType.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ internal static class CoordinatorEventType
public const string CallRejected = "call.rejected";
public const string CallRing = "call.ring";
public const string CallSessionEnded = "call.session_ended";
public const string CallSessionParticipantCountsUpdated = "call.session_participant_count_updated";
public const string CallSessionParticipantJoined = "call.session_participant_joined";
public const string CallSessionParticipantLeft = "call.session_participant_left";
public const string CallSessionStarted = "call.session_started";
Expand Down
10 changes: 9 additions & 1 deletion Packages/StreamVideo/Runtime/Core/LowLevelClient/RtcSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -604,7 +604,7 @@ private async Task SubscribeToTracksAsync()
if (ActiveCall.Participants == null || !ActiveCall.Participants.Any())
{
#if STREAM_DEBUG_ENABLED
_logs.Info($"{nameof(SubscribeToTracksAsync)} Ignored - No participants in the call to subscribe tracks for");
_logs.Error($"{nameof(SubscribeToTracksAsync)} Ignored - No participants in the call to subscribe tracks for");
#endif

return;
Expand Down Expand Up @@ -1077,6 +1077,12 @@ private void OnSfuPinsUpdated(PinsChanged pinsChanged)
ActiveCall.UpdateFromSfu(pinsChanged, _cache);
}

private void OnSfuHealthCheck(HealthCheckResponse healthCheckResponse)
{
_sfuTracer?.Trace("healthCheck", healthCheckResponse);
ActiveCall.UpdateFromSfu(healthCheckResponse, _cache);
}

private void OnSfuIceRestart(ICERestart iceRestart)
{
_sfuTracer?.Trace("iceRestart", iceRestart);
Expand Down Expand Up @@ -1669,6 +1675,7 @@ private void SubscribeToSfuEvents()
_sfuWebSocket.ParticipantLeft += OnSfuParticipantLeft;
_sfuWebSocket.DominantSpeakerChanged += OnSfuDominantSpeakerChanged;
_sfuWebSocket.JoinResponse += OnSfuJoinResponse;
_sfuWebSocket.HealthCheck += OnSfuHealthCheck;
_sfuWebSocket.TrackPublished += OnSfuTrackPublished;
_sfuWebSocket.TrackUnpublished += OnSfuTrackUnpublished;
_sfuWebSocket.Error += OnSfuWebSocketOnError;
Expand Down Expand Up @@ -1698,6 +1705,7 @@ private void UnsubscribeFromSfuEvents()
_sfuWebSocket.ParticipantLeft -= OnSfuParticipantLeft;
_sfuWebSocket.DominantSpeakerChanged -= OnSfuDominantSpeakerChanged;
_sfuWebSocket.JoinResponse -= OnSfuJoinResponse;
_sfuWebSocket.HealthCheck -= OnSfuHealthCheck;
_sfuWebSocket.TrackPublished -= OnSfuTrackPublished;
_sfuWebSocket.TrackUnpublished -= OnSfuTrackUnpublished;
_sfuWebSocket.Error -= OnSfuWebSocketOnError;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,9 @@ public void Dispose()
internal event Action<CallRingEventInternalDTO> InternalCallRingEvent;
internal event Action<CallSessionEndedEventInternalDTO> InternalCallSessionEndedEvent;
internal event Action<CallSessionStartedEventInternalDTO> InternalCallSessionStartedEvent;
internal event Action<CallSessionParticipantJoinedEventInternalDTO> InternalCallSessionParticipantJoinedEvent;
internal event Action<CallSessionParticipantLeftEventInternalDTO> InternalCallSessionParticipantLeftEvent;
internal event Action<CallSessionParticipantCountsUpdatedEventInternalDTO> InternalCallSessionParticipantCountsUpdatedEvent;
internal event Action<BlockedUserEventInternalDTO> InternalCallUnblockedUserEvent;
internal event Action<ConnectionErrorEventInternalDTO> InternalConnectionErrorEvent;
internal event Action<CustomVideoEventInternalDTO> InternalCustomVideoEvent;
Expand Down Expand Up @@ -479,6 +482,18 @@ private void RegisterCoordinatorEventHandlers()
CoordinatorEventType.CallSessionStarted,
e => InternalCallSessionStartedEvent?.Invoke(e));

_coordinatorWS.RegisterEventType<CallSessionParticipantJoinedEventInternalDTO>(
CoordinatorEventType.CallSessionParticipantJoined,
e => InternalCallSessionParticipantJoinedEvent?.Invoke(e));

_coordinatorWS.RegisterEventType<CallSessionParticipantLeftEventInternalDTO>(
CoordinatorEventType.CallSessionParticipantLeft,
e => InternalCallSessionParticipantLeftEvent?.Invoke(e));

_coordinatorWS.RegisterEventType<CallSessionParticipantCountsUpdatedEventInternalDTO>(
CoordinatorEventType.CallSessionParticipantCountsUpdated,
e => InternalCallSessionParticipantCountsUpdatedEvent?.Invoke(e));

_coordinatorWS.RegisterEventType<BlockedUserEventInternalDTO>(CoordinatorEventType.CallUnblockedUser,
e => InternalCallUnblockedUserEvent?.Invoke(e));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,11 +166,7 @@ private void HandleNewWebsocketMessage(string msg)
if (!EventHandlers.TryGetValue(type, out var handler))
{
#if STREAM_DEBUG_ENABLED
var ignoreTypes = new[] { "call.session_participant_count_updated" };
if (!ignoreTypes.Any(type.Contains))
{
Logs.Warning($"No message handler registered for `{type}`. Message not handled: " + msg);
}
Logs.Warning($"No message handler registered for `{type}`. Message not handled: " + msg);
#endif
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ internal class SfuWebSocket : BasePersistentWebSocket
public event Action<ParticipantLeft> ParticipantLeft;
public event Action<DominantSpeakerChanged> DominantSpeakerChanged;
public event Action<JoinResponse> JoinResponse;
public event Action<HealthCheckResponse> HealthCheck;
public event Action<TrackPublished> TrackPublished;
public event Action<TrackUnpublished> TrackUnpublished;
public event Action<Error> Error;
Expand Down Expand Up @@ -232,7 +233,7 @@ protected override void ProcessMessages()
OnHandleJoinResponse(sfuEvent.JoinResponse);
break;
case SfuEvent.EventPayloadOneofCase.HealthCheckResponse:
//StreamTodo: healthCheck contains participantsCount, we should probably sync our state with this info
HealthCheck?.Invoke(sfuEvent.HealthCheckResponse);
OnHealthCheckReceived();
break;
case SfuEvent.EventPayloadOneofCase.TrackPublished:
Expand Down
109 changes: 107 additions & 2 deletions Packages/StreamVideo/Runtime/Core/Models/CallSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ void IStateLoadableFrom<CallSessionResponseInternalDTO, CallSession>.LoadFromDto
StartedAt = dto.StartedAt;
LiveEndedAt = dto.LiveEndedAt;
LiveStartedAt = dto.LiveStartedAt;

UpdateParticipantCountFromSessionInternal(dto.AnonymousParticipantCount, dto.ParticipantsCountByRole);
}

void IStateLoadableFrom<SfuCallState, CallSession>.LoadFromDto(SfuCallState dto, ICache cache)
Expand All @@ -68,7 +70,7 @@ void IStateLoadableFrom<SfuCallState, CallSession>.LoadFromDto(SfuCallState dto,
StartedAt = dto.StartedAt.ToDateTimeOffset();
}

// dto.CallState.Participants may not contain all of the participants
// dto.CallState.Participants may not contain all participants
UpdateExtensions<StreamVideoCallParticipant, SfuParticipant>.TryAddUniqueTrackedObjects(_participants,
dto.Participants, cache.CallParticipants);

Expand All @@ -87,6 +89,12 @@ internal IStreamVideoCallParticipant UpdateFromSfu(ParticipantJoined participant

return participant;
}

internal void UpdateFromSfu(HealthCheckResponse healthCheckResponse, ICache cache)
{
((IStateLoadableFrom<SfuParticipantCount, ParticipantCount>)ParticipantCount).LoadFromDto(
healthCheckResponse.ParticipantCount, cache);
}

internal (string sessionId, string userId) UpdateFromSfu(ParticipantLeft participantLeft, ICache cache)
{
Expand All @@ -95,10 +103,107 @@ internal IStreamVideoCallParticipant UpdateFromSfu(ParticipantJoined participant

return (participantLeft.Participant.SessionId, participantLeft.Participant.UserId);
}


internal void UpdateFromCoordinator(
InternalDTO.Events.CallSessionParticipantCountsUpdatedEventInternalDTO participantCountsUpdated,
LowLevelClient.CallingState callingState)
{
_participantsCountByRole.TryReplaceValuesFromDto(participantCountsUpdated.ParticipantsCountByRole);
UpdateParticipantCountFromCoordinator(participantCountsUpdated.AnonymousParticipantCount,
participantCountsUpdated.ParticipantsCountByRole, callingState);
}

internal void UpdateFromCoordinator(
InternalDTO.Events.CallSessionParticipantJoinedEventInternalDTO participantJoined, ICache cache,
LowLevelClient.CallingState callingState)
{
var participant = cache.TryCreateOrUpdate(participantJoined.Participant);

if (!_participants.Contains(participant))
{
_participants.Add(participant);
}

var role = participantJoined.Participant.Role;
if (_participantsCountByRole.ContainsKey(role))
{
_participantsCountByRole[role]++;
}
else
{
_participantsCountByRole[role] = 1;
}

var anonymousCount = ParticipantCount != null ? (int)ParticipantCount.Anonymous : 0;
UpdateParticipantCountFromCoordinator(anonymousCount, _participantsCountByRole, callingState);
}

//StreamTODO: double-check this logic
internal void UpdateFromCoordinator(
InternalDTO.Events.CallSessionParticipantLeftEventInternalDTO participantLeft, ICache cache,
LowLevelClient.CallingState callingState)
{
var participant = cache.TryCreateOrUpdate(participantLeft.Participant);
_participants.Remove(participant);

var role = participantLeft.Participant.Role;
if (_participantsCountByRole.ContainsKey(role))
{
_participantsCountByRole[role] = Math.Max(0, _participantsCountByRole[role] - 1);

if (_participantsCountByRole[role] == 0)
{
_participantsCountByRole.Remove(role);
}
}

var anonymousCount = ParticipantCount != null ? (int)ParticipantCount.Anonymous : 0;
UpdateParticipantCountFromCoordinator(anonymousCount, _participantsCountByRole, callingState);
}

private readonly Dictionary<string, DateTimeOffset> _acceptedBy = new Dictionary<string, DateTimeOffset>();
private readonly List<StreamVideoCallParticipant> _participants = new List<StreamVideoCallParticipant>();
private readonly Dictionary<string, int> _participantsCountByRole = new Dictionary<string, int>();
private readonly Dictionary<string, DateTimeOffset> _rejectedBy = new Dictionary<string, DateTimeOffset>();

/// <summary>
/// Updates the ParticipantCount based on session data (used when NOT connected to SFU)
/// </summary>
private void UpdateParticipantCountFromCoordinator(int anonymousParticipantCount,
IReadOnlyDictionary<string, int> participantsCountByRole, LowLevelClient.CallingState callingState)
{
// When in JOINED state, we should use the participant count coming through
// the SFU healthcheck event, as it's more accurate.
if (callingState == LowLevelClient.CallingState.Joined)
{
return;
}

UpdateParticipantCountFromSessionInternal(anonymousParticipantCount, participantsCountByRole);
}

/// <summary>
/// Updates the ParticipantCount based on session data
/// </summary>
private void UpdateParticipantCountFromSessionInternal(int anonymousParticipantCount,
IReadOnlyDictionary<string, int> participantsCountByRole)
{
var byRoleCount = 0;
foreach (var count in participantsCountByRole.Values)
{
byRoleCount += count;
}

var total = Math.Max(byRoleCount, _participants.Count);

var dto = new SfuParticipantCount
{
Total = (uint)total,
Anonymous = (uint)anonymousParticipantCount
};

((IStateLoadableFrom<SfuParticipantCount, ParticipantCount>)ParticipantCount)
.LoadFromDto(dto, null);
}
}
}
3 changes: 3 additions & 0 deletions Packages/StreamVideo/Runtime/Core/State/Caches/ICacheExt.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,8 @@ public static StreamVideoUser TryCreateOrUpdate(this ICache cache, OwnUserRespon

public static StreamVideoCallParticipant TryCreateOrUpdate(this ICache cache, Participant dto)
=> dto == null ? null : cache.CallParticipants.CreateOrUpdate<StreamVideoCallParticipant, Participant>(dto, out _);

public static StreamVideoCallParticipant TryCreateOrUpdate(this ICache cache, CallParticipantResponseInternalDTO dto)
=> dto == null ? null : cache.CallParticipants.CreateOrUpdate<StreamVideoCallParticipant, CallParticipantResponseInternalDTO>(dto, out _);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System.Threading.Tasks;
using StreamVideo.Core.LowLevelClient;
using StreamVideo.Core.Models;
using StreamVideo.Core.Models.Sfu;
using StreamVideo.Core.QueryBuilders.Filters;
using StreamVideo.Core.QueryBuilders.Sort;
using StreamVideo.Core.State;
Expand Down Expand Up @@ -93,6 +94,8 @@ public interface IStreamCall : IStreamStatefulModel, IHasCustomData
/// Any update to this collection will trigger the <see cref="SortedParticipantsUpdated"/> event.
/// </summary>
IEnumerable<IStreamVideoCallParticipant> SortedParticipants { get; }

ParticipantCount ParticipantCount { get; }

/// <summary>
/// Participant that is currently the most actively speaking.
Expand Down
33 changes: 29 additions & 4 deletions Packages/StreamVideo/Runtime/Core/StatefulModels/StreamCall.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
using StreamVideo.Core.State.Caches;
using StreamVideo.Core.StatefulModels.Tracks;
using StreamVideo.Core.Utils;
using UnityEngine;
using TrackType = StreamVideo.Core.Models.Sfu.TrackType;
using ParticipantCount = StreamVideo.Core.Models.Sfu.ParticipantCount;

namespace StreamVideo.Core.StatefulModels
{
Expand Down Expand Up @@ -62,6 +62,8 @@ internal sealed class StreamCall : StreamStatefulModelBase<StreamCall>,

//StreamTodo: Maybe add OtherParticipants -> All participants except for the local participant?
public IReadOnlyList<IStreamVideoCallParticipant> Participants => Session?.Participants;

public ParticipantCount ParticipantCount => Session.ParticipantCount;

public bool IsLocalUserOwner
{
Expand Down Expand Up @@ -574,6 +576,30 @@ internal void UpdateFromSfu(PinsChanged pinsChanged, ICache cache)
UpdatePinnedParticipants(out _);
}

internal void UpdateFromSfu(HealthCheckResponse healthCheckResponse, ICache cache)
{
Session?.UpdateFromSfu(healthCheckResponse, cache);
}

internal void UpdateFromCoordinator(CallSessionParticipantCountsUpdatedEventInternalDTO eventData)
{
Session?.UpdateFromCoordinator(eventData, Client.InternalLowLevelClient.RtcSession.CallState);
}

internal void UpdateFromCoordinator(CallSessionParticipantJoinedEventInternalDTO eventData, ICache cache)
{
Session?.UpdateFromCoordinator(eventData, cache, Client.InternalLowLevelClient.RtcSession.CallState);

//StreamTodo: we should extract AddParticipant logic from SFU and whatever is received first (SFU or Coordinator) should handle it
}

internal void UpdateFromCoordinator(CallSessionParticipantLeftEventInternalDTO eventData, ICache cache)
{
Session?.UpdateFromCoordinator(eventData, cache, Client.InternalLowLevelClient.RtcSession.CallState);

//StreamTodo: we should extract RemoveParticipant logic from SFU and whatever is received first (SFU or Coordinator) should handle it
}

//StreamTodo: missing TrackRemoved or perhaps we should not care whether a track was added/removed but only published/unpublished -> enabled/disabled
internal void NotifyTrackAdded(IStreamVideoCallParticipant participant, IStreamTrack track)
=> ParticipantTrackAdded?.Invoke(participant, track);
Expand Down Expand Up @@ -647,12 +673,12 @@ internal void InternalHandleCallRecordingStartedEvent(CallReactionEventInternalD

//StreamTodo: NullReferenceException here because _client is never set
var participant
= _client.RtcSession.ActiveCall.Participants.FirstOrDefault(p => p.UserId == reaction.User.Id);
= Client.InternalLowLevelClient.RtcSession.ActiveCall.Participants.FirstOrDefault(p => p.UserId == reaction.User.Id);
if (participant == null)
{
Logs.ErrorIfDebug(
$"Failed to find participant for reaction. UserId: {reaction.User.Id}, Participants: " +
string.Join(", ", _client.RtcSession.ActiveCall.Participants.Select(p => p.UserId)));
string.Join(", ", Client.InternalLowLevelClient.RtcSession.ActiveCall.Participants.Select(p => p.UserId)));
return;
}

Expand Down Expand Up @@ -726,7 +752,6 @@ private readonly List<IStreamVideoCallParticipant>

#endregion

private readonly StreamVideoLowLevelClient _client;
private readonly StreamCallType _type;

private string _id;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ namespace StreamVideo.Core.StatefulModels.Tracks
/// </summary>
public abstract class BaseStreamTrack : IStreamTrack
{
private bool _isEnabled;
public event StreamTrackStateChangeHandler EnabledChanged;

public bool IsEnabled
Expand Down Expand Up @@ -59,6 +58,8 @@ protected virtual void OnDisposing()
{

}

private bool _isEnabled;
}

public abstract class BaseStreamTrack<TTrack> : BaseStreamTrack
Expand Down
Loading
Loading