@@ -4,60 +4,74 @@ defmodule RealtimeWeb.RealtimeChannel.MessageDispatcher do
44 """
55
66 require Logger
7+ alias Phoenix.Socket.Broadcast
8+ alias RealtimeWeb.Socket.UserBroadcast
79
810 def fastlane_metadata ( fastlane_pid , serializer , topic , log_level , tenant_id , replayed_message_ids \\ MapSet . new ( ) ) do
911 { :rc_fastlane , fastlane_pid , serializer , topic , log_level , tenant_id , replayed_message_ids }
1012 end
1113
14+ @ presence_diff "presence_diff"
15+
1216 @ doc """
1317 This dispatch function caches encoded messages if fastlane is used
1418 It also sends an :update_rate_counter to the subscriber and it can conditionally log
15- """
16- @ spec dispatch ( list , pid , Phoenix.Socket.Broadcast . t ( ) ) :: :ok
17- def dispatch ( subscribers , from , % Phoenix.Socket.Broadcast { event: event } = msg ) do
18- # fastlane_pid is the actual socket transport pid
19- # This reduce caches the serialization and bypasses the channel process going straight to the
20- # transport process
21-
22- message_id = message_id ( msg . payload )
2319
20+ fastlane_pid is the actual socket transport pid
21+ """
22+ @ spec dispatch ( list , pid , Broadcast . t ( ) | UserBroadcast . t ( ) ) :: :ok
23+ def dispatch ( subscribers , from , % Broadcast { event: @ presence_diff } = msg ) do
2424 { _cache , count } =
2525 Enum . reduce ( subscribers , { % { } , 0 } , fn
2626 { pid , _ } , { cache , count } when pid == from ->
2727 { cache , count }
2828
29- { pid , { :rc_fastlane , fastlane_pid , serializer , join_topic , log_level , tenant_id , replayed_message_ids } } ,
29+ { _pid , { :rc_fastlane , fastlane_pid , serializer , join_topic , log_level , tenant_id , _replayed_message_ids } } ,
3030 { cache , count } ->
31- if already_replayed? ( message_id , replayed_message_ids ) do
32- # skip already replayed message
33- { cache , count }
34- else
35- if event != "presence_diff" , do: send ( pid , :update_rate_counter )
31+ maybe_log ( log_level , join_topic , msg , tenant_id )
3632
37- maybe_log ( log_level , join_topic , msg , tenant_id )
38-
39- cache = do_dispatch ( msg , fastlane_pid , serializer , join_topic , cache )
40- { cache , count + 1 }
41- end
33+ cache = do_dispatch ( msg , fastlane_pid , serializer , join_topic , cache )
34+ { cache , count + 1 }
4235
4336 { pid , _ } , { cache , count } ->
4437 send ( pid , msg )
4538 { cache , count }
4639 end )
4740
4841 tenant_id = tenant_id ( subscribers )
49- increment_presence_counter ( tenant_id , event , count )
42+ increment_presence_counter ( tenant_id , msg . event , count )
5043
5144 :ok
5245 end
5346
54- defp increment_presence_counter ( tenant_id , "presence_diff" , count ) when is_binary ( tenant_id ) do
55- tenant_id
56- |> Realtime.Tenants . presence_events_per_second_key ( )
57- |> Realtime.GenCounter . add ( count )
58- end
47+ def dispatch ( subscribers , from , msg ) do
48+ message_id = message_id ( msg )
5949
60- defp increment_presence_counter ( _tenant_id , _event , _count ) , do: :ok
50+ _ =
51+ Enum . reduce ( subscribers , % { } , fn
52+ { pid , _ } , cache when pid == from ->
53+ cache
54+
55+ { pid , { :rc_fastlane , fastlane_pid , serializer , join_topic , log_level , tenant_id , replayed_message_ids } } ,
56+ cache ->
57+ if already_replayed? ( message_id , replayed_message_ids ) do
58+ # skip already replayed message
59+ cache
60+ else
61+ send ( pid , :update_rate_counter )
62+
63+ maybe_log ( log_level , join_topic , msg , tenant_id )
64+
65+ do_dispatch ( msg , fastlane_pid , serializer , join_topic , cache )
66+ end
67+
68+ { pid , _ } , cache ->
69+ send ( pid , msg )
70+ cache
71+ end )
72+
73+ :ok
74+ end
6175
6276 defp maybe_log ( :info , join_topic , msg , tenant_id ) do
6377 log = "Received message on #{ join_topic } with payload: #{ inspect ( msg , pretty: true ) } "
@@ -66,30 +80,53 @@ defmodule RealtimeWeb.RealtimeChannel.MessageDispatcher do
6680
6781 defp maybe_log ( _level , _join_topic , _msg , _tenant_id ) , do: :ok
6882
69- defp message_id ( % { "meta" => % { "id" => id } } ) , do: id
70- defp message_id ( _ ) , do: nil
71-
72- defp already_replayed? ( nil , _replayed_message_ids ) , do: false
73- defp already_replayed? ( message_id , replayed_message_ids ) , do: MapSet . member? ( replayed_message_ids , message_id )
74-
7583 defp do_dispatch ( msg , fastlane_pid , serializer , join_topic , cache ) do
7684 case cache do
77- % { ^ serializer => encoded_msg } ->
85+ % { ^ serializer => { :ok , encoded_msg } } ->
7886 send ( fastlane_pid , encoded_msg )
7987 cache
8088
89+ % { ^ serializer => { :error , reason } } ->
90+ # FIXME: log once when we try to serialize!
91+ raise reason
92+ cache
93+
8194 % { } ->
8295 # Use the original topic that was joined without the external_id
8396 msg = % { msg | topic: join_topic }
84- encoded_msg = serializer . fastlane! ( msg )
85- send ( fastlane_pid , encoded_msg )
86- Map . put ( cache , serializer , encoded_msg )
97+
98+ result =
99+ with { :ok , encoded_msg } <- fastlane! ( serializer , msg ) do
100+ send ( fastlane_pid , encoded_msg )
101+ { :ok , encoded_msg }
102+ end
103+
104+ Map . put ( cache , serializer , result )
87105 end
88106 end
89107
90- defp tenant_id ( [ { _pid , { :rc_fastlane , _ , _ , _ , _ , tenant_id , _ } } | _ ] ) do
91- tenant_id
108+ defp fastlane! ( Phoenix.Socket.V1.JSONSerializer = serializer , % UserBroadcast { } = msg ) do
109+ with { :ok , msg } <- UserBroadcast . convert_to_json_broadcast ( msg ) do
110+ { :ok , serializer . fastlane! ( msg ) }
111+ end
92112 end
93113
114+ defp fastlane! ( serializer , msg ) , do: { :ok , serializer . fastlane! ( msg ) }
115+
116+ defp tenant_id ( [ { _pid , { :rc_fastlane , _ , _ , _ , _ , tenant_id , _ } } | _ ] ) , do: tenant_id
94117 defp tenant_id ( _ ) , do: nil
118+
119+ defp increment_presence_counter ( tenant_id , "presence_diff" , count ) when is_binary ( tenant_id ) do
120+ tenant_id
121+ |> Realtime.Tenants . presence_events_per_second_key ( )
122+ |> Realtime.GenCounter . add ( count )
123+ end
124+
125+ defp increment_presence_counter ( _tenant_id , _event , _count ) , do: :ok
126+
127+ defp message_id ( % Broadcast { payload: % { "meta" => % { "id" => id } } } ) , do: id
128+ defp message_id ( _ ) , do: nil
129+
130+ defp already_replayed? ( nil , _replayed_message_ids ) , do: false
131+ defp already_replayed? ( message_id , replayed_message_ids ) , do: MapSet . member? ( replayed_message_ids , message_id )
95132end
0 commit comments