diff --git a/load_test/lib/load_test/user/sse.ex b/load_test/lib/load_test/user/sse.ex index 36f50ea..ec22c98 100644 --- a/load_test/lib/load_test/user/sse.ex +++ b/load_test/lib/load_test/user/sse.ex @@ -1,6 +1,8 @@ defmodule SseUser do require Logger + alias SseUser.SseConnection + defmodule SseState do defstruct [ :user_name, @@ -13,41 +15,9 @@ defmodule SseUser do ] end - defp build_headers(context, topic) do - iat = :os.system_time(:second) - exp = iat + context.sse_jwt_expiration - - jwt = %{ - "iss" => context.sse_jwt_issuer, - "exp" => exp, - "iat" => iat, - "aud" => context.sse_jwt_audience, - "sub" => topic - } - - jws = %{ - "alg" => "HS256" - } - - signed = JOSE.JWT.sign(context.sse_jwt_secret, jws, jwt) - {%{alg: :jose_jws_alg_hmac}, compact_signed} = JOSE.JWS.compact(signed) - - [{["Authorization"], "Bearer #{compact_signed}"}] - end - def run(context, user_name, topic, expected_messages) do url = context.sse_url - Logger.debug(fn -> - "#{user_name}: Starting SSE client on url #{url}, topic #{topic}, expecting #{length(expected_messages)} messages" - end) - - headers = build_headers(context, topic) - http_request_opts = [] - - {:ok, request_id} = - :httpc.request(:get, {url, headers}, http_request_opts, [{:sync, false}, {:stream, :self}]) - state = %SseState{ user_name: user_name, start_time: :os.system_time(:millisecond), @@ -60,42 +30,37 @@ defmodule SseUser do end } - # Adding a padding message for the connection message - wait_for_messages(state, request_id, ["" | expected_messages]) - end - - defp wait_for_messages(state, request_id, [first_message | remaining_messages]) do - Logger.debug(fn -> "#{header(state)} Waiting for message: #{first_message}" end) + SseConnection.start(context, fn -> header(state) end, url, topic) receive do - {:http, {_, {:error, msg}}} -> - Logger.error("#{header(state)} Http error: #{inspect(msg)}") - :ok = :httpc.cancel_request(request_id) - Stats.inc_msg_received_http_error() - raise("#{header(state)} Http error") - - {:http, {_, :stream, msg}} -> - msg = String.trim(msg) - Logger.debug(fn -> "#{header(state)} Received message: #{inspect(msg)}" end) - check_message(state, msg, first_message) - - {:http, {_, :stream_start, headers}} -> - {~c"x-sse-server", server} = List.keyfind(headers, ~c"x-sse-server", 0) - + {:sse_connected, server, request_id} -> Logger.info(fn -> - "#{header(state)} Connected, waiting: #{length(remaining_messages) + 1} messages, url #{state.url}, remote server: #{server}" + "#{header(state)} Connected, waiting for messages, url #{state.url}, remote server: #{server}" end) state.start_publisher_callback.() - msg -> - Logger.error("#{header(state)} Unexpected message #{inspect(msg)}") - :ok = :httpc.cancel_request(request_id) - raise("#{header(state)} Unexpected message") + wait_for_messages(state, request_id, expected_messages) + + {:dead} -> + Logger.error("#{header(state)} SSE connection died") + + other_message -> + Logger.error("#{header(state)} Unexpected message: #{inspect(other_message)}") + end + end + + defp wait_for_messages(state, request_id, [first_message | remaining_messages]) do + Logger.info(fn -> "#{header(state)} Waiting for message: #{first_message}" end) + + receive do + {:sse_event, sse_event} -> + Logger.debug(fn -> "#{header(state)} Received message: #{inspect(sse_event)}" end) + check_message(state, sse_event, first_message) after state.sse_timeout -> Logger.error( - "#{header(state)} Timeout waiting for message (timeout=#{state.sse_timeout}ms), remaining: #{length(remaining_messages) + 1} messages, url #{state.url}" + "#{header(state)} Timeout waiting for message (timeout=#{state.sse_timeout}ms), url #{state.url}" ) Stats.inc_msg_received_timeout() @@ -120,31 +85,114 @@ defmodule SseUser do end defp check_message(state, received_message, expected_message) do - clean_received_message = String.replace(received_message, ~r"id: .*\nevent: .*\n", "") + delay = :os.system_time(:millisecond) - String.to_integer(received_message.id) + Stats.observe_propagation(delay) - try do - [_, ts, message, _, _] = String.split(clean_received_message, " ", parts: 5) - current_ts = :os.system_time(:millisecond) - delay = current_ts - String.to_integer(ts) - Stats.observe_propagation(delay) + Logger.info(fn -> + "#{header(state)} Propagation delay for message #{received_message.data} is #{delay}ms" + end) - Logger.debug(fn -> - "#{header(state)} Propagation delay for message #{message} is #{delay}ms" - end) + [_ts, message, _, _] = String.split(received_message.data, " ", parts: 5) - if message == expected_message do - Stats.inc_msg_received_ok() - else - Stats.inc_msg_received_unexpected_message() + if message == expected_message do + Stats.inc_msg_received_ok() + else + Stats.inc_msg_received_unexpected_message() - Logger.error( - "#{header(state)} Received unexpected message on url #{state.url}: #{inspect(received_message)} instead of #{expected_message}" - ) + Logger.error( + "#{header(state)} Received unexpected message on url #{state.url}: #{inspect(received_message)} instead of #{expected_message}" + ) + end + end + + defmodule SseConnection do + # Start the SSE connection in a sub-process to intercept SSE events and only forward application events to the main process + def start(context, log_context, url, topic) do + sse_process = self() + + {:ok, _task} = + Task.start_link(fn -> + Logger.info("Starting SSE client on url #{url}, topic #{topic}") + headers = build_http_headers(context, topic) + + http_request_opts = [] + + {:ok, request_id} = + :httpc.request(:get, {url, headers}, http_request_opts, [ + {:sync, false}, + {:stream, :self} + ]) + + loop(log_context, request_id, sse_process) + end) + end + + defp build_http_headers(context, topic) do + iat = :os.system_time(:second) + exp = iat + context.sse_jwt_expiration + + jwt = %{ + "iss" => context.sse_jwt_issuer, + "exp" => exp, + "iat" => iat, + "aud" => context.sse_jwt_audience, + "sub" => topic + } + + jws = %{ + "alg" => "HS256" + } + + signed = JOSE.JWT.sign(context.sse_jwt_secret, jws, jwt) + {%{alg: :jose_jws_alg_hmac}, compact_signed} = JOSE.JWS.compact(signed) + + [{["Authorization"], "Bearer #{compact_signed}"}] + end + + defp loop(log_context, request_id, main_process) do + receive do + {:http, {_, {:error, msg}}} -> + Logger.error("#{log_context.()} Http error: #{inspect(msg)}") + :ok = :httpc.cancel_request(request_id) + Stats.inc_msg_received_http_error() + raise("#{log_context.()} Http error") + + {:http, {_, :stream_start, headers}} -> + {~c"x-sse-server", server} = List.keyfind(headers, ~c"x-sse-server", 0) + + send(main_process, {:sse_connected, server, request_id}) + + {:http, {_, :stream, msg}} -> + sse_event = parse_sse_event(msg) + + case sse_event.event do + # Events not part of the application messages, they are filtered out + event_name when event_name in ["timeout", "ping", "reconnect"] -> + Logger.debug("Received technical SSE event: #{event_name}") + + # Event part of the application messages, they are forwarded to the main process + _other_event -> + send(main_process, {:sse_event, sse_event}) + end + + other_message -> + Logger.error("#{log_context.()} Unexpected message #{inspect(other_message)}") + :ok = :httpc.cancel_request(request_id) + Stats.inc_msg_received_http_error() + send(main_process, {:dead}) end - rescue - e -> - Logger.error("#{header(state)} #{inspect(e)}") - Stats.inc_msg_received_error() + + loop(log_context, request_id, main_process) + end + + defp parse_sse_event(sse_event_chunk) do + String.split(sse_event_chunk, "\n") + |> Enum.reject(fn line -> String.length(String.trim(line)) == 0 end) + |> Enum.map(fn line -> + [key, value] = String.split(line, ~r/\: ?/, parts: 2) + {String.to_atom(key), value} + end) + |> Enum.into(%{}) end end end