@@ -2,6 +2,19 @@ defmodule HashRing.Worker do
22 @ moduledoc false
33 use GenServer
44
5+ @ erpc_timeout 500
6+ @ node_readiness_check_interval :timer . seconds ( 1 )
7+
8+ defstruct [
9+ :table ,
10+ :node_blacklist ,
11+ :node_whitelist ,
12+ :wait_for_readiness ,
13+ :readiness_deps_set
14+ ]
15+
16+ alias __MODULE__ , as: State
17+
518 def nodes ( pid_or_name )
619
720 def nodes ( pid ) when is_pid ( pid ) do
@@ -14,7 +27,7 @@ defmodule HashRing.Worker do
1427 |> get_ring ( )
1528 |> HashRing . nodes ( )
1629 rescue
17- ArgumentError ->
30+ ArgumentError ->
1831 { :error , :no_such_ring }
1932 end
2033
@@ -69,55 +82,135 @@ defmodule HashRing.Worker do
6982 nodes = [ Node . self ( ) | Node . list ( :connected ) ]
7083 node_blacklist = Keyword . get ( options , :node_blacklist , [ ~r/ ^remsh.*$/ , ~r/ ^rem-.*$/ ] )
7184 node_whitelist = Keyword . get ( options , :node_whitelist , [ ] )
85+ wait_for_readiness = Keyword . get ( options , :wait_for_readiness , false )
86+ readiness_deps_set = Keyword . get ( options , :readiness_deps , [ ] ) |> MapSet . new ( )
7287
7388 ring =
7489 Enum . reduce ( nodes , ring , fn node , acc ->
75- cond do
76- HashRing.Utils . ignore_node? ( node , node_blacklist , node_whitelist ) ->
77- acc
78-
79- :else ->
90+ if HashRing.Utils . ignore_node? ( node , node_blacklist , node_whitelist ) do
91+ acc
92+ else
93+ if wait_for_readiness do
94+ if node_ready? ( node , readiness_deps_set ) do
95+ HashRing . add_node ( acc , node )
96+ else
97+ schedule_check_for_node_readiness ( node )
98+ acc
99+ end
100+ else
80101 HashRing . add_node ( acc , node )
102+ end
81103 end
82104 end )
83105
84106 node_type = Keyword . get ( options , :node_type , :all )
85107 :ok = :net_kernel . monitor_nodes ( true , node_type: node_type )
86108 true = :ets . insert_new ( table , { :ring , ring } )
87- { :ok , { table , node_blacklist , node_whitelist } }
109+
110+ { :ok ,
111+ % State {
112+ table: table ,
113+ node_blacklist: node_blacklist ,
114+ node_whitelist: node_whitelist ,
115+ wait_for_readiness: wait_for_readiness ,
116+ readiness_deps_set: readiness_deps_set
117+ } }
88118
89119 :else ->
90120 nodes = Keyword . get ( options , :nodes , [ ] )
91121 ring = HashRing . add_nodes ( ring , nodes )
92122 true = :ets . insert_new ( table , { :ring , ring } )
93- { :ok , { table , [ ] , [ ] } }
123+
124+ { :ok ,
125+ % State {
126+ table: table ,
127+ node_blacklist: [ ] ,
128+ node_whitelist: [ ] ,
129+ wait_for_readiness: false ,
130+ readiness_deps_set: MapSet . new ( )
131+ } }
94132 end
95133 end
96134
97- def handle_call ( :list_nodes , _from , { table , _b , _w } = state ) do
135+ def handle_call ( :list_nodes , _from , % State { table: table } = state ) do
98136 { :reply , HashRing . nodes ( get_ring ( table ) ) , state }
99137 end
100138
101- def handle_call ( { :key_to_node , key } , _from , { table , _b , _w } = state ) do
139+ def handle_call ( { :key_to_node , key } , _from , % State { table: table } = state ) do
102140 { :reply , HashRing . key_to_node ( get_ring ( table ) , key ) , state }
103141 end
104142
105- def handle_call ( { :add_node , node } , _from , { table , _b , _w } = state ) do
106- get_ring ( table ) |> HashRing . add_node ( node ) |> update_ring ( table )
143+ def handle_call (
144+ { :add_node , node } ,
145+ _from ,
146+ % State {
147+ table: table ,
148+ wait_for_readiness: wait_for_readiness ,
149+ readiness_deps_set: readiness_deps_set
150+ } = state
151+ ) do
152+ if wait_for_readiness and not node_ready? ( node , readiness_deps_set ) do
153+ schedule_check_for_node_readiness ( node )
154+ else
155+ get_ring ( table ) |> HashRing . add_node ( node ) |> update_ring ( table )
156+ end
157+
107158 { :reply , :ok , state }
108159 end
109160
110- def handle_call ( { :add_node , node , weight } , _from , { table , _b , _w } = state ) do
111- get_ring ( table ) |> HashRing . add_node ( node , weight ) |> update_ring ( table )
161+ def handle_call (
162+ { :add_node , node , weight } ,
163+ _from ,
164+ % State {
165+ table: table ,
166+ wait_for_readiness: wait_for_readiness ,
167+ readiness_deps_set: readiness_deps_set
168+ } = state
169+ ) do
170+ if wait_for_readiness and not node_ready? ( node , readiness_deps_set ) do
171+ schedule_check_for_node_readiness ( { node , weight } )
172+ else
173+ get_ring ( table ) |> HashRing . add_node ( node , weight ) |> update_ring ( table )
174+ end
175+
112176 { :reply , :ok , state }
113177 end
114178
115- def handle_call ( { :add_nodes , nodes } , _from , { table , _b , _w } = state ) do
116- get_ring ( table ) |> HashRing . add_nodes ( nodes ) |> update_ring ( table )
179+ def handle_call (
180+ { :add_nodes , nodes } ,
181+ _from ,
182+ % State {
183+ table: table ,
184+ wait_for_readiness: wait_for_readiness ,
185+ readiness_deps_set: readiness_deps_set
186+ } = state
187+ ) do
188+ if wait_for_readiness do
189+ % { true: ready_nodes , false: starting_nodes } =
190+ Enum . group_by (
191+ nodes ,
192+ fn
193+ { node , _weight } ->
194+ node_ready? ( node , readiness_deps_set )
195+
196+ node ->
197+ node_ready? ( node , readiness_deps_set )
198+ end
199+ )
200+
201+ get_ring ( table ) |> HashRing . add_nodes ( ready_nodes ) |> update_ring ( table )
202+
203+ for starting_node <- starting_nodes do
204+ schedule_check_for_node_readiness ( starting_node )
205+ end
206+ else
207+ get_ring ( table ) |> HashRing . add_nodes ( nodes ) |> update_ring ( table )
208+ end
209+
117210 { :reply , :ok , state }
118211 end
119212
120- def handle_call ( { :remove_node , node } , _from , { table , _b , _w } = state ) do
213+ def handle_call ( { :remove_node , node } , _from , % State { table: table } = state ) do
121214 get_ring ( table ) |> HashRing . remove_node ( node ) |> update_ring ( table )
122215 { :reply , :ok , state }
123216 end
@@ -127,19 +220,62 @@ defmodule HashRing.Worker do
127220 { :stop , :shutdown , state }
128221 end
129222
130- def handle_info ( { :nodeup , node , _info } , { table , b , w } = state ) do
223+ def handle_info (
224+ { :nodeup , node , _info } ,
225+ % State {
226+ table: table ,
227+ node_blacklist: b ,
228+ node_whitelist: w ,
229+ wait_for_readiness: wait_for_readiness ,
230+ readiness_deps_set: readiness_deps_set
231+ } = state
232+ ) do
131233 unless HashRing.Utils . ignore_node? ( node , b , w ) do
132- get_ring ( table ) |> HashRing . add_node ( node ) |> update_ring ( table )
234+ if wait_for_readiness and not node_ready? ( node , readiness_deps_set ) do
235+ schedule_check_for_node_readiness ( node )
236+ else
237+ get_ring ( table ) |> HashRing . add_node ( node ) |> update_ring ( table )
238+ end
133239 end
134240
135241 { :noreply , state }
136242 end
137243
138- def handle_info ( { :nodedown , node , _info } , state = { table , _b , _w } ) do
244+ def handle_info ( { :nodedown , node , _info } , % State { table: table } = state ) do
139245 get_ring ( table ) |> HashRing . remove_node ( node ) |> update_ring ( table )
140246 { :noreply , state }
141247 end
142248
249+ def handle_info (
250+ { :check_node_readiness , node , weight } ,
251+ % State { table: table , readiness_deps_set: readiness_deps_set } = state
252+ ) do
253+ if node_ready? ( node , readiness_deps_set ) do
254+ get_ring ( table ) |> HashRing . add_node ( node , weight ) |> update_ring ( table )
255+ else
256+ schedule_check_for_node_readiness ( { node , weight } )
257+ end
258+
259+ { :noreply , state }
260+ end
261+
262+ def handle_info (
263+ { :check_node_readiness , node } ,
264+ % State { table: table , readiness_deps_set: readiness_deps_set } = state
265+ ) do
266+ if node_ready? ( node , readiness_deps_set ) do
267+ get_ring ( table ) |> HashRing . add_node ( node ) |> update_ring ( table )
268+ else
269+ schedule_check_for_node_readiness ( node )
270+ end
271+
272+ { :noreply , state }
273+ end
274+
275+ def handle_info ( _msg , state ) do
276+ { :noreply , state }
277+ end
278+
143279 defp get_ets_name ( name ) , do: :"libring_#{ name } "
144280
145281 defp do_call ( pid_or_name , msg )
@@ -160,6 +296,33 @@ defmodule HashRing.Worker do
160296
161297 defp get_ring ( table ) , do: :ets . lookup_element ( table , :ring , 2 )
162298
163- defp update_ring ( ring , table ) ,
299+ defp update_ring ( ring , table ) ,
164300 do: :ets . update_element ( table , :ring , { 2 , ring } )
301+
302+ defp get_started_apps_set ( node ) do
303+ try do
304+ :erpc . call ( node , Application , :started_applications , [ ] , @ erpc_timeout )
305+ |> Enum . map ( & elem ( & 1 , 0 ) )
306+ |> MapSet . new ( )
307+ rescue
308+ _e -> MapSet . new ( )
309+ end
310+ end
311+
312+ defp node_ready? ( node , readiness_deps_set ) do
313+ MapSet . difference ( readiness_deps_set , get_started_apps_set ( node ) )
314+ |> MapSet . equal? ( MapSet . new ( ) )
315+ end
316+
317+ defp schedule_check_for_node_readiness ( { node , weight } ) do
318+ if node in Node . list ( ) do
319+ :timer . send_after ( @ node_readiness_check_interval , { :check_node_readiness , node , weight } )
320+ end
321+ end
322+
323+ defp schedule_check_for_node_readiness ( node ) do
324+ if node in Node . list ( ) do
325+ :timer . send_after ( @ node_readiness_check_interval , { :check_node_readiness , node } )
326+ end
327+ end
165328end
0 commit comments