Skip to content

Commit 8dc935b

Browse files
authored
Have the calculator use Oban (#1639)
1 parent 80dd9e9 commit 8dc935b

File tree

9 files changed

+257
-338
lines changed

9 files changed

+257
-338
lines changed

config/config.exs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ config :nerves_hub, Oban,
5858
delete_archive: 1,
5959
delete_firmware: 1,
6060
device: 1,
61+
device_deployment_calculations: 5,
6162
firmware_delta_builder: 2,
6263
truncate: 1,
6364
# temporary, will remove in November

lib/nerves_hub/deployments.ex

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -197,13 +197,12 @@ defmodule NervesHub.Deployments do
197197
end
198198

199199
defp recalculate_devices(%{recalculation_type: :calculator_queue} = deployment, changeset) do
200-
_ =
201-
if Enum.any?(
202-
[:conditions, :is_active, :recalculation_type],
203-
&Map.has_key?(changeset.changes, &1)
204-
) do
205-
create_inflight_checks(deployment)
206-
end
200+
if Enum.any?(
201+
[:conditions, :is_active, :recalculation_type],
202+
&Map.has_key?(changeset.changes, &1)
203+
) do
204+
schedule_deployment_calculations(deployment)
205+
end
207206

208207
:ok
209208
end
@@ -334,15 +333,18 @@ defmodule NervesHub.Deployments do
334333
335334
Also clears any previous inflight checks for this deployment.
336335
"""
337-
def create_inflight_checks(deployment) do
338-
delete_inflight_checks(deployment)
339-
336+
def schedule_deployment_calculations(deployment) do
340337
query =
341338
Device
342339
|> select([d], %{
343-
deployment_id: ^deployment.id,
344-
device_id: d.id,
345-
inserted_at: ^DateTime.utc_now()
340+
worker: "NervesHub.Workers.DeviceCalculateDeployment",
341+
queue: "device_deployment_calculations",
342+
args:
343+
fragment(
344+
"json_build_object('device_id', ?, 'deployment_id', ?::integer)",
345+
d.id,
346+
^deployment.id
347+
)
346348
})
347349
|> where([d], not is_nil(d.connection_last_seen_at))
348350
|> where(
@@ -352,8 +354,9 @@ defmodule NervesHub.Deployments do
352354
)
353355
|> where([d], d.firmware_metadata["platform"] == ^deployment.firmware.platform)
354356
|> where([d], d.firmware_metadata["architecture"] == ^deployment.firmware.architecture)
357+
|> where([d], fragment("? <@ ?", ^deployment.conditions["tags"], d.tags))
355358

356-
Repo.insert_all(InflightDeploymentCheck, query)
359+
Repo.insert_all(Oban.Job, query)
357360
end
358361

359362
@doc """

lib/nerves_hub/deployments/calculator.ex

Lines changed: 0 additions & 144 deletions
This file was deleted.

lib/nerves_hub/deployments/monitor.ex

Lines changed: 2 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ defmodule NervesHub.Deployments.Monitor do
1010
alias NervesHub.DeploymentDynamicSupervisor
1111
alias NervesHub.Deployments
1212
alias NervesHub.Deployments.Orchestrator
13-
alias NervesHub.InflightDeploymentCheckDynamicSupervisor
1413
alias Phoenix.PubSub
1514
alias Phoenix.Socket.Broadcast
1615

@@ -37,13 +36,7 @@ defmodule NervesHub.Deployments.Monitor do
3736
{Deployments.Orchestrator, deployment}
3837
)
3938

40-
{:ok, calculator_pid} =
41-
DynamicSupervisor.start_child(
42-
InflightDeploymentCheckDynamicSupervisor,
43-
{Deployments.Calculator, deployment}
44-
)
45-
46-
{deployment.id, %{orchestrator_pid: orchestrator_pid, calculator_pid: calculator_pid}}
39+
{deployment.id, %{orchestrator_pid: orchestrator_pid}}
4740
end)
4841

4942
{:noreply, %{state | deployments: deployments}}
@@ -58,17 +51,8 @@ defmodule NervesHub.Deployments.Monitor do
5851
{Deployments.Orchestrator, deployment}
5952
)
6053

61-
{:ok, calculator_pid} =
62-
DynamicSupervisor.start_child(
63-
InflightDeploymentCheckDynamicSupervisor,
64-
{Deployments.Calculator, deployment}
65-
)
66-
6754
deployments =
68-
Map.put(state.deployments, deployment.id, %{
69-
orchestrator_pid: orchestrator_pid,
70-
calculator_pid: calculator_pid
71-
})
55+
Map.put(state.deployments, deployment.id, %{orchestrator_pid: orchestrator_pid})
7256

7357
{:noreply, %{state | deployments: deployments}}
7458
end

lib/nerves_hub/deployments/supervisor.ex

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,7 @@ defmodule NervesHub.Deployments.Supervisor do
1111
children = [
1212
{Registry, keys: :unique, name: NervesHub.Deployments},
1313
NervesHub.Deployments.Monitor,
14-
{DynamicSupervisor, strategy: :one_for_one, name: NervesHub.DeploymentDynamicSupervisor},
15-
{DynamicSupervisor,
16-
strategy: :one_for_one, name: NervesHub.InflightDeploymentCheckDynamicSupervisor}
14+
{DynamicSupervisor, strategy: :one_for_one, name: NervesHub.DeploymentDynamicSupervisor}
1715
]
1816

1917
Supervisor.init(children, strategy: :one_for_one)
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
defmodule NervesHub.Workers.DeviceCalculateDeployment do
2+
use Oban.Worker,
3+
queue: :device_deployment_calculations,
4+
max_attempts: 5
5+
6+
alias NervesHub.Devices.Device
7+
alias NervesHub.Deployments
8+
alias NervesHub.Deployments.Deployment
9+
alias NervesHub.Repo
10+
11+
@impl true
12+
def perform(%Oban.Job{args: %{"device_id" => device_id, "deployment_id" => deployment_id}}) do
13+
device = Repo.get!(Device, device_id)
14+
deployment = Deployments.get_deployment!(deployment_id) |> Deployment.with_firmware()
15+
16+
if !is_nil(device.deployment_id) && device.deployment_id != deployment.id do
17+
:ok
18+
else
19+
if deployment.is_active &&
20+
!is_nil(device.connection_last_seen_at) &&
21+
device.product_id == deployment.product_id &&
22+
device.firmware_metadata.platform == deployment.firmware.platform &&
23+
device.firmware_metadata.architecture == deployment.firmware.architecture &&
24+
Enum.all?(deployment.conditions["tags"], &Enum.member?(device.tags, &1)) &&
25+
Deployments.version_match?(device, deployment) do
26+
device
27+
|> Ecto.Changeset.change(%{deployment_id: deployment.id})
28+
|> Repo.update!()
29+
else
30+
device
31+
|> Ecto.Changeset.change(%{deployment_id: nil})
32+
|> Repo.update!()
33+
end
34+
35+
Phoenix.PubSub.broadcast(
36+
NervesHub.PubSub,
37+
"device:#{device.id}",
38+
%Phoenix.Socket.Broadcast{event: "devices/updated"}
39+
)
40+
end
41+
42+
:ok
43+
end
44+
end

0 commit comments

Comments
 (0)