Skip to content

Commit 38a1fb5

Browse files
authored
use disconnect_and_retry (#292)
1 parent 53bfcea commit 38a1fb5

File tree

5 files changed

+84
-154
lines changed

5 files changed

+84
-154
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
# Changelog
22

3+
## Unreleased
4+
5+
- use `disconnect_and_retry` (added in DBConnection v2.9.0) instead of `disconnect` for connection errors https://github.com/plausible/ch/pull/292
6+
37
## 0.6.2 (2026-01-03)
48

59
- added support for `multipart/form-data` in queries: https://github.com/plausible/ch/pull/290 -- which allows bypassing URL length limits sometimes imposed by reverse proxies when sending queries with many parameters.

lib/ch/connection.ex

Lines changed: 30 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,20 @@ defmodule Ch.Connection do
1212
@impl true
1313
@spec connect([Ch.start_option()]) :: {:ok, conn} | {:error, Error.t() | Mint.Types.error()}
1414
def connect(opts) do
15-
with {:ok, conn} <- do_connect(opts) do
15+
scheme = String.to_existing_atom(opts[:scheme] || "http")
16+
address = opts[:hostname] || "localhost"
17+
port = opts[:port] || 8123
18+
mint_opts = [mode: :passive] ++ Keyword.take(opts, [:hostname, :transport_opts])
19+
20+
with {:ok, conn} <- HTTP.connect(scheme, address, port, mint_opts) do
21+
conn =
22+
conn
23+
|> HTTP.put_private(:timeout, opts[:timeout] || :timer.seconds(15))
24+
|> maybe_put_private(:database, opts[:database])
25+
|> maybe_put_private(:username, opts[:username])
26+
|> maybe_put_private(:password, opts[:password])
27+
|> maybe_put_private(:settings, opts[:settings])
28+
1629
handshake = Query.build("select 1, version()")
1730
params = DBConnection.Query.encode(handshake, _params = [], _opts = [])
1831

@@ -44,11 +57,13 @@ defmodule Ch.Connection do
4457
{:ok, _conn} = HTTP.close(conn)
4558
{:error, reason}
4659

47-
{:disconnect, reason, conn} ->
60+
{disconnect, reason, conn} when disconnect in [:disconnect, :disconnect_and_retry] ->
4861
{:ok, _conn} = HTTP.close(conn)
4962
{:error, reason}
5063
end
5164
end
65+
catch
66+
_kind, reason -> {:error, reason}
5267
end
5368

5469
defp parse_version(version) do
@@ -65,7 +80,6 @@ defmodule Ch.Connection do
6580
@impl true
6681
@spec ping(conn) :: {:ok, conn} | {:disconnect, Mint.Types.error() | Error.t(), conn}
6782
def ping(conn) do
68-
conn = maybe_reconnect(conn)
6983
headers = [{"user-agent", @user_agent}]
7084

7185
case request(conn, "GET", "/ping", headers, _body = "", _opts = []) do
@@ -103,7 +117,6 @@ defmodule Ch.Connection do
103117

104118
@impl true
105119
def handle_declare(query, params, opts, conn) do
106-
conn = maybe_reconnect(conn)
107120
%Query{command: command, decode: decode} = query
108121
{query_params, extra_headers, body} = params
109122

@@ -123,6 +136,9 @@ defmodule Ch.Connection do
123136
}
124137

125138
{:ok, query, result, {conn, reader}}
139+
else
140+
{:error, _reason, _conn} = client_error -> client_error
141+
{:disconnect, reason, conn} -> {:disconnect_and_retry, reason, conn}
126142
end
127143
end
128144

@@ -267,7 +283,6 @@ defmodule Ch.Connection do
267283

268284
@impl true
269285
def handle_execute(%Query{} = query, {:stream, params}, opts, conn) do
270-
conn = maybe_reconnect(conn)
271286
{query_params, extra_headers, body} = params
272287

273288
path = path(conn, query_params, opts)
@@ -276,7 +291,7 @@ defmodule Ch.Connection do
276291
with {:ok, conn, ref} <- send_request(conn, "POST", path, headers, :stream) do
277292
case HTTP.stream_request_body(conn, ref, body) do
278293
{:ok, conn} -> {:ok, query, ref, conn}
279-
{:error, conn, reason} -> {:disconnect, reason, conn}
294+
{:error, conn, reason} -> {:disconnect_and_retry, reason, conn}
280295
end
281296
end
282297
end
@@ -295,12 +310,11 @@ defmodule Ch.Connection do
295310
end
296311

297312
{:error, conn, reason} ->
298-
{:disconnect, reason, conn}
313+
{:disconnect_and_retry, reason, conn}
299314
end
300315
end
301316

302317
def handle_execute(%Query{command: :insert} = query, params, opts, conn) do
303-
conn = maybe_reconnect(conn)
304318
{query_params, extra_headers, body} = params
305319

306320
path = path(conn, query_params, opts)
@@ -313,20 +327,23 @@ defmodule Ch.Connection do
313327
request(conn, "POST", path, headers, body, opts)
314328
end
315329

316-
with {:ok, conn, responses} <- result do
317-
{:ok, query, responses, conn}
330+
case result do
331+
{:ok, conn, responses} -> {:ok, query, responses, conn}
332+
{:error, _reason, _conn} = client_error -> client_error
333+
{:disconnect, reason, conn} -> {:disconnect_and_retry, reason, conn}
318334
end
319335
end
320336

321337
def handle_execute(query, params, opts, conn) do
322-
conn = maybe_reconnect(conn)
323338
{query_params, extra_headers, body} = params
324339

325340
path = path(conn, query_params, opts)
326341
headers = headers(conn, extra_headers, opts)
327342

328-
with {:ok, conn, responses} <- request(conn, "POST", path, headers, body, opts) do
329-
{:ok, query, responses, conn}
343+
case request(conn, "POST", path, headers, body, opts) do
344+
{:ok, conn, responses} -> {:ok, query, responses, conn}
345+
{:error, _reason, _conn} = client_error -> client_error
346+
{:disconnect, reason, conn} -> {:disconnect_and_retry, reason, conn}
330347
end
331348
end
332349

@@ -482,50 +499,6 @@ defmodule Ch.Connection do
482499
"/?" <> URI.encode_query(settings ++ query_params)
483500
end
484501

485-
# If the http connection was closed by the server, attempt to
486-
# reconnect once. If the re-connect failed, return the old
487-
# connection and let the error bubble up to the caller.
488-
defp maybe_reconnect(conn) do
489-
if HTTP.open?(conn) do
490-
conn
491-
else
492-
opts = HTTP.get_private(conn, :connect_options)
493-
494-
with {:ok, new_conn} <- do_connect(opts) do
495-
Logger.warning(
496-
"The connection was closed by the server; a new connection has been successfully reestablished."
497-
)
498-
499-
# copy settings that are set dynamically (e.g. json as text) over to the new connection
500-
maybe_put_private(new_conn, :settings, HTTP.get_private(conn, :settings))
501-
else
502-
_ -> conn
503-
end
504-
end
505-
end
506-
507-
defp do_connect(opts) do
508-
scheme = String.to_existing_atom(opts[:scheme] || "http")
509-
address = opts[:hostname] || "localhost"
510-
port = opts[:port] || 8123
511-
mint_opts = [mode: :passive] ++ Keyword.take(opts, [:hostname, :transport_opts])
512-
513-
with {:ok, conn} <- HTTP.connect(scheme, address, port, mint_opts) do
514-
conn =
515-
conn
516-
|> HTTP.put_private(:timeout, opts[:timeout] || :timer.seconds(15))
517-
|> maybe_put_private(:database, opts[:database])
518-
|> maybe_put_private(:username, opts[:username])
519-
|> maybe_put_private(:password, opts[:password])
520-
|> maybe_put_private(:settings, opts[:settings])
521-
|> maybe_put_private(:connect_options, opts)
522-
523-
{:ok, conn}
524-
end
525-
catch
526-
_kind, reason -> {:error, reason}
527-
end
528-
529502
@server_display_name_key :server_display_name
530503

531504
@spec ensure_same_server(conn, Mint.Types.headers()) :: conn

mix.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ defmodule Ch.MixProject do
3939
defp deps do
4040
[
4141
{:mint, "~> 1.0"},
42-
{:db_connection, "~> 2.0"},
42+
{:db_connection, "~> 2.9.0"},
4343
{:jason, "~> 1.0"},
4444
{:decimal, "~> 2.0"},
4545
{:ecto, "~> 3.13.0", optional: true},

mix.lock

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
%{
22
"benchee": {:hex, :benchee, "1.5.0", "4d812c31d54b0ec0167e91278e7de3f596324a78a096fd3d0bea68bb0c513b10", [:mix], [{:deep_merge, "~> 1.0", [hex: :deep_merge, repo: "hexpm", optional: false]}, {:statistex, "~> 1.1", [hex: :statistex, repo: "hexpm", optional: false]}, {:table, "~> 0.1.0", [hex: :table, repo: "hexpm", optional: true]}], "hexpm", "5b075393aea81b8ae74eadd1c28b1d87e8a63696c649d8293db7c4df3eb67535"},
3-
"db_connection": {:hex, :db_connection, "2.8.1", "9abdc1e68c34c6163f6fb96a96532272d13ad7ca45262156ae8b7ec6d9dc4bec", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "a61a3d489b239d76f326e03b98794fb8e45168396c925ef25feb405ed09da8fd"},
3+
"db_connection": {:hex, :db_connection, "2.9.0", "a6a97c5c958a2d7091a58a9be40caf41ab496b0701d21e1d1abff3fa27a7f371", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "17d502eacaf61829db98facf6f20808ed33da6ccf495354a41e64fe42f9c509c"},
44
"decimal": {:hex, :decimal, "2.3.0", "3ad6255aa77b4a3c4f818171b12d237500e63525c2fd056699967a3e7ea20f62", [:mix], [], "hexpm", "a4d66355cb29cb47c3cf30e71329e58361cfcb37c34235ef3bf1d7bf3773aeac"},
55
"deep_merge": {:hex, :deep_merge, "1.0.0", "b4aa1a0d1acac393bdf38b2291af38cb1d4a52806cf7a4906f718e1feb5ee961", [:mix], [], "hexpm", "ce708e5f094b9cd4e8f2be4f00d2f4250c4095be93f8cd6d018c753894885430"},
66
"dialyxir": {:hex, :dialyxir, "1.4.7", "dda948fcee52962e4b6c5b4b16b2d8fa7d50d8645bbae8b8685c3f9ecb7f5f4d", [:mix], [{:erlex, ">= 0.2.8", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "b34527202e6eb8cee198efec110996c25c5898f43a4094df157f8d28f27d9efe"},

0 commit comments

Comments
 (0)