@@ -4,92 +4,138 @@ defmodule RealtimeWeb.RealtimeChannel.MessageDispatcher do
44 """
55
66 require Logger
7+ alias Phoenix.Socket.Broadcast
8+ alias RealtimeWeb.Socket.UserBroadcast
79
810 def fastlane_metadata ( fastlane_pid , serializer , topic , log_level , tenant_id , replayed_message_ids \\ MapSet . new ( ) ) do
911 { :rc_fastlane , fastlane_pid , serializer , topic , log_level , tenant_id , replayed_message_ids }
1012 end
1113
14+ @ presence_diff "presence_diff"
15+
1216 @ doc """
1317 This dispatch function caches encoded messages if fastlane is used
1418 It also sends an :update_rate_counter to the subscriber and it can conditionally log
15- """
16- @ spec dispatch ( list , pid , Phoenix.Socket.Broadcast . t ( ) ) :: :ok
17- def dispatch ( subscribers , from , % Phoenix.Socket.Broadcast { event: event } = msg ) do
18- # fastlane_pid is the actual socket transport pid
19- # This reduce caches the serialization and bypasses the channel process going straight to the
20- # transport process
21-
22- message_id = message_id ( msg . payload )
2319
20+ fastlane_pid is the actual socket transport pid
21+ """
22+ @ spec dispatch ( list , pid , Broadcast . t ( ) | UserBroadcast . t ( ) ) :: :ok
23+ def dispatch ( subscribers , from , % Broadcast { event: @ presence_diff } = msg ) do
2424 { _cache , count } =
2525 Enum . reduce ( subscribers , { % { } , 0 } , fn
2626 { pid , _ } , { cache , count } when pid == from ->
2727 { cache , count }
2828
29- { pid , { :rc_fastlane , fastlane_pid , serializer , join_topic , log_level , tenant_id , replayed_message_ids } } ,
29+ { _pid , { :rc_fastlane , fastlane_pid , serializer , join_topic , log_level , tenant_id , _replayed_message_ids } } ,
3030 { cache , count } ->
31+ maybe_log ( log_level , join_topic , msg , tenant_id )
32+
33+ cache = do_dispatch ( msg , fastlane_pid , serializer , join_topic , cache , tenant_id , log_level )
34+ { cache , count + 1 }
35+
36+ { pid , _ } , { cache , count } ->
37+ send ( pid , msg )
38+ { cache , count }
39+ end )
40+
41+ tenant_id = tenant_id ( subscribers )
42+ increment_presence_counter ( tenant_id , msg . event , count )
43+
44+ :ok
45+ end
46+
47+ def dispatch ( subscribers , from , msg ) do
48+ message_id = message_id ( msg )
49+
50+ _ =
51+ Enum . reduce ( subscribers , % { } , fn
52+ { pid , _ } , cache when pid == from ->
53+ cache
54+
55+ { pid , { :rc_fastlane , fastlane_pid , serializer , join_topic , log_level , tenant_id , replayed_message_ids } } ,
56+ cache ->
3157 if already_replayed? ( message_id , replayed_message_ids ) do
3258 # skip already replayed message
33- { cache , count }
59+ cache
3460 else
35- if event != "presence_diff" , do: send ( pid , :update_rate_counter )
61+ send ( pid , :update_rate_counter )
3662
3763 maybe_log ( log_level , join_topic , msg , tenant_id )
3864
39- cache = do_dispatch ( msg , fastlane_pid , serializer , join_topic , cache )
40- { cache , count + 1 }
65+ do_dispatch ( msg , fastlane_pid , serializer , join_topic , cache , tenant_id , log_level )
4166 end
4267
43- { pid , _ } , { cache , count } ->
68+ { pid , _ } , cache ->
4469 send ( pid , msg )
45- { cache , count }
70+ cache
4671 end )
4772
48- tenant_id = tenant_id ( subscribers )
49- increment_presence_counter ( tenant_id , event , count )
50-
5173 :ok
5274 end
5375
54- defp increment_presence_counter ( tenant_id , "presence_diff" , count ) when is_binary ( tenant_id ) do
55- tenant_id
56- |> Realtime.Tenants . presence_events_per_second_key ( )
57- |> Realtime.GenCounter . add ( count )
76+ defp maybe_log ( :info , join_topic , msg , tenant_id ) when is_struct ( msg ) do
77+ log = "Received message on #{ join_topic } with payload: #{ inspect ( msg , pretty: true ) } "
78+ Logger . info ( log , external_id: tenant_id , project: tenant_id )
5879 end
5980
60- defp increment_presence_counter ( _tenant_id , _event , _count ) , do: :ok
61-
62- defp maybe_log ( :info , join_topic , msg , tenant_id ) do
63- log = "Received message on #{ join_topic } with payload: #{ inspect ( msg , pretty: true ) } "
81+ defp maybe_log ( :info , join_topic , msg , tenant_id ) when is_binary ( msg ) do
82+ log = "Received message on #{ join_topic } . #{ msg } "
6483 Logger . info ( log , external_id: tenant_id , project: tenant_id )
6584 end
6685
6786 defp maybe_log ( _level , _join_topic , _msg , _tenant_id ) , do: :ok
6887
69- defp message_id ( % { "meta" => % { "id" => id } } ) , do: id
70- defp message_id ( _ ) , do: nil
71-
72- defp already_replayed? ( nil , _replayed_message_ids ) , do: false
73- defp already_replayed? ( message_id , replayed_message_ids ) , do: MapSet . member? ( replayed_message_ids , message_id )
74-
75- defp do_dispatch ( msg , fastlane_pid , serializer , join_topic , cache ) do
88+ defp do_dispatch ( msg , fastlane_pid , serializer , join_topic , cache , tenant_id , log_level ) do
7689 case cache do
77- % { ^ serializer => encoded_msg } ->
90+ % { ^ serializer => { :ok , encoded_msg } } ->
7891 send ( fastlane_pid , encoded_msg )
7992 cache
8093
94+ % { ^ serializer => { :error , _reason } } ->
95+ # We do nothing at this stage. It has been already logged depending on the log level
96+ cache
97+
8198 % { } ->
8299 # Use the original topic that was joined without the external_id
83100 msg = % { msg | topic: join_topic }
84- encoded_msg = serializer . fastlane! ( msg )
85- send ( fastlane_pid , encoded_msg )
86- Map . put ( cache , serializer , encoded_msg )
101+
102+ result =
103+ case fastlane! ( serializer , msg ) do
104+ { :ok , encoded_msg } ->
105+ send ( fastlane_pid , encoded_msg )
106+ { :ok , encoded_msg }
107+
108+ { :error , reason } ->
109+ maybe_log ( log_level , join_topic , reason , tenant_id )
110+ end
111+
112+ Map . put ( cache , serializer , result )
87113 end
88114 end
89115
90- defp tenant_id ( [ { _pid , { :rc_fastlane , _ , _ , _ , _ , tenant_id , _ } } | _ ] ) do
91- tenant_id
116+ # We have to convert because V1 does not know how to process UserBroadcast
117+ defp fastlane! ( Phoenix.Socket.V1.JSONSerializer = serializer , % UserBroadcast { } = msg ) do
118+ with { :ok , msg } <- UserBroadcast . convert_to_json_broadcast ( msg ) do
119+ { :ok , serializer . fastlane! ( msg ) }
120+ end
92121 end
93122
123+ defp fastlane! ( serializer , msg ) , do: { :ok , serializer . fastlane! ( msg ) }
124+
125+ defp tenant_id ( [ { _pid , { :rc_fastlane , _ , _ , _ , _ , tenant_id , _ } } | _ ] ) , do: tenant_id
94126 defp tenant_id ( _ ) , do: nil
127+
128+ defp increment_presence_counter ( tenant_id , "presence_diff" , count ) when is_binary ( tenant_id ) do
129+ tenant_id
130+ |> Realtime.Tenants . presence_events_per_second_key ( )
131+ |> Realtime.GenCounter . add ( count )
132+ end
133+
134+ defp increment_presence_counter ( _tenant_id , _event , _count ) , do: :ok
135+
136+ defp message_id ( % Broadcast { payload: % { "meta" => % { "id" => id } } } ) , do: id
137+ defp message_id ( _ ) , do: nil
138+
139+ defp already_replayed? ( nil , _replayed_message_ids ) , do: false
140+ defp already_replayed? ( message_id , replayed_message_ids ) , do: MapSet . member? ( replayed_message_ids , message_id )
95141end
0 commit comments