From 7ad0003c1d9b066a852456bcb7ab95421e0b02b3 Mon Sep 17 00:00:00 2001 From: Alexandre Chouippe Date: Fri, 27 Sep 2024 17:55:50 +0200 Subject: [PATCH 1/3] Support keepalive events on load tests --- load_test/lib/load_test/user/sse.ex | 200 +++++++++++++++++----------- 1 file changed, 122 insertions(+), 78 deletions(-) diff --git a/load_test/lib/load_test/user/sse.ex b/load_test/lib/load_test/user/sse.ex index 5917110..499ad50 100644 --- a/load_test/lib/load_test/user/sse.ex +++ b/load_test/lib/load_test/user/sse.ex @@ -1,6 +1,8 @@ defmodule SseUser do require Logger + alias SseUser.SseConnection + defmodule SseState do defstruct [ :user_name, @@ -13,41 +15,9 @@ defmodule SseUser do ] end - defp build_headers(context, topic) do - iat = :os.system_time(:second) - exp = iat + 60 * 2 - 1 - - jwt = %{ - "iss" => context.sse_jwt_issuer, - "exp" => exp, - "iat" => iat, - "aud" => context.sse_jwt_audience, - "sub" => topic - } - - jws = %{ - "alg" => "HS256" - } - - signed = JOSE.JWT.sign(context.sse_jwt_secret, jws, jwt) - {%{alg: :jose_jws_alg_hmac}, compact_signed} = JOSE.JWS.compact(signed) - - [{["Authorization"], "Bearer #{compact_signed}"}] - end - def run(context, user_name, topic, expected_messages) do url = context.sse_url - Logger.debug(fn -> - "#{user_name}: Starting SSE client on url #{url}, topic #{topic}, expecting #{length(expected_messages)} messages" - end) - - headers = build_headers(context, topic) - http_request_opts = [] - - {:ok, request_id} = - :httpc.request(:get, {url, headers}, http_request_opts, [{:sync, false}, {:stream, :self}]) - state = %SseState{ user_name: user_name, start_time: :os.system_time(:millisecond), @@ -60,42 +30,34 @@ defmodule SseUser do end } - # Adding a padding message for the connection message - wait_for_messages(state, request_id, ["" | expected_messages]) - end - - defp wait_for_messages(state, request_id, [first_message | remaining_messages]) do - Logger.debug(fn -> "#{header(state)} Waiting for message: #{first_message}" end) + SseConnection.start(context, header(state), url, topic) receive do - {:http, {_, {:error, msg}}} -> - Logger.error("#{header(state)} Http error: #{inspect(msg)}") - :ok = :httpc.cancel_request(request_id) - Stats.inc_msg_received_http_error() - raise("#{header(state)} Http error") - - {:http, {_, :stream, msg}} -> - msg = String.trim(msg) - Logger.debug(fn -> "#{header(state)} Received message: #{inspect(msg)}" end) - check_message(state, msg, first_message) - - {:http, {_, :stream_start, headers}} -> - {~c"x-sse-server", server} = List.keyfind(headers, ~c"x-sse-server", 0) - + {:sse_connected, server, request_id} -> Logger.info(fn -> - "#{header(state)} Connected, waiting: #{length(remaining_messages) + 1} messages, url #{state.url}, remote server: #{server}" + "#{header(state)} Connected, waiting for messages, url #{state.url}, remote server: #{server}" end) state.start_publisher_callback.() - msg -> - Logger.error("#{header(state)} Unexpected message #{inspect(msg)}") - :ok = :httpc.cancel_request(request_id) - raise("#{header(state)} Unexpected message") + wait_for_messages(state, request_id, expected_messages) + + other_message -> + Logger.error("#{header(state)} Unexpected message: #{inspect(other_message)}") + end + end + + defp wait_for_messages(state, request_id, [first_message | remaining_messages]) do + Logger.info(fn -> "#{header(state)} Waiting for message: #{first_message}" end) + + receive do + {:sse_event, sse_event} -> + Logger.debug(fn -> "#{header(state)} Received message: #{inspect(sse_event)}" end) + check_message(state, sse_event, first_message) after state.sse_timeout -> Logger.error( - "#{header(state)} Timeout waiting for message (timeout=#{state.sse_timeout}ms), remaining: #{length(remaining_messages) + 1} messages, url #{state.url}" + "#{header(state)} Timeout waiting for message (timeout=#{state.sse_timeout}ms), url #{state.url}" ) Stats.inc_msg_received_timeout() @@ -120,31 +82,113 @@ defmodule SseUser do end defp check_message(state, received_message, expected_message) do - clean_received_message = String.replace(received_message, ~r"id: .*\nevent: .*\n", "") + delay = :os.system_time(:millisecond) - String.to_integer(received_message.id) + Stats.observe_propagation(delay) + + Logger.info(fn -> + "#{header(state)} Propagation delay for message #{received_message.data} is #{delay}ms" + end) - try do - [_, ts, message, _, _] = String.split(clean_received_message, " ", parts: 5) - current_ts = :os.system_time(:millisecond) - delay = current_ts - String.to_integer(ts) - Stats.observe_propagation(delay) + [_ts, message, _, _] = String.split(received_message.data, " ", parts: 5) - Logger.debug(fn -> - "#{header(state)} Propagation delay for message #{message} is #{delay}ms" - end) + if message == expected_message do + Stats.inc_msg_received_ok() + else + Stats.inc_msg_received_unexpected_message() + + Logger.error( + "#{header(state)} Received unexpected message on url #{state.url}: #{inspect(received_message)} instead of #{expected_message}" + ) + end + end - if message == expected_message do - Stats.inc_msg_received_ok() - else - Stats.inc_msg_received_unexpected_message() + defmodule SseConnection do + # Start the SSE connection in a sub-process to intercept SSE events and only forward application events to the main process + def start(context, log_context, url, topic) do + sse_process = self() - Logger.error( - "#{header(state)} Received unexpected message on url #{state.url}: #{inspect(received_message)} instead of #{expected_message}" - ) + {:ok, _task} = + Task.start_link(fn -> + Logger.info("Starting SSE client on url #{url}, topic #{topic}") + headers = build_http_headers(context, topic) + + http_request_opts = [] + + {:ok, request_id} = + :httpc.request(:get, {url, headers}, http_request_opts, [ + {:sync, false}, + {:stream, :self} + ]) + + loop(log_context, request_id, sse_process) + end) + end + + defp build_http_headers(context, topic) do + iat = :os.system_time(:second) + exp = iat + 60 * 2 - 1 + + jwt = %{ + "iss" => context.sse_jwt_issuer, + "exp" => exp, + "iat" => iat, + "aud" => context.sse_jwt_audience, + "sub" => topic + } + + jws = %{ + "alg" => "HS256" + } + + signed = JOSE.JWT.sign(context.sse_jwt_secret, jws, jwt) + {%{alg: :jose_jws_alg_hmac}, compact_signed} = JOSE.JWS.compact(signed) + + [{["Authorization"], "Bearer #{compact_signed}"}] + end + + defp loop(log_context, request_id, main_process) do + receive do + {:http, {_, {:error, msg}}} -> + Logger.error("#{log_context} Http error: #{inspect(msg)}") + :ok = :httpc.cancel_request(request_id) + Stats.inc_msg_received_http_error() + raise("#{log_context} Http error") + + {:http, {_, :stream_start, headers}} -> + {~c"x-sse-server", server} = List.keyfind(headers, ~c"x-sse-server", 0) + + send(main_process, {:sse_connected, server, request_id}) + + {:http, {_, :stream, msg}} -> + sse_event = parse_sse_event(msg) + + case sse_event.event do + # Events not part of the application messages, they are filtered out + event_name when event_name in ["timeout", "ping", "reconnect"] -> + Logger.debug("Received technical SSE event: #{event_name}") + + # Event part of the application messages, they are forwarded to the main process + _other_event -> + send(main_process, {:sse_event, sse_event}) + end + + other_message -> + Logger.error("#{log_context} Unexpected message #{inspect(other_message)}") + :ok = :httpc.cancel_request(request_id) + raise("#{log_context} Unexpected message") end - rescue - e -> - Logger.error("#{header(state)} #{inspect(e)}") - Stats.inc_msg_received_error() + + loop(log_context, request_id, main_process) + end + + defp parse_sse_event(sse_event_chunk) do + String.split(sse_event_chunk, "\n") + |> Enum.reject(fn line -> String.length(String.trim(line)) == 0 end) + |> Enum.map(fn line -> + [key, value] = String.split(line, ~r/\: ?/, parts: 2) + {String.to_atom(key), value} + end) + |> Enum.into(%{}) end end end From 0d583c19ed66fef78de56d116210c4e0e2507767 Mon Sep 17 00:00:00 2001 From: Bertrand Paquet Date: Mon, 30 Sep 2024 12:07:57 +0200 Subject: [PATCH 2/3] Add log --- load_test/lib/load_test/user/sse.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/load_test/lib/load_test/user/sse.ex b/load_test/lib/load_test/user/sse.ex index 1104349..1a87f65 100644 --- a/load_test/lib/load_test/user/sse.ex +++ b/load_test/lib/load_test/user/sse.ex @@ -175,7 +175,7 @@ defmodule SseUser do other_message -> Logger.error("#{log_context.()} Unexpected message #{inspect(other_message)}") :ok = :httpc.cancel_request(request_id) - raise("#{log_context.()} Unexpected message") + raise("#{log_context.()} Unexpected message: #{inspect(other_message)}") end loop(log_context, request_id, main_process) From ca723dd535b02685078828e44b6d3933f685c615 Mon Sep 17 00:00:00 2001 From: Bertrand Paquet Date: Mon, 30 Sep 2024 12:22:29 +0200 Subject: [PATCH 3/3] Fix ? --- load_test/lib/load_test/user/sse.ex | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/load_test/lib/load_test/user/sse.ex b/load_test/lib/load_test/user/sse.ex index 1a87f65..ec22c98 100644 --- a/load_test/lib/load_test/user/sse.ex +++ b/load_test/lib/load_test/user/sse.ex @@ -42,6 +42,9 @@ defmodule SseUser do wait_for_messages(state, request_id, expected_messages) + {:dead} -> + Logger.error("#{header(state)} SSE connection died") + other_message -> Logger.error("#{header(state)} Unexpected message: #{inspect(other_message)}") end @@ -175,7 +178,8 @@ defmodule SseUser do other_message -> Logger.error("#{log_context.()} Unexpected message #{inspect(other_message)}") :ok = :httpc.cancel_request(request_id) - raise("#{log_context.()} Unexpected message: #{inspect(other_message)}") + Stats.inc_msg_received_http_error() + send(main_process, {:dead}) end loop(log_context, request_id, main_process)