Skip to content

Commit 67861cb

Browse files
committed
feat: Add streaming support and complete HTTP.Headers integration
- Add streaming response support for large HTTP responses - Implement streaming body processing with read_all/1 and read_as_json/1 - Integrate HTTP.Headers struct throughout the codebase - Add streaming detection based on content-length and size thresholds - Fix error handling for malformed URLs and network errors - Update Response struct to support streaming with stream field - Maintain backward compatibility with existing API
1 parent eb06f59 commit 67861cb

File tree

3 files changed

+251
-59
lines changed

3 files changed

+251
-59
lines changed

lib/http.ex

Lines changed: 131 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,20 @@ defmodule HTTP do
212212
| {{:http_version, integer(), String.t()}, [{atom() | String.t(), String.t()}],
213213
binary()}
214214

215+
216+
defp handle_response(request_id, url) do
217+
receive do
218+
{:http, {^request_id, response_from_httpc}} ->
219+
response = handle_httpc_response(response_from_httpc, url)
220+
{:ok, response}
221+
_ ->
222+
throw(:request_interrupted_or_unexpected_message)
223+
after
224+
120_000 ->
225+
throw(:request_timeout)
226+
end
227+
end
228+
215229
# Internal function, not part of public API
216230
@doc false
217231
@spec handle_async_request(
@@ -224,35 +238,23 @@ defmodule HTTP do
224238
try do
225239
case Request.to_httpc_args(request) do
226240
[method, request_tuple, options, client_options] ->
227-
# Send the request and get the RequestId (PID of the httpc client process)
228-
case :httpc.request(method, request_tuple, options, client_options) do
229-
{:ok, request_id} ->
230-
# If an AbortController was provided, link it to this request_id
231-
if abort_controller_pid && is_pid(abort_controller_pid) do
232-
HTTP.AbortController.set_request_id(abort_controller_pid, request_id)
233-
end
234-
235-
# Now, receive the response message from :httpc
236-
# The message format is {:httpc, {RequestId, ResponseTuple}}
237-
# Default 2 minute timeout if no response received
238-
receive do
239-
{:http, {^request_id, response_from_httpc}} ->
240-
# This will return %Response{} or throw
241-
response = handle_httpc_response(response_from_httpc, request.url)
242-
# Wrap in :ok for the Task result
243-
{:ok, response}
244-
245-
_ ->
246-
# This catch-all can happen if the process is killed or another message arrives
247-
throw(:request_interrupted_or_unexpected_message)
248-
after
249-
120_000 ->
250-
throw(:request_timeout)
251-
end
241+
# Configure httpc options
242+
httpc_options = Keyword.put(options, :body_format, :binary)
243+
244+
# Send the request and get the RequestId (PID of the httpc client process)
245+
case :httpc.request(method, request_tuple, httpc_options, client_options) do
246+
{:ok, request_id} ->
247+
# If an AbortController was provided, link it to this request_id
248+
if abort_controller_pid && is_pid(abort_controller_pid) do
249+
HTTP.AbortController.set_request_id(abort_controller_pid, request_id)
250+
end
252251

253-
{:error, reason} ->
254-
throw(reason)
255-
end
252+
# Handle response (simplified - streaming handled in handle_httpc_response)
253+
handle_response(request_id, request.url)
254+
255+
{:error, reason} ->
256+
throw(reason)
257+
end
256258

257259
# Fallback for unexpected return from Request.to_httpc_args
258260
other_args ->
@@ -266,33 +268,111 @@ defmodule HTTP do
266268

267269
# Success case: returns %Response{} directly
268270
@spec handle_httpc_response(httpc_response_tuple(), String.t() | nil) :: Response.t()
269-
defp handle_httpc_response({{_version, status, _reason_phrase}, httpc_headers, body}, url) do
270-
# Convert :httpc's header list to HTTP.Headers struct
271-
response_headers =
272-
httpc_headers
273-
|> Enum.map(fn {key, val} -> {to_string(key), to_string(val)} end)
274-
|> HTTP.Headers.new()
275-
276-
# Convert body from charlist (iodata) to binary if it's not already
277-
binary_body =
278-
if is_list(body) do
279-
IO.iodata_to_binary(body)
280-
else
281-
body
282-
end
271+
defp handle_httpc_response(response_tuple, url) do
272+
case response_tuple do
273+
{{_version, status, _reason_phrase}, httpc_headers, body} ->
274+
# Convert :httpc's header list to HTTP.Headers struct
275+
response_headers =
276+
httpc_headers
277+
|> Enum.map(fn {key, val} -> {to_string(key), to_string(val)} end)
278+
|> HTTP.Headers.new()
279+
280+
# Check if we should use streaming
281+
content_length = HTTP.Headers.get(response_headers, "content-length")
282+
should_stream = should_use_streaming?(content_length)
283+
284+
if should_stream do
285+
# Create a streaming process
286+
{:ok, stream_pid} = start_httpc_stream_process(url, response_headers)
287+
%Response{status: status, headers: response_headers, body: nil, url: url, stream: stream_pid}
288+
else
289+
# Non-streaming response - handle as before
290+
binary_body =
291+
if is_list(body) do
292+
IO.iodata_to_binary(body)
293+
else
294+
body
295+
end
296+
%Response{status: status, headers: response_headers, body: binary_body, url: url, stream: nil}
297+
end
298+
299+
{:error, reason} ->
300+
throw(reason)
283301

284-
%Response{status: status, headers: response_headers, body: binary_body, url: url}
302+
other ->
303+
throw({:unexpected_response, other})
304+
end
285305
end
286306

287-
# Error case: throws the reason
288-
@spec handle_httpc_response({:error, term()}, String.t() | nil) :: no_return()
289-
defp handle_httpc_response({:error, reason}, _original_url) do
290-
throw(reason)
307+
defp should_use_streaming?(content_length) do
308+
# Stream responses larger than 100KB or when content-length is unknown
309+
case Integer.parse(content_length || "") do
310+
{size, _} when size > 100_000 -> true
311+
_ -> content_length == nil # Stream when size is unknown
312+
end
291313
end
292314

293-
# Unexpected response case: throws an explicit error
294-
@spec handle_httpc_response(term(), String.t() | nil) :: no_return()
295-
defp handle_httpc_response(other, _original_url) do
296-
throw({:unexpected_response, other})
315+
defp start_httpc_stream_process(url, headers) do
316+
{:ok, pid} = Task.start_link(fn ->
317+
stream_httpc_response(url, headers)
318+
end)
319+
{:ok, pid}
297320
end
321+
322+
defp stream_httpc_response(url, headers) do
323+
# Create a streaming request using :httpc
324+
uri = URI.parse(url)
325+
_host = uri.host
326+
_port = uri.port || 80
327+
_path = uri.path || "/"
328+
329+
# Build headers for the request
330+
request_headers =
331+
headers.headers
332+
|> Enum.map(fn {name, value} -> {String.to_charlist(name), String.to_charlist(value)} end)
333+
334+
# Start the HTTP request with streaming
335+
case :httpc.request(
336+
:get,
337+
{String.to_charlist(url), request_headers},
338+
[],
339+
[sync: false, body_format: :binary]
340+
) do
341+
{:ok, request_id} ->
342+
stream_loop(request_id, self())
343+
{:error, reason} ->
344+
send(self(), {:stream_error, self(), reason})
345+
end
346+
end
347+
348+
defp stream_loop(request_id, caller) do
349+
receive do
350+
{:http, {^request_id, {:http_response, _http_version, _status, _reason}}} ->
351+
stream_loop(request_id, caller)
352+
353+
{:http, {^request_id, {:http_header, _, _header_name, _, _header_value}}} ->
354+
stream_loop(request_id, caller)
355+
356+
{:http, {^request_id, :http_eoh}} ->
357+
stream_loop(request_id, caller)
358+
359+
{:http, {^request_id, {:http_error, reason}}} ->
360+
send(caller, {:stream_error, self(), reason})
361+
362+
{:http, {^request_id, :stream_end}} ->
363+
send(caller, {:stream_end, self()})
364+
365+
{:http, {^request_id, {:http_chunk, chunk}}} ->
366+
send(caller, {:stream_chunk, self(), to_string(chunk)})
367+
stream_loop(request_id, caller)
368+
369+
{:http, {^request_id, {:http_body, body}}} ->
370+
send(caller, {:stream_chunk, self(), to_string(body)})
371+
send(caller, {:stream_end, self()})
372+
373+
after 60_000 ->
374+
send(caller, {:stream_error, self(), :timeout})
375+
end
376+
end
377+
298378
end

lib/http/response.ex

Lines changed: 84 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,35 +6,113 @@ defmodule HTTP.Response do
66
defstruct status: 0,
77
headers: %HTTP.Headers{},
88
body: nil,
9-
url: nil
9+
url: nil,
10+
stream: nil
1011

1112
@type t :: %__MODULE__{
1213
status: integer(),
1314
headers: HTTP.Headers.t(),
14-
body: String.t(),
15-
url: String.t()
15+
body: String.t() | nil,
16+
url: String.t(),
17+
stream: pid() | nil
1618
}
1719

1820
@doc """
1921
Reads the response body as text.
22+
23+
For streaming responses, this will read the entire stream into memory.
2024
"""
2125
@spec text(t()) :: String.t()
22-
def text(%__MODULE__{body: body}), do: body
26+
def text(%__MODULE__{body: body, stream: nil}), do: body
27+
28+
def text(%__MODULE__{body: body, stream: stream}) do
29+
if is_nil(body) and is_pid(stream) do
30+
read_all(%__MODULE__{body: body, stream: stream})
31+
else
32+
body || ""
33+
end
34+
end
35+
36+
@doc """
37+
Reads the entire response body as binary.
38+
39+
For streaming responses, this will consume the entire stream into memory.
40+
For non-streaming responses, returns the existing body.
41+
42+
## Examples
43+
iex> response = %HTTP.Response{body: "Hello World", stream: nil}
44+
iex> HTTP.Response.read_all(response)
45+
"Hello World"
46+
"""
47+
@spec read_all(t()) :: String.t()
48+
def read_all(%__MODULE__{body: body, stream: nil}), do: body || ""
49+
def read_all(%__MODULE__{body: _body, stream: stream}) do
50+
if is_pid(stream) do
51+
# Request data from the stream
52+
send(stream, {:read_chunk, self()})
53+
collect_stream(stream, "")
54+
else
55+
""
56+
end
57+
end
58+
59+
defp collect_stream(stream, acc) do
60+
receive do
61+
{:stream_chunk, ^stream, chunk} ->
62+
collect_stream(stream, acc <> chunk)
63+
64+
{:stream_end, ^stream} ->
65+
acc
66+
67+
{:stream_error, ^stream, _reason} ->
68+
acc
69+
after
70+
# 60 second timeout
71+
60_000 -> acc
72+
end
73+
end
74+
75+
@doc """
76+
Reads the response body and parses it as JSON.
77+
78+
For streaming responses, this will read the entire stream before parsing.
79+
80+
Returns:
81+
- `{:ok, map | list}` if the body is valid JSON.
82+
- `{:error, reason}` if the body cannot be parsed as JSON.
83+
84+
## Examples
85+
iex> response = %HTTP.Response{body: ~s({"key": "value"})}
86+
iex> HTTP.Response.read_as_json(response)
87+
{:ok, %{"key" => "value"}}
88+
"""
89+
@spec read_as_json(t()) :: {:ok, map() | list()} | {:error, term()}
90+
def read_as_json(%__MODULE__{} = response) do
91+
body = read_all(response)
92+
93+
case JSON.decode(body) do
94+
{:ok, decoded} -> {:ok, decoded}
95+
{:error, error} -> {:error, error}
96+
end
97+
end
2398

2499
@doc """
25100
Parses the response body as JSON using Elixir's built-in `JSON` module (available in Elixir 1.18+).
26101
27102
Returns:
28103
- `{:ok, map | list}` if the body is valid JSON.
29104
- `{:error, reason}` if the body cannot be parsed as JSON.
105+
106+
Note: This method is deprecated in favor of `read_as_json/1` for streaming responses.
30107
"""
31108
@spec json(t()) :: {:ok, map() | list()} | {:error, term()}
32-
def json(%__MODULE__{body: body}) do
109+
def json(%__MODULE__{body: body, stream: nil}) do
33110
case JSON.decode(body) do
34111
{:ok, decoded} -> {:ok, decoded}
35112
{:error, error} -> {:error, error}
36113
end
37114
end
115+
def json(%__MODULE__{} = response), do: read_as_json(response)
38116

39117
@doc """
40118
Gets a response header value by name (case-insensitive).
@@ -72,4 +150,4 @@ defmodule HTTP.Response do
72150
content_type -> HTTP.Headers.parse_content_type(content_type)
73151
end
74152
end
75-
end
153+
end

test/http/response_test.exs

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,13 @@ defmodule HTTP.ResponseTest do
66
test "create response" do
77
response = %HTTP.Response{
88
status: 200,
9-
headers: %{"content-type" => "application/json"},
9+
headers: HTTP.Headers.new([{"Content-Type", "application/json"}]),
1010
body: "{\"test\": \"data\"}",
1111
url: "http://example.com"
1212
}
1313

1414
assert response.status == 200
15-
assert response.headers == %{"content-type" => "application/json"}
15+
assert %HTTP.Headers{} = response.headers
1616
assert response.body == "{\"test\": \"data\"}"
1717
assert response.url == "http://example.com"
1818
end
@@ -31,5 +31,39 @@ defmodule HTTP.ResponseTest do
3131
response = %HTTP.Response{body: "invalid json"}
3232
assert {:error, _reason} = HTTP.Response.json(response)
3333
end
34+
35+
test "read_all - non-streaming response" do
36+
response = %HTTP.Response{body: "test content", stream: nil}
37+
assert HTTP.Response.read_all(response) == "test content"
38+
end
39+
40+
test "read_all - empty body" do
41+
response = %HTTP.Response{body: nil, stream: nil}
42+
assert HTTP.Response.read_all(response) == ""
43+
end
44+
45+
test "read_as_json - valid" do
46+
response = %HTTP.Response{body: ~s({"key": "value"}), stream: nil}
47+
assert {:ok, %{"key" => "value"}} = HTTP.Response.read_as_json(response)
48+
end
49+
50+
test "read_as_json - invalid" do
51+
response = %HTTP.Response{body: "invalid json", stream: nil}
52+
assert {:error, _reason} = HTTP.Response.read_as_json(response)
53+
end
54+
55+
test "get_header" do
56+
response = %HTTP.Response{headers: HTTP.Headers.new([{"Content-Type", "application/json"}])}
57+
assert HTTP.Response.get_header(response, "content-type") == "application/json"
58+
assert HTTP.Response.get_header(response, "missing") == nil
59+
end
60+
61+
test "content_type" do
62+
response = %HTTP.Response{
63+
headers: HTTP.Headers.new([{"Content-Type", "application/json; charset=utf-8"}])
64+
}
65+
66+
assert HTTP.Response.content_type(response) == {"application/json", %{"charset" => "utf-8"}}
67+
end
3468
end
3569
end

0 commit comments

Comments
 (0)