Skip to content

Commit 61d3b8c

Browse files
authored
refactor(pubsub): replace custom Edgehog.PubSub with Ash.Notifier.PubSub (#1014)
- Removed the legacy Edgehog.PubSub module and the PublishNotification change. - Updated Deployment and OTAOperation resources to use Ash.Notifier.PubSub. - Added pub_sub blocks with proper transforms to map Ash actions to legacy event types. - Updated code subscribing to events to use Phoenix.PubSub directly. - Adjusted tests to match new notification structure. By using as built in notification system we will be able to integrate more features in the edgehog backend. Namely Ash GraphQL Subscriptions and a better notification system to campaigns, reacting only to relevant updates and discarding other events. Signed-off-by: ArnelaL <arnela.lisic@secomind.com>
1 parent 11541e2 commit 61d3b8c

File tree

12 files changed

+123
-238
lines changed

12 files changed

+123
-238
lines changed

backend/lib/edgehog/changes/publish_notification.ex

Lines changed: 0 additions & 53 deletions
This file was deleted.

backend/lib/edgehog/containers/deployment.ex

Lines changed: 52 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,9 @@ defmodule Edgehog.Containers.Deployment do
2222
@moduledoc false
2323
use Edgehog.MultitenantResource,
2424
domain: Edgehog.Containers,
25-
extensions: [AshGraphql.Resource]
25+
extensions: [AshGraphql.Resource],
26+
notifiers: [Ash.Notifier.PubSub]
2627

27-
alias Edgehog.Changes.PublishNotification
2828
alias Edgehog.Containers.Deployment.Calculations
2929
alias Edgehog.Containers.Deployment.Changes
3030
alias Edgehog.Containers.Deployment.Types.DeploymentState
@@ -66,7 +66,6 @@ defmodule Edgehog.Containers.Deployment do
6666
change manage_relationship(:device_id, :device, type: :append)
6767
change Changes.Relate
6868
change Changes.SendRequest
69-
change {PublishNotification, event_type: :deployment_created}
7069
end
7170

7271
create :just_create do
@@ -85,7 +84,6 @@ defmodule Edgehog.Containers.Deployment do
8584

8685
change manage_relationship(:device_id, :device, type: :append)
8786
change Changes.Relate
88-
change {PublishNotification, event_type: :deployment_created}
8987
end
9088

9189
if @testing do
@@ -157,7 +155,6 @@ defmodule Edgehog.Containers.Deployment do
157155
require_atomic? false
158156

159157
change Changes.SendDeploymentToDevice
160-
change {PublishNotification, event_type: :deployment_updated}
161158
end
162159

163160
update :upgrade_release do
@@ -177,35 +174,30 @@ defmodule Edgehog.Containers.Deployment do
177174
change set_attribute(:state, :sent)
178175

179176
require_atomic? false
180-
change {PublishNotification, event_type: :deployment_updated}
181177
end
182178

183179
update :mark_as_started do
184180
change set_attribute(:state, :started)
185181

186182
require_atomic? false
187-
change {PublishNotification, event_type: :deployment_updated}
188183
end
189184

190185
update :mark_as_starting do
191186
require_atomic? false
192187

193188
change Changes.MarkAsStarting
194-
change {PublishNotification, event_type: :deployment_updated}
195189
end
196190

197191
update :mark_as_stopped do
198192
change set_attribute(:state, :stopped)
199193

200194
require_atomic? false
201-
change {PublishNotification, event_type: :deployment_updated}
202195
end
203196

204197
update :mark_as_stopping do
205198
require_atomic? false
206199

207200
change Changes.MarkAsStopping
208-
change {PublishNotification, event_type: :deployment_updated}
209201
end
210202

211203
update :mark_as_errored do
@@ -217,22 +209,19 @@ defmodule Edgehog.Containers.Deployment do
217209
change set_attribute(:state, :error)
218210

219211
require_atomic? false
220-
change {PublishNotification, event_type: :deployment_error}
221212
end
222213

223214
update :mark_as_deleting do
224215
change set_attribute(:state, :deleting)
225216

226217
require_atomic? false
227-
change {PublishNotification, event_type: :deployment_updated}
228218
end
229219

230220
update :maybe_run_ready_actions do
231221
change Changes.MaybeRunReadyActions
232222
change Changes.MaybePublishDeploymentReady
233223

234224
require_atomic? false
235-
change {PublishNotification, event_type: :deployment_updated}
236225
end
237226

238227
read :filter_by_release do
@@ -294,6 +283,56 @@ defmodule Edgehog.Containers.Deployment do
294283
identity :release_instance, [:device_id, :release_id]
295284
end
296285

286+
pub_sub do
287+
prefix "deployments"
288+
module EdgehogWeb.Endpoint
289+
290+
publish :deploy, [[:id, "*"]]
291+
publish :just_create, [[:id, "*"]]
292+
293+
publish :mark_as_sent, [[:id, "*"]]
294+
publish :mark_as_started, [[:id, "*"]]
295+
publish :mark_as_starting, [[:id, "*"]]
296+
publish :mark_as_stopped, [[:id, "*"]]
297+
publish :mark_as_stopping, [[:id, "*"]]
298+
publish :mark_as_errored, [[:id, "*"]]
299+
publish :mark_as_deleting, [[:id, "*"]]
300+
publish :maybe_run_ready_actions, [[:id, "*"]]
301+
302+
transform fn notification ->
303+
deployment = notification.data
304+
action = notification.action.name
305+
306+
event_type =
307+
cond do
308+
Map.get(notification.metadata || %{}, :custom_event) == :deployment_ready ->
309+
:deployment_ready
310+
311+
action in [:deploy, :just_create] ->
312+
:deployment_created
313+
314+
action in [
315+
:mark_as_sent,
316+
:mark_as_started,
317+
:mark_as_starting,
318+
:mark_as_stopped,
319+
:mark_as_stopping,
320+
:mark_as_deleting,
321+
:maybe_run_ready_actions
322+
] ->
323+
:deployment_updated
324+
325+
action == :mark_as_errored ->
326+
:deployment_error
327+
328+
true ->
329+
:unknown_event
330+
end
331+
332+
{event_type, deployment}
333+
end
334+
end
335+
297336
postgres do
298337
table "application_deployments"
299338

backend/lib/edgehog/containers/deployment/changes/maybe_publish_deployment_ready.ex

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,17 +24,22 @@ defmodule Edgehog.Containers.Deployment.Changes.MaybePublishDeploymentReady do
2424
"""
2525
use Ash.Resource.Change
2626

27-
alias Edgehog.PubSub
28-
2927
@impl Ash.Resource.Change
3028
def change(changeset, _opts, _context) do
3129
changeset
3230
|> Ash.Changeset.load(:is_ready)
3331
|> Ash.Changeset.after_transaction(fn _changeset, transaction_result ->
3432
with {:ok, deployment} <- transaction_result,
3533
{:ok, deployment} <- Ash.load(deployment, :is_ready) do
36-
if deployment.is_ready,
37-
do: PubSub.publish!(:deployment_ready, deployment)
34+
if deployment.is_ready do
35+
Ash.Notifier.notify(%Ash.Notifier.Notification{
36+
data: deployment,
37+
for: [Ash.Notifier.PubSub],
38+
action: %{type: :update, name: :maybe_run_ready_actions},
39+
resource: Edgehog.Containers.Deployment,
40+
metadata: %{custom_event: :deployment_ready}
41+
})
42+
end
3843

3944
{:ok, deployment}
4045
end

backend/lib/edgehog/deployment_campaigns/deployment_mechanism/lazy/core.ex

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ defmodule Edgehog.DeploymentCampaigns.DeploymentMechanism.Lazy.Core do
2727
alias Edgehog.DeploymentCampaigns.DeploymentCampaign
2828
alias Edgehog.DeploymentCampaigns.DeploymentTarget
2929
alias Edgehog.Error.AstarteAPIError
30-
alias Edgehog.PubSub
3130

3231
@doc """
3332
Fetches a deployment campaign by its ID and tenant ID, raising an error if not found.
@@ -247,7 +246,11 @@ defmodule Edgehog.DeploymentCampaigns.DeploymentMechanism.Lazy.Core do
247246
- :ok if the subscription is successful, otherwise raises an error.
248247
"""
249248
def subscribe_to_deployment_updates!(deployment_id) do
250-
with {:error, reason} <- PubSub.subscribe_to_events_for({:deployment, deployment_id}) do
249+
with {:error, reason} <-
250+
Phoenix.PubSub.subscribe(
251+
Edgehog.PubSub,
252+
"deployments:#{deployment_id}"
253+
) do
251254
raise reason
252255
end
253256
end
@@ -262,7 +265,10 @@ defmodule Edgehog.DeploymentCampaigns.DeploymentMechanism.Lazy.Core do
262265
- :ok if the subscription is successful, otherwise raises an error.
263266
"""
264267
def unsubscribe_to_deployment_updates!(deployment_id) do
265-
PubSub.unsubscribe_to_events_for({:deployment, deployment_id})
268+
Phoenix.PubSub.unsubscribe(
269+
Edgehog.PubSub,
270+
"deployments:#{deployment_id}"
271+
)
266272
end
267273

268274
@doc """

backend/lib/edgehog/deployment_campaigns/deployment_mechanism/lazy/executor.ex

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -449,7 +449,7 @@ defmodule Edgehog.DeploymentCampaigns.DeploymentMechanism.Lazy.Executor do
449449
# events enqueued with the :next_event action. This means that we can be sure an :info event
450450
# or a timeout won't be handled, e.g., between a rollout and the handling of its error
451451

452-
def handle_event(:info, {:deployment_ready, deployment}, _state, data) do
452+
def handle_event(:info, %{payload: {:deployment_ready, deployment}}, _state, data) do
453453
# We always cancel the retry timeout for every kind of update we see on an Deployment.
454454
# This ensures we don't resend the request even if we accidentally miss the acknowledge.
455455
# If the timeout does not exist, this is a no-op anyway.
@@ -464,11 +464,11 @@ defmodule Edgehog.DeploymentCampaigns.DeploymentMechanism.Lazy.Executor do
464464
end
465465

466466
# Ignore deployment_updated events
467-
def handle_event(:info, {:deployment_updated, _deployment}, _state, _data) do
467+
def handle_event(:info, %{payload: {:deployment_updated, _deployment}}, _state, _data) do
468468
:keep_state_and_data
469469
end
470470

471-
def handle_event(:info, {:deployment_error, deployment}, _state, data) do
471+
def handle_event(:info, %{payload: {:deployment_error, deployment}}, _state, data) do
472472
# We always cancel the retry timeout for every kind of update we see on an Deployment.
473473
# This ensures we don't resend the request even if we accidentally miss the acknowledge.
474474
# If the timeout does not exist, this is a no-op anyway.

backend/lib/edgehog/os_management/ota_operation/ota_operation.ex

Lines changed: 35 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,9 @@ defmodule Edgehog.OSManagement.OTAOperation do
2424
domain: Edgehog.OSManagement,
2525
extensions: [
2626
AshGraphql.Resource
27-
]
27+
],
28+
notifiers: [Ash.Notifier.PubSub]
2829

29-
alias Edgehog.Changes.PublishNotification
3030
alias Edgehog.OSManagement.OTAOperation.Changes
3131
alias Edgehog.OSManagement.OTAOperation.ManualActions
3232
alias Edgehog.OSManagement.OTAOperation.Status
@@ -67,7 +67,6 @@ defmodule Edgehog.OSManagement.OTAOperation do
6767
accept [:base_image_url, :device_id]
6868

6969
change Changes.SendUpdateRequest
70-
change {PublishNotification, event_type: :ota_operation_created}
7170
end
7271

7372
create :manual do
@@ -95,7 +94,6 @@ defmodule Edgehog.OSManagement.OTAOperation do
9594
change set_attribute(:manual?, true)
9695
change Changes.HandleEphemeralImageUpload
9796
change Changes.SendUpdateRequest
98-
change {PublishNotification, event_type: :ota_operation_created}
9997
end
10098

10199
destroy :destroy do
@@ -107,12 +105,11 @@ defmodule Edgehog.OSManagement.OTAOperation do
107105
end
108106

109107
update :mark_as_timed_out do
110-
# Needed because PublishNotification and HandleEphemeralImageDeletion are not atomic
108+
# Needed because HandleEphemeralImageDeletion are not atomic
111109
require_atomic? false
112110

113111
change set_attribute(:status, :failure)
114112
change set_attribute(:status_code, :request_timeout)
115-
change {PublishNotification, event_type: :ota_operation_updated}
116113

117114
change Changes.HandleEphemeralImageDeletion do
118115
where attribute_equals(:manual?, true)
@@ -126,11 +123,9 @@ defmodule Edgehog.OSManagement.OTAOperation do
126123
update :update_status do
127124
accept [:status, :status_progress, :status_code, :message]
128125

129-
# Needed because PublishNotification and HandleEphemeralImageDeletion are not atomic
126+
# Needed because and HandleEphemeralImageDeletion are not atomic
130127
require_atomic? false
131128

132-
change {PublishNotification, event_type: :ota_operation_updated}
133-
134129
change Changes.HandleEphemeralImageDeletion do
135130
where [attribute_equals(:manual?, true), attribute_in(:status, @terminal_statuses)]
136131
end
@@ -222,7 +217,37 @@ defmodule Edgehog.OSManagement.OTAOperation do
222217
calculate :finished?, :boolean, expr(status == :success or status == :failure)
223218
end
224219

225-
# TODO: notifiers to replace the old PubSub
220+
pub_sub do
221+
prefix "ota_operations"
222+
module EdgehogWeb.Endpoint
223+
224+
publish :create_managed, [[:id, "*"]]
225+
publish :manual, [[:id, "*"]]
226+
publish :mark_as_timed_out, [[:id, "*"]]
227+
publish :update_status, [[:id, "*"]]
228+
229+
transform fn notification ->
230+
ota_operation = notification.data
231+
action = notification.action.name
232+
233+
event_type =
234+
cond do
235+
action in [:create_managed, :manual] ->
236+
:ota_operation_created
237+
238+
action in [
239+
:mark_as_timed_out,
240+
:update_status
241+
] ->
242+
:ota_operation_updated
243+
244+
true ->
245+
:unknown_event
246+
end
247+
248+
{event_type, ota_operation}
249+
end
250+
end
226251

227252
postgres do
228253
table "ota_operations"

0 commit comments

Comments
 (0)