Skip to content

Commit 632b489

Browse files
authored
fix: move rate limit message to happen inside the RateCounter (#1487)
1 parent c84aedc commit 632b489

File tree

6 files changed

+142
-21
lines changed

6 files changed

+142
-21
lines changed

lib/realtime/rate_counter/rate_counter.ex

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ defmodule Realtime.RateCounter do
3434
tick_ref: nil,
3535
idle_shutdown: @idle_shutdown,
3636
idle_shutdown_ref: nil,
37+
limit: %{log: false},
3738
telemetry: %{
3839
event_name: [@app_name] ++ [:rate_counter],
3940
measurements: %{sum: 0},
@@ -49,6 +50,12 @@ defmodule Realtime.RateCounter do
4950
tick_ref: reference(),
5051
idle_shutdown: integer() | :infinity,
5152
idle_shutdown_ref: reference(),
53+
limit: %{
54+
log: boolean(),
55+
value: integer(),
56+
triggered: boolean(),
57+
log_fn: (-> term())
58+
},
5259
telemetry: %{
5360
emit: false,
5461
event_name: :telemetry.event_name(),
@@ -133,6 +140,8 @@ defmodule Realtime.RateCounter do
133140
every = Keyword.get(args, :tick, @tick)
134141
max_bucket_len = Keyword.get(args, :max_bucket_len, @max_bucket_len)
135142
idle_shutdown_ms = Keyword.get(args, :idle_shutdown, @idle_shutdown)
143+
limit_opts = Keyword.get(args, :limit)
144+
136145
Logger.info("Starting #{__MODULE__} for: #{inspect(id)}")
137146

138147
# Always reset the counter in case the counter had already accumulated without
@@ -153,6 +162,18 @@ defmodule Realtime.RateCounter do
153162
%{emit: false}
154163
end
155164

165+
limit =
166+
if limit_opts do
167+
%{
168+
log: true,
169+
value: limit_opts[:value],
170+
log_fn: limit_opts[:log_fn],
171+
triggered: false
172+
}
173+
else
174+
%{log: false}
175+
end
176+
156177
ticker = tick(0)
157178

158179
idle_shutdown_ref =
@@ -165,7 +186,8 @@ defmodule Realtime.RateCounter do
165186
max_bucket_len: max_bucket_len,
166187
idle_shutdown: idle_shutdown_ms,
167188
idle_shutdown_ref: idle_shutdown_ref,
168-
telemetry: telemetry
189+
telemetry: telemetry,
190+
limit: limit
169191
}
170192

171193
Cachex.put!(@cache, id, state)
@@ -195,6 +217,8 @@ defmodule Realtime.RateCounter do
195217
|> Kernel./(bucket_len)
196218

197219
state = %{state | bucket: bucket, avg: avg}
220+
221+
state = maybe_trigger_limit(state)
198222
tick(state.tick)
199223

200224
Cachex.put!(@cache, state.id, state)
@@ -226,6 +250,28 @@ defmodule Realtime.RateCounter do
226250
end
227251
end
228252

253+
defp maybe_trigger_limit(%{limit: %{log: false}} = state), do: state
254+
255+
defp maybe_trigger_limit(%{limit: %{triggered: true}} = state) do
256+
# Limit has been triggered, but we need to check if it is still above the limit
257+
if state.avg < state.limit.value do
258+
%{state | limit: %{state.limit | triggered: false}}
259+
else
260+
# Limit is still above the threshold, so we keep the state as is
261+
state
262+
end
263+
end
264+
265+
defp maybe_trigger_limit(state) do
266+
if state.avg >= state.limit.value do
267+
state.limit.log_fn.()
268+
269+
%{state | limit: %{state.limit | triggered: true}}
270+
else
271+
state
272+
end
273+
end
274+
229275
defp tick(every) do
230276
Process.send_after(self(), :tick, every)
231277
end

lib/realtime/tenants.ex

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,16 @@ defmodule Realtime.Tenants do
151151
event_name: [:channel, :joins],
152152
measurements: %{limit: max_joins_per_second},
153153
metadata: %{tenant: tenant_id}
154-
}
154+
},
155+
limit: [
156+
value: max_joins_per_second,
157+
log_fn: fn ->
158+
Logger.critical("ClientJoinRateLimitReached: Too many joins per second",
159+
external_id: tenant_id,
160+
project: tenant_id
161+
)
162+
end
163+
]
155164
]
156165

157166
%RateCounter.Args{id: joins_per_second_key(tenant_id), opts: opts}

lib/realtime_web/channels/realtime_channel.ex

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ defmodule RealtimeWeb.RealtimeChannel do
137137

138138
{:error, :too_many_joins} ->
139139
msg = "Too many joins per second"
140-
Logging.log_error_message(:error, "ClientJoinRateLimitReached", msg)
140+
{:error, %{reason: msg}}
141141

142142
{:error, :increase_connection_pool} ->
143143
msg = "Please increase your connection pool size"
@@ -504,10 +504,10 @@ defmodule RealtimeWeb.RealtimeChannel do
504504
rate_args = Tenants.joins_per_second_rate(tenant, limits.max_joins_per_second)
505505

506506
RateCounter.new(rate_args)
507-
GenCounter.add(rate_args.id)
508507

509508
case RateCounter.get(rate_args) do
510509
{:ok, %{avg: avg}} when avg < limits.max_joins_per_second ->
510+
GenCounter.add(rate_args.id)
511511
:ok
512512

513513
{:ok, %{avg: _}} ->

mix.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
44
def project do
55
[
66
app: :realtime,
7-
version: "2.41.17",
7+
version: "2.41.18",
88
elixir: "~> 1.17.3",
99
elixirc_paths: elixirc_paths(Mix.env()),
1010
start_permanent: Mix.env() == :prod,

test/integration/rt_channel_test.exs

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1812,21 +1812,31 @@ defmodule Realtime.Integration.RtChannelTest do
18121812
config = %{broadcast: %{self: true}, private: false}
18131813
realtime_topic = "realtime:#{random_string()}"
18141814

1815-
for _ <- 1..1000 do
1816-
WebsocketClient.join(socket, realtime_topic, %{config: config})
1817-
1..5 |> Enum.random() |> Process.sleep()
1818-
end
1815+
log =
1816+
capture_log(fn ->
1817+
for _ <- 1..1000 do
1818+
WebsocketClient.join(socket, realtime_topic, %{config: config})
1819+
1..5 |> Enum.random() |> Process.sleep()
1820+
end
18191821

1820-
assert_receive %Message{
1821-
event: "phx_reply",
1822-
payload: %{
1823-
"response" => %{"reason" => "Too many joins per second"},
1824-
"status" => "error"
1825-
}
1826-
},
1827-
2000
1822+
assert_receive %Message{
1823+
event: "phx_reply",
1824+
payload: %{
1825+
"response" => %{"reason" => "Too many joins per second"},
1826+
"status" => "error"
1827+
}
1828+
},
1829+
2000
1830+
1831+
change_tenant_configuration(tenant, :max_joins_per_second, max_joins_per_second)
1832+
end)
1833+
1834+
assert log =~
1835+
"project=#{tenant.external_id} external_id=#{tenant.external_id} [critical] ClientJoinRateLimitReached: Too many joins per second"
18281836

1829-
change_tenant_configuration(tenant, :max_joins_per_second, max_joins_per_second)
1837+
# Only one log message should be emitted
1838+
# Splitting by the error message returns the error message and the rest of the log only
1839+
assert length(String.split(log, "ClientJoinRateLimitReached")) == 2
18301840
end
18311841
end
18321842

test/realtime/rate_counter/rate_counter_test.exs

Lines changed: 59 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,14 @@
11
defmodule Realtime.RateCounterTest do
22
use Realtime.DataCase, async: true
33

4+
require Logger
5+
46
alias Realtime.RateCounter
57
alias Realtime.RateCounter.Args
68
alias Realtime.GenCounter
79

10+
import ExUnit.CaptureLog
11+
812
describe "new/2" do
913
test "starts a new rate counter without telemetry" do
1014
id = {:domain, :metric, Ecto.UUID.generate()}
@@ -14,13 +18,14 @@ defmodule Realtime.RateCounterTest do
1418
assert %Realtime.RateCounter{
1519
id: ^id,
1620
avg: +0.0,
17-
bucket: [],
21+
bucket: _,
1822
max_bucket_len: 60,
1923
tick: 1000,
2024
tick_ref: _,
2125
idle_shutdown: 900_000,
2226
idle_shutdown_ref: _,
23-
telemetry: %{emit: false}
27+
telemetry: %{emit: false},
28+
limit: %{log: false}
2429
} = :sys.get_state(pid)
2530
end
2631

@@ -39,6 +44,7 @@ defmodule Realtime.RateCounterTest do
3944
args = %Args{
4045
id: id,
4146
opts: [
47+
tick: 10,
4248
telemetry: %{
4349
event_name: [:custom, :new_event],
4450
measurements: %{limit: 123},
@@ -54,7 +60,7 @@ defmodule Realtime.RateCounterTest do
5460
avg: +0.0,
5561
bucket: _,
5662
max_bucket_len: 60,
57-
tick: 1000,
63+
tick: 10,
5864
tick_ref: _,
5965
idle_shutdown: 900_000,
6066
idle_shutdown_ref: _,
@@ -75,6 +81,56 @@ defmodule Realtime.RateCounterTest do
7581
}
7682
end
7783

84+
test "starts a new rate counter with limit to log" do
85+
id = {:domain, :metric, Ecto.UUID.generate()}
86+
87+
args = %Args{
88+
id: id,
89+
opts: [
90+
tick: 100,
91+
max_bucket_len: 10,
92+
limit: [
93+
value: 10,
94+
log_fn: fn ->
95+
Logger.error("ErrorMessage: Reason", external_id: "tenant123", project: "tenant123")
96+
end
97+
]
98+
]
99+
}
100+
101+
assert {:ok, pid} = RateCounter.new(args)
102+
103+
assert %RateCounter{
104+
id: ^id,
105+
avg: +0.0,
106+
bucket: _,
107+
max_bucket_len: 10,
108+
telemetry: %{emit: false},
109+
limit: %{
110+
log: true,
111+
value: 10,
112+
triggered: false
113+
}
114+
} = :sys.get_state(pid)
115+
116+
log =
117+
capture_log(fn ->
118+
GenCounter.add(args.id, 50)
119+
Process.sleep(300)
120+
end)
121+
122+
assert {:ok, %RateCounter{limit: %{triggered: true}}} = RateCounter.get(args)
123+
assert log =~ "project=tenant123 external_id=tenant123 [error] ErrorMessage: Reason"
124+
125+
# Only one log message should be emitted
126+
# Splitting by the error message returns the error message and the rest of the log only
127+
assert length(String.split(log, "ErrorMessage: Reason")) == 2
128+
129+
Process.sleep(300)
130+
131+
assert {:ok, %RateCounter{limit: %{triggered: false}}} = RateCounter.get(args)
132+
end
133+
78134
test "reset counter if GenCounter already had something" do
79135
args = %Args{id: {:domain, :metric, Ecto.UUID.generate()}}
80136
GenCounter.add(args.id, 100)

0 commit comments

Comments
 (0)