-
Notifications
You must be signed in to change notification settings - Fork 29
Expand file tree
/
Copy pathmanaged_ring.ex
More file actions
300 lines (246 loc) · 10.2 KB
/
managed_ring.ex
File metadata and controls
300 lines (246 loc) · 10.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
defmodule HashRing.Managed do
@moduledoc """
This module defines the API for working with hash rings where the ring state is managed
in a GenServer process.
There is a performance penalty with working with the ring this way, but it is the best approach
if you need to share the ring across multiple processes, or need to maintain multiple rings.
If your rings map 1:1 with Erlang node membership, you can configure rings to automatically
monitor node up/down events and update the hash ring accordingly, with a default weight,
and either whitelist or blacklist nodes from the ring. You configure this at the ring level in your `config.exs`.
Each ring is configured in `config.exs`, and can contain a list of nodes to seed the ring with,
and you can then dynamically add/remove nodes to the ring using the API here. Each node on the ring can
be configured with a weight, which affects the amount of the total keyspace it owns. The default weight
is `128`. It's best to base the weight of nodes on some concrete relative value, such as the amount of
memory a node has.
"""
@type ring :: pid() | atom()
@type key :: any()
@type weight :: pos_integer
@type node_list :: [term() | {term(), weight}]
@type pattern_list :: [String.t() | Regex.t()]
@type app_list :: [atom()]
@type ring_options :: [
nodes: node_list,
monitor_nodes: boolean,
node_blacklist: pattern_list,
node_whitelist: pattern_list,
node_type: :all | :hidden | :visible,
wait_for_readiness: boolean,
readiness_deps: app_list
]
@type child_spec_options :: [
:id => atom() | term(),
:start => {module, function_name :: atom, args :: [term]},
optional(:restart) => restart,
optional(:shutdown) => shutdown,
optional(:type) => type,
optional(:modules) => [module] | :dynamic,
optional(:significant) => boolean,
:nodes => node_list,
:monitor_nodes => boolean,
:node_blacklist => pattern_list,
:node_whitelist => pattern_list,
:node_type => :all | :hidden | :visible,
:wait_for_readiness => boolean,
:readiness_deps: app_list,
]
@valid_ring_opts [
:name,
:nodes,
:monitor_nodes,
:node_blacklist,
:node_whitelist,
:node_type,
:wait_for_readiness,
:readiness_deps
]
@spec child_spec(child_spec_options) :: Supervisor.child_spec
def child_spec(opts) do
opts = Keyword.put_new(opts, :name, :hash_ring_manager)
Keyword.merge(%{
id: opts[:id] || opts[:name],
type: :worker,
restart: :permanent,
start: {__MODULE__, :run, [opts[:name], Keyword.take(opts, @valid_ring_opts)]}
}, Keyword.drop(opts, @valid_ring_opts))
end
@doc """
Creates a new stateful hash ring with the given name.
This name is how you will refer to the hash ring via other API calls.
It takes an optional set of options which control how the ring behaves.
Valid options are as follows:
* `nodes: list` - a list of nodes to initialize the ring.
* `monitor_nodes: boolean` - will automatically monitor Erlang node membership,
if new nodes are connected or nodes are disconnected, the ring will be updated automatically.
In this configuration, nodes cannot be added or removed via the API. Those requests will be ignored.
* `node_blacklist: [String.t | Regex.t]` - Used in conjunction with `monitor_nodes: true`, this
is a list of patterns, either as literal strings, or as regex patterns (in either string or literal form),
and will be used to ignore nodeup/down events for nodes which are blacklisted. If a node whitelist
is provided, the blacklist has no effect.
* `node_whitelist: [String.t | Regex.t]` - The same as `node_blacklist`, except the opposite; only nodes
which match a pattern in the whitelist will result in the ring being updated.
* `wait_for_readiness: boolean` - Wait for apps listed in `readiness_deps` to start before adding to the ring.
* `readiness_deps: [atom]` - List of dependency apps that need to start before the node is considered ready.
* `node_type: :all | :hidden | :visible`: refers what kind of nodes will be monitored
when `monitor_nodes` is `true`. For more information, see `:net_kernel.monitor_nodes/2`.
An error is returned if the ring already exists or if bad ring options are provided.
## Examples
iex> {:ok, _pid} = HashRing.Managed.new(:test1, [nodes: ["a", {"b", 64}]])
...> HashRing.Managed.key_to_node(:test1, :foo)
"b"
iex> {:ok, pid} = HashRing.Managed.new(:test2)
...> {:error, {:already_started, existing_pid}} = HashRing.Managed.new(:test2)
...> pid == existing_pid
true
iex> HashRing.Managed.new(:test3, [nodes: "a"])
** (ArgumentError) {:nodes, "a"} is an invalid option for `HashRing.Managed.new/2`
"""
@spec new(ring) :: {:ok, pid} | {:error, {:already_started, pid}}
@spec new(ring, ring_options) ::
{:ok, pid} | {:error, {:already_started, pid}} | {:error, {:invalid_option, term}}
def new(name, ring_options \\ []) when is_list(ring_options) do
opts = [{:name, name} | ring_options]
invalid =
Enum.find(opts, fn
{key, value} when key in @valid_ring_opts ->
case key do
:name when is_atom(value) -> false
:nodes when is_list(value) -> Keyword.keyword?(value)
:monitor_nodes when is_boolean(value) -> false
:node_blacklist when is_list(value) -> false
:node_whitelist when is_list(value) -> false
:node_type when value in [:all, :hidden, :visible] -> false
:wait_for_readiness when is_boolean(value) -> false
:readiness_deps when is_list(value) -> false
_ -> true
end
end)
case invalid do
nil ->
case Process.whereis(:"libring_#{name}") do
nil ->
DynamicSupervisor.start_child(HashRing.Supervisor, {HashRing.Worker, opts})
pid ->
{:error, {:already_started, pid}}
end
_ ->
raise ArgumentError,
message: "#{inspect(invalid)} is an invalid option for `HashRing.Managed.new/2`"
end
end
@doc """
Same as `HashRing.nodes/1`, returns a list of nodes on the ring.
## Examples
iex> {:ok, _pid} = HashRing.Managed.new(:nodes_test)
...> HashRing.Managed.add_nodes(:nodes_test, [:a, :b])
...> HashRing.Managed.nodes(:nodes_test)
[:b, :a]
"""
@spec nodes(ring) :: [term()]
def nodes(ring) do
HashRing.Worker.nodes(ring)
end
@doc """
Adds a node to the given hash ring.
An error is returned if the ring does not exist, or the node already exists in the ring.
## Examples
iex> {:ok, _pid} = HashRing.Managed.new(:test4)
...> HashRing.Managed.add_node(:test4, "a")
...> HashRing.Managed.key_to_node(:test4, :foo)
"a"
iex> HashRing.Managed.add_node(:no_exist, "a")
{:error, :no_such_ring}
"""
@spec add_node(ring, key) :: :ok | {:error, :no_such_ring}
def add_node(ring, node) do
HashRing.Worker.add_node(ring, node)
end
@doc """
Same as `add_node/2`, but takes a weight value.
The weight controls the relative presence this node will have on the ring,
the default is `128`, but it's best to give each node a weight value which maps
to a concrete resource such as memory or priority. It's not ideal to have a number
which is too high, as it will make the ring data structure larger, but a good value
is probably in the range of 64-256.
## Examples
iex> {:ok, _pid} = HashRing.Managed.new(:test5)
...> HashRing.Managed.add_node(:test5, "a", 64)
...> HashRing.Managed.key_to_node(:test5, :foo)
"a"
iex> HashRing.Managed.add_node(:no_exist, "a")
{:error, :no_such_ring}
"""
@spec add_node(ring, key, weight) ::
:ok
| {:error, :no_such_ring}
| {:error, {:invalid_weight, key, term}}
def add_node(ring, node, weight)
when is_integer(weight) and weight > 0 do
HashRing.Worker.add_node(ring, node, weight)
end
def add_node(_ring, node, weight) do
{:error, {:invalid_weight, node, weight}}
end
@doc """
Adds a list of nodes to the ring.
The list of nodes can contain either node names or `{node_name, weight}`
tuples. If there is an error with any of the node weights, an error will
be returned, and the ring will remain unchanged.
## Examples
iex> {:ok, _pid} = HashRing.Managed.new(:test6)
...> :ok = HashRing.Managed.add_nodes(:test6, ["a", {"b", 64}])
...> HashRing.Managed.key_to_node(:test6, :foo)
"b"
iex> {:ok, _pid} = HashRing.Managed.new(:test7)
...> HashRing.Managed.add_nodes(:test7, ["a", {"b", :wrong}])
{:error, [{:invalid_weight, "b", :wrong}]}
"""
@spec add_nodes(ring, node_list) ::
:ok
| {:error, :no_such_ring}
| {:error, [{:invalid_weight, key, term}]}
def add_nodes(ring, nodes) when is_list(nodes) do
invalid =
Enum.filter(nodes, fn
{_node, weight} when is_integer(weight) and weight > 0 ->
false
{_node, _weight} ->
true
node when is_binary(node) or is_atom(node) ->
false
_node ->
true
end)
case invalid do
[] ->
HashRing.Worker.add_nodes(ring, nodes)
_ ->
{:error, Enum.map(invalid, fn {k, v} -> {:invalid_weight, k, v} end)}
end
end
@doc """
Removes a node from the given hash ring.
An error is returned if the ring does not exist.
## Examples
iex> {:ok, _pid} = HashRing.Managed.new(:test8)
...> :ok = HashRing.Managed.add_nodes(:test8, ["a", {"b", 64}])
...> :ok = HashRing.Managed.remove_node(:test8, "b")
...> HashRing.Managed.key_to_node(:test8, :foo)
"a"
"""
@spec remove_node(ring, key) :: :ok | {:error, :no_such_ring}
def remove_node(ring, node) do
HashRing.Worker.remove_node(ring, node)
end
@doc """
Maps a key to a node on the hash ring.
An error is returned if the ring does not exist.
"""
@spec key_to_node(ring, any()) ::
key
| {:error, :no_such_ring}
| {:error, {:invalid_ring, :no_nodes}}
def key_to_node(ring, key) do
HashRing.Worker.key_to_node(ring, key)
end
end