Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 31 additions & 14 deletions lib/dns_cluster.ex
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ defmodule DNSCluster do
differs between nodes, a tuple of `{basename, query}` can be provided as well.
The value `:ignore` can be used to ignore starting the DNSCluster.
* `:resource_types` - the resource record types that are used for node discovery.
Defaults to `[:a, :aaaa]` and also supports the `:srv` type.
Defaults to `[:a, :aaaa]` and also supports `{:srv, :ips}`, `{:srv, :hostnames}` and
`:srv` (same as `{:srv, :ips}`) type.
* `:interval` - the millisec interval between DNS queries. Defaults to `5000`.
* `:connect_timeout` - the millisec timeout to allow discovered nodes to connect.
Defaults to `10_000`.
Expand All @@ -68,7 +69,7 @@ defmodule DNSCluster do
GenServer.start_link(__MODULE__, opts, name: Keyword.get(opts, :name, __MODULE__))
end

@valid_resource_types [:a, :aaaa, :srv]
@valid_resource_types [:a, :aaaa, :srv, {:srv, :ips}, {:srv, :hostnames}]

@impl true
def init(opts) do
Expand All @@ -79,7 +80,11 @@ defmodule DNSCluster do
{:ok, query} ->
validate_query!(query)

resource_types = Keyword.get(opts, :resource_types, [:a, :aaaa])
resource_types =
opts
|> Keyword.get(:resource_types, [:a, :aaaa])
|> normalize_old_srv_resource_type()

validate_resource_types!(resource_types)

warn_on_invalid_dist()
Expand All @@ -97,20 +102,20 @@ defmodule DNSCluster do
resolver: resolver
}

{:ok, state, {:continue, :discover_ips}}
{:ok, state, {:continue, :discover_addresses}}

:error ->
raise ArgumentError, "missing required :query option in #{inspect(opts)}"
end
end

@impl true
def handle_continue(:discover_ips, state) do
def handle_continue(:discover_addresses, state) do
{:noreply, do_discovery(state)}
end

@impl true
def handle_info(:discover_ips, state) do
def handle_info(:discover_addresses, state) do
{:noreply, do_discovery(state)}
end

Expand All @@ -123,19 +128,19 @@ defmodule DNSCluster do
defp connect_new_nodes(%{resolver: resolver, connect_timeout: timeout} = state) do
node_names = for name <- resolver.list_nodes(), into: MapSet.new(), do: to_string(name)

ips = discover_ips(state)
addresses = discover_addresses(state)

_results =
ips
|> Enum.map(fn {basename, ip} -> "#{basename}@#{ip}" end)
addresses
|> Enum.map(fn {basename, address} -> "#{basename}@#{address}" end)
|> Enum.filter(fn node_name -> !Enum.member?(node_names, node_name) end)
|> Task.async_stream(
fn new_name ->
if resolver.connect_node(:"#{new_name}") do
log(state, "#{node()} connected to #{new_name}")
end
end,
max_concurrency: max(1, length(ips)),
max_concurrency: max(1, length(addresses)),
timeout: timeout
)
|> Enum.to_list()
Expand All @@ -148,23 +153,35 @@ defmodule DNSCluster do
end

defp schedule_next_poll(state) do
%{state | poll_timer: Process.send_after(self(), :discover_ips, state.interval)}
%{state | poll_timer: Process.send_after(self(), :discover_addresses, state.interval)}
end

defp discover_ips(%{resolver: resolver, query: queries, resource_types: resource_types} = state) do
defp discover_addresses(
%{resolver: resolver, query: queries, resource_types: resource_types} = state
) do
for resource_type <- resource_types,
query <- queries,
basename = basename_from_query_or_state(query, state),
addr <- resolver.lookup(query, resource_type) do
{basename, addr}
end
|> Enum.uniq()
|> Enum.map(fn {basename, addr} -> {basename, to_string(:inet.ntoa(addr))} end)
|> Enum.map(fn
{basename, ip} when is_tuple(ip) -> {basename, to_string(:inet.ntoa(ip))}
{basename, hostname} -> {basename, hostname}
end)
end

defp basename_from_query_or_state({basename, _query}, _state), do: basename
defp basename_from_query_or_state(_query, %{basename: basename}), do: basename

defp normalize_old_srv_resource_type(resource_types) do
Enum.map(resource_types, fn
:srv -> {:srv, :ips}
resource_type -> resource_type
end)
end

defp validate_query!(query) do
query
|> List.wrap()
Expand All @@ -184,7 +201,7 @@ defmodule DNSCluster do
defp validate_resource_types!(resource_types) do
if resource_types == [] or resource_types -- @valid_resource_types != [] do
raise ArgumentError,
"expected :resource_types to be a subset of [:a, :aaaa, :srv], got: #{inspect(resource_types)}"
"expected :resource_types to be a subset of #{inspect(@valid_resource_types)}, got: #{inspect(resource_types)}"
end
end

Expand Down
42 changes: 24 additions & 18 deletions lib/dns_cluster/resolver.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,30 +13,36 @@ defmodule DNSCluster.Resolver do

def list_nodes, do: Node.list(:visible)

def lookup(query, type) when is_binary(query) and type in [:a, :aaaa] do
case :inet_res.getbyname(~c"#{query}", type) do
{:ok, hostent(h_addr_list: addr_list)} -> addr_list
{:error, _} -> []
end
def lookup(query, {:srv, :hostnames}), do: lookup_by_name(query, :srv)

def lookup(query, {:srv, :ips}) do
query
|> lookup_by_name(:srv)
|> Enum.flat_map(&lookup_host_by_name/1)
end

def lookup(query, type) when is_binary(query) and type in [:srv] do
case :inet_res.getbyname(~c"#{query}", type) do
{:ok, hostent(h_addr_list: srv_list)} ->
lookup_hosts(srv_list)
def lookup(query, resource_type) when resource_type in [:a, :aaaa] do
lookup_by_name(query, resource_type)
end

{:error, _} ->
defp lookup_by_name(query, resource_type) do
case :inet_res.getbyname(~c"#{query}", resource_type) do
{:ok, hostent(h_addr_list: addr_list)} ->
if resource_type == :srv do
for {_prio, _weight, _port, address} <- addr_list, do: address
else
addr_list
end

{:error, _reason} ->
[]
end
end

defp lookup_hosts(srv_list) do
srv_list
|> Enum.flat_map(fn {_prio, _weight, _port, host_name} ->
case :inet.gethostbyname(host_name) do
{:ok, hostent(h_addr_list: addr_list)} -> addr_list
{:error, _} -> []
end
end)
defp lookup_host_by_name(query) do
case :inet_res.gethostbyname(query) do
{:ok, hostent(h_addr_list: addr_list)} -> addr_list
{:error, _reason} -> []
end
end
end
10 changes: 6 additions & 4 deletions test/dns_cluster_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,8 @@ defmodule DNSClusterTest do
end

describe "resource_types" do
test "resource_types can be a subset of [:a, :aaaa, :srv]", config do
test "resource_types can be a subset of [:a, :aaaa, :srv, {:srv, :ips}, {:srv, :hostnames}]",
config do
assert {:ok, _cluster} =
start_supervised(
{DNSCluster,
Expand All @@ -152,9 +153,10 @@ defmodule DNSClusterTest do
)
end

test "resource_types can't be outside of [:a, :aaaa, :srv]", config do
test "resource_types can't be outside of [:a, :aaaa, :srv, {:srv, :ips}, {:srv, :hostnames}]",
config do
assert_raise RuntimeError,
~r/expected :resource_types to be a subset of \[:a, :aaaa, :srv\]/,
~r/expected :resource_types to be a subset of \[:a, :aaaa, :srv, {:srv, :ips}, {:srv, :hostnames}\]/,
fn ->
start_supervised!(
{DNSCluster,
Expand All @@ -168,7 +170,7 @@ defmodule DNSClusterTest do

test "resource_types can't be empty", config do
assert_raise RuntimeError,
~r/expected :resource_types to be a subset of \[:a, :aaaa, :srv\]/,
~r/expected :resource_types to be a subset of \[:a, :aaaa, :srv, {:srv, :ips}, {:srv, :hostnames}\]/,
fn ->
start_supervised!(
{DNSCluster,
Expand Down