Skip to content

Commit 3829cfe

Browse files
authored
Merge pull request #6 from martinsumner/mas-2.2.5-dscpworkerpool
Mas 2.2.5 dscpworkerpool
2 parents 94d9d6d + c9305a2 commit 3829cfe

12 files changed

+776
-211
lines changed

docs/node_worker_pool.md

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
# Node Worker Pool
2+
3+
4+
## Background
5+
6+
Riak Core has a pool of workers started by riak_core_vnode, based on configuration returned from the application vnode init code:
7+
8+
https://github.com/basho/riak_core/blob/2.1.9/src/riak_core_vnode.erl#L223-#L243
9+
10+
This provides a pool of workers per-vnode, for running async tasks. Work is redirected to the runners by returning {async, Work, From, State} from a `Mod:handle_coverage/4` or `Mod:handle_command/3` request.
11+
12+
https://github.com/basho/riak_core/blob/2.1.9/src/riak_core_vnode.erl#L358-L362
13+
14+
https://github.com/basho/riak_core/blob/2.1.9/src/riak_core_vnode.erl#L394-L398
15+
16+
By default in riak, the vnode `vnode_worker_pool_size` is set to 10. On an average riak cluster there is typically 8 to 15 vnodes per physical node, meaning there is o(100) vnode_workers that can be launched concurrently from these pools. This means that it is easy to pass enough async work to the vnode pool to overwhelm resources in the cluster. This may still be true even if the `vnode_worker_pool_size` is reduced to 1, and at these lower numbers multiple queries could become queued behind one long-running request.
17+
18+
19+
## Feature Overview
20+
21+
The purpose of node worker pool feature is to provide pool of workers that are shared amongst all vnodes on the node, and allow for potentially complex or expensive queries to be re-directed to node-wide pools rather than the vnode pool. The node-wide pools allow for tighter management of resource contention, in that the pool sizes can be smaller than the count of vnodes on the node.
22+
23+
For some application it may be sufficient to have two pools available from each vnode - the `vnode_worker_pool` and the `node_worker_pool`. However, some implementations may wish to have a more complete model of queues for [differentiated services](https://en.wikipedia.org/wiki/Differentiated_services) with:
24+
25+
- Expedited forwarding - i.e. use the almost unbounded vnode_worker_pool;
26+
27+
- Assured Forwarding 1 to 4 - 4 classes of pools of a fixed size which can be allocated to different task types;
28+
29+
- Best Effort - a narrow pool for any activity which is particularly expensive and can support arbitrary delays during busy periods.
30+
31+
The application can determine both the size of each pool, and which pool can be used. If an attempt is made to use an undefined (or uninitiated) pool then that work should fallback to the `vnode_worker_pool`.
32+
33+
34+
## Implementation
35+
36+
The `riak_core_vnode_worker_pool` is started and shutdown by the vnode process, as that vnode process starts up and shuts down. The node_worker_pool cannot be tied to an individual vnode in the same way, so the node_worker_pool's supervisor is started directly through the main riak_core supervision tree:
37+
38+
https://github.com/martinsumner/riak_core/blob/mas-2.2.5-dscpworkerpool/src/riak_core_sup.erl#L82
39+
40+
This does not start any pools. The responsibility for naming and starting pools lies with the application, which can start pools via:
41+
42+
https://github.com/martinsumner/riak_core/blob/mas-2.2.5-dscpworkerpool/src/riak_core_node_worker_pool_sup.erl#L44-L47
43+
44+
The arguments to use here are as with the `vnode_worker_pool`, except for the addition of `QueueType` which will be the name of the pool, under which the pool will be registered. There are pre-defined types to use for the anticipated queueing strategies:
45+
46+
https://github.com/martinsumner/riak_core/blob/mas-2.2.5-dscpworkerpool/src/riak_core_node_worker_pool.erl#L29-L33
47+
48+
The following code snippet from `riak_kv_app.erl` shows how pools may be started:
49+
50+
```
51+
52+
WorkerPools =
53+
case app_helper:get_env(riak_kv, worker_pool_strategy, none) of
54+
none ->
55+
[];
56+
single ->
57+
NWPS = app_helper:get_env(riak_kv, node_worker_pool_size),
58+
[{node_worker_pool, {riak_kv_worker, NWPS, [], [], node_worker_pool}}];
59+
dscp ->
60+
AF1 = app_helper:get_env(riak_kv, af1_worker_pool_size),
61+
AF2 = app_helper:get_env(riak_kv, af2_worker_pool_size),
62+
AF3 = app_helper:get_env(riak_kv, af3_worker_pool_size),
63+
AF4 = app_helper:get_env(riak_kv, af4_worker_pool_size),
64+
BE = app_helper:get_env(riak_kv, be_worker_pool_size),
65+
[{dscp_worker_pool, {riak_kv_worker, AF1, [], [], af1_pool}},
66+
{dscp_worker_pool, {riak_kv_worker, AF2, [], [], af2_pool}},
67+
{dscp_worker_pool, {riak_kv_worker, AF3, [], [], af3_pool}},
68+
{dscp_worker_pool, {riak_kv_worker, AF4, [], [], af4_pool}},
69+
{dscp_worker_pool, {riak_kv_worker, BE, [], [], be_pool}}]
70+
end,
71+
72+
....
73+
74+
riak_core:register(riak_kv, [
75+
76+
....
77+
78+
]
79+
80+
++ WorkerPools),
81+
```
82+
83+
The implementation of both the `riak_core_node_worker_pool` and the `riak_core_vnode_worker_pool` is now based on a common behaviour - `riak_core_worker_pool`:
84+
85+
https://github.com/martinsumner/riak_core/blob/mas-2.2.5-dscpworkerpool/src/riak_core_worker_pool.erl
86+
87+
The primary difference in implementation is that `riak_core_node_worker_pool` must trap_exit on initialisation, as there is no closing vnode process to call shutdown_pool and neatly terminate the pool (with a wait for work to finish).
88+
89+
A new function `queue_work/4` is added to the `riak_core_vnode` to prompt work to be queued for a node_worker_pool:
90+
91+
https://github.com/martinsumner/riak_core/blob/mas-2.2.5-dscpworkerpool/src/riak_core_vnode.erl#L1092-L1105
92+
93+
This is triggered by a response to Mod:handle_coverage/4 or Mod:handle_command/3 of:
94+
95+
``{PoolName, Work, From, NewModState}``
96+
97+
https://github.com/martinsumner/riak_core/blob/mas-2.2.5-dscpworkerpool/src/riak_core_vnode.erl#L378-L386
98+
99+
If there is need to call for work to be queued directly from the application (e.g. using `riak_core_vnode:queue_work/4`), then the application should be aware of the vnode pool pid() to be used by `queue_work/4` as a fallback. To receive this information onto ModState, the application may provide a `Mod:add_vnode_pool/2` function, which if present will be called by riak_core_vnode after the pool has been initialised:
100+
101+
https://github.com/martinsumner/riak_core/blob/mas-2.2.5-dscpworkerpool/src/riak_core_vnode.erl#L245-L254
102+
103+
104+
## Snapshots Pre-Fold
105+
106+
Within `riak_kv` fold functions returned from backends for performing queries which were directed towards a worker_pool (such as 2i queries), were passed to the worker without a snapshot being taken. When the worker in the pool ran the `Fold()`, at that point a snapshot would be taken.
107+
108+
This model works if there is unlimited capacity in the vnode worker pools, as it is likely that the fold functions across a coverage plan will be called reasonably close together, so as to present a roughly cluster-wide point-in-time view of the query. However, with a constrained pool, a subset of the folds in the coverage plan may be delayed behind other work. Therefore, it is preferable for async work which intends to use node_worker_pools to have had the snapshot taken prior to the fold function being returned from the vnode backend.
109+
110+
This is implemented within leveled as the SnapPreFold boolean which can be passed into query requests. When SnapPrefold is `true`, the snapshot will be taken at the point the backend receives the request, and when the fold is eventually called by the worker in the pool, it will be based on that snapshot. So variation in worker availability across node will not impact the "consistency" of the query results - the results will be based on a loosely correlated point in time (subject to race conditions to the head of the vnode message queue).
111+
112+
Some work has been done to implement prefold snapping in eleveldb, mainly by splitting up the existing fold API into two stages:
113+
114+
https://github.com/martinsumner/riak_kv/blob/mas-2.2.5-clusteraae/src/riak_kv_eleveldb_backend.erl#L388-L430.
115+
116+
To implement snap_prefold in Bitcask would probably require generating file links at the point the fold is closed, but no work has been done on that backend at present.

rebar.config

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
{poolboy, ".*", {git, "git://github.com/basho/poolboy.git", {branch, "develop-2.2.5"}}},
1515
{basho_stats, ".*", {git, "git://github.com/basho/basho_stats.git", {branch, "master"}}},
1616
{riak_sysmon, ".*", {git, "https://github.com/basho/riak_sysmon.git", {tag, "2.1.5"}}},
17-
{eleveldb, ".*", {git, "git://github.com/basho/eleveldb.git", {branch, "2.0"}}},
17+
{eleveldb, ".*", {git, "git://github.com/martinsumner/eleveldb.git", {branch, "mas-2.0.34-ee"}}},
1818
{riak_ensemble, ".*", {git, "https://github.com/basho/riak_ensemble", {branch, "develop-2.2"}}},
1919
{pbkdf2, ".*", {git, "git://github.com/basho/erlang-pbkdf2.git", {branch, "master"}}},
2020
{exometer_core, ".*", {git, "git://github.com/basho/exometer_core.git", {branch, "master"}}},

src/riak_core.erl

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -379,6 +379,16 @@ register(App, [{permissions, Permissions}|T]) ->
379379
register(App, T);
380380
register(App, [{auth_mod, {AuthType, AuthMod}}|T]) ->
381381
register_proplist({AuthType, AuthMod}, auth_mods),
382+
register(App, T);
383+
register(App,
384+
[{node_worker_pool,
385+
{WorkerMod, PoolSize, WArgs, WProps, node_worker_pool}}|T]) ->
386+
register_pool(App, WorkerMod, PoolSize, WArgs, WProps, node_worker_pool),
387+
register(App, T);
388+
register(App,
389+
[{dscp_worker_pool,
390+
{WorkerMod, PoolSize, WArgs, WProps, PoolType}}|T]) ->
391+
register_pool(App, WorkerMod, PoolSize, WArgs, WProps, PoolType),
382392
register(App, T).
383393

384394
register_mod(App, Module, Type) when is_atom(Type) ->
@@ -398,6 +408,13 @@ register_mod(App, Module, Type) when is_atom(Type) ->
398408
lists:usort([{App,Module}|Mods]))
399409
end.
400410

411+
register_pool(_App, WorkerMod, PoolSize, WorkerArgs, WorkerProps, PoolType) ->
412+
ok = riak_core_node_worker_pool_sup:start_pool(WorkerMod,
413+
PoolSize,
414+
WorkerArgs,
415+
WorkerProps,
416+
PoolType).
417+
401418
register_metadata(App, Value, Type) ->
402419
case application:get_env(riak_core, Type) of
403420
undefined ->

src/riak_core_node_worker_pool.erl

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
%%
2+
%% Copyright (c) 2007-2011 Basho Technologies, Inc. All Rights Reserved.
3+
%%
4+
%% This file is provided to you under the Apache License,
5+
%% Version 2.0 (the "License"); you may not use this file
6+
%% except in compliance with the License. You may obtain
7+
%% a copy of the License at
8+
%%
9+
%% http://www.apache.org/licenses/LICENSE-2.0
10+
%%
11+
%% Unless required by applicable law or agreed to in writing,
12+
%% software distributed under the License is distributed on an
13+
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
%% KIND, either express or implied. See the License for the
15+
%% specific language governing permissions and limitations
16+
%% under the License.
17+
%%
18+
%% -------------------------------------------------------------------
19+
20+
-module(riak_core_node_worker_pool).
21+
22+
-behaviour(riak_core_worker_pool).
23+
24+
-export([do_init/1, reply/2, do_work/3]).
25+
%% export the names of the pools as functions
26+
-export([af1/0, af2/0, af3/0, af4/0, be/0, nwp/0, dscp_pools/0, pools/0]).
27+
28+
%% API
29+
-export([start_link/5, stop/2, shutdown_pool/2, handle_work/3]).
30+
31+
-type worker_pool()
32+
% Allows you to set up a DSCP-style set of pools (assuming the
33+
% vnode_worker_pool counts as ef. Otherwise can just have a
34+
% single node_worker_pool
35+
:: be_pool|af1_pool|af2_pool|af3_pool|af4_pool|node_worker_pool.
36+
37+
-export_type([worker_pool/0]).
38+
39+
-spec af1() -> af1_pool.
40+
af1() -> af1_pool.
41+
42+
-spec af2() -> af2_pool.
43+
af2() -> af2_pool.
44+
45+
-spec af3() -> af3_pool.
46+
af3() -> af3_pool.
47+
48+
-spec af4() -> af4_pool.
49+
af4() -> af4_pool.
50+
51+
-spec be() -> be_pool.
52+
be() -> be_pool.
53+
54+
-spec nwp() -> node_worker_pool.
55+
nwp() -> node_worker_pool.
56+
57+
-spec pools() -> [worker_pool()].
58+
pools() ->
59+
[af1(), af2(), af3(), af4(), be(), nwp()].
60+
61+
-spec dscp_pools() -> [worker_pool()].
62+
dscp_pools() ->
63+
[af1(), af2(), af3(), af4(), be()].
64+
65+
-spec start_link(atom(), pos_integer(), list(), list(), worker_pool())
66+
-> {ok, pid()}.
67+
%% @doc
68+
%% Start a worker pool, and register under the name PoolType, which should be
69+
%% a recognised name from type worker_pool()
70+
start_link(WorkerMod, PoolSize, WorkerArgs, WorkerProps, PoolType)
71+
when PoolType == be_pool;
72+
PoolType == af1_pool;
73+
PoolType == af2_pool;
74+
PoolType == af3_pool;
75+
PoolType == af4_pool;
76+
PoolType == node_worker_pool ->
77+
{ok, Pid} =
78+
riak_core_worker_pool:start_link([WorkerMod,
79+
PoolSize,
80+
WorkerArgs,
81+
WorkerProps],
82+
?MODULE),
83+
register(PoolType, Pid),
84+
lager:info("Registered worker pool of type ~w and size ~w",
85+
[PoolType, PoolSize]),
86+
{ok, Pid}.
87+
88+
do_init([WorkerMod, PoolSize, WorkerArgs, WorkerProps]) ->
89+
process_flag(trap_exit, true),
90+
poolboy:start_link([{worker_module, riak_core_vnode_worker},
91+
{worker_args,
92+
[node, WorkerArgs, WorkerProps, self()]},
93+
{worker_callback_mod, WorkerMod},
94+
{size, PoolSize}, {max_overflow, 0}]).
95+
96+
handle_work(PoolName, Work, From) when
97+
PoolName == be_pool;
98+
PoolName == af1_pool;
99+
PoolName == af2_pool;
100+
PoolName == af3_pool;
101+
PoolName == af4_pool;
102+
PoolName == node_worker_pool ->
103+
riak_core_stat:update({worker_pool, PoolName}),
104+
riak_core_worker_pool:handle_work(PoolName, Work, From).
105+
106+
stop(Pid, Reason) ->
107+
riak_core_worker_pool:stop(Pid, Reason).
108+
109+
%% wait for all the workers to finish any current work
110+
shutdown_pool(Pid, Wait) ->
111+
riak_core_worker_pool:shutdown_pool(Pid, Wait).
112+
113+
reply(From, Msg) ->
114+
riak_core_vnode:reply(From, Msg).
115+
116+
do_work(Pid, Work, From) ->
117+
riak_core_vnode_worker:handle_work(Pid, Work, From).
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
%% -------------------------------------------------------------------
2+
%%
3+
%% This file is provided to you under the Apache License,
4+
%% Version 2.0 (the "License"); you may not use this file
5+
%% except in compliance with the License. You may obtain
6+
%% a copy of the License at
7+
%%
8+
%% http://www.apache.org/licenses/LICENSE-2.0
9+
%%
10+
%% Unless required by applicable law or agreed to in writing,
11+
%% software distributed under the License is distributed on an
12+
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
13+
%% KIND, either express or implied. See the License for the
14+
%% specific language governing permissions and limitations
15+
%% under the License.
16+
%%
17+
%% -------------------------------------------------------------------
18+
-module(riak_core_node_worker_pool_sup).
19+
-behaviour(supervisor).
20+
-export([start_link/0, init/1]).
21+
-export([start_pool/5]).
22+
23+
%% Helper macro for declaring children of supervisor
24+
-define(CHILD(I, PoolType, Args, Type, Timeout),
25+
{PoolType,
26+
{I, start_link, Args},
27+
permanent, Timeout, Type, [I]}).
28+
-define(CHILD(I, PoolType, Args, Type),
29+
?CHILD(I, PoolType, Args, Type, 5000)).
30+
31+
-type worker_pool() :: riak_core_node_worker_pool:worker_pool().
32+
33+
start_link() ->
34+
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
35+
36+
init([]) ->
37+
{ok, {{one_for_one, 5, 10}, []}}.
38+
39+
%% @doc
40+
%% Start a node_worker_pool - can be either assuredforwardng_pool or
41+
%% a besteffort_pool (which will also be registered as a node_worker_pool for
42+
%% backwards compatability)
43+
-spec start_pool(atom(), pos_integer(), list(), list(), worker_pool()) ->
44+
ok | {error, Reason::term()}.
45+
start_pool(WorkerMod, PoolSize, WorkerArgs, WorkerProps, QueueType) ->
46+
Ref = pool(WorkerMod, PoolSize, WorkerArgs, WorkerProps, QueueType),
47+
case supervisor:start_child(?MODULE, Ref) of
48+
{ok, _} -> ok;
49+
{ok, _, _} -> ok;
50+
{error, already_present} -> ok;
51+
{error, {already_started, _}} -> ok;
52+
{error, OtherErr} -> {error, OtherErr}
53+
end.
54+
55+
pool(WorkerMod, PoolSize, WorkerArgs, WorkerProps, QueueType) ->
56+
?CHILD(riak_core_node_worker_pool,
57+
QueueType,
58+
[WorkerMod, PoolSize, WorkerArgs, WorkerProps, QueueType],
59+
worker).
60+

0 commit comments

Comments
 (0)