Skip to content

Commit 0a84874

Browse files
authored
Merge pull request #42 from cloudant/clustering-with-bootstrap
Clustering with bootstrap
2 parents bf459b7 + 0d5d202 commit 0a84874

File tree

6 files changed

+133
-21
lines changed

6 files changed

+133
-21
lines changed

rebar.config

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
{require_otp_vsn, "17|18|19|20|21"}.
1+
{require_otp_vsn, "17|18|19|20|21|22"}.
22

33
{deps,
44
[

src/basho_bench.erl

Lines changed: 104 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,18 +23,22 @@
2323

2424
-export([start/0]).
2525

26-
-export([setup_benchmark/1, run_benchmark/1, await_completion/1, main/1, md5/1, get_test_dir/0]).
26+
-export([setup_benchmark/2, run_benchmark/0, await_completion/1, main/1, md5/1, get_test_dir/0]).
27+
-export([master_node/0, node_count/0, is_master/0, is_worker/0, is_clustered/0]).
2728
-include("basho_bench.hrl").
2829

2930

31+
-define(AWAIT_TIMEOUT, 10000).
32+
33+
3034
start() ->
3135
application:ensure_all_started(basho_bench).
3236

3337
%% ====================================================================
3438
%% API
3539
%% ====================================================================
3640

37-
setup_benchmark(Opts) ->
41+
setup_benchmark(Opts, Configs) ->
3842
BenchName = bench_name(Opts),
3943
TestDir = test_dir(Opts, BenchName),
4044
application:set_env(basho_bench, test_dir, TestDir),
@@ -66,10 +70,7 @@ setup_benchmark(Opts) ->
6670
true -> setup_distributed_work();
6771
false -> ok
6872
end,
69-
ok.
7073

71-
72-
run_benchmark(Configs) ->
7374
TestDir = get_test_dir(),
7475
%% Init code path
7576
add_code_paths(basho_bench_config:get(code_paths, [])),
@@ -93,11 +94,35 @@ run_benchmark(Configs) ->
9394

9495
log_dimensions(),
9596

97+
ok.
98+
99+
100+
run_benchmark() ->
96101
basho_bench_sup:start_child(),
97-
ok = basho_bench_stats:run(),
98-
ok = basho_bench_measurement:run(),
99-
ok = basho_bench_duration:run(),
100-
ok = basho_bench_worker:run(basho_bench_worker_sup:workers()),
102+
case {master_node(), node_count()} of
103+
%% No master
104+
{undefined, _} ->
105+
bootstrap_bb(),
106+
ok = run_benchmarks(basho_bench_worker_sup:workers());
107+
%% Only one node, equivalent to no master
108+
{_, 1} ->
109+
bootstrap_bb(),
110+
ok = run_benchmarks(basho_bench_worker_sup:workers());
111+
%% Master and multiple workers
112+
{MasterNode, NodeCount} when NodeCount > 1 ->
113+
case MasterNode =:= node() of
114+
true ->
115+
await_nodes(NodeCount),
116+
bootstrap_bb(),
117+
ok = run_benchmarks();
118+
false ->
119+
?INFO("Worker node[~p] connecting to master node: ~p~n", [node(), MasterNode]),
120+
pong = net_adm:ping(MasterNode),
121+
true = length(nodes()) > 0,
122+
%% Let master node decide when to run benchmarks
123+
ok = basho_bench_duration:run()
124+
end
125+
end,
101126
application:set_env(basho_bench_app, is_running, true).
102127

103128

@@ -117,8 +142,8 @@ main(Args) ->
117142
ok = maybe_net_node(Opts),
118143
ok = maybe_join(Opts),
119144
{ok, _} = start(),
120-
setup_benchmark(Opts),
121-
run_benchmark(Configs),
145+
setup_benchmark(Opts, Configs),
146+
run_benchmark(),
122147
await_completion(infinity).
123148

124149

@@ -346,3 +371,71 @@ get_test_dir() ->
346371
undefined -> error(unset_test_dir);
347372
{ok, TestDir} -> TestDir
348373
end.
374+
375+
master_node() ->
376+
Default = case init:get_argument(master_node) of
377+
{ok, [[Node]]} -> list_to_atom(Node);
378+
_ -> node()
379+
end,
380+
case catch basho_bench_config:get(master_node, undefined) of
381+
{'EXIT', {noproc, _}} ->
382+
Default;
383+
undefined ->
384+
Default;
385+
Res ->
386+
Res
387+
end.
388+
389+
node_count() ->
390+
Default = case init:get_argument(node_count) of
391+
{ok, [[NodeCount]]} -> list_to_integer(NodeCount);
392+
_ -> 1
393+
end,
394+
case catch basho_bench_config:get(node_count, undefined) of
395+
{'EXIT', {noproc, _}} ->
396+
Default;
397+
undefined ->
398+
Default;
399+
Res ->
400+
Res
401+
end.
402+
403+
is_master() ->
404+
master_node() =:= node().
405+
406+
is_worker() ->
407+
not is_master() and is_clustered().
408+
409+
is_clustered() ->
410+
node_count() > 1.
411+
412+
await_nodes(NodeCount) ->
413+
await_nodes(NodeCount, 100).
414+
415+
await_nodes(NodeCount, SleepMS) ->
416+
await_nodes(NodeCount, SleepMS, ?AWAIT_TIMEOUT).
417+
418+
await_nodes(_, _, Timeout) when Timeout < 0 ->
419+
throw({error, await_node_timeout});
420+
await_nodes(NodeCount, SleepMS, Timeout) ->
421+
case NodeCount =:= length(nodes()) + 1 of
422+
true ->
423+
ok;
424+
false ->
425+
?INFO("Waiting on ~p nodes to connect~n", [NodeCount - length(nodes()) - 1]),
426+
timer:sleep(SleepMS),
427+
await_nodes(NodeCount, SleepMS * 2, Timeout - SleepMS)
428+
end.
429+
430+
bootstrap_bb() ->
431+
ok = basho_bench_stats:run(),
432+
ok = basho_bench_measurement:run(),
433+
ok = basho_bench_duration:run().
434+
435+
run_benchmarks() ->
436+
Pids0 = [basho_bench_worker_sup:remote_workers(Node) || Node <- nodes()],
437+
Pids = lists:flatten([basho_bench_worker_sup:workers() | Pids0]),
438+
run_benchmarks(Pids).
439+
440+
run_benchmarks(Pids) ->
441+
ok = basho_bench_worker:run(Pids).

src/basho_bench_run_sup.erl

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,12 @@ init([]) ->
1919
[] -> [];
2020
_Driver -> [?CHILD(basho_bench_measurement, worker, Timeout)]
2121
end,
22-
Spec = [
23-
?CHILD(basho_bench_duration, worker, Timeout),
24-
?CHILD(basho_bench_stats, worker, Timeout),
25-
?CHILD(basho_bench_worker_sup, supervisor, Timeout)
26-
] ++ MeasurementDriver,
27-
{ok, {{one_for_all, 0, 1}, Spec}}.
22+
Spec0 = [?CHILD(basho_bench_worker_sup, supervisor, Timeout)],
23+
Spec1 = case basho_bench:is_master() of
24+
true ->
25+
[?CHILD(basho_bench_stats, worker, Timeout) | Spec0];
26+
false ->
27+
Spec0
28+
end,
29+
Spec2 = [?CHILD(basho_bench_duration, worker, Timeout) | Spec1],
30+
{ok, {{one_for_all, 0, 1}, Spec2 ++ MeasurementDriver}}.

src/basho_bench_stats.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ op_complete(Op, ok, ElapsedUs) ->
6565
op_complete(Op, {ok, Units}, ElapsedUs) ->
6666
%% Update the histogram and units counter for the op in question
6767
% io:format("Get distributed: ~p~n", [get_distributed()]),
68-
case get_distributed() of
68+
case get_distributed() or basho_bench:is_worker() of
6969
true ->
7070
gen_server:cast({global, ?MODULE}, {Op, {ok, Units}, ElapsedUs});
7171
false ->

src/basho_bench_sup.erl

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,4 +48,12 @@ start_link() ->
4848
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
4949

5050
init([]) ->
51-
{ok, {{one_for_one, 5, 10}, [?CHILD(basho_bench_config, worker)]}}.
51+
Children = case basho_bench:is_master() of
52+
true ->
53+
[?CHILD(basho_bench_config, worker)];
54+
false ->
55+
pong = net_adm:ping(basho_bench:master_node()),
56+
global:sync(),
57+
[]
58+
end,
59+
{ok, {{one_for_one, 5, 10}, Children}}.

src/basho_bench_worker_sup.erl

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
%% API
2727
-export([start_link/0,
2828
workers/0,
29+
workers/1,
30+
remote_workers/1,
2931
stop_child/1,
3032
active_workers/0]).
3133

@@ -42,7 +44,13 @@ start_link() ->
4244
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
4345

4446
workers() ->
45-
[Pid || {_Id, Pid, worker, [basho_bench_worker]} <- supervisor:which_children(?MODULE)].
47+
workers(?MODULE).
48+
49+
workers(Sup) ->
50+
[Pid || {_Id, Pid, worker, [basho_bench_worker]} <- supervisor:which_children(Sup)].
51+
52+
remote_workers(Node) ->
53+
workers({?MODULE, Node}).
4654

4755
stop_child(Id) ->
4856
supervisor:terminate_child(?MODULE, Id).

0 commit comments

Comments
 (0)