-
Notifications
You must be signed in to change notification settings - Fork 39
allow client to reconnect if other side closes connection #478
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 11 commits
0dfad21
d3b0edc
ea2e246
6f97f56
a0be2d9
719ad3f
29bca94
3120978
2db60eb
0827acf
64bd3ad
46ce001
a1c23e3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,7 +21,7 @@ defmodule Thrift.Binary.Framed.Client do | |
| alias Thrift.TApplicationException | ||
| alias Thrift.Transport.SSL | ||
|
|
||
| @immutable_tcp_opts [active: false, packet: 4, mode: :binary] | ||
| @immutable_tcp_opts [active: true, packet: 4, mode: :binary] | ||
|
|
||
| @type error :: {:error, atom} | {:error, {:exception, struct}} | ||
| @type success :: {:ok, binary} | ||
|
|
@@ -42,6 +42,7 @@ defmodule Thrift.Binary.Framed.Client do | |
| {:tcp_opts, [tcp_option]} | ||
| | {:ssl_opts, [SSL.option()]} | ||
| | {:gen_server_opts, [genserver_call_option]} | ||
| | {:reconnect, boolean} | ||
|
|
||
| @type options :: [option] | ||
|
|
||
|
|
@@ -55,7 +56,8 @@ defmodule Thrift.Binary.Framed.Client do | |
| ssl_opts: [SSL.option()], | ||
| timeout: integer, | ||
| sock: {:gen_tcp, :gen_tcp.socket()} | {:ssl, :ssl.sslsocket()}, | ||
| seq_id: integer | ||
| seq_id: integer, | ||
| reconnect: boolean | ||
| } | ||
| defstruct host: nil, | ||
| port: nil, | ||
|
|
@@ -64,7 +66,8 @@ defmodule Thrift.Binary.Framed.Client do | |
| ssl_opts: nil, | ||
| timeout: 5000, | ||
| sock: nil, | ||
| seq_id: 0 | ||
| seq_id: 0, | ||
| reconnect: false | ||
| end | ||
|
|
||
| require Logger | ||
|
|
@@ -74,6 +77,7 @@ defmodule Thrift.Binary.Framed.Client do | |
| def init({host, port, opts}) do | ||
| tcp_opts = Keyword.get(opts, :tcp_opts, []) | ||
| ssl_opts = Keyword.get(opts, :ssl_opts, []) | ||
| reconnect = Keyword.get(opts, :reconnect, false) | ||
|
|
||
| {timeout, tcp_opts} = Keyword.pop(tcp_opts, :timeout, 5000) | ||
|
|
||
|
|
@@ -82,7 +86,8 @@ defmodule Thrift.Binary.Framed.Client do | |
| port: port, | ||
| tcp_opts: tcp_opts, | ||
| ssl_opts: ssl_opts, | ||
| timeout: timeout | ||
| timeout: timeout, | ||
| reconnect: reconnect | ||
| } | ||
|
|
||
| {:connect, :init, s} | ||
|
|
@@ -124,6 +129,9 @@ defmodule Thrift.Binary.Framed.Client do | |
| Additionally, the options `:name`, `:debug`, and `:spawn_opt`, if specified, | ||
| will be passed to the underlying `GenServer`. See `GenServer.start_link/3` | ||
| for details on these options. | ||
|
|
||
| The `:reconnect` option if setted to `true` forces client to reopen tcp connection whenever | ||
| it closed. | ||
| """ | ||
| @spec start_link(String.t(), 0..65_535, options) :: GenServer.on_start() | ||
| def start_link(host, port, opts) do | ||
|
|
@@ -143,6 +151,9 @@ defmodule Thrift.Binary.Framed.Client do | |
| |> Keyword.merge(@immutable_tcp_opts) | ||
| |> Keyword.put_new(:send_timeout, 1000) | ||
|
|
||
| # reset sequence id for newly created connection | ||
| s = %{s | seq_id: 0} | ||
|
|
||
| case :gen_tcp.connect(host, port, opts, timeout) do | ||
| {:ok, sock} -> | ||
| maybe_ssl_handshake(sock, host, port, s) | ||
|
|
@@ -158,10 +169,13 @@ defmodule Thrift.Binary.Framed.Client do | |
| end | ||
|
|
||
| @impl Connection | ||
| def disconnect(info, %{sock: {transport, sock}}) do | ||
| def disconnect(info, %{sock: {transport, sock}} = s) do | ||
| :ok = transport.close(sock) | ||
|
|
||
| case info do | ||
| :reconnect -> | ||
| {:connect, info, %{s | sock: nil}} | ||
|
|
||
| {:close, from} -> | ||
| Connection.reply(from, :ok) | ||
| {:stop, :normal, nil} | ||
|
|
@@ -245,15 +259,15 @@ defmodule Thrift.Binary.Framed.Client do | |
|
|
||
| def handle_call( | ||
| {:call, rpc_name, serialized_args, tcp_opts}, | ||
| _, | ||
| _from, | ||
| %{sock: {transport, sock}, seq_id: seq_id, timeout: default_timeout} = s | ||
| ) do | ||
| s = %{s | seq_id: seq_id + 1} | ||
| message = Binary.serialize(:message_begin, {:call, seq_id, rpc_name}) | ||
| timeout = Keyword.get(tcp_opts, :timeout, default_timeout) | ||
|
|
||
| with :ok <- transport.send(sock, [message | serialized_args]), | ||
| {:ok, message} <- transport.recv(sock, 0, timeout) do | ||
| {:ok, message} <- receive_message(transport, sock, timeout) do | ||
| reply = deserialize_message_reply(message, rpc_name, seq_id) | ||
| {:reply, reply, s} | ||
| else | ||
|
|
@@ -291,10 +305,59 @@ defmodule Thrift.Binary.Framed.Client do | |
| end | ||
| end | ||
|
|
||
| @impl Connection | ||
| def handle_info({:tcp, sock, _data}, %{reconnect: true, sock: {:gen_tcp, sock}} = s) do | ||
| {:disconnect, :reconnect, s} | ||
| end | ||
|
|
||
| def handle_info({:tcp_closed, sock}, %{reconnect: true, sock: {:gen_tcp, sock}} = s) do | ||
| {:disconnect, :reconnect, s} | ||
| end | ||
|
|
||
| def handle_info({:tcp_error, sock, _error}, %{reconnect: true, sock: {:gen_tcp, sock}} = s) do | ||
| {:disconnect, :reconnect, s} | ||
| end | ||
|
|
||
| def handle_info({:ssl, sock, _data}, %{reconnect: true, sock: {:ssl, sock}} = s) do | ||
| {:disconnect, :reconnect, s} | ||
| end | ||
|
|
||
| def handle_info({:ssl_closed, sock}, %{reconnect: true, sock: {:ssl, sock}} = s) do | ||
| {:disconnect, :reconnect, s} | ||
| end | ||
|
|
||
| def handle_info({:ssl_error, sock, _error}, %{reconnect: true, sock: {:ssl, sock}} = s) do | ||
| {:disconnect, :reconnect, s} | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we should always disconnect and make the decision on reconnect there. If
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In this case many tests fails. If I disconnect on any TCP or SSL message with
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure - should I correct these test cases?
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hi, any comments about my last question?
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We need to think about we handle these cases, and handle them. For example we could do try do
GenServer.call(...)
catch
:exit, {{:error, _} = error, _} ->
error
:exit, {:noproc, _} ->
{:error, :closed}
end
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My opinion is not to handle these cases and let users of library to handle it. But in this case some tests should be rewritten |
||
| end | ||
|
|
||
| def handle_info(_, s) do | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should handle the |
||
| {:noreply, s} | ||
| end | ||
|
|
||
| def deserialize_message_reply(message, rpc_name, seq_id) do | ||
| handle_message(Binary.deserialize(:message_begin, message), seq_id, rpc_name) | ||
| end | ||
|
|
||
| defp receive_message(:gen_tcp, sock, timeout) do | ||
| receive do | ||
| {:tcp, ^sock, data} -> {:ok, data} | ||
| {:tcp_closed, ^sock} -> {:error, :closed} | ||
| {:tcp_error, ^sock, error} -> {:error, error} | ||
| after | ||
| timeout -> {:error, :timeout} | ||
| end | ||
| end | ||
|
|
||
| defp receive_message(:ssl, sock, timeout) do | ||
| receive do | ||
| {:ssl, ^sock, data} -> {:ok, data} | ||
| {:ssl_closed, ^sock} -> {:error, :closed} | ||
| {:ssl_error, ^sock, error} -> {:error, error} | ||
| after | ||
| timeout -> {:error, :timeout} | ||
| end | ||
| end | ||
cybernetlab marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| defp handle_message({:ok, {:reply, seq_id, rpc_name, serialized_response}}, seq_id, rpc_name) do | ||
| {:ok, serialized_response} | ||
| end | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.