Skip to content

Commit 530f169

Browse files
committed
feat: Add HTTP.Telemetry module to trace request and response.
1 parent b7ab7c9 commit 530f169

File tree

5 files changed

+362
-11
lines changed

5 files changed

+362
-11
lines changed

README.md

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,39 @@ default_ua = HTTP.Headers.user_agent()
156156
{media_type, params} = HTTP.Headers.parse_content_type("application/json; charset=utf-8")
157157
```
158158

159+
### HTTP.Telemetry
160+
Comprehensive telemetry and metrics for HTTP requests and responses.
161+
162+
```elixir
163+
# All HTTP.fetch operations automatically emit telemetry events
164+
# No configuration required - just attach handlers
165+
166+
:telemetry.attach_many(
167+
"my_handler",
168+
[
169+
[:http_fetch, :request, :start],
170+
[:http_fetch, :request, :stop],
171+
[:http_fetch, :request, :exception]
172+
],
173+
fn event_name, measurements, metadata, _config ->
174+
case event_name do
175+
[:http_fetch, :request, :start] ->
176+
IO.puts("Starting request to #{metadata.url}")
177+
[:http_fetch, :request, :stop] ->
178+
IO.puts("Request completed: #{measurements.status} in #{measurements.duration}μs")
179+
[:http_fetch, :request, :exception] ->
180+
IO.puts("Request failed: #{inspect(metadata.error)}")
181+
end
182+
end,
183+
nil
184+
)
185+
186+
# Manual telemetry events (for custom implementations)
187+
HTTP.Telemetry.request_start("GET", URI.parse("https://example.com"), %HTTP.Headers{})
188+
HTTP.Telemetry.request_stop(200, URI.parse("https://example.com"), 1024, 1500)
189+
HTTP.Telemetry.request_exception(URI.parse("https://example.com"), :timeout, 5000)
190+
```
191+
159192
### HTTP.Request
160193
Request configuration struct.
161194

lib/http.ex

Lines changed: 72 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,9 @@ defmodule HTTP do
181181
# Extract AbortController PID from FetchOptions
182182
abort_controller_pid = options.signal
183183

184+
# Emit telemetry event for request start
185+
HTTP.Telemetry.request_start(request.method, request.url, request.headers)
186+
184187
# Spawn a task to handle the asynchronous HTTP request
185188
task =
186189
Task.Supervisor.async_nolink(
@@ -221,6 +224,8 @@ defmodule HTTP do
221224
pid() | nil
222225
) :: Response.t() | {:error, term()}
223226
def handle_async_request(request, _calling_pid, abort_controller_pid) do
227+
start_time = System.monotonic_time(:microsecond)
228+
224229
# Use a try/catch block to convert `throw` from handle_httpc_response into an {:error, reason} tuple
225230
try do
226231
case Request.to_httpc_args(request) do
@@ -237,18 +242,46 @@ defmodule HTTP do
237242
end
238243

239244
# Handle response (simplified - streaming handled in handle_httpc_response)
240-
handle_response(request_id, request.url)
245+
result = handle_response(request_id, request.url)
246+
247+
# Emit telemetry event for request completion
248+
duration = System.monotonic_time(:microsecond) - start_time
249+
250+
case result do
251+
%Response{status: status, body: body} when is_binary(body) ->
252+
HTTP.Telemetry.request_stop(status, request.url, byte_size(body), duration)
253+
254+
%Response{status: status, stream: nil} ->
255+
# Non-streaming response with nil body (unlikely, but handle)
256+
HTTP.Telemetry.request_stop(status, request.url, 0, duration)
257+
258+
%Response{status: status} ->
259+
# Streaming response - we'll emit telemetry when streaming completes
260+
HTTP.Telemetry.request_stop(status, request.url, 0, duration)
261+
262+
{:error, _} ->
263+
# Error will be handled in catch block
264+
:ok
265+
end
266+
267+
result
241268

242269
{:error, reason} ->
270+
duration = System.monotonic_time(:microsecond) - start_time
271+
HTTP.Telemetry.request_exception(request.url, reason, duration)
243272
throw(reason)
244273
end
245274

246275
# Fallback for unexpected return from Request.to_httpc_args
247276
other_args ->
277+
duration = System.monotonic_time(:microsecond) - start_time
278+
HTTP.Telemetry.request_exception(request.url, {:bad_request_args, other_args}, duration)
248279
throw({:bad_request_args, other_args})
249280
end
250281
catch
251282
reason ->
283+
duration = System.monotonic_time(:microsecond) - start_time
284+
HTTP.Telemetry.request_exception(request.url, reason, duration)
252285
{:error, reason}
253286
end
254287
end
@@ -308,22 +341,33 @@ defmodule HTTP do
308341
defp should_use_streaming?(content_length) do
309342
# Stream responses larger than 5MB to avoid issues with large files
310343
case Integer.parse(content_length || "") do
311-
{size, _} when size > 5_000_000 -> true
344+
{size, _} when size > 5_000_000 ->
345+
# Emit telemetry for streaming start
346+
HTTP.Telemetry.streaming_start(size)
347+
true
348+
312349
# Stream when size is unknown
313-
_ -> content_length == nil
350+
_ ->
351+
if content_length == nil do
352+
HTTP.Telemetry.streaming_start(0)
353+
end
354+
355+
content_length == nil
314356
end
315357
end
316358

317359
defp start_httpc_stream_process(uri, headers) do
360+
start_time = System.monotonic_time(:microsecond)
361+
318362
{:ok, pid} =
319363
Task.start_link(fn ->
320-
stream_httpc_response(uri, headers)
364+
stream_httpc_response(uri, headers, start_time)
321365
end)
322366

323367
{:ok, pid}
324368
end
325369

326-
defp stream_httpc_response(uri, headers) do
370+
defp stream_httpc_response(uri, headers, start_time) do
327371
# Use the URI directly (it's already parsed)
328372
_host = uri.host
329373
_port = uri.port || 80
@@ -342,45 +386,62 @@ defmodule HTTP do
342386
sync: false
343387
) do
344388
{:ok, request_id} ->
345-
stream_loop(request_id, self())
389+
stream_loop(request_id, self(), 0, start_time)
346390

347391
{:error, reason} ->
348392
send(self(), {:stream_error, self(), reason})
349393
end
350394
end
351395

352-
defp stream_loop(request_id, caller) do
396+
defp stream_loop(request_id, caller, total_bytes, start_time) do
353397
receive do
354398
{:http, {^request_id, {:http_response, _http_version, _status, _reason}}} ->
355-
stream_loop(request_id, caller)
399+
stream_loop(request_id, caller, total_bytes, start_time)
356400

357401
{:http, {^request_id, {:http_header, _, _header_name, _, _header_value}}} ->
358-
stream_loop(request_id, caller)
402+
stream_loop(request_id, caller, total_bytes, start_time)
359403

360404
{:http, {^request_id, :http_eoh}} ->
361-
stream_loop(request_id, caller)
405+
stream_loop(request_id, caller, total_bytes, start_time)
362406

363407
{:http, {^request_id, {:http_error, reason}}} ->
364408
send(caller, {:stream_error, self(), reason})
365409

366410
{:http, {^request_id, :stream_end}} ->
411+
duration = System.monotonic_time(:microsecond) - start_time
412+
HTTP.Telemetry.streaming_stop(total_bytes, duration)
367413
send(caller, {:stream_end, self()})
368414

369415
{:http, {^request_id, {:http_chunk, chunk}}} ->
416+
chunk_size = byte_size(chunk)
417+
new_total = total_bytes + chunk_size
418+
HTTP.Telemetry.streaming_chunk(chunk_size, new_total)
370419
send(caller, {:stream_chunk, self(), to_string(chunk)})
371-
stream_loop(request_id, caller)
420+
stream_loop(request_id, caller, new_total, start_time)
372421

373422
{:http, {^request_id, {:http_body, body}}} ->
423+
chunk_size = byte_size(body)
424+
new_total = total_bytes + chunk_size
425+
HTTP.Telemetry.streaming_chunk(chunk_size, new_total)
374426
send(caller, {:stream_chunk, self(), to_string(body)})
427+
duration = System.monotonic_time(:microsecond) - start_time
428+
HTTP.Telemetry.streaming_stop(new_total, duration)
375429
send(caller, {:stream_end, self()})
376430

377431
{:http, {^request_id, {_status_line, _headers, body}}} ->
378432
# Handle complete response (non-streaming case)
379433
binary_body = if is_list(body), do: IO.iodata_to_binary(body), else: body
434+
chunk_size = byte_size(binary_body)
435+
new_total = total_bytes + chunk_size
436+
HTTP.Telemetry.streaming_chunk(chunk_size, new_total)
380437
send(caller, {:stream_chunk, self(), binary_body})
438+
duration = System.monotonic_time(:microsecond) - start_time
439+
HTTP.Telemetry.streaming_stop(new_total, duration)
381440
send(caller, {:stream_end, self()})
382441
after
383442
60_000 ->
443+
duration = System.monotonic_time(:microsecond) - start_time
444+
HTTP.Telemetry.streaming_stop(total_bytes, duration)
384445
send(caller, {:stream_error, self(), :timeout})
385446
end
386447
end

lib/http/telemetry.ex

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
defmodule HTTP.Telemetry do
2+
@moduledoc """
3+
Telemetry integration for HTTP fetch operations.
4+
5+
Provides event tracking and metrics collection for HTTP requests and responses.
6+
"""
7+
8+
@doc """
9+
Emits a telemetry event for request start.
10+
11+
## Examples
12+
iex> HTTP.Telemetry.request_start("GET", URI.parse("https://example.com"), %HTTP.Headers{})
13+
:ok
14+
"""
15+
@spec request_start(String.t(), URI.t(), HTTP.Headers.t()) :: :ok
16+
def request_start(method, url, headers) do
17+
measurements = %{start_time: System.system_time(:millisecond)}
18+
19+
metadata = %{
20+
method: method,
21+
url: url,
22+
headers: headers
23+
}
24+
25+
:telemetry.execute([:http_fetch, :request, :start], measurements, metadata)
26+
end
27+
28+
@doc """
29+
Emits a telemetry event for request completion.
30+
31+
## Examples
32+
iex> HTTP.Telemetry.request_stop(200, URI.parse("https://example.com"), 1024, 1500)
33+
:ok
34+
"""
35+
@spec request_stop(integer(), URI.t(), integer(), integer()) :: :ok
36+
def request_stop(status, url, response_size, duration_us) do
37+
measurements = %{
38+
duration: duration_us,
39+
status: status,
40+
response_size: response_size
41+
}
42+
43+
metadata = %{url: url, status: status}
44+
45+
:telemetry.execute([:http_fetch, :request, :stop], measurements, metadata)
46+
end
47+
48+
@doc """
49+
Emits a telemetry event for request failure.
50+
51+
## Examples
52+
iex> HTTP.Telemetry.request_exception(URI.parse("https://example.com"), :timeout, 5000)
53+
:ok
54+
"""
55+
@spec request_exception(URI.t(), term(), integer()) :: :ok
56+
def request_exception(url, error, duration_us) do
57+
measurements = %{duration: duration_us}
58+
metadata = %{url: url, error: error}
59+
60+
:telemetry.execute([:http_fetch, :request, :exception], measurements, metadata)
61+
end
62+
63+
@doc """
64+
Emits a telemetry event for response body reading start.
65+
66+
## Examples
67+
iex> HTTP.Telemetry.response_body_read_start(1024)
68+
:ok
69+
"""
70+
@spec response_body_read_start(integer()) :: :ok
71+
def response_body_read_start(content_length) do
72+
measurements = %{content_length: content_length}
73+
:telemetry.execute([:http_fetch, :response, :body_read_start], measurements, %{})
74+
end
75+
76+
@doc """
77+
Emits a telemetry event for response body reading completion.
78+
79+
## Examples
80+
iex> HTTP.Telemetry.response_body_read_stop(1024, 500)
81+
:ok
82+
"""
83+
@spec response_body_read_stop(integer(), integer()) :: :ok
84+
def response_body_read_stop(bytes_read, duration_us) do
85+
measurements = %{bytes_read: bytes_read, duration: duration_us}
86+
:telemetry.execute([:http_fetch, :response, :body_read_stop], measurements, %{})
87+
end
88+
89+
@doc """
90+
Emits a telemetry event for streaming start.
91+
92+
## Examples
93+
iex> HTTP.Telemetry.streaming_start(5242880)
94+
:ok
95+
"""
96+
@spec streaming_start(integer()) :: :ok
97+
def streaming_start(content_length) do
98+
measurements = %{content_length: content_length}
99+
:telemetry.execute([:http_fetch, :streaming, :start], measurements, %{})
100+
end
101+
102+
@doc """
103+
Emits a telemetry event for streaming chunk received.
104+
105+
## Examples
106+
iex> HTTP.Telemetry.streaming_chunk(8192, 16384)
107+
:ok
108+
"""
109+
@spec streaming_chunk(integer(), integer()) :: :ok
110+
def streaming_chunk(bytes_received, total_bytes) do
111+
measurements = %{
112+
bytes_received: bytes_received,
113+
total_bytes: total_bytes
114+
}
115+
116+
:telemetry.execute([:http_fetch, :streaming, :chunk], measurements, %{})
117+
end
118+
119+
@doc """
120+
Emits a telemetry event for streaming completion.
121+
122+
## Examples
123+
iex> HTTP.Telemetry.streaming_stop(5242880, 10000)
124+
:ok
125+
"""
126+
@spec streaming_stop(integer(), integer()) :: :ok
127+
def streaming_stop(total_bytes, duration_us) do
128+
measurements = %{total_bytes: total_bytes, duration: duration_us}
129+
:telemetry.execute([:http_fetch, :streaming, :stop], measurements, %{})
130+
end
131+
end

mix.lock

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,5 @@
66
"makeup_elixir": {:hex, :makeup_elixir, "1.0.1", "e928a4f984e795e41e3abd27bfc09f51db16ab8ba1aebdba2b3a575437efafc2", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "7284900d412a3e5cfd97fdaed4f5ed389b8f2b4cb49efc0eb3bd10e2febf9507"},
77
"makeup_erlang": {:hex, :makeup_erlang, "1.0.2", "03e1804074b3aa64d5fad7aa64601ed0fb395337b982d9bcf04029d68d51b6a7", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "af33ff7ef368d5893e4a267933e7744e46ce3cf1f61e2dccf53a111ed3aa3727"},
88
"nimble_parsec": {:hex, :nimble_parsec, "1.4.2", "8efba0122db06df95bfaa78f791344a89352ba04baedd3849593bfce4d0dc1c6", [:mix], [], "hexpm", "4b21398942dda052b403bbe1da991ccd03a053668d147d53fb8c4e0efe09c973"},
9+
"telemetry": {:hex, :telemetry, "1.3.0", "fedebbae410d715cf8e7062c96a1ef32ec22e764197f70cda73d82778d61e7a2", [:rebar3], [], "hexpm", "7015fc8919dbe63764f4b4b87a95b7c0996bd539e0d499be6ec9d7f3875b79e6"},
910
}

0 commit comments

Comments
 (0)