-
Notifications
You must be signed in to change notification settings - Fork 296
Open
Labels
Description
Elixir version
1.18.7-otp-27
Database and Version
17.2
Postgrex Version
master branch
Current behavior
Hi,
I think there is a possible bug in the way that the cancels of queries are being handeled
that can result in exceeding the connection pools in a high fashion and creating additional
load on Postgresql.
- Run a query where canceling the query takes long
- Query times out on the client (Ecto/Postgrex) with whatever timeout the user has set
- Postgrex intializes a cancel but never awaits/confirms the query has been canceled
- The slot in the pool gets returned and a new connection can be opened
- On postgres on the other hand the connection is not closed/still active because the query has not been canceled yet
- Now whatever limit we have set as POOL limit will exceed because we are allowing opening new connections
when the old connection has not been terminated. - This is not good because in the worst case if a malicious actor identifies this you can create a huge load on Postgresql and bypassing the POOL limits.
I am having a hard time to reproduce this behaviour with a test case but I think this test case reproduces it.
I am creating a partitioned table and I am forcing parallel execution so that canceling will
require to cancel all parallel workers which should increase the time until the query gets canceled.
test "partitioned table query cancellation", context do
conn = context[:pid]
%Postgrex.Result{connection_id: connection_id} = Postgrex.query!(conn, "SELECT 1", [])
:ok = query("SET max_parallel_workers_per_gather = 8;", [])
:ok = query("SET parallel_setup_cost = 0;", [])
:ok = query("SET parallel_tuple_cost = 0;", [])
Postgrex.query!(
conn,
"""
CREATE TABLE IF NOT EXISTS test_partitioned (
id SERIAL,
varchar_col VARCHAR(50),
int_col INTEGER,
time_col TIMESTAMPTZ
) PARTITION BY RANGE (time_col)
""",
[]
)
base_time = ~U[2023-01-01 00:00:00Z]
partitions_range = 0..350
IO.inspect("creating partitions")
for i <- partitions_range do
start_time = DateTime.add(base_time, i * 86400, :second)
end_time = DateTime.add(base_time, (i + 1) * 86400, :second)
Postgrex.query!(
conn,
"""
CREATE TABLE IF NOT EXISTS test_partitioned_#{i} PARTITION OF test_partitioned
FOR VALUES FROM ('#{start_time}') TO ('#{end_time}')
""",
[]
)
end
IO.inspect("inserting into partitions")
for i <- partitions_range do
partition_time = DateTime.add(base_time, i * 86400 + 3600, :second)
values =
for j <- 1..20000 do
"('varchar_#{rem(j, 100)}', #{j}, '#{DateTime.to_iso8601(partition_time)}')"
end
|> Enum.join(", ")
Postgrex.query!(
conn,
"""
INSERT INTO test_partitioned (varchar_col, int_col, time_col)
VALUES #{values}
""",
[]
)
end
Process.flag(:trap_exit, true)
IO.inspect("running query then cancel")
result =
capture_log(fn ->
case query(
"""
SELECT * FROM test_partitioned ORDER BY RANDOM() * int_col, varchar_col
""",
[],
timeout: 2_000
) do
%Postgrex.Error{postgres: %{message: "canceling statement due to user request"}} ->
:ok
%DBConnection.ConnectionError{message: "tcp recv: closed" <> _} ->
:ok
other ->
IO.inspect("unexpected result: #{inspect(other)}")
:ok
end
# assert_receive {:EXIT, ^conn, :killed}
end)
assert result =~ "disconnected: ** (DBConnection.ConnectionError)"
{:ok, new_conn} = Postgrex.start_link(context[:options])
assert %Postgrex.Result{rows: []} =
Postgrex.query!(
new_conn,
"SELECT * from pg_stat_activity where state = 'active' and pid = $1",
[connection_id]
)
endI hope my explanations are clear, if there are any questions please let me know.
Expected behavior
Properly wait/force the cancel of active queries before giving the connection slot back to the pool for new connections/queries.