@@ -12,6 +12,8 @@ defmodule NervesHub.Deployments.Orchestrator do
1212
1313 require Logger
1414
15+ require OpenTelemetry.Tracer , as: Tracer
16+
1517 alias NervesHub.Devices
1618 alias NervesHub.Devices.Device
1719 alias NervesHub.Repo
@@ -49,57 +51,59 @@ defmodule NervesHub.Deployments.Orchestrator do
4951 was successful, and the process is repeated.
5052 """
5153 def trigger_update ( deployment ) do
52- :telemetry . execute ( [ :nerves_hub , :deployment , :trigger_update ] , % { count: 1 } )
53-
54- match_conditions = [
55- { :and , { :== , { :map_get , :deployment_id , :"$1" } , deployment . id } ,
56- { :== , { :map_get , :updating , :"$1" } , false } ,
57- { :== , { :map_get , :updates_enabled , :"$1" } , true } ,
58- { :"/=" , { :map_get , :firmware_uuid , :"$1" } , deployment . firmware . uuid } }
59- ]
60-
61- match_return = % {
62- device_id: { :element , 1 , :"$_" } ,
63- pid: { :element , 1 , { :element , 2 , :"$_" } } ,
64- firmware_uuid: { :map_get , :firmware_uuid , { :element , 2 , { :element , 2 , :"$_" } } }
65- }
66-
67- devices =
68- Registry . select ( NervesHub.Devices.Registry , [
69- { { :_ , :_ , :"$1" } , match_conditions , [ match_return ] }
70- ] )
71-
72- # Get a rough count of devices to update
73- count = deployment . concurrent_updates - Devices . count_inflight_updates_for ( deployment )
74- # Just in case inflight goes higher than concurrent, limit it to 0
75- count = max ( count , 0 )
76-
77- # use a reduce to bounce out early?
78- # limit the number of devices to 5 minutes / 500ms?
79-
80- devices
81- |> Enum . take ( count )
82- |> Enum . each ( fn % { device_id: device_id , pid: pid } ->
83- :telemetry . execute ( [ :nerves_hub , :deployment , :trigger_update , :device ] , % { count: 1 } )
84-
85- device = % Device { id: device_id }
86-
87- # Check again because other nodes are processing at the same time
88- if Devices . count_inflight_updates_for ( deployment ) < deployment . concurrent_updates do
89- case Devices . told_to_update ( device , deployment ) do
90- { :ok , inflight_update } ->
91- send ( pid , { "deployments/update" , inflight_update } )
92-
93- :error ->
94- Logger . error (
95- "An inflight update could not be created or found for the device #{ device . identifier } (#{ device . id } )"
96- )
54+ Tracer . with_span "NervesHub.Deployments.Orchestrator.trigger_update" do
55+ :telemetry . execute ( [ :nerves_hub , :deployment , :trigger_update ] , % { count: 1 } )
56+
57+ match_conditions = [
58+ { :and , { :== , { :map_get , :deployment_id , :"$1" } , deployment . id } ,
59+ { :== , { :map_get , :updating , :"$1" } , false } ,
60+ { :== , { :map_get , :updates_enabled , :"$1" } , true } ,
61+ { :"/=" , { :map_get , :firmware_uuid , :"$1" } , deployment . firmware . uuid } }
62+ ]
63+
64+ match_return = % {
65+ device_id: { :element , 1 , :"$_" } ,
66+ pid: { :element , 1 , { :element , 2 , :"$_" } } ,
67+ firmware_uuid: { :map_get , :firmware_uuid , { :element , 2 , { :element , 2 , :"$_" } } }
68+ }
69+
70+ devices =
71+ Registry . select ( NervesHub.Devices.Registry , [
72+ { { :_ , :_ , :"$1" } , match_conditions , [ match_return ] }
73+ ] )
74+
75+ # Get a rough count of devices to update
76+ count = deployment . concurrent_updates - Devices . count_inflight_updates_for ( deployment )
77+ # Just in case inflight goes higher than concurrent, limit it to 0
78+ count = max ( count , 0 )
79+
80+ # use a reduce to bounce out early?
81+ # limit the number of devices to 5 minutes / 500ms?
82+
83+ devices
84+ |> Enum . take ( count )
85+ |> Enum . each ( fn % { device_id: device_id , pid: pid } ->
86+ :telemetry . execute ( [ :nerves_hub , :deployment , :trigger_update , :device ] , % { count: 1 } )
87+
88+ device = % Device { id: device_id }
89+
90+ # Check again because other nodes are processing at the same time
91+ if Devices . count_inflight_updates_for ( deployment ) < deployment . concurrent_updates do
92+ case Devices . told_to_update ( device , deployment ) do
93+ { :ok , inflight_update } ->
94+ send ( pid , { "deployments/update" , inflight_update } )
95+
96+ :error ->
97+ Logger . error (
98+ "An inflight update could not be created or found for the device #{ device . identifier } (#{ device . id } )"
99+ )
100+ end
97101 end
98- end
99102
100- # Slow the update a bit to allow for concurrent nodes
101- Process . sleep ( 500 )
102- end )
103+ # Slow the update a bit to allow for concurrent nodes
104+ Process . sleep ( 500 )
105+ end )
106+ end
103107 end
104108
105109 def init ( deployment ) do
0 commit comments