Skip to content

Commit b8faf4f

Browse files
authored
fix: check buffer for all queues, not only first over limit (#3062)
* fix: check buffer for all queues, not only first over limit * perf: accomodate for spikes * chore: fix failing test
1 parent 8b9b86f commit b8faf4f

File tree

4 files changed

+12
-17
lines changed

4 files changed

+12
-17
lines changed

lib/logflare/backends.ex

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -816,15 +816,7 @@ defmodule Logflare.Backends do
816816
end
817817

818818
def cached_local_pending_buffer_full?(%Source{id: source_id}) do
819-
PubSubRates.Cache.get_local_buffer(source_id, nil)
820-
|> Map.get(:queues, [])
821-
|> case do
822-
[] ->
823-
false
824-
825-
queues ->
826-
Enum.all?(queues, fn {_key, v} -> v > @max_pending_buffer_len_per_queue end)
827-
end
819+
buffer_full_for_backend?(source_id, nil)
828820
end
829821

830822
@spec buffer_full_for_backend?(
@@ -835,7 +827,7 @@ defmodule Logflare.Backends do
835827
defp buffer_full_for_backend?(source_id, backend_id) do
836828
case PubSubRates.Cache.get_local_buffer(source_id, backend_id) do
837829
%{queues: [_ | _] = queues} ->
838-
Enum.any?(queues, fn {_key, count} ->
830+
Enum.all?(queues, fn {_key, count} ->
839831
count > @max_pending_buffer_len_per_queue
840832
end)
841833

lib/logflare/backends/ingest_event_queue.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ defmodule Logflare.Backends.IngestEventQueue do
1414

1515
@ets_table_mapper :ingest_event_queue_mapping
1616
@ets_table :source_ingest_events
17-
@max_queue_size 15_000
17+
@max_queue_size 30_000
1818
@type source_backend_pid ::
1919
{Source.t() | pos_integer(), Backend.t() | pos_integer() | nil, pid() | nil}
2020
@type table_key :: {pos_integer(), pos_integer() | nil, pid() | nil}

lib/logflare/backends/ingest_event_queue/queue_janitor.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@ defmodule Logflare.Backends.IngestEventQueue.QueueJanitor do
1414
require Logger
1515
@default_interval 1_000
1616
@default_remainder 100
17-
@default_max Logflare.Backends.max_buffer_queue_len()
1817
@default_purge_ratio 0.05
18+
@default_max round(Logflare.Backends.max_buffer_queue_len() * 1.2)
1919

2020
def start_link(opts) do
2121
GenServer.start_link(__MODULE__, opts)

test/logflare_web/plugs/buffer_limiter_test.exs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -263,10 +263,12 @@ defmodule LogflareWeb.Plugs.BufferLimiterTest do
263263
assert json_response(conn, 429)
264264
end
265265

266-
test "returns 429 when user default queue is full even if system default is not", %{
267-
conn: conn,
268-
source: source
269-
} do
266+
test "returns 429 when user's backend default ingest queue is full even if system queue is not",
267+
%{
268+
conn: conn,
269+
source: source,
270+
backend1: backend1
271+
} do
270272
system_queue_key = {source.id, nil, spawn(fn -> :ok end)}
271273
IngestEventQueue.upsert_tid(system_queue_key)
272274

@@ -275,7 +277,7 @@ defmodule LogflareWeb.Plugs.BufferLimiterTest do
275277
IngestEventQueue.add_to_table(system_queue_key, [le])
276278
end
277279

278-
user_queue_key = {source.id, nil, self()}
280+
user_queue_key = {source.id, backend1.id, spawn(fn -> :ok end)}
279281
IngestEventQueue.upsert_tid(user_queue_key)
280282

281283
for _ <- 1..(Backends.max_buffer_queue_len() + 500) do
@@ -284,6 +286,7 @@ defmodule LogflareWeb.Plugs.BufferLimiterTest do
284286
end
285287

286288
Backends.cache_local_buffer_lens(source.id, nil)
289+
Backends.cache_local_buffer_lens(source.id, backend1.id)
287290

288291
conn =
289292
conn

0 commit comments

Comments
 (0)