Skip to content

Commit 8a83da9

Browse files
authored
Detailed observability (#37)
This PR rename some existing observability metrics and introduce more detailed metrics: On message brokering: - Rename `current_connections` to `concurrent_subscription` and split subscriptions per issuer, - Rename `connections[created / released]` to `subscription_lifecycle[created/released]` and split per issuer, - Add `subscription_duration_ms` to track the duration of SSE subscriptions and split per issuer, - Rename `messages[received/published]` by `messages[published/sent]` and split per issuer (published = message published on the internal API, sent = message sent through a SSE connection), - Add `topic_count` to track how many topics are stored in the in-memory message retention. On system metrics: - Add the boolean `stopping` that turned to `true` when a node is in the graceful shutdown process. Add http metrics: - `http_request_count[public/internal, status]` count the number of http request on each interfaces and per HTTP status, it is incremented at the end of the request processing. - `http_request_duration_ms[public/internal]` report the duration of http request on each interfaces.
1 parent 176d30e commit 8a83da9

File tree

11 files changed

+262
-130
lines changed

11 files changed

+262
-130
lines changed

neurow/lib/neurow/application.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ defmodule Neurow.Application do
9393
end
9494

9595
MetricsPlugExporter.setup()
96-
Stats.setup()
96+
Neurow.Stats.setup()
9797
JOSE.json_module(:jiffy)
9898

9999
opts = [strategy: :one_for_one, name: Neurow.Supervisor]

neurow/lib/neurow/broker/receiver_shard.ex

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,10 @@ defmodule Neurow.Broker.ReceiverShard do
2121
GenServer.call(shard, {:flush_history})
2222
end
2323

24+
def topic_count(shard) do
25+
GenServer.call(shard, {:topic_count})
26+
end
27+
2428
@impl true
2529
def init(shard) do
2630
:ok =
@@ -73,4 +77,9 @@ defmodule Neurow.Broker.ReceiverShard do
7377
:ets.delete_all_objects(table_1)
7478
{:reply, :ok, {table_0, table_1}}
7579
end
80+
81+
@impl true
82+
def handle_call({:topic_count}, _from, {table_0, table_1}) do
83+
{:reply, :ets.info(table_0)[:size] + :ets.info(table_1)[:size], {table_0, table_1}}
84+
end
7685
end

neurow/lib/neurow/broker/receiver_shard_manager.ex

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,20 +35,27 @@ defmodule Neurow.Broker.ReceiverShardManager do
3535
{:noreply, state}
3636
end
3737

38-
def all_pids(fun) do
38+
def receiver_shards() do
3939
Enum.map(0..(@shards - 1), fn shard ->
40-
fun.({shard, Neurow.Broker.ReceiverShard.build_name(shard)})
40+
{shard, Neurow.Broker.ReceiverShard.build_name(shard)}
4141
end)
4242
end
4343

4444
def rotate do
45-
Stats.inc_history_rotate()
45+
Neurow.Stats.MessageBroker.inc_history_rotate()
4646

47-
all_pids(fn {_, pid} ->
47+
Enum.each(receiver_shards(), fn {_shard, pid} ->
4848
send(pid, {:rotate})
4949
end)
5050
end
5151

52+
def topic_count do
53+
receiver_shards()
54+
|> Enum.map(fn {_shard, pid} -> pid end)
55+
|> Enum.map(&Neurow.Broker.ReceiverShard.topic_count/1)
56+
|> Enum.sum()
57+
end
58+
5259
@impl true
5360
def handle_call({:rotate}, _, state) do
5461
rotate()
@@ -57,7 +64,7 @@ defmodule Neurow.Broker.ReceiverShardManager do
5764

5865
@impl true
5966
def handle_call({:flush_history}, _from, state) do
60-
all_pids(fn {_, pid} ->
67+
Enum.each(receiver_shards(), fn {_shard, pid} ->
6168
pid |> Neurow.Broker.ReceiverShard.flush_history()
6269
end)
6370

@@ -74,7 +81,7 @@ defmodule Neurow.Broker.ReceiverShardManager do
7481
end
7582

7683
def create_receivers() do
77-
all_pids(fn {shard, pid} ->
84+
Enum.map(receiver_shards(), fn {shard, pid} ->
7885
Supervisor.child_spec({Neurow.Broker.ReceiverShard, shard}, id: pid)
7986
end)
8087
end

neurow/lib/neurow/configuration.ex

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,11 @@ defmodule Neurow.Configuration do
55
GenServer.start_link(__MODULE__, default, name: __MODULE__)
66
end
77

8+
def issuers() do
9+
Application.fetch_env!(:neurow, :public_api_authentication)[:issuers]
10+
|> Map.keys()
11+
end
12+
813
def public_api_issuer_jwks(issuer_name) do
914
GenServer.call(__MODULE__, {:public_api_issuer_jwks, issuer_name})
1015
end

neurow/lib/neurow/internal_api/endpoint.ex

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ defmodule Neurow.InternalApi.Endpoint do
1515
&Neurow.Configuration.internal_api_verbose_authentication_errors/0,
1616
max_lifetime: &Neurow.Configuration.internal_api_jwt_max_lifetime/0,
1717
send_forbidden: &Neurow.InternalApi.Endpoint.send_forbidden/3,
18-
inc_error_callback: &Stats.inc_jwt_errors_internal/0,
18+
inc_error_callback: &Neurow.Stats.Security.inc_jwt_errors_internal/0,
1919
exclude_path_prefixes: ["/ping", "/nodes", "/cluster_size_above", "/history"]
2020
)
2121

@@ -78,7 +78,7 @@ defmodule Neurow.InternalApi.Endpoint do
7878

7979
post "/v1/publish" do
8080
case extract_params(conn) do
81-
{:ok, messages, topics} ->
81+
{:ok, issuer, messages, topics} ->
8282
publish_timestamp = :os.system_time(:millisecond)
8383

8484
nb_publish =
@@ -96,7 +96,7 @@ defmodule Neurow.InternalApi.Endpoint do
9696
end)
9797
end)
9898

99-
Stats.inc_msg_received()
99+
Neurow.Stats.MessageBroker.inc_message_published(issuer)
100100

101101
conn
102102
|> put_resp_header("content-type", "application/json")
@@ -126,7 +126,7 @@ defmodule Neurow.InternalApi.Endpoint do
126126
full_topics =
127127
Enum.map(PublishRequest.topics(publish_request), fn topic -> "#{issuer}-#{topic}" end)
128128

129-
{:ok, PublishRequest.messages(publish_request), full_topics}
129+
{:ok, issuer, PublishRequest.messages(publish_request), full_topics}
130130
else
131131
error ->
132132
error

neurow/lib/neurow/public_api/endpoint.ex

Lines changed: 30 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,6 @@ defmodule Neurow.PublicApi.Endpoint do
22
require Logger
33
import Plug.Conn
44
use Plug.Router
5-
6-
plug(:monitor_sse)
7-
85
plug(:preflight_request)
96

107
plug(Neurow.JwtAuthPlug,
@@ -14,7 +11,7 @@ defmodule Neurow.PublicApi.Endpoint do
1411
verbose_authentication_errors:
1512
&Neurow.Configuration.public_api_verbose_authentication_errors/0,
1613
max_lifetime: &Neurow.Configuration.public_api_jwt_max_lifetime/0,
17-
inc_error_callback: &Stats.inc_jwt_errors_public/0
14+
inc_error_callback: &Neurow.Stats.Security.inc_jwt_errors_public/0
1815
)
1916

2017
plug(:match)
@@ -52,6 +49,9 @@ defmodule Neurow.PublicApi.Endpoint do
5249
:ok = Neurow.StopListener.subscribe()
5350
:ok = Phoenix.PubSub.subscribe(Neurow.PubSub, topic)
5451

52+
{:ok, _pid} =
53+
Neurow.PublicApi.SSEMonitor.start_link(issuer)
54+
5555
last_event_id = extract_last_event_id(conn)
5656

5757
case last_event_id do
@@ -71,7 +71,7 @@ defmodule Neurow.PublicApi.Endpoint do
7171
conn
7272
|> send_chunked(200)
7373
|> import_history(topic, last_event_id)
74-
|> loop(timeout_ms, keep_alive_ms, now_ms, now_ms, exp)
74+
|> loop(timeout_ms, keep_alive_ms, now_ms, now_ms, exp, issuer)
7575
end
7676

7777
_ ->
@@ -186,7 +186,7 @@ defmodule Neurow.PublicApi.Endpoint do
186186
{conn, sent}
187187
end
188188

189-
def loop(conn, sse_timeout_ms, keep_alive_ms, last_message_ms, last_ping_ms, jwt_exp_s) do
189+
def loop(conn, sse_timeout_ms, keep_alive_ms, last_message_ms, last_ping_ms, jwt_exp_s, issuer) do
190190
now_ms = :os.system_time(:millisecond)
191191

192192
cond do
@@ -204,7 +204,8 @@ defmodule Neurow.PublicApi.Endpoint do
204204
keep_alive_ms,
205205
last_message_ms,
206206
now_ms,
207-
jwt_exp_s
207+
jwt_exp_s,
208+
issuer
208209
)
209210

210211
# JWT token expired, send a credentials_expired event and stop the connection
@@ -226,7 +227,8 @@ defmodule Neurow.PublicApi.Endpoint do
226227
receive do
227228
{:pubsub_message, message} ->
228229
conn = write_chunk(conn, message)
229-
Stats.inc_msg_published()
230+
231+
Neurow.Stats.MessageBroker.inc_message_sent(issuer)
230232
new_last_message_ms = :os.system_time(:millisecond)
231233

232234
conn
@@ -235,7 +237,8 @@ defmodule Neurow.PublicApi.Endpoint do
235237
keep_alive_ms,
236238
new_last_message_ms,
237239
new_last_message_ms,
238-
jwt_exp_s
240+
jwt_exp_s,
241+
issuer
239242
)
240243

241244
:shutdown ->
@@ -244,10 +247,26 @@ defmodule Neurow.PublicApi.Endpoint do
244247

245248
# Consume useless messages to avoid memory overflow
246249
_ ->
247-
conn |> loop(sse_timeout_ms, keep_alive_ms, last_message_ms, last_ping_ms, jwt_exp_s)
250+
loop(
251+
conn,
252+
sse_timeout_ms,
253+
keep_alive_ms,
254+
last_message_ms,
255+
last_ping_ms,
256+
jwt_exp_s,
257+
issuer
258+
)
248259
after
249260
next_tick_ms ->
250-
conn |> loop(sse_timeout_ms, keep_alive_ms, last_message_ms, last_ping_ms, jwt_exp_s)
261+
loop(
262+
conn,
263+
sse_timeout_ms,
264+
keep_alive_ms,
265+
last_message_ms,
266+
last_ping_ms,
267+
jwt_exp_s,
268+
issuer
269+
)
251270
end
252271
end
253272
end
@@ -301,9 +320,4 @@ defmodule Neurow.PublicApi.Endpoint do
301320
String.match?(origin, allowed_origin)
302321
end)
303322
end
304-
305-
defp monitor_sse(conn, _) do
306-
{:ok, _pid} = SSEMonitor.start_link(conn)
307-
conn
308-
end
309323
end
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
defmodule Neurow.PublicApi.SSEMonitor do
2+
require Logger
3+
use GenServer
4+
5+
def start_link(issuer) do
6+
GenServer.start_link(__MODULE__, {issuer, :os.system_time()})
7+
end
8+
9+
@impl true
10+
def init({issuer, start_time}) do
11+
Neurow.Stats.MessageBroker.inc_subscriptions(issuer)
12+
Process.flag(:trap_exit, true)
13+
{:ok, {issuer, start_time}}
14+
end
15+
16+
@impl true
17+
def terminate(:normal, {issuer, start_time}) do
18+
track_subscription_end(issuer, start_time)
19+
Logger.debug("SSE connection end")
20+
end
21+
22+
def terminate(reason, {issuer, start_time}) do
23+
track_subscription_end(issuer, start_time)
24+
Logger.debug("SSE connection terminated: #{inspect(reason)}")
25+
end
26+
27+
defp track_subscription_end(issuer, start_time) do
28+
duration_ms = System.convert_time_unit(:os.system_time() - start_time, :native, :millisecond)
29+
Neurow.Stats.MessageBroker.dec_subscriptions(issuer, duration_ms)
30+
end
31+
end

neurow/lib/sse_monitor.ex

Lines changed: 0 additions & 26 deletions
This file was deleted.

0 commit comments

Comments
 (0)