Abyss is a modern, pure Elixir UDP server library that provides a high-performance foundation for building UDP-based services like DNS servers, DHCP servers, or custom UDP applications. It implements a supervisor-based architecture with connection pooling and pluggable transport modules.
- High Performance: Supervisor-based architecture with configurable connection pooling
- Flexible Handler System: Pluggable handler modules for custom protocol implementations
- Real-time Metrics: Built-in telemetry with connection counts, throughput rates, and response times
- Security Features: Built-in rate limiting and packet size validation
- Broadcast Support: Native support for broadcast and multicast applications
- Graceful Shutdown: Coordinated shutdown with configurable timeouts
- Extensible Transport: Pluggable transport layer (currently UDP)
The package can be installed by adding abyss to your list of dependencies in mix.exs:
def deps do
[
{:abyss, "~> 0.4.0"}
]
enddefmodule MyEchoHandler do
use Abyss.Handler
@impl true
def handle_data({ip, port, data}, state) do
# Echo the data back to the client
Abyss.Transport.UDP.send(state.socket, ip, port, data)
{:continue, state}
end
end
# Start the server
{:ok, _pid} = Abyss.start_link([
handler_module: MyEchoHandler,
port: 1234,
num_listeners: 10
])Abyss provides built-in real-time metrics for monitoring server performance:
# Get current metrics
metrics = Abyss.Telemetry.get_metrics()
# => %{
# connections_active: 15,
# connections_total: 1250,
# accepts_total: 1250,
# responses_total: 1198,
# accepts_per_second: 25,
# responses_per_second: 23
# }
# Set up response time monitoring
:telemetry.attach_many(
"response-monitor",
[[:abyss, :metrics, :response_time]],
fn [:abyss, :metrics, :response_time], measurements, _metadata, _config ->
IO.puts("Response time: \#{measurements.response_time}ms")
end,
%{}
)
# Reset metrics
Abyss.Telemetry.reset_metrics()# Send a test packet
echo "Hello, UDP" | nc -4 -u -w1 127.0.0.1 1234
# Continuous testing
while true; do echo "Hello, UDP $(date +%T)" | nc -4 -u -w1 127.0.0.1 1234; doneStarts an Abyss server with the given options.
@spec start_link(options()) :: Supervisor.on_start()Options:
handler_module(required) - Module implementingAbyss.Handlerbehaviorport- UDP port to listen on (default: 4000)num_listeners- Number of listener processes (default: 100)num_connections- Max concurrent connections (default: 16_384)transport_options- Keyword list passed to UDP transportread_timeout- Connection read timeout (default: 60_000ms)shutdown_timeout- Graceful shutdown timeout (default: 15_000ms)rate_limit_enabled- Enable rate limiting (default: false)rate_limit_max_packets- Max packets per window (default: 1000)rate_limit_window_ms- Rate limit window in ms (default: 1000)max_packet_size- Maximum packet size in bytes (default: 8192)broadcast- Enable broadcast mode (default: false)connection_telemetry_sample_rate- Sampling rate for connection telemetry (default: 0.05)handler_memory_check_interval- Memory check interval in ms (default: 10_000)handler_memory_warning_threshold- Memory warning threshold in MB (default: 100)handler_memory_hard_limit- Memory hard limit in MB (default: 150)
Stops the server gracefully, waiting for connections to finish.
@spec stop(Supervisor.supervisor(), timeout()) :: :okTemporarily stop accepting new connections while maintaining existing ones.
def suspend(supervisor), do: Abyss.Server.suspend(supervisor)
def resume(supervisor), do: Abyss.Server.resume(supervisor)Implement the Abyss.Handler behavior to process UDP packets:
defmodule MyHandler do
use Abyss.Handler
@impl true
def handle_data({ip, port, data}, state) do
# Process incoming UDP packet
response = process_data(data)
# Send response back to client
Abyss.Transport.UDP.send(state.socket, ip, port, response)
{:continue, state} # Continue handling more packets
# or
{:close, state} # Close connection after response
end
# Optional callbacks
@impl true
def handle_timeout(state) do
Logger.warn("Connection timed out")
{:close, state}
end
@impl true
def handle_error(reason, state) do
Logger.error("Handler error: #{inspect(reason)}")
{:continue, state}
end
@impl true
def init(state) do
{:ok, Map.put(state, :counter, 0)}
end
@impl true
def terminate(_reason, state) do
# Cleanup resources
:ok
end
endAbyss.start_link([
handler_module: MyHandler,
port: 8080
])Abyss.start_link([
handler_module: MyHandler,
port: 8080,
num_listeners: 200, # Increase for high throughput
num_connections: 32_768, # Allow more concurrent connections
read_timeout: 30_000 # Shorter timeout for faster cleanup
])Abyss.start_link([
handler_module: MyHandler,
port: 8080,
rate_limit_enabled: true,
rate_limit_max_packets: 100, # Lower limit for strict rate limiting
rate_limit_window_ms: 1000,
max_packet_size: 1024 # Limit packet size to prevent DoS
])Abyss.start_link([
handler_module: MyBroadcastHandler,
port: 67, # DHCP port
broadcast: true,
transport_options: [
broadcast: true,
multicast_if: {255, 255, 255, 255},
reuseaddr: true,
reuseport: true
]
])Abyss emits comprehensive telemetry events for monitoring:
[:abyss, :listener, :start][:abyss, :listener, :ready][:abyss, :listener, :waiting][:abyss, :listener, :receiving][:abyss, :listener, :stop]
[:abyss, :connection, :start][:abyss, :connection, :ready][:abyss, :connection, :send][:abyss, :connection, :recv][:abyss, :connection, :stop]
[:abyss, :listener, :rate_limit_exceeded][:abyss, :listener, :packet_too_large]
# Attach structured logger at different levels
Abyss.Logger.attach_logger(:error) # Errors only
Abyss.Logger.attach_logger(:info) # General events
Abyss.Logger.attach_logger(:debug) # Detailed debugging
Abyss.Logger.attach_logger(:trace) # Verbose tracing-
Listener Pool Size (
num_listeners)- Default: 100
- Increase for high-throughput scenarios
- Typical range: 10-1000
-
Connection Limits (
num_connections)- Default: 16_384
- Based on available memory and expected load
- Use
:infinityfor unlimited (with caution)
-
Rate Limiting
- Enable for public-facing services
- Adjust based on expected traffic patterns
- Monitor
[:abyss, :listener, :rate_limit_exceeded]events
-
Buffer Sizes
- Configure via
transport_options
transport_options: [ recbuf: 8192, # Receive buffer sndbuf: 8192 # Send buffer ]
- Configure via
Monitor key metrics via telemetry:
:telemetry.attach_many(
"abyss-monitor",
[
[:abyss, :listener, :start],
[:abyss, :connection, :start],
[:abyss, :listener, :rate_limit_exceeded]
],
&handle_metrics/4,
%{}
)When deploying Abyss to production, consider these security configurations:
Abyss.start_link([
handler_module: MyHandler,
port: 8080,
# Enable rate limiting for DoS protection
rate_limit_enabled: true,
rate_limit_max_packets: 1000,
rate_limit_window_ms: 1000,
# Limit packet size to prevent memory exhaustion
max_packet_size: 8192,
# Use reasonable connection limits
num_connections: 10_000,
# Configure socket buffers
transport_options: [
recbuf: 262_144, # 256KB receive buffer
sndbuf: 262_144 # 256KB send buffer
]
])- Rate Limiting: Always enable rate limiting for public services
- Packet Size Limits: Set appropriate
max_packet_sizelimits - Connection Limits: Monitor and adjust
num_connectionsbased on resources - Network Access: Use firewall rules to restrict access when possible
- Monitoring: Set up alerts for rate limiting events
:telemetry.attach_many(
"security-monitor",
[
[:abyss, :listener, :rate_limit_exceeded],
[:abyss, :listener, :packet_too_large]
],
&handle_security_event/4,
%{}
)
defp handle_security_event(event, measurements, metadata, config) do
case event do
[:abyss, :listener, :rate_limit_exceeded] ->
Logger.warn("Rate limit exceeded from #{metadata.remote_address}")
[:abyss, :listener, :packet_too_large] ->
Logger.warn("Oversized packet from #{metadata.remote_address}: #{metadata.packet_size} bytes")
end
end# Run echo server with trace logging
mix run --no-halt -e 'Code.require_file("example/echo.ex"); Abyss.Logger.attach_logger(:trace); Abyss.start_link(handler_module: Echo, port: 1234); Process.sleep(:infinity)'
# Test with netcat
echo "Hello, UDP" | nc -4 -u -w1 127.0.0.1 1234# DNS forwarder
mix run --no-halt -e 'Code.require_file("example/dns_forwarder.ex"); Abyss.start_link(handler_module: HandleDNS, port: 53); Process.sleep(:infinity)'
# DNS recursive resolver
mix run --no-halt -e 'Code.require_file("example/dns_recursive.ex"); Abyss.start_link(handler_module: HandleDNS, port: 53); Process.sleep(:infinity)'# DHCP listener
mix run --no-halt -e 'Code.require_file("example/dump_dhcp.ex"); Abyss.start_link(handler_module: DumpDHCP, port: 67, broadcast: true, transport_options: [broadcast: true, multicast_if: {255, 255, 255, 255}]); Process.sleep(:infinity)'
# mDNS listener
mix run --no-halt -e 'Code.require_file("example/dump_mdns.ex"); Abyss.start_link(handler_module: DumpMDNS, port: 5353, broadcast: true, transport_options: [broadcast: true, multicast_if: {224, 0, 0, 251}]); Process.sleep(:infinity)'Abyss implements a hierarchical supervision tree:
Abyss (main supervisor)
├── Abyss.RateLimiter (if enabled)
├── Abyss.ListenerPool (supervisor)
│ ├── Abyss.Listener (listener process 1)
│ ├── Abyss.Listener (listener process 2)
│ └── ... (up to num_listeners processes)
├── DynamicSupervisor (connection supervisor)
│ ├── Handler process 1 (per UDP packet)
│ ├── Handler process 2 (per UDP packet)
│ └── ... (up to num_connections processes)
├── Task (activator - starts listeners)
└── Abyss.ShutdownListener (coordinates graceful shutdown)
- Listener Pool: Manages multiple listener processes for load distribution
- Listener: Waits for UDP packets on the bound port
- Connection: Creates handler processes for incoming packets
- Handler: Processes packet data using user-defined logic
- Transport: Handles low-level UDP socket operations
Abyss uses a modular transport architecture with specialized modules:
Abyss.Transport.UDP.Core- Core UDP socket operations (open, close, send, recv)Abyss.Transport.UDP.Unicast- Unicast-specific functionality with proper resource cleanupAbyss.Transport.UDP.Broadcast- Broadcast and multicast support
This modular design ensures proper resource management and makes it easier to extend or customize transport behavior.
- Rest for One: If a listener crashes, other listeners continue
- Dynamic Supervisor: Handler processes are isolated
- Graceful Shutdown: Coordinated termination with timeouts
- Fork the repository
- Create your feature branch (
git checkout -b my-new-feature) - Commit your changes (
git commit -am 'Add some feature') - Push to the branch (
git push origin my-new-feature) - Open a Pull Request
# Install dependencies
mix deps.get
# Run tests
mix test
# Run tests with coverage
mix test --cover
# Format code
mix format
# Run dialyzer
mix dialyzer
# Run credo
mix credo
# Generate documentation
mix docsThis project is licensed under the MIT License - see the LICENSE file for details.