Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .dialyzer_ignore.exs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
[
# HTTP.fetch/2 uses throw/catch for error handling which confuses Dialyzer
~r/lib\/http\.ex:231.*invalid_contract/,
~r/lib\/http\.ex:232.*no_return/,
~r/lib\/http\.ex:255.*invalid_contract/,
~r/lib\/http\.ex:256.*no_return/,

# Pattern match warning in handle_async_request - intentional error handling
~r/lib\/http\.ex:329.*pattern_match/,
~r/lib\/http\.ex:1.*pattern_match/,

# HTTP.Promise.then/3 opaque type issue with Task struct - Task.Supervisor returns opaque Task
~r/lib\/http\/promise\.ex:96.*contract_with_opaque/,
Expand Down
16 changes: 14 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ A modern HTTP client library for Elixir that provides a fetch API similar to web
- **Browser-like API**: Familiar fetch interface with promises and async/await patterns
- **Full HTTP support**: GET, POST, PUT, DELETE, PATCH, HEAD methods
- **Complete httpc integration**: Support for all :httpc.request options
- **Unix Domain Sockets**: HTTP over Unix sockets for Docker daemon, systemd, and other local services
- **Form data support**: HTTP.FormData for multipart/form-data and file uploads
- **Streaming file uploads**: Efficient large file uploads using streams
- **Type-safe configuration**: HTTP.FetchOptions for structured request configuration
Expand Down Expand Up @@ -43,13 +44,23 @@ response =
binary_data = response.body

# POST request with JSON
{:ok, response} =
{:ok, response} =
HTTP.fetch("https://jsonplaceholder.typicode.com/posts", [
method: "POST",
headers: %{"Content-Type" => "application/json"},
body: JSON.encode\!(%{title: "Hello", body: "World"})
])
|> HTTP.Promise.await()

# Unix Domain Socket request (Docker daemon example)
{:ok, response} =
HTTP.fetch("http://localhost/version",
unix_socket: "/var/run/docker.sock")
|> HTTP.Promise.await()

# Parse Docker version info
{:ok, docker_info} = HTTP.Response.json(response)
IO.puts("Docker Version: #{docker_info["Version"]}")
```

# Form data with file upload
Expand Down Expand Up @@ -80,7 +91,8 @@ promise = HTTP.fetch(url, [
body: "request body",
content_type: "application/json",
options: [timeout: 10_000],
signal: abort_controller
signal: abort_controller,
unix_socket: "/var/run/docker.sock" # Optional: use Unix Domain Socket
])
```

Expand Down
71 changes: 67 additions & 4 deletions lib/http.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ defmodule HTTP do
- **Automatic streaming**: Responses >5MB or with unknown Content-Length automatically stream
- **Request cancellation**: Via `HTTP.AbortController` for aborting in-flight requests
- **Promise chaining**: JavaScript-like promise interface with `then/3` support
- **Unix Domain Sockets**: Support for HTTP over Unix sockets (Docker daemon, systemd, etc.)
- **Telemetry integration**: Comprehensive event emission for monitoring and observability
- **Zero external dependencies**: Uses only Erlang/OTP built-in modules (except telemetry)

Expand All @@ -34,6 +35,12 @@ defmodule HTTP do
])
|> HTTP.Promise.await()

# Unix Domain Socket request (e.g., Docker daemon)
{:ok, response} =
HTTP.fetch("http://localhost/version",
unix_socket: "/var/run/docker.sock")
|> HTTP.Promise.await()

## Architecture

The library is structured around these core modules:
Expand Down Expand Up @@ -97,6 +104,8 @@ defmodule HTTP do
(e.g., `sync: false`, `body_format: :binary`). Overrides `Request` defaults.
- `:signal`: An `HTTP.AbortController` PID. If provided, the request can be aborted
via this controller.
- `:unix_socket`: Path to a Unix Domain Socket file (e.g., "/var/run/docker.sock").
When provided, the request is sent over the Unix socket instead of TCP/IP.

Returns:
- `%HTTP.Promise{}`: A Promise struct. The caller should `HTTP.Promise.await(promise_struct)` to get the final
Expand Down Expand Up @@ -227,6 +236,21 @@ defmodule HTTP do
IO.puts "Request was likely aborted. Reason: \#{inspect(reason)}"
end
end

# Unix Domain Socket request to Docker daemon
docker_promise = HTTP.fetch("http://localhost/version", unix_socket: "/var/run/docker.sock")
case HTTP.Promise.await(docker_promise) do
%HTTP.Response{status: 200} = response ->
case HTTP.Response.json(response) do
{:ok, json} ->
IO.puts "Docker Version: \#{json["Version"]}"
IO.puts "API Version: \#{json["ApiVersion"]}"
{:error, reason} ->
IO.inspect reason, label: "JSON Parse Error"
end
{:error, reason} ->
IO.inspect reason, label: "Docker Request Error"
end
"""
@spec fetch(String.t() | URI.t(), Keyword.t() | map()) :: %HTTP.Promise{}
def fetch(url, init \\ []) do
Expand All @@ -245,8 +269,9 @@ defmodule HTTP do
options: Keyword.merge(Request.__struct__().options, options.opts)
}

# Extract AbortController PID from FetchOptions
# Extract AbortController PID and unix_socket from FetchOptions
abort_controller_pid = options.signal
unix_socket_path = options.unix_socket

# Emit telemetry event for request start
HTTP.Telemetry.request_start(request.method, request.url, request.headers)
Expand All @@ -257,7 +282,7 @@ defmodule HTTP do
:http_fetch_task_supervisor,
HTTP,
:handle_async_request,
[request, self(), abort_controller_pid]
[request, self(), abort_controller_pid, unix_socket_path]
)

# Wrap the task in our new Promise struct
Expand Down Expand Up @@ -385,11 +410,49 @@ defmodule HTTP do
@spec handle_async_request(
Request.t(),
pid(),
pid() | nil
pid() | nil,
String.t() | nil
) :: Response.t() | {:error, term()}
def handle_async_request(request, _calling_pid, abort_controller_pid) do
def handle_async_request(request, _calling_pid, abort_controller_pid, unix_socket_path \\ nil) do
start_time = System.monotonic_time(:microsecond)

# If unix_socket_path is provided, use Unix socket transport
if unix_socket_path do
handle_unix_socket_request(request, unix_socket_path, start_time)
else
handle_httpc_request(request, abort_controller_pid, start_time)
end
end

# Handle Unix Domain Socket requests
defp handle_unix_socket_request(request, socket_path, start_time) do
try do
# Get timeout from request options
timeout = Keyword.get(request.http_options, :timeout, 30_000)

case HTTP.UnixSocket.request(socket_path, request, timeout) do
{:ok, response} ->
# Emit telemetry event for request completion
duration = System.monotonic_time(:microsecond) - start_time
body_size = if is_binary(response.body), do: byte_size(response.body), else: 0
HTTP.Telemetry.request_stop(response.status, request.url, body_size, duration)
response

{:error, reason} ->
duration = System.monotonic_time(:microsecond) - start_time
HTTP.Telemetry.request_exception(request.url, reason, duration)
throw(reason)
end
catch
reason ->
duration = System.monotonic_time(:microsecond) - start_time
HTTP.Telemetry.request_exception(request.url, reason, duration)
{:error, reason}
end
end

# Handle regular HTTP/HTTPS requests via :httpc
defp handle_httpc_request(request, abort_controller_pid, start_time) do
# Use a try/catch block to convert `throw` from handle_httpc_response into an {:error, reason} tuple
try do
case Request.to_httpc_args(request) do
Expand Down
5 changes: 5 additions & 0 deletions lib/http/fetch_options.ex
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ defmodule HTTP.FetchOptions do
options: [],
opts: [sync: false],
signal: nil,
unix_socket: nil,
timeout: nil,
connect_timeout: nil,
ssl: nil,
Expand All @@ -112,6 +113,7 @@ defmodule HTTP.FetchOptions do
options: keyword(),
opts: keyword(),
signal: any() | nil,
unix_socket: String.t() | nil,
timeout: integer() | nil,
connect_timeout: integer() | nil,
ssl: list() | nil,
Expand Down Expand Up @@ -230,6 +232,9 @@ defmodule HTTP.FetchOptions do
{:signal, signal}, acc ->
%{acc | signal: signal}

{:unix_socket, unix_socket}, acc ->
%{acc | unix_socket: unix_socket}

{:timeout, timeout}, acc ->
%{acc | timeout: timeout}

Expand Down
Loading
Loading