@@ -4,60 +4,74 @@ defmodule RealtimeWeb.RealtimeChannel.MessageDispatcher do
44 """
55
66 require Logger
7+ alias Phoenix.Socket.Broadcast
8+ alias RealtimeWeb.Socket.UserBroadcast
79
810 def fastlane_metadata ( fastlane_pid , serializer , topic , log_level , tenant_id , replayed_message_ids \\ MapSet . new ( ) ) do
911 { :rc_fastlane , fastlane_pid , serializer , topic , log_level , tenant_id , replayed_message_ids }
1012 end
1113
14+ @ presence_diff "presence_diff"
15+
1216 @ doc """
1317 This dispatch function caches encoded messages if fastlane is used
1418 It also sends an :update_rate_counter to the subscriber and it can conditionally log
15- """
16- @ spec dispatch ( list , pid , Phoenix.Socket.Broadcast . t ( ) ) :: :ok
17- def dispatch ( subscribers , from , % Phoenix.Socket.Broadcast { event: event } = msg ) do
18- # fastlane_pid is the actual socket transport pid
19- # This reduce caches the serialization and bypasses the channel process going straight to the
20- # transport process
21-
22- message_id = message_id ( msg . payload )
2319
20+ fastlane_pid is the actual socket transport pid
21+ """
22+ @ spec dispatch ( list , pid , Broadcast . t ( ) | UserBroadcast . t ( ) ) :: :ok
23+ def dispatch ( subscribers , from , % Broadcast { event: @ presence_diff } = msg ) do
2424 { _cache , count } =
2525 Enum . reduce ( subscribers , { % { } , 0 } , fn
2626 { pid , _ } , { cache , count } when pid == from ->
2727 { cache , count }
2828
29- { pid , { :rc_fastlane , fastlane_pid , serializer , join_topic , log_level , tenant_id , replayed_message_ids } } ,
29+ { _pid , { :rc_fastlane , fastlane_pid , serializer , join_topic , log_level , tenant_id , _replayed_message_ids } } ,
3030 { cache , count } ->
31- if already_replayed? ( message_id , replayed_message_ids ) do
32- # skip already replayed message
33- { cache , count }
34- else
35- if event != "presence_diff" , do: send ( pid , :update_rate_counter )
31+ maybe_log ( log_level , join_topic , msg , tenant_id )
3632
37- maybe_log ( log_level , join_topic , msg , tenant_id )
38-
39- cache = do_dispatch ( msg , fastlane_pid , serializer , join_topic , cache )
40- { cache , count + 1 }
41- end
33+ cache = do_dispatch ( msg , fastlane_pid , serializer , join_topic , cache , tenant_id , log_level )
34+ { cache , count + 1 }
4235
4336 { pid , _ } , { cache , count } ->
4437 send ( pid , msg )
4538 { cache , count }
4639 end )
4740
4841 tenant_id = tenant_id ( subscribers )
49- increment_presence_counter ( tenant_id , event , count )
42+ increment_presence_counter ( tenant_id , msg . event , count )
5043
5144 :ok
5245 end
5346
54- defp increment_presence_counter ( tenant_id , "presence_diff" , count ) when is_binary ( tenant_id ) do
55- tenant_id
56- |> Realtime.Tenants . presence_events_per_second_key ( )
57- |> Realtime.GenCounter . add ( count )
58- end
47+ def dispatch ( subscribers , from , msg ) do
48+ message_id = message_id ( msg )
5949
60- defp increment_presence_counter ( _tenant_id , _event , _count ) , do: :ok
50+ _ =
51+ Enum . reduce ( subscribers , % { } , fn
52+ { pid , _ } , cache when pid == from ->
53+ cache
54+
55+ { pid , { :rc_fastlane , fastlane_pid , serializer , join_topic , log_level , tenant_id , replayed_message_ids } } ,
56+ cache ->
57+ if already_replayed? ( message_id , replayed_message_ids ) do
58+ # skip already replayed message
59+ cache
60+ else
61+ send ( pid , :update_rate_counter )
62+
63+ maybe_log ( log_level , join_topic , msg , tenant_id )
64+
65+ do_dispatch ( msg , fastlane_pid , serializer , join_topic , cache , tenant_id , log_level )
66+ end
67+
68+ { pid , _ } , cache ->
69+ send ( pid , msg )
70+ cache
71+ end )
72+
73+ :ok
74+ end
6175
6276 defp maybe_log ( :info , join_topic , msg , tenant_id ) do
6377 log = "Received message on #{ join_topic } with payload: #{ inspect ( msg , pretty: true ) } "
@@ -66,30 +80,57 @@ defmodule RealtimeWeb.RealtimeChannel.MessageDispatcher do
6680
6781 defp maybe_log ( _level , _join_topic , _msg , _tenant_id ) , do: :ok
6882
69- defp message_id ( % { "meta" => % { "id" => id } } ) , do: id
70- defp message_id ( _ ) , do: nil
71-
72- defp already_replayed? ( nil , _replayed_message_ids ) , do: false
73- defp already_replayed? ( message_id , replayed_message_ids ) , do: MapSet . member? ( replayed_message_ids , message_id )
74-
75- defp do_dispatch ( msg , fastlane_pid , serializer , join_topic , cache ) do
83+ defp do_dispatch ( msg , fastlane_pid , serializer , join_topic , cache , tenant_id , log_level ) do
7684 case cache do
77- % { ^ serializer => encoded_msg } ->
85+ % { ^ serializer => { :ok , encoded_msg } } ->
7886 send ( fastlane_pid , encoded_msg )
7987 cache
8088
89+ % { ^ serializer => { :error , reason } } ->
90+ # We do nothing at this stage. It has been already logged depending on the log level
91+ cache
92+
8193 % { } ->
8294 # Use the original topic that was joined without the external_id
8395 msg = % { msg | topic: join_topic }
84- encoded_msg = serializer . fastlane! ( msg )
85- send ( fastlane_pid , encoded_msg )
86- Map . put ( cache , serializer , encoded_msg )
96+
97+ result =
98+ case fastlane! ( serializer , msg ) do
99+ { :ok , encoded_msg } ->
100+ send ( fastlane_pid , encoded_msg )
101+ { :ok , encoded_msg }
102+
103+ { :error , reason } ->
104+ maybe_log ( log_level , join_topic , reason , tenant_id )
105+ end
106+
107+ Map . put ( cache , serializer , result )
87108 end
88109 end
89110
90- defp tenant_id ( [ { _pid , { :rc_fastlane , _ , _ , _ , _ , tenant_id , _ } } | _ ] ) do
91- tenant_id
111+ # We have to convert because V1 does not know how to process UserBroadcast
112+ defp fastlane! ( Phoenix.Socket.V1.JSONSerializer = serializer , % UserBroadcast { } = msg ) do
113+ with { :ok , msg } <- UserBroadcast . convert_to_json_broadcast ( msg ) do
114+ { :ok , serializer . fastlane! ( msg ) }
115+ end
92116 end
93117
118+ defp fastlane! ( serializer , msg ) , do: { :ok , serializer . fastlane! ( msg ) }
119+
120+ defp tenant_id ( [ { _pid , { :rc_fastlane , _ , _ , _ , _ , tenant_id , _ } } | _ ] ) , do: tenant_id
94121 defp tenant_id ( _ ) , do: nil
122+
123+ defp increment_presence_counter ( tenant_id , "presence_diff" , count ) when is_binary ( tenant_id ) do
124+ tenant_id
125+ |> Realtime.Tenants . presence_events_per_second_key ( )
126+ |> Realtime.GenCounter . add ( count )
127+ end
128+
129+ defp increment_presence_counter ( _tenant_id , _event , _count ) , do: :ok
130+
131+ defp message_id ( % Broadcast { payload: % { "meta" => % { "id" => id } } } ) , do: id
132+ defp message_id ( _ ) , do: nil
133+
134+ defp already_replayed? ( nil , _replayed_message_ids ) , do: false
135+ defp already_replayed? ( message_id , replayed_message_ids ) , do: MapSet . member? ( replayed_message_ids , message_id )
95136end
0 commit comments