Skip to content

Commit 761ed0f

Browse files
committed
connection is active now, tests added
1 parent 2a468b2 commit 761ed0f

File tree

3 files changed

+53
-8
lines changed

3 files changed

+53
-8
lines changed

lib/thrift/binary/framed/client.ex

Lines changed: 32 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ defmodule Thrift.Binary.Framed.Client do
2121
alias Thrift.TApplicationException
2222
alias Thrift.Transport.SSL
2323

24-
@immutable_tcp_opts [active: false, packet: 4, mode: :binary]
24+
@immutable_tcp_opts [active: true, packet: 4, mode: :binary]
2525

2626
@type error :: {:error, atom} | {:error, {:exception, struct}}
2727
@type success :: {:ok, binary}
@@ -168,10 +168,13 @@ defmodule Thrift.Binary.Framed.Client do
168168
end
169169

170170
@impl Connection
171-
def disconnect(info, %{sock: {transport, sock}}) do
171+
def disconnect(info, %{sock: {transport, sock}} = s) do
172172
:ok = transport.close(sock)
173173

174174
case info do
175+
{:reconnect, _} = reconnect ->
176+
{:connect, info, %{s | sock: nil}}
177+
175178
{:close, from} ->
176179
Connection.reply(from, :ok)
177180
{:stop, :normal, nil}
@@ -268,13 +271,13 @@ defmodule Thrift.Binary.Framed.Client do
268271
timeout = Keyword.get(tcp_opts, :timeout, default_timeout)
269272

270273
with :ok <- transport.send(sock, [message | serialized_args]),
271-
{:ok, message} <- transport.recv(sock, 0, timeout) do
274+
{:ok, message} <- receive_message(sock, timeout) do
272275
reply = deserialize_message_reply(message, rpc_name, seq_id)
273276
{:reply, reply, s}
274277
else
275278
{:error, :closed} = error ->
276279
if reconnect do
277-
{:connect, {:reconnect, :call, msg, from}, %{s | sock: nil}}
280+
{:disconnect, {:reconnect, {:call, msg, from}}, s}
278281
else
279282
{:disconnect, error, error, s}
280283
end
@@ -310,7 +313,7 @@ defmodule Thrift.Binary.Framed.Client do
310313

311314
{:error, :closed} = error ->
312315
if reconnect do
313-
{:connect, {:reconnect, :cast, msg}, %{s | sock: nil}}
316+
{:disconnect, {:reconnect, {:cast, msg}}, s}
314317
else
315318
{:disconnect, error, s}
316319
end
@@ -320,10 +323,32 @@ defmodule Thrift.Binary.Framed.Client do
320323
end
321324
end
322325

326+
@impl Connection
327+
def handle_info({:tcp_closed, sock}, %{reconnect: true, sock: {_transport, sock}} = s) do
328+
{:disconnect, {:reconnect, nil}, s}
329+
end
330+
331+
def handle_info(_, s) do
332+
{:noreply, s}
333+
end
334+
323335
def deserialize_message_reply(message, rpc_name, seq_id) do
324336
handle_message(Binary.deserialize(:message_begin, message), seq_id, rpc_name)
325337
end
326338

339+
defp receive_message(sock, timeout) do
340+
receive do
341+
{:tcp, ^sock, data} -> {:ok, data}
342+
{:tcp_closed, ^sock} -> {:error, :closed}
343+
{:tcp_error, ^sock, error} -> {:error, error}
344+
{:ssl, ^sock, data} -> {:ok, data}
345+
{:ssl_closed, ^sock} -> {:error, :closed}
346+
{:ssl_error, ^sock, error} -> {:error, error}
347+
after
348+
timeout -> {:error, :timeout}
349+
end
350+
end
351+
327352
defp handle_message({:ok, {:reply, seq_id, rpc_name, serialized_response}}, seq_id, rpc_name) do
328353
{:ok, serialized_response}
329354
end
@@ -402,7 +427,7 @@ defmodule Thrift.Binary.Framed.Client do
402427
end
403428
end
404429

405-
defp maybe_resend_data({:ok, s}, {:reconnect, :call, msg, from}) do
430+
defp maybe_resend_data({:ok, s}, {:reconnect, {:call, msg, from}}) do
406431
case handle_call(msg, from, s) do
407432
{:reply, reply, s} ->
408433
GenServer.reply(from, reply)
@@ -417,7 +442,7 @@ defmodule Thrift.Binary.Framed.Client do
417442
end
418443
end
419444

420-
defp maybe_resend_data({:ok, s}, {:reconnect, :cast, msg}) do
445+
defp maybe_resend_data({:ok, s}, {:reconnect, {:cast, msg}}) do
421446
case handle_cast(msg, s) do
422447
{:noreply, s} ->
423448
{:ok, s}

mix.lock

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
"makeup_elixir": {:hex, :makeup_elixir, "0.14.0", "cf8b7c66ad1cff4c14679698d532f0b5d45a3968ffbcbfd590339cb57742f1ae", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm"},
1515
"metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [:rebar3], [], "hexpm"},
1616
"mimerl": {:hex, :mimerl, "1.2.0", "67e2d3f571088d5cfd3e550c383094b47159f3eee8ffa08e64106cdf5e981be3", [:rebar3], [], "hexpm"},
17-
"nimble_parsec": {:hex, :nimble_parsec, "0.5.1", "c90796ecee0289dbb5ad16d3ad06f957b0cd1199769641c961cfe0b97db190e0", [], [], "hexpm"},
17+
"nimble_parsec": {:hex, :nimble_parsec, "0.5.1", "c90796ecee0289dbb5ad16d3ad06f957b0cd1199769641c961cfe0b97db190e0", [:mix], [], "hexpm"},
1818
"parse_trans": {:hex, :parse_trans, "3.3.0", "09765507a3c7590a784615cfd421d101aec25098d50b89d7aa1d66646bc571c1", [:rebar3], [], "hexpm"},
1919
"ranch": {:hex, :ranch, "1.7.1", "6b1fab51b49196860b733a49c07604465a47bdb78aa10c1c16a3d199f7f8c881", [:rebar3], [], "hexpm"},
2020
"ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.4", "f0eafff810d2041e93f915ef59899c923f4568f4585904d010387ed74988e77b", [:make, :mix, :rebar3], [], "hexpm"},

test/thrift/binary/framed/server_test.exs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,4 +177,24 @@ defmodule Servers.Binary.Framed.IntegrationTest do
177177
thrift_test "client methods can be called by name instead of pid", %{client_name: name} do
178178
assert {:ok, true} == Client.ping(name)
179179
end
180+
181+
@ping_reply <<128, 1, 0, 2, 0, 0, 0, 4, 112, 105, 110, 103, 0, 0, 0, 0, 2, 0, 0, 1, 0>>
182+
thrift_test "client can reconnect when connection closed by server", ctx do
183+
{:ok, sock} = :gen_tcp.listen(0, [:binary, packet: 4, active: false])
184+
{:ok, port} = :inet.port(sock)
185+
first_conn = Task.async(fn ->
186+
{:ok, conn} = :gen_tcp.accept(sock)
187+
:ok = :gen_tcp.close(conn)
188+
end)
189+
name = String.to_atom("#{ctx.client_name}_1")
190+
{:ok, client} = Client.start_link("localhost", port, name: name, reconnect: true)
191+
second_conn = Task.async(fn ->
192+
{:ok, conn} = :gen_tcp.accept(sock)
193+
{:ok, _} = :gen_tcp.recv(conn, 0)
194+
:ok = :gen_tcp.send(conn, @ping_reply)
195+
end)
196+
assert {:ok, true} == Client.ping(client)
197+
Task.await(first_conn)
198+
Task.await(second_conn)
199+
end
180200
end

0 commit comments

Comments
 (0)