Skip to content

Commit 5caac8c

Browse files
committed
allow multiple tcp clients
1 parent 6cd3693 commit 5caac8c

File tree

13 files changed

+101
-51
lines changed

13 files changed

+101
-51
lines changed

lib/ex_ice/ice_agent.ex

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -322,13 +322,13 @@ defmodule ExICE.ICEAgent do
322322

323323
@impl true
324324
def init(opts) do
325-
# TODO: this is ugly, and will not allow us to run more than two TCP ICE agents at the same time
325+
# TODO: this is ugly, and potentially a bottleneck
326326
opts =
327327
if opts[:transport] == :tcp do
328-
{:ok, _pid} = ExICE.Priv.Transport.TCP.Client.start_link()
329-
opts ++ [transport_module: ExICE.Priv.Transport.TCP.Client]
328+
{:ok, pid} = ExICE.Priv.Transport.TCP.Client.start_link()
329+
opts ++ [transport_module: %{mod: ExICE.Priv.Transport.TCP.Client, ref: pid}]
330330
else
331-
opts
331+
opts ++ [transport_module: %{mod: ExIce.Priv.Transport.UDP, ref: nil}]
332332
end
333333

334334
ice_agent = ExICE.Priv.ICEAgent.new(opts)

lib/ex_ice/priv/candidate.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ defmodule ExICE.Priv.Candidate do
118118

119119
defp generate_other_preference(local_preferences, attempts) do
120120
# 0..8191
121-
<<pref::13, _::bitstring>> = :crypto.strong_rand_bytes(2)
121+
<<pref::13, _::3>> = :crypto.strong_rand_bytes(2)
122122

123123
if local_preferences |> Map.values() |> Enum.member?(pref) do
124124
generate_other_preference(local_preferences, attempts - 1)

lib/ex_ice/priv/candidate/host.ex

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ defmodule ExICE.Priv.Candidate.Host do
22
@moduledoc false
33
@behaviour ExICE.Priv.Candidate
44

5-
alias ExICE.Priv.CandidateBase
5+
alias ExICE.Priv.{CandidateBase, Transport}
66

77
@type t() :: %__MODULE__{base: CandidateBase.t()}
88

@@ -28,7 +28,7 @@ defmodule ExICE.Priv.Candidate.Host do
2828

2929
@impl true
3030
def send_data(cand, dst_ip, dst_port, data) do
31-
case cand.base.transport_module.send(cand.base.socket, {dst_ip, dst_port}, data) do
31+
case Transport.send(cand.base.transport_module, cand.base.socket, {dst_ip, dst_port}, data) do
3232
:ok -> {:ok, cand}
3333
{:error, reason} -> {:error, reason, cand}
3434
end

lib/ex_ice/priv/candidate/prflx.ex

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ defmodule ExICE.Priv.Candidate.Prflx do
22
@moduledoc false
33
@behaviour ExICE.Priv.Candidate
44

5-
alias ExICE.Priv.CandidateBase
5+
alias ExICE.Priv.{CandidateBase, Transport}
66

77
@type t() :: %__MODULE__{base: CandidateBase.t()}
88

@@ -28,7 +28,7 @@ defmodule ExICE.Priv.Candidate.Prflx do
2828

2929
@impl true
3030
def send_data(cand, dst_ip, dst_port, data) do
31-
case cand.base.transport_module.send(cand.base.socket, {dst_ip, dst_port}, data) do
31+
case Transport.send(cand.base.transport_module, cand.base.socket, {dst_ip, dst_port}, data) do
3232
:ok -> {:ok, cand}
3333
{:error, reason} -> {:error, reason, cand}
3434
end

lib/ex_ice/priv/candidate/relay.ex

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ defmodule ExICE.Priv.Candidate.Relay do
22
@moduledoc false
33
@behaviour ExICE.Priv.Candidate
44

5-
alias ExICE.Priv.CandidateBase
5+
alias ExICE.Priv.{CandidateBase, Transport}
66

77
@type t() :: %__MODULE__{base: CandidateBase.t()}
88

@@ -125,7 +125,7 @@ defmodule ExICE.Priv.Candidate.Relay do
125125
end
126126

127127
defp do_send(cand, dst_addr, data) do
128-
case cand.base.transport_module.send(cand.base.socket, dst_addr, data) do
128+
case Transport.send(cand.base.transport_module, cand.base.socket, dst_addr, data) do
129129
:ok -> {:ok, cand}
130130
{:error, reason} -> {:error, reason, cand}
131131
end

lib/ex_ice/priv/candidate/srflx.ex

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ defmodule ExICE.Priv.Candidate.Srflx do
22
@moduledoc false
33
@behaviour ExICE.Priv.Candidate
44

5-
alias ExICE.Priv.CandidateBase
5+
alias ExICE.Priv.{CandidateBase, Transport}
66

77
@type t() :: %__MODULE__{base: CandidateBase.t()}
88

@@ -28,7 +28,7 @@ defmodule ExICE.Priv.Candidate.Srflx do
2828

2929
@impl true
3030
def send_data(cand, dst_ip, dst_port, data) do
31-
case cand.base.transport_module.send(cand.base.socket, {dst_ip, dst_port}, data) do
31+
case Transport.send(cand.base.transport_module, cand.base.socket, {dst_ip, dst_port}, data) do
3232
:ok -> {:ok, cand}
3333
{:error, reason} -> {:error, reason, cand}
3434
end

lib/ex_ice/priv/candidate_base.ex

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
defmodule ExICE.Priv.CandidateBase do
22
@moduledoc false
3-
alias ExICE.Priv.{Candidate, Utils}
3+
alias ExICE.Priv.{Candidate, Transport, Utils}
44

55
@type t :: %__MODULE__{
66
id: integer(),
@@ -11,6 +11,7 @@ defmodule ExICE.Priv.CandidateBase do
1111
port: :inet.port_number(),
1212
priority: integer(),
1313
transport: :udp | :tcp,
14+
# XXX
1415
transport_module: module(),
1516
socket: :inet.socket() | nil,
1617
type: Candidate.type(),
@@ -34,7 +35,7 @@ defmodule ExICE.Priv.CandidateBase do
3435
@spec new(Candidate.type(), Keyword.t()) :: t()
3536
def new(type, config) do
3637
transport_module = Keyword.fetch!(config, :transport_module)
37-
transport = transport_module.transport()
38+
transport = Transport.transport(transport_module)
3839
address = Keyword.fetch!(config, :address)
3940

4041
%__MODULE__{

lib/ex_ice/priv/gatherer.ex

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ defmodule ExICE.Priv.Gatherer do
1212

1313
@type t() :: %__MODULE__{
1414
if_discovery_module: module(),
15+
# XXX
1516
transport_module: module(),
1617
ip_filter: (:inet.ip_address() -> boolean),
1718
ports: Enumerable.t(non_neg_integer())
@@ -41,7 +42,7 @@ defmodule ExICE.Priv.Gatherer do
4142
|> Stream.reject(&unsupported_ipv6?(&1))
4243
|> Enum.to_list()
4344

44-
for transport_opts <- gatherer.transport_module.socket_configs(),
45+
for transport_opts <- Transport.socket_configs(gatherer.transport_module),
4546
ip <- ips do
4647
open_socket(gatherer, ip, transport_opts)
4748
end
@@ -67,9 +68,15 @@ defmodule ExICE.Priv.Gatherer do
6768
]
6869

6970
Enum.reduce_while(gatherer.ports, nil, fn port, _ ->
70-
case gatherer.transport_module.setup_socket(ip, port, socket_opts, transport_opts) do
71+
case Transport.setup_socket(
72+
gatherer.transport_module,
73+
ip,
74+
port,
75+
socket_opts,
76+
transport_opts
77+
) do
7178
{:ok, socket} ->
72-
{:ok, {^ip, sock_port}} = gatherer.transport_module.sockname(socket)
79+
{:ok, {^ip, sock_port}} = Transport.sockname(gatherer.transport_module, socket)
7380

7481
Logger.debug(
7582
"Successfully opened socket for: #{inspect(ip)}:#{sock_port}, socket: #{inspect(socket)}"
@@ -118,7 +125,7 @@ defmodule ExICE.Priv.Gatherer do
118125
ip = List.first(ips)
119126
port = stun_server.port
120127

121-
{:ok, {sock_ip, _sock_port}} = gatherer.transport_module.sockname(socket)
128+
{:ok, {sock_ip, _sock_port}} = Transport.sockname(gatherer.transport_module, socket)
122129

123130
cand_family = Utils.family(sock_ip)
124131
stun_family = Utils.family(ip)
@@ -127,7 +134,7 @@ defmodule ExICE.Priv.Gatherer do
127134
# Communication with STUN servers should be handled differently
128135
# than the rest of the TCP traffic: the messages are not RFC 4571-framed,
129136
# and we want to issue connection attempts from passive candidates as well.
130-
gatherer.transport_module.send(socket, {ip, port}, binding_request,
137+
Transport.send(gatherer.transport_module, socket, {ip, port}, binding_request,
131138
frame?: false,
132139
connect?: true
133140
)
@@ -281,7 +288,7 @@ defmodule ExICE.Priv.Gatherer do
281288
socket: socket,
282289
transport_opts: transport_opts
283290
}) do
284-
{:ok, {sock_ip, sock_port}} = gatherer.transport_module.sockname(socket)
291+
{:ok, {sock_ip, sock_port}} = Transport.sockname(gatherer.transport_module, socket)
285292

286293
tcp_type = transport_opts[:tcp_type]
287294

lib/ex_ice/priv/ice_agent.ex

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -142,9 +142,9 @@ defmodule ExICE.Priv.ICEAgent do
142142
{local_ufrag, local_pwd} = generate_credentials()
143143

144144
controlling_process = Keyword.fetch!(opts, :controlling_process)
145+
transport_module = Keyword.fetch!(opts, :transport_module)
145146

146147
if_discovery_module = opts[:if_discovery_module] || IfDiscovery.Inet
147-
transport_module = opts[:transport_module] || Transport.UDP
148148
ip_filter = opts[:ip_filter] || fn _ -> true end
149149
ports = opts[:ports] || [0]
150150

@@ -944,7 +944,7 @@ defmodule ExICE.Priv.ICEAgent do
944944

945945
{:send, dst, data, client} ->
946946
tr = %{tr | client: client}
947-
:ok = ice_agent.transport_module.send(tr.socket, dst, data)
947+
:ok = Transport.send(ice_agent.transport_module, tr.socket, dst, data)
948948
put_in(ice_agent.gathering_transactions[tr_id], tr)
949949

950950
{:error, _reason, _client} ->
@@ -962,7 +962,7 @@ defmodule ExICE.Priv.ICEAgent do
962962
cand = %{cand | client: client}
963963
ice_agent = put_in(ice_agent.local_cands[cand.base.id], cand)
964964
# we can't use do_send here as it will try to create permission for the turn address
965-
:ok = ice_agent.transport_module.send(cand.base.socket, dst, data)
965+
:ok = Transport.send(ice_agent.transport_module, cand.base.socket, dst, data)
966966
ice_agent
967967

968968
{:error, _reason, client} ->
@@ -1013,7 +1013,7 @@ defmodule ExICE.Priv.ICEAgent do
10131013
for turn_server <- turn_servers, %{socket: socket} <- sockets do
10141014
with {:ok, client} <-
10151015
ExTURN.Client.new(turn_server.url, turn_server.username, turn_server.credential),
1016-
{:ok, {sock_ip, _sock_port}} <- ice_agent.transport_module.sockname(socket),
1016+
{:ok, {sock_ip, _sock_port}} <- Transport.sockname(ice_agent.transport_module, socket),
10171017
{true, _, _} <-
10181018
{Utils.family(client.turn_ip) == Utils.family(sock_ip), client, sock_ip} do
10191019
t_id = {socket, {client.turn_ip, client.turn_port}}
@@ -1098,7 +1098,7 @@ defmodule ExICE.Priv.ICEAgent do
10981098
end
10991099

11001100
defp execute_gathering_transaction(ice_agent, %{stun_server: stun_server} = tr) do
1101-
{:ok, {sock_ip, sock_port}} = ice_agent.transport_module.sockname(tr.socket)
1101+
{:ok, {sock_ip, sock_port}} = Transport.sockname(ice_agent.transport_module, tr.socket)
11021102

11031103
Logger.debug("""
11041104
Sending binding request to gather srflx candidate for:
@@ -1124,7 +1124,7 @@ defmodule ExICE.Priv.ICEAgent do
11241124
end
11251125

11261126
defp execute_gathering_transaction(ice_agent, %{client: client} = tr) do
1127-
{:ok, {sock_ip, sock_port}} = ice_agent.transport_module.sockname(tr.socket)
1127+
{:ok, {sock_ip, sock_port}} = Transport.sockname(ice_agent.transport_module, tr.socket)
11281128

11291129
Logger.debug("""
11301130
Starting the process of gathering relay candidate for:
@@ -1135,7 +1135,7 @@ defmodule ExICE.Priv.ICEAgent do
11351135
{:send, turn_addr, data, client} = ExTURN.Client.allocate(client)
11361136
tr = Map.put(tr, :client, client)
11371137

1138-
case ice_agent.transport_module.send(tr.socket, turn_addr, data) do
1138+
case Transport.send(ice_agent.transport_module, tr.socket, turn_addr, data) do
11391139
:ok ->
11401140
tr = %{tr | state: :in_progress, send_time: now()}
11411141
ice_agent = put_in(ice_agent.gathering_transactions[tr.t_id], tr)
@@ -1215,7 +1215,7 @@ defmodule ExICE.Priv.ICEAgent do
12151215

12161216
# Use sock_addr for calculating priority.
12171217
# In other case, we might get duplicates.
1218-
{:ok, {sock_addr, _sock_port}} = ice_agent.transport_module.sockname(tr.socket)
1218+
{:ok, {sock_addr, _sock_port}} = Transport.sockname(ice_agent.transport_module, tr.socket)
12191219

12201220
# TODO: set correct tcp_type here
12211221
{local_preferences, priority} =
@@ -1250,7 +1250,7 @@ defmodule ExICE.Priv.ICEAgent do
12501250

12511251
{:send, turn_addr, data, client} ->
12521252
tr = %{tr | client: client}
1253-
:ok = ice_agent.transport_module.send(tr.socket, turn_addr, data)
1253+
:ok = Transport.send(ice_agent.transport_module, tr.socket, turn_addr, data)
12541254
put_in(ice_agent.gathering_transactions[tr_id], tr)
12551255

12561256
{:error, _reason, _client} ->
@@ -1912,7 +1912,7 @@ defmodule ExICE.Priv.ICEAgent do
19121912

19131913
case find_local_cand(Map.values(ice_agent.local_cands), xor_addr, xor_port) do
19141914
nil ->
1915-
{:ok, {base_addr, base_port}} = ice_agent.transport_module.sockname(tr.socket)
1915+
{:ok, {base_addr, base_port}} = Transport.sockname(ice_agent.transport_module, tr.socket)
19161916

19171917
host_cand = find_host_cand(Map.values(ice_agent.local_cands), tr.socket)
19181918

@@ -2423,7 +2423,7 @@ defmodule ExICE.Priv.ICEAgent do
24232423
# Alternatively, we could create a callback for `:inet.info/1`,
24242424
# but it's return type is not standardized - sometimes it's %{states: [:closed]},
24252425
# some other time %{rstates: [:closed], wstates: [:closed]}.
2426-
case ice_agent.transport_module.sockname(socket) do
2426+
case Transport.sockname(ice_agent.transport_module, socket) do
24272427
{:error, :closed} -> ice_agent
24282428
_ -> do_close_socket(ice_agent, socket)
24292429
end
@@ -2448,7 +2448,7 @@ defmodule ExICE.Priv.ICEAgent do
24482448

24492449
tr_rtx = ice_agent.tr_rtx -- Map.keys(removed_gathering_transactions)
24502450

2451-
:ok = ice_agent.transport_module.close(socket)
2451+
:ok = Transport.close(ice_agent.transport_module, socket)
24522452
:ok = flush_socket_msg(socket)
24532453

24542454
%{ice_agent | tr_rtx: tr_rtx, gathering_transactions: gathering_transactions}
@@ -2513,12 +2513,12 @@ defmodule ExICE.Priv.ICEAgent do
25132513

25142514
defp do_restart(ice_agent) do
25152515
Enum.each(ice_agent.sockets, fn socket ->
2516-
case ice_agent.transport_module.sockname(socket) do
2516+
case Transport.sockname(ice_agent.transport_module, socket) do
25172517
{:ok, {ip, port}} ->
25182518
# we could use close_socket function here but because we are
25192519
# clearing the whole state anyway, we can close the socket manually
25202520
Logger.debug("Closing socket: #{inspect(ip)}:#{port}.")
2521-
:ok = ice_agent.transport_module.close(socket)
2521+
:ok = Transport.close(ice_agent.transport_module, socket)
25222522
:ok = flush_socket_msg(socket)
25232523

25242524
{:error, :closed} ->
@@ -3116,7 +3116,7 @@ defmodule ExICE.Priv.ICEAgent do
31163116
priority =
31173117
if local_candidate.base.type == :relay do
31183118
{:ok, {sock_addr, _sock_port}} =
3119-
ice_agent.transport_module.sockname(local_candidate.base.socket)
3119+
Transport.sockname(ice_agent.transport_module, local_candidate.base.socket)
31203120

31213121
Candidate.priority!(
31223122
ice_agent.local_preferences,

lib/ex_ice/priv/transport.ex

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ defmodule ExICE.Priv.Transport do
22
@moduledoc false
33

44
@type socket :: term()
5+
@type tp_ref :: term()
56
@type open_option ::
67
:inet.inet_backend()
78
| :inet.address_family()
@@ -12,14 +13,50 @@ defmodule ExICE.Priv.Transport do
1213

1314
@callback socket_configs() :: [map()]
1415

15-
@callback setup_socket(:inet.ip_address(), :inet.port_number(), [open_option()], map()) ::
16+
@callback setup_socket(
17+
tp_ref(),
18+
:inet.ip_address(),
19+
:inet.port_number(),
20+
[open_option()],
21+
map()
22+
) ::
1623
{:ok, socket()} | {:error, term()}
1724

1825
@callback sockname(socket()) ::
1926
{:ok, {:inet.ip_address(), :inet.port_number()}} | {:error, term()}
2027

21-
@callback send(socket(), {:inet.ip_address(), :inet.port_number()}, binary(), Keyword.t()) ::
28+
@callback send(
29+
tp_ref(),
30+
socket(),
31+
{:inet.ip_address(), :inet.port_number()},
32+
binary(),
33+
Keyword.t()
34+
) ::
2235
:ok | {:error, term()}
2336

24-
@callback close(socket()) :: :ok
37+
@callback close(tp_ref(), socket()) :: :ok
38+
39+
def transport(tp_spec) do
40+
tp_spec.mod.transport()
41+
end
42+
43+
def socket_configs(tp_spec) do
44+
tp_spec.mod.socket_configs()
45+
end
46+
47+
def setup_socket(tp_spec, ip, port, socket_opts, tp_opts \\ []) do
48+
tp_spec.mod.setup_socket(tp_spec.ref, ip, port, socket_opts, tp_opts)
49+
end
50+
51+
def sockname(tp_spec, socket) do
52+
tp_spec.mod.sockname(socket)
53+
end
54+
55+
def send(tp_spec, socket, dest, packet, tp_opts \\ []) do
56+
tp_spec.mod.send(tp_spec.ref, socket, dest, packet, tp_opts)
57+
end
58+
59+
def close(tp_spec, socket) do
60+
tp_spec.mod.close(tp_spec.ref, socket)
61+
end
2562
end

0 commit comments

Comments
 (0)