diff --git a/CHANGELOG.md b/CHANGELOG.md index 7082d7c..3ac1fb0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # Changelog +## Unreleased + +- use `disconnect_and_retry` (added in DBConnection v2.9.0) instead of `disconnect` for connection errors https://github.com/plausible/ch/pull/292 + ## 0.6.2 (2026-01-03) - added support for `multipart/form-data` in queries: https://github.com/plausible/ch/pull/290 -- which allows bypassing URL length limits sometimes imposed by reverse proxies when sending queries with many parameters. diff --git a/lib/ch/connection.ex b/lib/ch/connection.ex index 9ff5074..b53394a 100644 --- a/lib/ch/connection.ex +++ b/lib/ch/connection.ex @@ -12,7 +12,20 @@ defmodule Ch.Connection do @impl true @spec connect([Ch.start_option()]) :: {:ok, conn} | {:error, Error.t() | Mint.Types.error()} def connect(opts) do - with {:ok, conn} <- do_connect(opts) do + scheme = String.to_existing_atom(opts[:scheme] || "http") + address = opts[:hostname] || "localhost" + port = opts[:port] || 8123 + mint_opts = [mode: :passive] ++ Keyword.take(opts, [:hostname, :transport_opts]) + + with {:ok, conn} <- HTTP.connect(scheme, address, port, mint_opts) do + conn = + conn + |> HTTP.put_private(:timeout, opts[:timeout] || :timer.seconds(15)) + |> maybe_put_private(:database, opts[:database]) + |> maybe_put_private(:username, opts[:username]) + |> maybe_put_private(:password, opts[:password]) + |> maybe_put_private(:settings, opts[:settings]) + handshake = Query.build("select 1, version()") params = DBConnection.Query.encode(handshake, _params = [], _opts = []) @@ -44,11 +57,13 @@ defmodule Ch.Connection do {:ok, _conn} = HTTP.close(conn) {:error, reason} - {:disconnect, reason, conn} -> + {disconnect, reason, conn} when disconnect in [:disconnect, :disconnect_and_retry] -> {:ok, _conn} = HTTP.close(conn) {:error, reason} end end + catch + _kind, reason -> {:error, reason} end defp parse_version(version) do @@ -65,7 +80,6 @@ defmodule Ch.Connection do @impl true @spec ping(conn) :: {:ok, conn} | {:disconnect, Mint.Types.error() | Error.t(), conn} def ping(conn) do - conn = maybe_reconnect(conn) headers = [{"user-agent", @user_agent}] case request(conn, "GET", "/ping", headers, _body = "", _opts = []) do @@ -103,7 +117,6 @@ defmodule Ch.Connection do @impl true def handle_declare(query, params, opts, conn) do - conn = maybe_reconnect(conn) %Query{command: command, decode: decode} = query {query_params, extra_headers, body} = params @@ -123,6 +136,9 @@ defmodule Ch.Connection do } {:ok, query, result, {conn, reader}} + else + {:error, _reason, _conn} = client_error -> client_error + {:disconnect, reason, conn} -> {:disconnect_and_retry, reason, conn} end end @@ -267,7 +283,6 @@ defmodule Ch.Connection do @impl true def handle_execute(%Query{} = query, {:stream, params}, opts, conn) do - conn = maybe_reconnect(conn) {query_params, extra_headers, body} = params path = path(conn, query_params, opts) @@ -276,7 +291,7 @@ defmodule Ch.Connection do with {:ok, conn, ref} <- send_request(conn, "POST", path, headers, :stream) do case HTTP.stream_request_body(conn, ref, body) do {:ok, conn} -> {:ok, query, ref, conn} - {:error, conn, reason} -> {:disconnect, reason, conn} + {:error, conn, reason} -> {:disconnect_and_retry, reason, conn} end end end @@ -295,12 +310,11 @@ defmodule Ch.Connection do end {:error, conn, reason} -> - {:disconnect, reason, conn} + {:disconnect_and_retry, reason, conn} end end def handle_execute(%Query{command: :insert} = query, params, opts, conn) do - conn = maybe_reconnect(conn) {query_params, extra_headers, body} = params path = path(conn, query_params, opts) @@ -313,20 +327,23 @@ defmodule Ch.Connection do request(conn, "POST", path, headers, body, opts) end - with {:ok, conn, responses} <- result do - {:ok, query, responses, conn} + case result do + {:ok, conn, responses} -> {:ok, query, responses, conn} + {:error, _reason, _conn} = client_error -> client_error + {:disconnect, reason, conn} -> {:disconnect_and_retry, reason, conn} end end def handle_execute(query, params, opts, conn) do - conn = maybe_reconnect(conn) {query_params, extra_headers, body} = params path = path(conn, query_params, opts) headers = headers(conn, extra_headers, opts) - with {:ok, conn, responses} <- request(conn, "POST", path, headers, body, opts) do - {:ok, query, responses, conn} + case request(conn, "POST", path, headers, body, opts) do + {:ok, conn, responses} -> {:ok, query, responses, conn} + {:error, _reason, _conn} = client_error -> client_error + {:disconnect, reason, conn} -> {:disconnect_and_retry, reason, conn} end end @@ -482,50 +499,6 @@ defmodule Ch.Connection do "/?" <> URI.encode_query(settings ++ query_params) end - # If the http connection was closed by the server, attempt to - # reconnect once. If the re-connect failed, return the old - # connection and let the error bubble up to the caller. - defp maybe_reconnect(conn) do - if HTTP.open?(conn) do - conn - else - opts = HTTP.get_private(conn, :connect_options) - - with {:ok, new_conn} <- do_connect(opts) do - Logger.warning( - "The connection was closed by the server; a new connection has been successfully reestablished." - ) - - # copy settings that are set dynamically (e.g. json as text) over to the new connection - maybe_put_private(new_conn, :settings, HTTP.get_private(conn, :settings)) - else - _ -> conn - end - end - end - - defp do_connect(opts) do - scheme = String.to_existing_atom(opts[:scheme] || "http") - address = opts[:hostname] || "localhost" - port = opts[:port] || 8123 - mint_opts = [mode: :passive] ++ Keyword.take(opts, [:hostname, :transport_opts]) - - with {:ok, conn} <- HTTP.connect(scheme, address, port, mint_opts) do - conn = - conn - |> HTTP.put_private(:timeout, opts[:timeout] || :timer.seconds(15)) - |> maybe_put_private(:database, opts[:database]) - |> maybe_put_private(:username, opts[:username]) - |> maybe_put_private(:password, opts[:password]) - |> maybe_put_private(:settings, opts[:settings]) - |> maybe_put_private(:connect_options, opts) - - {:ok, conn} - end - catch - _kind, reason -> {:error, reason} - end - @server_display_name_key :server_display_name @spec ensure_same_server(conn, Mint.Types.headers()) :: conn diff --git a/mix.exs b/mix.exs index 2924944..a8e6684 100644 --- a/mix.exs +++ b/mix.exs @@ -39,7 +39,7 @@ defmodule Ch.MixProject do defp deps do [ {:mint, "~> 1.0"}, - {:db_connection, "~> 2.0"}, + {:db_connection, "~> 2.9.0"}, {:jason, "~> 1.0"}, {:decimal, "~> 2.0"}, {:ecto, "~> 3.13.0", optional: true}, diff --git a/mix.lock b/mix.lock index d09cc78..1cf2d83 100644 --- a/mix.lock +++ b/mix.lock @@ -1,6 +1,6 @@ %{ "benchee": {:hex, :benchee, "1.5.0", "4d812c31d54b0ec0167e91278e7de3f596324a78a096fd3d0bea68bb0c513b10", [:mix], [{:deep_merge, "~> 1.0", [hex: :deep_merge, repo: "hexpm", optional: false]}, {:statistex, "~> 1.1", [hex: :statistex, repo: "hexpm", optional: false]}, {:table, "~> 0.1.0", [hex: :table, repo: "hexpm", optional: true]}], "hexpm", "5b075393aea81b8ae74eadd1c28b1d87e8a63696c649d8293db7c4df3eb67535"}, - "db_connection": {:hex, :db_connection, "2.8.1", "9abdc1e68c34c6163f6fb96a96532272d13ad7ca45262156ae8b7ec6d9dc4bec", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "a61a3d489b239d76f326e03b98794fb8e45168396c925ef25feb405ed09da8fd"}, + "db_connection": {:hex, :db_connection, "2.9.0", "a6a97c5c958a2d7091a58a9be40caf41ab496b0701d21e1d1abff3fa27a7f371", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "17d502eacaf61829db98facf6f20808ed33da6ccf495354a41e64fe42f9c509c"}, "decimal": {:hex, :decimal, "2.3.0", "3ad6255aa77b4a3c4f818171b12d237500e63525c2fd056699967a3e7ea20f62", [:mix], [], "hexpm", "a4d66355cb29cb47c3cf30e71329e58361cfcb37c34235ef3bf1d7bf3773aeac"}, "deep_merge": {:hex, :deep_merge, "1.0.0", "b4aa1a0d1acac393bdf38b2291af38cb1d4a52806cf7a4906f718e1feb5ee961", [:mix], [], "hexpm", "ce708e5f094b9cd4e8f2be4f00d2f4250c4095be93f8cd6d018c753894885430"}, "dialyxir": {:hex, :dialyxir, "1.4.7", "dda948fcee52962e4b6c5b4b16b2d8fa7d50d8645bbae8b8685c3f9ecb7f5f4d", [:mix], [{:erlex, ">= 0.2.8", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "b34527202e6eb8cee198efec110996c25c5898f43a4094df157f8d28f27d9efe"}, diff --git a/test/ch/faults_test.exs b/test/ch/faults_test.exs index dae3b7d..bcc7457 100644 --- a/test/ch/faults_test.exs +++ b/test/ch/faults_test.exs @@ -264,8 +264,6 @@ defmodule Ch.FaultsTest do clickhouse: clickhouse, query_options: query_options } do - test = self() - log = capture_async_log(fn -> {:ok, conn} = Ch.start_link(port: port, timeout: 100) @@ -277,10 +275,10 @@ defmodule Ch.FaultsTest do :ok = :gen_tcp.send(clickhouse, intercept_packets(mint)) :ok = :gen_tcp.send(mint, intercept_packets(clickhouse)) - spawn_link(fn -> - assert {:error, %Mint.TransportError{reason: :timeout}} = - Ch.query(conn, "select 1 + 1", [], query_options) - end) + select = + Task.async(fn -> + Ch.query(conn, "select 1 + 1", [], query_options) + end) # failed select 1 + 1 :ok = :gen_tcp.send(clickhouse, intercept_packets(mint)) @@ -293,18 +291,11 @@ defmodule Ch.FaultsTest do :ok = :gen_tcp.send(clickhouse, intercept_packets(mint)) :ok = :gen_tcp.send(mint, intercept_packets(clickhouse)) - spawn_link(fn -> - assert {:ok, %{num_rows: 1, rows: [[2]]}} = - Ch.query(conn, "select 1 + 1", [], query_options) - - send(test, :done) - end) - # select 1 + 1 :ok = :gen_tcp.send(clickhouse, intercept_packets(mint)) :ok = :gen_tcp.send(mint, intercept_packets(clickhouse)) - assert_receive :done + assert {:ok, %Ch.Result{rows: [[2]]}} = Task.await(select) end) assert log =~ "disconnected: ** (Mint.TransportError) timeout" @@ -312,7 +303,6 @@ defmodule Ch.FaultsTest do test "reconnects after closed on response", ctx do %{port: port, listen: listen, clickhouse: clickhouse, query_options: query_options} = ctx - test = self() log = capture_async_log(fn -> @@ -325,10 +315,10 @@ defmodule Ch.FaultsTest do :ok = :gen_tcp.send(clickhouse, intercept_packets(mint)) :ok = :gen_tcp.send(mint, intercept_packets(clickhouse)) - spawn_link(fn -> - assert {:error, %Mint.TransportError{reason: :closed}} = - Ch.query(conn, "select 1 + 1", [], query_options) - end) + select = + Task.async(fn -> + Ch.query(conn, "select 1 + 1", [], query_options) + end) # failed select 1 + 1 :ok = :gen_tcp.send(clickhouse, intercept_packets(mint)) @@ -342,17 +332,11 @@ defmodule Ch.FaultsTest do :ok = :gen_tcp.send(clickhouse, intercept_packets(mint)) :ok = :gen_tcp.send(mint, intercept_packets(clickhouse)) - spawn_link(fn -> - assert {:ok, %{num_rows: 1, rows: [[2]]}} = - Ch.query(conn, "select 1 + 1", [], query_options) - - send(test, :done) - end) - # select 1 + 1 :ok = :gen_tcp.send(clickhouse, intercept_packets(mint)) :ok = :gen_tcp.send(mint, intercept_packets(clickhouse)) - assert_receive :done + + assert {:ok, %{rows: [[2]]}} = Task.await(select) end) assert log =~ "disconnected: ** (Mint.TransportError) socket closed" @@ -360,7 +344,6 @@ defmodule Ch.FaultsTest do test "reconnects after Connection: close response from server", ctx do %{port: port, listen: listen, clickhouse: clickhouse, query_options: query_options} = ctx - test = self() log = capture_async_log(fn -> @@ -373,12 +356,10 @@ defmodule Ch.FaultsTest do :ok = :gen_tcp.send(clickhouse, intercept_packets(mint)) :ok = :gen_tcp.send(mint, intercept_packets(clickhouse)) - spawn_link(fn -> - assert {:ok, %{num_rows: 1, rows: [[2]]}} = - Ch.query(conn, "select 1 + 1", [], query_options) - - send(test, :done) - end) + select = + Task.async(fn -> + Ch.query(conn, "select 1 + 1", [], query_options) + end) # first select 1 + 1 :ok = :gen_tcp.send(clickhouse, intercept_packets(mint)) @@ -390,9 +371,12 @@ defmodule Ch.FaultsTest do "Connection: Close" ) + assert response =~ "Connection: Close" + :ok = :gen_tcp.send(mint, response) :ok = :gen_tcp.close(mint) - assert_receive :done + + assert {:ok, %Ch.Result{rows: [[2]]}} = Task.await(select) # reconnect {:ok, mint} = :gen_tcp.accept(listen) @@ -401,22 +385,19 @@ defmodule Ch.FaultsTest do :ok = :gen_tcp.send(clickhouse, intercept_packets(mint)) :ok = :gen_tcp.send(mint, intercept_packets(clickhouse)) - spawn_link(fn -> - assert {:ok, %{num_rows: 1, rows: [[2]]}} = - Ch.query(conn, "select 1 + 1", [], query_options) - - send(test, :done) - end) + select = + Task.async(fn -> + Ch.query(conn, "select 2 + 2", [], query_options) + end) - # select 1 + 1 + # select 2 + 2 :ok = :gen_tcp.send(clickhouse, intercept_packets(mint)) :ok = :gen_tcp.send(mint, intercept_packets(clickhouse)) - assert_receive :done + assert {:ok, %Ch.Result{rows: [[4]]}} = Task.await(select) end) - refute log =~ "disconnected: **" - assert log =~ "connection was closed by the server" + assert log =~ "disconnected: ** (Mint.HTTPError) the connection is closed" end # TODO non-chunked request @@ -424,7 +405,6 @@ defmodule Ch.FaultsTest do test "reconnects after closed before streaming request", ctx do %{port: port, listen: listen, clickhouse: clickhouse, query_options: query_options} = ctx - test = self() rows = [[1, 2], [3, 4]] stream = Stream.map(rows, fn row -> Ch.RowBinary.encode_row(row, [:u8, :u8]) end) @@ -442,15 +422,15 @@ defmodule Ch.FaultsTest do # disconnect before insert :ok = :gen_tcp.close(mint) - spawn_link(fn -> - assert {:error, %Mint.TransportError{reason: :closed}} = - Ch.query( - conn, - "insert into unknown_table(a,b) format RowBinary", - stream, - Keyword.merge(query_options, encode: false) - ) - end) + insert = + Task.async(fn -> + Ch.query( + conn, + "insert into unknown_table(a,b) format RowBinary", + stream, + Keyword.merge(query_options, encode: false) + ) + end) # reconnect {:ok, mint} = :gen_tcp.accept(listen) @@ -459,25 +439,12 @@ defmodule Ch.FaultsTest do :ok = :gen_tcp.send(clickhouse, intercept_packets(mint)) :ok = :gen_tcp.send(mint, intercept_packets(clickhouse)) - spawn_link(fn -> - assert {:error, %Ch.Error{code: 60, message: message}} = - Ch.query( - conn, - "insert into unknown_table(a,b) format RowBinary", - stream, - Keyword.merge(query_options, encode: false) - ) - - assert message =~ ~r/UNKNOWN_TABLE/ - - send(test, :done) - end) - # insert :ok = :gen_tcp.send(clickhouse, intercept_packets(mint)) :ok = :gen_tcp.send(mint, intercept_packets(clickhouse)) - assert_receive :done + assert {:error, %Ch.Error{code: 60, message: message}} = Task.await(insert) + assert message =~ ~r/UNKNOWN_TABLE/ end) assert log =~ "disconnected: ** (Mint.TransportError) socket closed" @@ -486,7 +453,6 @@ defmodule Ch.FaultsTest do test "reconnects after closed while streaming request", ctx do %{port: port, listen: listen, clickhouse: clickhouse, query_options: query_options} = ctx - test = self() rows = [[1, 2], [3, 4]] stream = Stream.map(rows, fn row -> Ch.RowBinary.encode_row(row, [:u8, :u8]) end) @@ -501,15 +467,15 @@ defmodule Ch.FaultsTest do :ok = :gen_tcp.send(clickhouse, intercept_packets(mint)) :ok = :gen_tcp.send(mint, intercept_packets(clickhouse)) - spawn_link(fn -> - assert {:error, %Mint.TransportError{reason: :closed}} = - Ch.query( - conn, - "insert into unknown_table(a,b) format RowBinary", - stream, - Keyword.merge(query_options, encode: false) - ) - end) + insert = + Task.async(fn -> + Ch.query( + conn, + "insert into unknown_table(a,b) format RowBinary", + stream, + Keyword.merge(query_options, encode: false) + ) + end) # close after first packet from mint arrives assert_receive {:tcp, ^mint, _packet} @@ -522,25 +488,12 @@ defmodule Ch.FaultsTest do :ok = :gen_tcp.send(clickhouse, intercept_packets(mint)) :ok = :gen_tcp.send(mint, intercept_packets(clickhouse)) - spawn_link(fn -> - assert {:error, %Ch.Error{code: 60, message: message}} = - Ch.query( - conn, - "insert into unknown_table(a,b) format RowBinary", - stream, - Keyword.merge(query_options, encode: false) - ) - - assert message =~ ~r/UNKNOWN_TABLE/ - - send(test, :done) - end) - # insert :ok = :gen_tcp.send(clickhouse, intercept_packets(mint)) :ok = :gen_tcp.send(mint, intercept_packets(clickhouse)) - assert_receive :done + assert {:error, %Ch.Error{code: 60, message: message}} = Task.await(insert) + assert message =~ ~r/UNKNOWN_TABLE/ end) assert log =~ "disconnected: ** (Mint.TransportError) socket closed"