Skip to content

Commit 1423367

Browse files
authored
Implement auto reconnect in load tests (#44)
1 parent f16a03a commit 1423367

File tree

8 files changed

+124
-72
lines changed

8 files changed

+124
-72
lines changed

load_test/config/runtime.exs

Lines changed: 19 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -4,53 +4,38 @@ config :logger, :console,
44
format: "$time $metadata[$level] $message\n",
55
level: String.to_atom(System.get_env("LOG_LEVEL") || "info")
66

7-
config :load_test, port: String.to_integer(System.get_env("PORT") || "2999")
8-
config :load_test, nb_user: String.to_integer(System.get_env("NB_USER") || "1")
9-
10-
config :load_test, sse_user_agent: System.get_env("SSE_USER_AGENT") || "neurow_load_test/1.0"
11-
config :load_test, sse_timeout: String.to_integer(System.get_env("SSE_TIMEOUT") || "900000")
12-
config :load_test, sse_url: System.get_env("SSE_URL") || "http://localhost:4000/v1/subscribe"
13-
config :load_test, sse_jwt_issuer: System.get_env("SSE_JWT_ISSUER") || "test_issuer1"
14-
157
config :load_test,
16-
sse_jwt_expiration: String.to_integer(System.get_env("SSE_JWT_EXPIRATION") || "86400")
17-
18-
config :load_test,
19-
sse_jwt_secret:
20-
System.get_env("SSE_JWT_SECRET") || "966KljJz--KyzyBnMOrFXfAkq9XMqWwPgdBV3cKTxsc"
8+
port: String.to_integer(System.get_env("PORT") || "2999"),
9+
nb_user: String.to_integer(System.get_env("NB_USER") || "1")
2110

2211
config :load_test,
23-
sse_jwt_audience: System.get_env("SSE_JWT_AUDIENCE") || "public_api"
12+
sse_user_agent: System.get_env("SSE_USER_AGENT") || "neurow_load_test/1.0",
13+
sse_timeout: String.to_integer(System.get_env("SSE_TIMEOUT") || "900000")
2414

2515
config :load_test,
26-
publish_url: System.get_env("PUBLISH_URL") || "http://localhost:3000/v1/publish"
27-
28-
config :load_test, publish_timeout: String.to_integer(System.get_env("PUBLISH_TIMEOUT") || "5000")
29-
30-
config :load_test,
31-
publish_http_pool_size: String.to_integer(System.get_env("PUBLISH_HTTP_POOL_SIZE") || "2000")
32-
33-
config :load_test, publish_jwt_issuer: System.get_env("PUBLISH_JWT_ISSUER") || "test_issuer1"
16+
sse_url: System.get_env("SSE_URL") || "http://localhost:4000/v1/subscribe",
17+
sse_jwt_issuer: System.get_env("SSE_JWT_ISSUER") || "test_issuer1",
18+
sse_jwt_expiration: String.to_integer(System.get_env("SSE_JWT_EXPIRATION") || "86400"),
19+
sse_jwt_secret:
20+
System.get_env("SSE_JWT_SECRET") || "966KljJz--KyzyBnMOrFXfAkq9XMqWwPgdBV3cKTxsc",
21+
sse_jwt_audience: System.get_env("SSE_JWT_AUDIENCE") || "public_api",
22+
sse_auto_reconnect: String.to_atom(System.get_env("SSE_AUTO_RECONNECT") || "false")
3423

3524
config :load_test,
25+
publish_url: System.get_env("PUBLISH_URL") || "http://localhost:3000/v1/publish",
26+
publish_timeout: String.to_integer(System.get_env("PUBLISH_TIMEOUT") || "5000"),
27+
publish_http_pool_size: String.to_integer(System.get_env("PUBLISH_HTTP_POOL_SIZE") || "2000"),
28+
publish_jwt_issuer: System.get_env("PUBLISH_JWT_ISSUER") || "test_issuer1",
3629
publish_jwt_secret:
37-
System.get_env("PUBLISH_JWT_SECRET") || "nLjJdNLlpdv3W4Xk7MyVCAZKD-hvza6FQ4yhUUFnjmg"
38-
39-
config :load_test,
30+
System.get_env("PUBLISH_JWT_SECRET") || "nLjJdNLlpdv3W4Xk7MyVCAZKD-hvza6FQ4yhUUFnjmg",
4031
publish_jwt_audience: System.get_env("PUBLISH_JWT_AUDIENCE") || "internal_api"
4132

4233
config :load_test,
4334
delay_between_messages_min:
44-
String.to_integer(System.get_env("DELAY_BETWEEN_MESSAGES_MIN") || "500")
45-
46-
config :load_test,
35+
String.to_integer(System.get_env("DELAY_BETWEEN_MESSAGES_MIN") || "500"),
4736
delay_between_messages_max:
48-
String.to_integer(System.get_env("DELAY_BETWEEN_MESSAGES_MAX") || "5000")
49-
50-
config :load_test,
51-
number_of_messages_min: String.to_integer(System.get_env("NUMBER_OF_MESSAGES_MIN") || "10")
52-
53-
config :load_test,
37+
String.to_integer(System.get_env("DELAY_BETWEEN_MESSAGES_MAX") || "5000"),
38+
number_of_messages_min: String.to_integer(System.get_env("NUMBER_OF_MESSAGES_MIN") || "10"),
5439
number_of_messages_max: String.to_integer(System.get_env("NUMBER_OF_MESSAGES_MAX") || "50")
5540

5641
config :load_test,

load_test/lib/load_test/main.ex

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ defmodule LoadTest.Main do
1111
:sse_jwt_audience,
1212
:sse_jwt_expiration,
1313
:sse_user_agent,
14+
:sse_auto_reconnect,
1415
:publish_url,
1516
:publish_timeout,
1617
:publish_jwt_issuer,
@@ -38,6 +39,7 @@ defmodule LoadTest.Main do
3839
{:ok, sse_jwt_secret} = Application.fetch_env(:load_test, :sse_jwt_secret)
3940
{:ok, sse_jwt_audience} = Application.fetch_env(:load_test, :sse_jwt_audience)
4041
{:ok, sse_jwt_expiration} = Application.fetch_env(:load_test, :sse_jwt_expiration)
42+
{:ok, sse_auto_reconnect} = Application.fetch_env(:load_test, :sse_auto_reconnect)
4143

4244
{:ok, publish_url} = Application.fetch_env(:load_test, :publish_url)
4345
{:ok, publish_timeout} = Application.fetch_env(:load_test, :publish_timeout)
@@ -64,6 +66,7 @@ defmodule LoadTest.Main do
6466
sse_jwt_secret: JOSE.JWK.from_oct(sse_jwt_secret),
6567
sse_jwt_audience: sse_jwt_audience,
6668
sse_jwt_expiration: sse_jwt_expiration,
69+
sse_auto_reconnect: sse_auto_reconnect,
6770
publish_url: publish_url,
6871
publish_timeout: publish_timeout,
6972
publish_jwt_issuer: publish_jwt_issuer,
@@ -76,6 +79,7 @@ defmodule LoadTest.Main do
7679
}
7780

7881
Logger.warning("SSE base url: #{sse_url}")
82+
Logger.warning("SSE Auto reconnect: #{sse_auto_reconnect}")
7983
Logger.warning("Publish base url: #{publish_url}")
8084
Logger.warning("User agent: #{sse_user_agent}")
8185
Logger.warning("Starting load test with #{nb_user} users")

load_test/lib/load_test/user/publisher.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ defmodule LoadTest.User.Publisher do
7575
duration = :os.system_time(:millisecond) - state.start_time
7676

7777
Logger.info(fn ->
78-
"publisher_#{state.user_name}: All messages published (#{state.message_count}) to #{state.publish_url}, duration: #{duration / 1000}"
78+
"publisher_#{state.user_name}: User ok, #{state.message_count} messages published, duration: #{duration / 1000}"
7979
end)
8080
end
8181

load_test/lib/load_test/user/sse.ex

Lines changed: 84 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,19 @@ defmodule SseUser do
99
:current_message,
1010
:url,
1111
:sse_timeout,
12-
:start_publisher_callback
12+
:start_publisher_callback,
13+
:connect_callback,
14+
:conn_pid,
15+
:stream_ref,
16+
:last_event_id,
17+
:reconnect
1318
]
1419
end
1520

16-
defp build_headers(context, topic) do
21+
defp last_event_id_header(nil), do: []
22+
defp last_event_id_header(last_event_id), do: [{"Last-Event-ID", last_event_id}]
23+
24+
defp build_headers(context, state, topic) do
1725
iat = :os.system_time(:second)
1826
exp = iat + context.sse_jwt_expiration
1927

@@ -35,7 +43,7 @@ defmodule SseUser do
3543
[
3644
{["Authorization"], "Bearer #{compact_signed}"},
3745
{["User-Agent"], context.sse_user_agent}
38-
]
46+
] ++ last_event_id_header(state.last_event_id)
3947
end
4048

4149
def run(context, user_name, topic, expected_messages) do
@@ -45,8 +53,6 @@ defmodule SseUser do
4553
"#{user_name}: Starting SSE client on url #{url}, topic #{topic}, expecting #{length(expected_messages)} messages"
4654
end)
4755

48-
headers = build_headers(context, topic)
49-
5056
parsed_url = URI.parse(url)
5157

5258
opts = %{
@@ -59,10 +65,18 @@ defmodule SseUser do
5965
}
6066
}
6167

62-
{:ok, conn_pid} = :gun.open(String.to_atom(parsed_url.host), parsed_url.port, opts)
63-
{:ok, proto} = :gun.await_up(conn_pid)
64-
Logger.debug(fn -> "Connection established with proto #{inspect(proto)}" end)
65-
stream_ref = :gun.get(conn_pid, parsed_url.path, headers)
68+
connect_callback = fn state ->
69+
headers = build_headers(context, state, topic)
70+
{:ok, conn_pid} = :gun.open(String.to_atom(parsed_url.host), parsed_url.port, opts)
71+
{:ok, proto} = :gun.await_up(conn_pid)
72+
Logger.debug(fn -> "Connection established with proto #{inspect(proto)}" end)
73+
stream_ref = :gun.get(conn_pid, parsed_url.path, headers)
74+
state = Map.put(state, :conn_pid, conn_pid)
75+
state = Map.put(state, :stream_ref, stream_ref)
76+
state
77+
end
78+
79+
reconnect = if context.sse_auto_reconnect, do: 0, else: -1
6680

6781
state = %SseState{
6882
user_name: user_name,
@@ -73,71 +87,108 @@ defmodule SseUser do
7387
sse_timeout: context.sse_timeout,
7488
start_publisher_callback: fn ->
7589
LoadTest.Main.start_publisher(context, user_name, topic, expected_messages)
76-
end
90+
end,
91+
connect_callback: connect_callback,
92+
last_event_id: nil,
93+
reconnect: reconnect
7794
}
7895

79-
wait_for_messages(state, conn_pid, stream_ref, expected_messages)
96+
state = connect_callback.(state)
97+
98+
wait_for_messages(state, expected_messages)
8099
end
81100

82-
defp wait_for_messages(state, conn_pid, stream_ref, [first_message | remaining_messages]) do
101+
defp reconnect(state, messages, reason) do
102+
:ok = :gun.close(state.conn_pid)
103+
104+
if state.reconnect == -1 do
105+
Logger.error(fn -> "#{header(state)} Connection closed: #{reason}" end)
106+
Stats.inc_msg_received_http_error()
107+
raise("#{header(state)} Connection closed")
108+
end
109+
110+
Logger.error("#{header(state)} Connection closed, reconnecting: #{reason}")
111+
Stats.inc_reconnect()
112+
state = state.connect_callback.(state)
113+
state = Map.put(state, :reconnect, state.reconnect + 1)
114+
wait_for_messages(state, messages)
115+
end
116+
117+
defp wait_for_messages(state, [first_message | remaining_messages]) do
83118
Logger.debug(fn -> "#{header(state)} Waiting for message: #{first_message}" end)
84119

85-
result = :gun.await(conn_pid, stream_ref, state.sse_timeout)
120+
result = :gun.await(state.conn_pid, state.stream_ref, state.sse_timeout)
86121

87122
case result do
88123
{:response, _, code, _} when code == 200 ->
89124
Logger.debug(
90125
"#{header(state)} Connected, waiting: #{length(remaining_messages) + 1} messages, url #{state.url}"
91126
)
92127

93-
state.start_publisher_callback.()
94-
wait_for_messages(state, conn_pid, stream_ref, [first_message | remaining_messages])
128+
state =
129+
if state.start_publisher_callback do
130+
state.start_publisher_callback.()
131+
Map.put(state, :start_publisher_callback, nil)
132+
else
133+
state
134+
end
135+
136+
wait_for_messages(state, [first_message | remaining_messages])
95137

96138
{:response, _, code, _} ->
97139
Logger.error("#{header(state)} Error: #{inspect(code)}")
98-
:ok = :gun.close(conn_pid)
140+
:ok = :gun.close(state.conn_pid)
99141
Stats.inc_msg_received_http_error()
100142
raise("#{header(state)} Error")
101143

102-
{:data, _, msg} ->
144+
{:data, :fin, _} ->
145+
reconnect(state, [first_message | remaining_messages], "{:data, :fin, _}")
146+
147+
{:error, {:stream_error, {:stream_error, :internal_error, :"Stream reset by server."}}} ->
148+
reconnect(state, [first_message | remaining_messages], "Stream reset by server.")
149+
150+
{:data, :nofin, msg} ->
103151
msg = String.trim(msg)
104152
Logger.debug(fn -> "#{header(state)} Received message: #{inspect(msg)}" end)
105153

106154
if msg =~ "event: ping" do
107-
wait_for_messages(state, conn_pid, stream_ref, [first_message | remaining_messages])
155+
wait_for_messages(state, [first_message | remaining_messages])
108156
else
109-
if check_message(state, msg, first_message) == :error do
110-
:ok = :gun.close(conn_pid)
111-
raise("#{header(state)} Message check error")
157+
case check_message(state, msg, first_message) do
158+
:error ->
159+
:ok = :gun.close(state.conn_pid)
160+
raise("#{header(state)} Message check error")
161+
162+
state ->
163+
state = Map.put(state, :current_message, state.current_message + 1)
164+
wait_for_messages(state, remaining_messages)
112165
end
113-
114-
state = Map.put(state, :current_message, state.current_message + 1)
115-
wait_for_messages(state, conn_pid, stream_ref, remaining_messages)
116166
end
117167

118168
msg ->
119169
Logger.error("#{header(state)} Unexpected message #{inspect(msg)}")
120-
:ok = :gun.close(conn_pid)
170+
:ok = :gun.close(state.conn_pid)
121171
raise("#{header(state)} Unexpected message")
122172
end
123173
end
124174

125-
defp wait_for_messages(state, conn_pid, _, []) do
126-
:ok = :gun.close(conn_pid)
127-
Logger.info("#{header(state)} All messages received, url #{state.url}")
175+
defp wait_for_messages(state, []) do
176+
:ok = :gun.close(state.conn_pid)
177+
Logger.info("#{header(state)} All messages received")
128178
end
129179

130180
defp header(state) do
131181
now = :os.system_time(:millisecond)
132182

133-
"#{state.user_name} / #{now - state.start_time} ms / #{state.current_message} < #{state.all_messages}: "
183+
"#{state.user_name} / #{now - state.start_time} ms / #{state.current_message} < #{state.all_messages} / #{state.reconnect}: "
134184
end
135185

136186
defp check_message(state, received_message, expected_message) do
137-
clean_received_message = String.replace(received_message, ~r"id: .*\nevent: .*\n", "")
187+
[first, _, third | _] = String.split(received_message, "\n")
188+
[_, id] = String.split(first, " ")
138189

139190
try do
140-
[_, ts, message, _, _] = String.split(clean_received_message, " ", parts: 5)
191+
[_, ts, message, _, _] = String.split(third, " ", parts: 5)
141192
current_ts = :os.system_time(:millisecond)
142193
delay = current_ts - String.to_integer(ts)
143194
Stats.observe_propagation(delay)
@@ -148,7 +199,8 @@ defmodule SseUser do
148199

149200
if message == expected_message do
150201
Stats.inc_msg_received_ok()
151-
:ok
202+
state = Map.put(state, :last_event_id, id)
203+
state
152204
else
153205
Stats.inc_msg_received_unexpected_message()
154206

load_test/lib/stats.ex

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,11 @@ defmodule Stats do
1313
help: "Messages counter"
1414
)
1515

16+
Gauge.declare(
17+
name: :reconnect,
18+
help: "Reconnection counter"
19+
)
20+
1621
Gauge.declare(
1722
name: :users,
1823
labels: [:status],
@@ -33,6 +38,7 @@ defmodule Stats do
3338
Gauge.set([name: :messages, labels: [:received, :http_error]], 0)
3439
Gauge.set([name: :messages, labels: [:published, :ok]], 0)
3540
Gauge.set([name: :messages, labels: [:published, :error]], 0)
41+
Gauge.set([name: :reconnect], 0)
3642

3743
Gauge.declare(
3844
name: :memory_usage,
@@ -92,6 +98,10 @@ defmodule Stats do
9298
Gauge.inc(name: :messages, labels: [:published, :error])
9399
end
94100

101+
def inc_reconnect() do
102+
Gauge.inc(name: :reconnect)
103+
end
104+
95105
def observe_propagation(delay) do
96106
Summary.observe([name: :propagation_delay], delay)
97107
end

load_test/terraform/instances.tf

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ docker run --rm -d \
3232
-e DELAY_BETWEEN_MESSAGES_MAX \
3333
-e NUMBER_OF_MESSAGES_MIN \
3434
-e NUMBER_OF_MESSAGES_MAX \
35+
-e SSE_AUTO_RECONNECT \
3536
-e SSE_TIMEOUT \
3637
-e INITIAL_DELAY_MAX \
3738
-e PUBLISH_HTTP_POOL_SIZE \
@@ -52,6 +53,7 @@ echo "instances:
5253
- propagation_delay_sum
5354
- propagation_delay_count
5455
- memory_usage
56+
- reconnect
5557
- erlang_vm_memory_atom_bytes_total
5658
- erlang_vm_memory_bytes_total
5759
- erlang_vm_memory_dets_tables

neurow/config/runtime.exs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,17 +29,16 @@ config :neurow,
2929
public_api_context_path: System.get_env("PUBLIC_API_CONTEXT_PATH") || "",
3030
sse_timeout: String.to_integer(System.get_env("SSE_TIMEOUT") || "900000"),
3131
sse_keepalive: String.to_integer(System.get_env("SSE_KEEPALIVE") || "600000"),
32-
max_header_value_length: String.to_integer(System.get_env("MAX_HEADER_VALUE_LENGTH") || "8192")
32+
max_header_value_length: String.to_integer(System.get_env("MAX_HEADER_VALUE_LENGTH") || "8192"),
33+
ssl_keyfile: System.get_env("SSL_KEYFILE"),
34+
ssl_certfile: System.get_env("SSL_CERTFILE")
3335

3436
# Internal API configuration
3537
config :neurow,
3638
internal_api_port: String.to_integer(System.get_env("INTERNAL_API_PORT") || "3000"),
3739
internal_api_jwt_max_lifetime:
3840
String.to_integer(System.get_env("INTERNAL_API_JWT_MAX_LIFETIME") || "1500")
3941

40-
config :neurow, ssl_keyfile: System.get_env("SSL_KEYFILE")
41-
config :neurow, ssl_certfile: System.get_env("SSL_CERTFILE")
42-
4342
config :neurow,
4443
history_min_duration: String.to_integer(System.get_env("HISTORY_MIN_DURATION") || "30")
4544

neurow/lib/neurow/public_api/endpoint.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ defmodule Neurow.PublicApi.Endpoint do
172172
end
173173

174174
defp process_history(conn, last_event_id, sent, [first | rest]) do
175-
{_, message} = first
175+
{_topic, message} = first
176176

177177
if message.timestamp > last_event_id do
178178
conn = write_chunk(conn, message)

0 commit comments

Comments
 (0)