Skip to content

Commit 11156ac

Browse files
committed
1.2.1 - ensure queue is working properly
1 parent c36db0a commit 11156ac

File tree

3 files changed

+148
-17
lines changed

3 files changed

+148
-17
lines changed

lib/hemdal/host.ex

Lines changed: 76 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -245,13 +245,51 @@ defmodule Hemdal.Host do
245245
@spec update_host(Hemdal.Config.Host.t()) :: {:ok, pid()}
246246
def update_host(host) do
247247
if pid = get_pid(host.id) do
248-
GenStateMachine.cast(pid, {:update, host})
248+
GenServer.cast(pid, {:update, host})
249249
{:ok, pid}
250250
else
251251
start(host)
252252
end
253253
end
254254

255+
@typedoc """
256+
Information about the status of the host. The host is limiting the number
257+
of running workers so we can get the information of the following data:
258+
259+
- `waiting_workers` the current number of requests awaiting in the queue.
260+
- `running_workers` the number of running workers.
261+
- `max_running_workers` the max number of workers that could be run at
262+
the same time.
263+
"""
264+
@type host_stats() :: %{
265+
:status => :up | :down,
266+
optional(:waiting_workers) => non_neg_integer(),
267+
optional(:running_workers) => non_neg_integer(),
268+
optional(:max_running_workers) => non_neg_integer()
269+
}
270+
271+
@doc """
272+
Retrieve stats for a specific host.
273+
"""
274+
@spec get_stats(host_id()) :: host_stats()
275+
def get_stats(host_id) do
276+
GenServer.call(via(host_id), :get_stats)
277+
catch
278+
:exit, {:noproc, _info} ->
279+
%{status: :down}
280+
end
281+
282+
@doc """
283+
Retrieve stats for all of the hosts. See `get_stats/1`.
284+
"""
285+
@spec get_all_stats :: %{host_id() => host_stats()}
286+
def get_all_stats do
287+
@registry_name
288+
|> Registry.select([{{:"$1", :_, :_}, [], [:"$1"]}])
289+
|> Map.new(&{&1, get_stats(&1)})
290+
end
291+
292+
@typedoc false
255293
@type t() :: %__MODULE__{
256294
host: nil | Hemdal.Config.Host.t(),
257295
max_workers: :infinity | non_neg_integer(),
@@ -332,6 +370,17 @@ defmodule Hemdal.Host do
332370
{:noreply, %__MODULE__{state | workers: workers}}
333371
end
334372

373+
def handle_call(:get_stats, _from, %__MODULE__{} = state) do
374+
stats = %{
375+
status: :up,
376+
waiting_workers: Queue.len(state.queue),
377+
running_workers: state.workers,
378+
max_running_workers: state.max_workers
379+
}
380+
381+
{:reply, stats, state}
382+
end
383+
335384
@impl GenServer
336385
@doc false
337386
def handle_cast({:update, host}, state) do
@@ -363,26 +412,38 @@ defmodule Hemdal.Host do
363412

364413
defp launch_extra(state, false) do
365414
{{:value, {info, from}}, queue} = Queue.out(state.queue)
366-
{:noreply, state} = handle_call(info, from, %__MODULE__{state | queue: queue})
367-
launch_extra(state, Queue.is_empty(state.queue))
415+
416+
case handle_call(info, from, %__MODULE__{state | queue: queue}) do
417+
{:noreply, state} ->
418+
launch_extra(state, Queue.is_empty(state.queue))
419+
420+
{:reply, reply, state} ->
421+
GenServer.reply(from, reply)
422+
launch_extra(state, Queue.is_empty(state.queue))
423+
end
368424
end
369425

370426
@impl GenServer
371427
@doc false
372428
def handle_info({:DOWN, _ref, :process, _pid, _reason}, state) do
373-
case Queue.out(state.queue) do
374-
{:empty, queue} ->
375-
workers = state.workers - 1
376-
Logger.debug("host => workers: #{workers}/#{state.max_workers} ; queue length: 0")
377-
{:noreply, %__MODULE__{state | workers: workers, queue: queue}}
378-
379-
{{:value, {{:exec, caller, cmd, args}, from}}, queue} ->
380-
state = %__MODULE__{state | queue: queue}
381-
send_result = &GenServer.reply(from, &1)
382-
spawn_monitor(fn -> run_in_background(caller, cmd, args, send_result, state) end)
383-
qlen = Queue.len(queue)
384-
Logger.debug("host => workers: #{state.workers}/#{state.max_workers} ; queue length: #{qlen}")
429+
{next, queue} = Queue.out(state.queue)
430+
state = %__MODULE__{state | queue: queue, workers: state.workers - 1}
431+
len = Queue.len(queue)
432+
Logger.debug("host => workers: #{state.workers}/#{state.max_workers} ; queue length: #{len}")
433+
434+
case next do
435+
:empty ->
385436
{:noreply, state}
437+
438+
{:value, {info, from}} ->
439+
case handle_call(info, from, state) do
440+
{:noreply, state} ->
441+
{:noreply, state}
442+
443+
{:reply, reply, state} ->
444+
GenServer.reply(from, reply)
445+
{:noreply, state}
446+
end
386447
end
387448
end
388449

mix.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
defmodule Hemdal.MixProject do
22
use Mix.Project
33

4-
@version "1.2.0"
4+
@version "1.2.1"
55

66
def project do
77
[

test/hemdal/host_test.exs

Lines changed: 71 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,54 @@ defmodule Hemdal.HostTest do
102102
] == Hemdal.Host.get_all()
103103
end
104104

105+
test "queue empty" do
106+
assert :ok = Hemdal.Host.reload_all()
107+
assert [host_id, _] = Hemdal.Host.get_all()
108+
109+
long_run = %Hemdal.Config.Command{
110+
name: "wait",
111+
type: "line",
112+
command: "sleep 1 && printf DONE",
113+
output: false,
114+
decode: false
115+
}
116+
117+
stats = Hemdal.Host.get_stats(host_id)
118+
assert 1 == stats.max_running_workers
119+
assert 0 == stats.running_workers
120+
assert 0 == stats.waiting_workers
121+
122+
assert {:ok, _worker} = Hemdal.Host.exec_background(host_id, long_run)
123+
124+
stats = Hemdal.Host.get_stats(host_id)
125+
assert 1 == stats.running_workers
126+
assert 0 == stats.waiting_workers
127+
128+
spawn_monitor(fn ->
129+
assert {:ok, _worker} = Hemdal.Host.exec_background(host_id, long_run)
130+
assert_receive {:ok, %{"message" => "DONE"}}, 2_000
131+
end)
132+
133+
Process.sleep(100)
134+
stats = Hemdal.Host.get_stats(host_id)
135+
assert 1 == stats.running_workers
136+
assert 1 == stats.waiting_workers
137+
138+
spawn_monitor(fn ->
139+
assert {:ok, _worker} = Hemdal.Host.exec_background(host_id, long_run)
140+
assert_receive {:ok, %{"message" => "DONE"}}, 2_000
141+
end)
142+
143+
Process.sleep(100)
144+
stats = Hemdal.Host.get_stats(host_id)
145+
assert 1 == stats.running_workers
146+
assert 2 == stats.waiting_workers
147+
148+
assert_receive {:ok, %{"message" => "DONE"}}, 2_000
149+
assert_receive {:DOWN, _ref, :process, _pid, _reason}, 2_000
150+
assert_receive {:DOWN, _ref, :process, _pid, _reason}, 2_000
151+
end
152+
105153
test "run shell command" do
106154
assert :ok = Hemdal.Host.reload_all()
107155
assert [host_id, _] = Hemdal.Host.get_all()
@@ -119,15 +167,37 @@ defmodule Hemdal.HostTest do
119167
assert :ok = Hemdal.Host.reload_all()
120168
assert [host_id, _] = Hemdal.Host.get_all()
121169

170+
assert %{
171+
"78eb75f9-2ac7-434c-a1a2-330b23c89982" => %{
172+
status: :up,
173+
max_running_workers: 1,
174+
running_workers: 0,
175+
waiting_workers: 0
176+
},
177+
"f8441510-95db-4e00-a3e0-1556bb8a778c" => %{
178+
status: :up,
179+
max_running_workers: 1,
180+
running_workers: 0,
181+
waiting_workers: 0
182+
}
183+
} == Hemdal.Host.get_all_stats()
184+
122185
echo = %Hemdal.Config.Command{
123186
name: "hello world!",
124187
type: "line",
125-
command: ~s|echo '{"status": "OK", "message": "hello world!"}'|
188+
command: ~s|sleep 1 && echo '{"status": "OK", "message": "hello world!"}'|
126189
}
127190

128191
assert {:ok, worker} = Hemdal.Host.exec_background(host_id, echo)
129192
assert is_pid(worker)
193+
194+
assert %{status: :up, waiting_workers: 0, running_workers: 1, max_running_workers: 1} ==
195+
Hemdal.Host.get_stats(host_id)
196+
130197
assert_receive {:ok, %{"message" => "hello world!"}}, 5_000
198+
199+
assert %{status: :up, waiting_workers: 0, running_workers: 0, max_running_workers: 1} ==
200+
Hemdal.Host.get_stats(host_id)
131201
end
132202

133203
test "run script" do

0 commit comments

Comments
 (0)