Skip to content

Commit 1d140af

Browse files
committed
only log in case of error
1 parent cfb3167 commit 1d140af

File tree

3 files changed

+93
-113
lines changed

3 files changed

+93
-113
lines changed

lib/realtime_web/channels/realtime_channel.ex

Lines changed: 71 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@ defmodule RealtimeWeb.RealtimeChannel do
55
use RealtimeWeb, :channel
66
use RealtimeWeb.RealtimeChannel.Logging
77

8+
alias RealtimeWeb.SocketDisconnect
89
alias DBConnection.Backoff
9-
alias Phoenix.Socket
1010

1111
alias Realtime.Crypto
1212
alias Realtime.GenCounter
@@ -22,14 +22,11 @@ defmodule RealtimeWeb.RealtimeChannel do
2222
alias Realtime.Tenants.Connect
2323

2424
alias RealtimeWeb.Channels.Payloads.Join
25-
alias RealtimeWeb.Channels.Payloads.Config
26-
alias RealtimeWeb.Channels.Payloads.PostgresChange
2725
alias RealtimeWeb.ChannelsAuthorization
2826
alias RealtimeWeb.RealtimeChannel.BroadcastHandler
2927
alias RealtimeWeb.RealtimeChannel.MessageDispatcher
3028
alias RealtimeWeb.RealtimeChannel.PresenceHandler
3129
alias RealtimeWeb.RealtimeChannel.Tracker
32-
alias RealtimeWeb.SocketDisconnect
3330

3431
@confirm_token_ms_interval :timer.minutes(5)
3532

@@ -50,11 +47,20 @@ defmodule RealtimeWeb.RealtimeChannel do
5047
Logger.metadata(external_id: tenant_id, project: tenant_id)
5148
Logger.put_process_level(self(), log_level)
5249

53-
# We always need to assign the access token so we can get the logs metadata working as expected
54-
socket = assign_access_token(socket, params)
50+
socket =
51+
socket
52+
|> assign_access_token(params)
53+
|> assign_counter()
54+
|> assign_presence_counter()
55+
|> assign(:private?, !!params["config"]["private"])
56+
|> assign(:policies, nil)
57+
58+
case Join.validate(params) do
59+
{:ok, _join} -> nil
60+
{:error, :invalid_join_payload, errors} -> log_error(socket, "InvalidJoinPayload", errors)
61+
end
5562

56-
with {:ok, %Socket{} = socket, %Join{} = configuration} <- configure_socket(socket, params),
57-
:ok <- SignalHandler.shutdown_in_progress?(),
63+
with :ok <- SignalHandler.shutdown_in_progress?(),
5864
:ok <- only_private?(tenant_id, socket),
5965
:ok <- limit_joins(socket),
6066
:ok <- limit_channels(socket),
@@ -64,6 +70,7 @@ defmodule RealtimeWeb.RealtimeChannel do
6470
{:ok, db_conn} <- Connect.lookup_or_start_connection(tenant_id),
6571
{:ok, socket} <- maybe_assign_policies(sub_topic, db_conn, socket) do
6672
tenant_topic = Tenants.tenant_topic(tenant_id, sub_topic, !socket.assigns.private?)
73+
6774
# fastlane subscription
6875
metadata =
6976
MessageDispatcher.fastlane_metadata(transport_pid, serializer, topic, socket.assigns.log_level, tenant_id)
@@ -72,11 +79,15 @@ defmodule RealtimeWeb.RealtimeChannel do
7279

7380
Phoenix.PubSub.subscribe(Realtime.PubSub, "realtime:operations:" <> tenant_id)
7481

75-
is_new_api = new_api?(configuration)
76-
77-
presence_enabled? = Join.presence_enabled?(configuration)
82+
is_new_api = new_api?(params)
83+
# TODO: Default will be moved to false in the future
84+
presence_enabled? =
85+
case get_in(params, ["config", "presence", "enabled"]) do
86+
enabled when is_boolean(enabled) -> enabled
87+
_ -> true
88+
end
7889

79-
pg_change_params = pg_change_params(is_new_api, configuration, channel_pid, claims, sub_topic)
90+
pg_change_params = pg_change_params(is_new_api, params, channel_pid, claims, sub_topic)
8091

8192
opts = %{
8293
is_new_api: is_new_api,
@@ -93,13 +104,13 @@ defmodule RealtimeWeb.RealtimeChannel do
93104
state = %{postgres_changes: add_id_to_postgres_changes(pg_change_params)}
94105

95106
assigns = %{
96-
ack_broadcast: Join.ack_broadcast?(configuration),
107+
ack_broadcast: !!params["config"]["broadcast"]["ack"],
97108
confirm_token_ref: confirm_token_ref,
98109
is_new_api: is_new_api,
99110
pg_sub_ref: nil,
100111
pg_change_params: pg_change_params,
101-
presence_key: Join.presence_key(configuration),
102-
self_broadcast: Join.self_broadcast?(configuration),
112+
presence_key: presence_key(params),
113+
self_broadcast: !!params["config"]["broadcast"]["self"],
103114
tenant_topic: tenant_topic,
104115
channel_name: sub_topic,
105116
presence_enabled?: presence_enabled?
@@ -113,9 +124,6 @@ defmodule RealtimeWeb.RealtimeChannel do
113124

114125
{:ok, state, assign(socket, assigns)}
115126
else
116-
{:error, :invalid_join_payload, errors, socket} ->
117-
log_error(socket, "InvalidJoinPayload", errors)
118-
119127
{:error, :expired_token, msg} ->
120128
maybe_log_warning(socket, "InvalidJWTToken", msg)
121129

@@ -192,23 +200,6 @@ defmodule RealtimeWeb.RealtimeChannel do
192200
end
193201
end
194202

195-
defp configure_socket(socket, params) do
196-
case Join.validate(params) do
197-
{:ok, configuration} ->
198-
socket =
199-
socket
200-
|> assign_counter()
201-
|> assign_presence_counter()
202-
|> assign(:private?, Join.private?(configuration))
203-
|> assign(:policies, nil)
204-
205-
{:ok, socket, configuration}
206-
207-
{:error, :invalid_join_payload, errors} ->
208-
{:error, :invalid_join_payload, errors, socket}
209-
end
210-
end
211-
212203
@impl true
213204
def handle_info(:update_rate_counter, %{assigns: %{limits: %{max_events_per_second: max}}} = socket) do
214205
count(socket)
@@ -546,24 +537,40 @@ defmodule RealtimeWeb.RealtimeChannel do
546537

547538
defp count(%{assigns: %{rate_counter: counter}}), do: GenCounter.add(counter.id)
548539

549-
defp assign_access_token(socket, params) do
550-
%{assigns: %{tenant_token: tenant_token, headers: headers}} = socket
540+
defp presence_key(params) do
541+
case params["config"]["presence"]["key"] do
542+
key when is_binary(key) and key != "" -> key
543+
_ -> UUID.uuid1()
544+
end
545+
end
546+
547+
defp assign_access_token(%{assigns: %{headers: headers}} = socket, params) do
548+
access_token = Map.get(params, "access_token") || Map.get(params, "user_token")
551549
{_, header} = Enum.find(headers, {nil, nil}, fn {k, _} -> k == "x-api-key" end)
552550

553-
access_token = Map.get(params, "access_token")
554-
user_token = Map.get(params, "user_token")
551+
case access_token do
552+
nil -> assign(socket, :access_token, header)
553+
"sb_" <> _ -> assign(socket, :access_token, header)
554+
_ -> handle_access_token(socket, params)
555+
end
556+
end
555557

556-
access_token =
557-
cond do
558-
is_binary(access_token) and !String.starts_with?(access_token, "sb_") -> access_token
559-
is_binary(user_token) and !String.starts_with?(user_token, "sb_") -> user_token
560-
is_binary(tenant_token) and !String.starts_with?(tenant_token, "sb_") -> tenant_token
561-
true -> header
562-
end
558+
defp assign_access_token(socket, params), do: handle_access_token(socket, params)
559+
560+
defp handle_access_token(%{assigns: %{tenant_token: _tenant_token}} = socket, %{"user_token" => user_token})
561+
when is_binary(user_token) do
562+
assign(socket, :access_token, user_token)
563+
end
563564

565+
defp handle_access_token(%{assigns: %{tenant_token: _tenant_token}} = socket, %{"access_token" => access_token})
566+
when is_binary(access_token) do
564567
assign(socket, :access_token, access_token)
565568
end
566569

570+
defp handle_access_token(%{assigns: %{tenant_token: tenant_token}} = socket, _params) when is_binary(tenant_token) do
571+
assign(socket, :access_token, tenant_token)
572+
end
573+
567574
defp confirm_token(%{assigns: assigns}) do
568575
%{jwt_secret: jwt_secret, access_token: access_token} = assigns
569576

@@ -630,30 +637,28 @@ defmodule RealtimeWeb.RealtimeChannel do
630637
})
631638
end
632639

633-
defp new_api?(%Join{config: config}) when not is_nil(config), do: true
640+
defp new_api?(%{"config" => _}), do: true
634641
defp new_api?(_), do: false
635642

636-
defp pg_change_params(true, %Join{config: %Config{postgres_changes: postgres_changes}}, channel_pid, claims, _)
637-
when not is_nil(postgres_changes) do
638-
postgres_changes
639-
|> Enum.reject(&is_nil/1)
640-
|> Enum.map(fn %PostgresChange{table: table, event: event, schema: schema, filter: filter} ->
641-
params =
642-
%{"table" => table, "filter" => filter, "schema" => schema, "event" => event}
643-
|> Enum.reject(fn {_, v} -> is_nil(v) end)
644-
|> Map.new()
645-
646-
%{
647-
id: UUID.uuid1(),
648-
channel_pid: channel_pid,
649-
claims: claims,
650-
params: params
651-
}
652-
end)
643+
defp pg_change_params(true, params, channel_pid, claims, _) do
644+
case get_in(params, ["config", "postgres_changes"]) do
645+
[_ | _] = params_list ->
646+
params_list
647+
|> Enum.reject(&is_nil/1)
648+
|> Enum.map(fn params ->
649+
%{
650+
id: UUID.uuid1(),
651+
channel_pid: channel_pid,
652+
claims: claims,
653+
params: params
654+
}
655+
end)
656+
657+
_ ->
658+
[]
659+
end
653660
end
654661

655-
defp pg_change_params(true, _, _, _, _), do: []
656-
657662
defp pg_change_params(false, _, channel_pid, claims, sub_topic) do
658663
params =
659664
case String.split(sub_topic, ":", parts: 3) do

test/integration/rt_channel_test.exs

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -420,18 +420,28 @@ defmodule Realtime.Integration.RtChannelTest do
420420
500
421421
end
422422

423-
test "nil postgres changes params identified as error", %{tenant: tenant} do
423+
test "handle nil postgres changes params as empty param changes", %{tenant: tenant} do
424424
{socket, _} = get_connection(tenant)
425425
topic = "realtime:any"
426426
config = %{postgres_changes: [nil]}
427427

428-
log =
429-
capture_log(fn ->
430-
WebsocketClient.join(socket, topic, %{config: config})
431-
Process.sleep(500)
432-
end)
428+
WebsocketClient.join(socket, topic, %{config: config})
433429

434-
assert log =~ "InvalidJoinPayload"
430+
assert_receive %Message{event: "phx_reply", payload: %{"status" => "ok"}, topic: ^topic}, 200
431+
assert_receive %Phoenix.Socket.Message{event: "presence_state", payload: %{}, topic: ^topic}, 500
432+
433+
refute_receive %Message{
434+
event: "system",
435+
payload: %{
436+
"channel" => "any",
437+
"extension" => "postgres_changes",
438+
"message" => "Subscribed to PostgreSQL",
439+
"status" => "ok"
440+
},
441+
ref: nil,
442+
topic: ^topic
443+
},
444+
1000
435445
end
436446
end
437447

test/realtime_web/channels/realtime_channel_test.exs

Lines changed: 5 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -503,9 +503,12 @@ defmodule RealtimeWeb.RealtimeChannelTest do
503503

504504
test "expired jwt returns a error with sub data if available log_level=warning", %{tenant: tenant} do
505505
sub = random_string()
506+
506507
api_key = Generators.generate_jwt_token(tenant)
507-
claims = %{role: "authenticated", exp: System.system_time(:second) - 1, sub: sub}
508-
jwt = Generators.generate_jwt_token(tenant, claims)
508+
509+
jwt =
510+
Generators.generate_jwt_token(tenant, %{role: "authenticated", exp: System.system_time(:second) - 1, sub: sub})
511+
509512
assert {:ok, socket} = connect(UserSocket, %{"log_level" => "warning"}, conn_opts(tenant, api_key))
510513

511514
log =
@@ -672,44 +675,6 @@ defmodule RealtimeWeb.RealtimeChannelTest do
672675
end
673676
end
674677

675-
describe "join payload validations" do
676-
test "valid payload allows join", %{tenant: tenant} do
677-
jwt = Generators.generate_jwt_token(tenant)
678-
{:ok, %Socket{} = socket} = connect(UserSocket, %{"log_level" => "warning"}, conn_opts(tenant, jwt))
679-
680-
config = %{
681-
"config" => %{
682-
"private" => false,
683-
"broadcast" => %{"ack" => false, "self" => false},
684-
"presence" => %{"enabled" => true, "key" => "potato"},
685-
"postgres_changes" => [
686-
%{"event" => "INSERT", "schema" => "public", "table" => "users", "filter" => "id=eq.1"},
687-
%{"event" => "DELETE", "schema" => "public", "table" => "users", "filter" => "id=eq.2"},
688-
%{"event" => "UPDATE", "schema" => "public", "table" => "users", "filter" => "id=eq.3"}
689-
]
690-
},
691-
"access_token" => jwt
692-
}
693-
694-
assert {:ok, _, %Socket{}} = subscribe_and_join(socket, "realtime:test", config)
695-
end
696-
697-
test "invalid payload returns error", %{tenant: tenant} do
698-
jwt = Generators.generate_jwt_token(tenant)
699-
{:ok, %Socket{} = socket} = connect(UserSocket, %{"log_level" => "warning"}, conn_opts(tenant, jwt))
700-
701-
log =
702-
capture_log(fn ->
703-
assert {:error, %{reason: reason}} =
704-
subscribe_and_join(socket, "realtime:test", %{"config" => "potato"})
705-
706-
assert reason =~ "unable to parse, expected a map"
707-
end)
708-
709-
assert log =~ "InvalidJoinPayload"
710-
end
711-
end
712-
713678
test "registers transport pid and channel pid per tenant", %{tenant: tenant} do
714679
jwt = Generators.generate_jwt_token(tenant)
715680
{:ok, %Socket{} = socket} = connect(UserSocket, %{"log_level" => "warning"}, conn_opts(tenant, jwt))

0 commit comments

Comments
 (0)