Skip to content

Commit 5c97c68

Browse files
[#397]: Preserve the order of the response messages when the stream response process handles ":consume_response" (#396)
* Simple bug demonstration and bugfix * Migrate responses from list to erlang queue * Use queue.to_list instead of queue.peek in tests * Use factory to build message for ordering test * Fix bye luis test and encoding * Fix mint adapter connection process test after responses migration to erlang queue * Update test/grpc/client/adapters/mint/connection_process_test.exs --------- Co-authored-by: Paulo Valente <[email protected]>
1 parent c26acb9 commit 5c97c68

File tree

4 files changed

+67
-45
lines changed

4 files changed

+67
-45
lines changed

lib/grpc/client/adapters/mint/stream_response_process.ex

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ defmodule GRPC.Client.Adapters.Mint.StreamResponseProcess do
7575
grpc_stream: stream,
7676
send_headers_or_trailers: send_headers_or_trailers?,
7777
buffer: <<>>,
78-
responses: [],
78+
responses: :queue.new(),
7979
done: false,
8080
from: nil,
8181
compressor: nil
@@ -100,7 +100,7 @@ defmodule GRPC.Client.Adapters.Mint.StreamResponseProcess do
100100
{{_, message}, rest} ->
101101
# TODO add code here to handle compressor headers
102102
response = codec.decode(message, res_mod)
103-
new_responses = [{:ok, response} | responses]
103+
new_responses = :queue.in({:ok, response}, responses)
104104
new_state = %{state | buffer: rest, responses: new_responses}
105105
{:reply, :ok, new_state, {:continue, :produce_response}}
106106

@@ -117,7 +117,7 @@ defmodule GRPC.Client.Adapters.Mint.StreamResponseProcess do
117117
)
118118
when type in @header_types do
119119
state = update_compressor({type, headers}, state)
120-
new_responses = [get_headers_response(headers, type) | responses]
120+
new_responses = :queue.in(get_headers_response(headers, type), responses)
121121
{:reply, :ok, %{state | responses: new_responses}, {:continue, :produce_response}}
122122
end
123123

@@ -131,7 +131,8 @@ defmodule GRPC.Client.Adapters.Mint.StreamResponseProcess do
131131

132132
case get_headers_response(headers, type) do
133133
{:error, _rpc_error} = error ->
134-
{:reply, :ok, %{state | responses: [error | responses]}, {:continue, :produce_response}}
134+
{:reply, :ok, %{state | responses: :queue.in(error, responses)},
135+
{:continue, :produce_response}}
135136

136137
_any ->
137138
{:reply, :ok, state, {:continue, :produce_response}}
@@ -143,7 +144,8 @@ defmodule GRPC.Client.Adapters.Mint.StreamResponseProcess do
143144
_from,
144145
%{responses: responses} = state
145146
) do
146-
{:reply, :ok, %{state | responses: [error | responses]}, {:continue, :produce_response}}
147+
{:reply, :ok, %{state | responses: :queue.in(error, responses)},
148+
{:continue, :produce_response}}
147149
end
148150

149151
def handle_call({:consume_response, :done}, _from, state) do
@@ -152,19 +154,23 @@ defmodule GRPC.Client.Adapters.Mint.StreamResponseProcess do
152154

153155
@impl true
154156
def handle_continue(:produce_response, state) do
155-
case state do
156-
%{from: nil} ->
157+
no_responses? = :queue.is_empty(state.responses)
158+
without_from? = is_nil(state.from)
159+
160+
cond do
161+
without_from? ->
157162
{:noreply, state}
158163

159-
%{from: from, responses: [], done: true} ->
160-
GenServer.reply(from, nil)
164+
no_responses? and state.done ->
165+
GenServer.reply(state.from, nil)
161166
{:stop, :normal, state}
162167

163-
%{responses: []} ->
168+
no_responses? ->
164169
{:noreply, state}
165170

166-
%{responses: [response | rest], from: from} ->
167-
GenServer.reply(from, response)
171+
true ->
172+
{{:value, response}, rest} = :queue.out(state.responses)
173+
GenServer.reply(state.from, response)
168174
{:noreply, %{state | responses: rest, from: nil}}
169175
end
170176
end

test/grpc/client/adapters/mint/connection_process_test.exs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,7 @@ defmodule GRPC.Client.Adapters.Mint.ConnectionProcessTest do
208208
assert {:reply, :ok, new_state} = response
209209
assert %{} == new_state.requests
210210
response_state = :sys.get_state(response_pid)
211-
assert [] == response_state.responses
211+
assert :queue.is_empty(response_state.responses)
212212
assert true == response_state.done
213213
end
214214
end
@@ -342,8 +342,9 @@ defmodule GRPC.Client.Adapters.Mint.ConnectionProcessTest do
342342

343343
response_state = :sys.get_state(response_pid)
344344

345-
assert [error: %Mint.TransportError{reason: :closed, __exception__: true}] ==
346-
response_state.responses
345+
assert :queue.to_list(response_state.responses) == [
346+
error: %Mint.TransportError{reason: :closed, __exception__: true}
347+
]
347348
end
348349
end
349350

@@ -382,7 +383,7 @@ defmodule GRPC.Client.Adapters.Mint.ConnectionProcessTest do
382383
assert new_state.conn.state == :closed
383384
assert_receive {:elixir_grpc, :connection_down, pid}, 500
384385
response_state = :sys.get_state(response_pid)
385-
assert [error: "the connection is closed"] == response_state.responses
386+
assert :queue.to_list(response_state.responses) == [error: "the connection is closed"]
386387
assert true == response_state.done
387388
assert pid == self()
388389
end
@@ -410,7 +411,7 @@ defmodule GRPC.Client.Adapters.Mint.ConnectionProcessTest do
410411
assert new_state.conn.state == :closed
411412
assert_receive {:elixir_grpc, :connection_down, pid}, 500
412413
response_state = :sys.get_state(response_pid)
413-
assert [error: "the connection is closed"] == response_state.responses
414+
assert :queue.to_list(response_state.responses) == [error: "the connection is closed"]
414415
assert true == response_state.done
415416
assert pid == self()
416417
end

test/grpc/client/adapters/mint/stream_response_process_test.exs

Lines changed: 39 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ defmodule GRPC.Client.Adapters.Mint.StreamResponseProcessTest do
1010
done: false,
1111
from: nil,
1212
grpc_stream: build(:client_stream),
13-
responses: [],
13+
responses: :queue.new(),
1414
compressor: nil,
1515
send_headers_or_trailers: false
1616
}
@@ -41,7 +41,7 @@ defmodule GRPC.Client.Adapters.Mint.StreamResponseProcessTest do
4141
state: state,
4242
data: {_, _, full_message}
4343
} do
44-
expected_response_message = build(:hello_reply_rpc)
44+
expected_response_message = {:ok, build(:hello_reply_rpc)}
4545

4646
response =
4747
StreamResponseProcess.handle_call(
@@ -52,35 +52,32 @@ defmodule GRPC.Client.Adapters.Mint.StreamResponseProcessTest do
5252

5353
assert {:reply, :ok, new_state, {:continue, :produce_response}} = response
5454
assert new_state.buffer == <<>>
55-
assert [{:ok, response_message}] = new_state.responses
56-
assert expected_response_message == response_message
55+
assert :queue.to_list(new_state.responses) == [expected_response_message]
5756
end
5857

5958
test "append incoming message to existing buffer", %{state: state, data: {part1, part2, _}} do
6059
state = %{state | buffer: part1}
61-
expected_response_message = build(:hello_reply_rpc)
60+
expected_response_message = {:ok, build(:hello_reply_rpc)}
6261

6362
response =
6463
StreamResponseProcess.handle_call({:consume_response, {:data, part2}}, self(), state)
6564

6665
assert {:reply, :ok, new_state, {:continue, :produce_response}} = response
6766
assert new_state.buffer == <<>>
68-
assert [{:ok, response_message}] = new_state.responses
69-
assert expected_response_message == response_message
67+
assert :queue.to_list(new_state.responses) == [expected_response_message]
7068
end
7169

7270
test "decode message and put rest on buffer", %{state: state, data: {_, _, full}} do
7371
extra_data = <<0, 1, 2>>
7472
data = full <> extra_data
75-
expected_response_message = build(:hello_reply_rpc)
73+
expected_response_message = {:ok, build(:hello_reply_rpc)}
7674

7775
response =
7876
StreamResponseProcess.handle_call({:consume_response, {:data, data}}, self(), state)
7977

8078
assert {:reply, :ok, new_state, {:continue, :produce_response}} = response
8179
assert new_state.buffer == extra_data
82-
assert [{:ok, response_message}] = new_state.responses
83-
assert expected_response_message == response_message
80+
assert :queue.to_list(new_state.responses) == [expected_response_message]
8481
end
8582
end
8683

@@ -106,9 +103,10 @@ defmodule GRPC.Client.Adapters.Mint.StreamResponseProcessTest do
106103
state
107104
)
108105

106+
expected_error = {:error, %GRPC.RPCError{message: "Internal Server Error", status: 2}}
107+
109108
assert {:reply, :ok, new_state, {:continue, :produce_response}} = response
110-
assert [{:error, error}] = new_state.responses
111-
assert %GRPC.RPCError{message: "Internal Server Error", status: 2} == error
109+
assert :queue.to_list(new_state.responses) == [expected_error]
112110
end,
113111
do: [
114112
{%{type: :headers, is_header_enabled: false}},
@@ -139,17 +137,10 @@ defmodule GRPC.Client.Adapters.Mint.StreamResponseProcessTest do
139137
state
140138
)
141139

142-
assert {:reply, :ok, new_state, {:continue, :produce_response}} = response
143-
assert [{type_response, response_headers}] = new_state.responses
144-
assert type == type_response
140+
expected_response = {type, Map.new(headers)}
145141

146-
assert %{
147-
"content-length" => "0",
148-
"content-type" => "application/grpc+proto",
149-
"grpc-message" => "",
150-
"grpc-status" => "0",
151-
"server" => "Cowboy"
152-
} == response_headers
142+
assert {:reply, :ok, new_state, {:continue, :produce_response}} = response
143+
assert :queue.to_list(new_state.responses) == [expected_response]
153144
end,
154145
do: [{:headers}, {:trailers}]
155146
)
@@ -174,7 +165,7 @@ defmodule GRPC.Client.Adapters.Mint.StreamResponseProcessTest do
174165
)
175166

176167
assert {:reply, :ok, new_state, {:continue, :produce_response}} = response
177-
assert [] == new_state.responses
168+
assert :queue.is_empty(new_state.responses)
178169
end,
179170
do: [{:headers}, {:trailers}]
180171
)
@@ -238,8 +229,7 @@ defmodule GRPC.Client.Adapters.Mint.StreamResponseProcessTest do
238229
)
239230

240231
assert {:reply, :ok, new_state, {:continue, :produce_response}} = response
241-
assert [response_error] = new_state.responses
242-
assert response_error == error
232+
assert :queue.to_list(new_state.responses) == [error]
243233
end
244234
end
245235

@@ -282,11 +272,11 @@ defmodule GRPC.Client.Adapters.Mint.StreamResponseProcessTest do
282272
end
283273

284274
test "send response to caller when there are responses in the queue", %{state: state} do
285-
state = %{state | from: {self(), :tag}, done: false, responses: [1, 2]}
275+
state = %{state | from: {self(), :tag}, done: false, responses: :queue.from_list([1, 2])}
286276
{:noreply, new_state} = StreamResponseProcess.handle_continue(:produce_response, state)
287277
%{from: from, responses: responses} = new_state
288-
assert nil == from
289-
assert [2] == responses
278+
assert is_nil(from)
279+
assert :queue.to_list(responses) == [2]
290280
assert_receive {:tag, 1}
291281
end
292282
end
@@ -321,6 +311,27 @@ defmodule GRPC.Client.Adapters.Mint.StreamResponseProcessTest do
321311
assert {:ok, build(:hello_reply_rpc)} == data
322312
end
323313

314+
test "preserves response messages order", %{pid: pid} do
315+
hello_luis =
316+
<<0, 0, 0, 0, 12, 10, 10, 72, 101, 108, 108, 111, 32, 76, 117, 105, 115>>
317+
318+
bye_luis =
319+
<<0, 0, 0, 0, 10, 10, 8, 66, 121, 101, 32, 76, 117, 105, 115>>
320+
321+
stream = StreamResponseProcess.build_stream(pid)
322+
StreamResponseProcess.consume(pid, :data, hello_luis)
323+
StreamResponseProcess.consume(pid, :data, bye_luis)
324+
StreamResponseProcess.done(pid)
325+
326+
expected_elements =
327+
[
328+
ok: build(:hello_reply_rpc),
329+
ok: build(:bye_reply_rpc)
330+
]
331+
332+
assert Enum.to_list(stream) == expected_elements
333+
end
334+
324335
test_with_params(
325336
"emits headers to stream",
326337
%{pid: pid},

test/support/factory.ex

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,4 +78,8 @@ defmodule GRPC.Factory do
7878
def hello_reply_rpc_factory do
7979
%Helloworld.HelloReply{message: "Hello Luis"}
8080
end
81+
82+
def bye_reply_rpc_factory do
83+
%Helloworld.HelloReply{message: "Bye Luis"}
84+
end
8185
end

0 commit comments

Comments
 (0)