Skip to content

Commit 21c8716

Browse files
authored
Cloudant worker rampup (#47)
* Add worker rampup logic
1 parent 346d9de commit 21c8716

File tree

4 files changed

+115
-18
lines changed

4 files changed

+115
-18
lines changed

src/basho_bench_duration.erl

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,8 @@
2828
-record(state, {
2929
ref,
3030
duration,
31-
start
31+
start,
32+
rampup_interval
3233
}).
3334

3435
-include("basho_bench.hrl").
@@ -50,7 +51,7 @@ start_link() ->
5051
init([]) ->
5152
run_hook(basho_bench_config:get(pre_hook, no_op)),
5253
Ref = erlang:monitor(process, whereis(basho_bench_run_sup)),
53-
{ok, #state{ref=Ref}}.
54+
{ok, maybe_add_rampup(#state{ref=Ref})}.
5455

5556

5657
handle_call(run, _From, State) ->
@@ -60,6 +61,9 @@ handle_call(run, _From, State) ->
6061
start=os:timestamp(),
6162
duration=DurationMins
6263
},
64+
if NewState#state.rampup_interval =:= undefined -> ok; true ->
65+
timer:send_interval(NewState#state.rampup_interval, rampup)
66+
end,
6367
maybe_end({reply, ok, NewState});
6468

6569
handle_call(remaining, _From, State) ->
@@ -90,13 +94,20 @@ handle_info({'DOWN', Ref, process, _Object, Info}, #state{ref=Ref}=State) ->
9094
handle_info(timeout, State) ->
9195
{stop, {shutdown, normal}, State};
9296

97+
handle_info(rampup, State) ->
98+
?INFO("Triggering worker rampup", []),
99+
add_worker(),
100+
maybe_end({noreply, State});
101+
93102
handle_info(Msg, State) ->
94103
?WARN("basho_bench_duration handled unexpected info message: ~p", [Msg]),
95-
{noreply, State}.
104+
maybe_end({noreply, State}).
96105

97106

98107
terminate(Reason, #state{duration=DurationMins}) ->
99108
case Reason of
109+
normal ->
110+
?CONSOLE("Test completed after ~p mins.\n", [DurationMins]);
100111
{shutdown, normal} ->
101112
?CONSOLE("Test completed after ~p mins.\n", [DurationMins]);
102113
{shutdown, Reason} ->
@@ -165,3 +176,25 @@ worker_stopping(WorkerPid) ->
165176
%% WorkerPid is basho_bench_worker's id, not the pid of actual driver
166177
gen_server:cast(?MODULE, {worker_stopping, WorkerPid}),
167178
ok.
179+
180+
maybe_add_rampup(State) ->
181+
case basho_bench_config:get(workers_rampup, undefined) of
182+
undefined ->
183+
State;
184+
Interval when is_integer(Interval) ->
185+
State#state{rampup_interval=Interval};
186+
%% TODO: should we support per type intervals?
187+
[{_Type, Interval} | _ ] when is_integer(Interval) ->
188+
State#state{rampup_interval=Interval};
189+
Else ->
190+
throw({unexpected_rampup, Else})
191+
end.
192+
193+
add_worker() ->
194+
case basho_bench_config:get(workers, undefined) of
195+
undefined ->
196+
basho_bench_worker_sup:add_worker();
197+
[_|_] = Workers ->
198+
WorkerTypes = [WT || {WT, _C} <- Workers],
199+
basho_bench_worker_sup:add_workers(WorkerTypes)
200+
end.

src/basho_bench_stats.erl

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -270,9 +270,10 @@ process_stats(Now, #state{stats_writer=Module}=State) ->
270270
%% Reset Ops
271271
[folsom_metrics_counter:dec({ops, Op}, OpsAmount) || {Op, _, OpsAmount} <- OkOpsRes],
272272

273+
Concurrency = basho_bench_worker_sup:worker_count(),
273274
%% Write summary
274275
Module:process_summary(State#state.stats_writer_data,
275-
Elapsed, Window, Ops, Oks, Errors),
276+
Elapsed, Window, Concurrency, Ops, Oks, Errors),
276277

277278
%% Dump current error counts to console
278279
case (State#state.errors_since_last_report) of
@@ -295,7 +296,8 @@ process_global_stats(#state{stats_writer=Module}=State) ->
295296
Errors = error_counter(Op),
296297
Units = folsom_metrics:get_metric_value({overall_units, Op}),
297298
Ops = folsom_metrics:get_metric_value({overall_ops, Op}),
298-
Module:report_global_stats(Op, Stats, Errors, Units, Ops)
299+
Concurrency = basho_bench_worker_sup:worker_count(),
300+
Module:report_global_stats(Op, Stats, Errors, Units, Ops, Concurrency)
299301
end, State#state.ops).
300302

301303
%%
@@ -307,10 +309,11 @@ report_latency(#state{stats_writer=Module}=State, Elapsed, Window, Op) ->
307309
Errors = error_counter(Op),
308310
Units = folsom_metrics:get_metric_value({units, Op}),
309311
Ops = folsom_metrics:get_metric_value({ops, Op}),
312+
Concurrency = basho_bench_worker_sup:worker_count(),
310313

311314
Module:report_latency({State#state.stats_writer,
312315
State#state.stats_writer_data},
313-
Elapsed, Window, Op,
316+
Elapsed, Window, Concurrency, Op,
314317
Stats, Errors, Units, Ops),
315318
{Ops, Units, Errors}.
316319

src/basho_bench_stats_writer_csv.erl

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,10 @@
3333

3434
-export([new/2,
3535
terminate/1,
36-
process_summary/6,
36+
process_summary/7,
3737
report_error/3,
38-
report_global_stats/5,
39-
report_latency/8]).
38+
report_global_stats/6,
39+
report_latency/9]).
4040

4141
-include("basho_bench.hrl").
4242

@@ -57,7 +57,7 @@ new(Ops, Measurements) ->
5757
filename:join([TestDir, "summary.csv"]),
5858
[raw, binary, write]
5959
),
60-
file:write(SummaryFile, <<"elapsed, window, total_operations, total_units, successful, failed\n">>),
60+
file:write(SummaryFile, <<"elapsed, window, concurrency, total_operations, total_units, successful, failed\n">>),
6161

6262
%% Setup errors file w/counters for each error. Embedded commas likely
6363
%% in the error messages so quote the columns.
@@ -82,11 +82,12 @@ terminate({SummaryFile, ErrorsFile}) ->
8282
ok.
8383

8484
process_summary({SummaryFile, _ErrorsFile},
85-
Elapsed, Window, Ops, Oks, Errors) ->
85+
Elapsed, Window, Concurrency, Ops, Oks, Errors) ->
8686
file:write(SummaryFile,
87-
io_lib:format("~w, ~w, ~w, ~w, ~w, ~w\n",
87+
io_lib:format("~w, ~w, ~w, ~w, ~w, ~w, ~w\n",
8888
[Elapsed,
8989
Window,
90+
Concurrency,
9091
Ops + Errors,
9192
Oks,
9293
Ops,
@@ -98,7 +99,7 @@ report_error({_SummaryFile, ErrorsFile},
9899
io_lib:format("\"~w\",\"~w\"\n",
99100
[Key, Count])).
100101

101-
report_global_stats({Op,_}, Stats, Errors, Units, Ops) ->
102+
report_global_stats({Op,_}, Stats, Errors, Units, Ops, Concurrency) ->
102103
%% Build up JSON structure representing statistics collected in folsom
103104
P = proplists:get_value(percentile, Stats),
104105
JsonElements0 = lists:foldl(fun(K, Acc) ->
@@ -136,20 +137,24 @@ report_global_stats({Op,_}, Stats, Errors, Units, Ops) ->
136137
%% insert Ops counts
137138
JsonElements3 = [{'ops', Ops} | JsonElements2],
138139

140+
%% insert concurrency
141+
JsonElements4 = [{concurrency, Concurrency} | JsonElements3],
142+
139143
JsonMetrics0 = erlang:get(run_metrics),
140-
JsonMetrics = lists:keyreplace(Op, 1, JsonMetrics0, {Op, {JsonElements3}}),
144+
JsonMetrics = lists:keyreplace(Op, 1, JsonMetrics0, {Op, {JsonElements4}}),
141145
%?DEBUG("Generated Json:\n~w",[JsonElements]),
142146
erlang:put(run_metrics, JsonMetrics).
143147

144148
report_latency({_SummaryFile, _ErrorsFile},
145-
Elapsed, Window, Op,
149+
Elapsed, Window, Concurrency, Op,
146150
Stats, Errors, Units, Ops) ->
147151
case proplists:get_value(n, Stats) > 0 of
148152
true ->
149153
P = proplists:get_value(percentile, Stats),
150-
Line = io_lib:format("~w, ~w, ~w, ~w, ~w, ~.1f, ~w, ~w, ~w, ~w, ~w, ~w\n",
154+
Line = io_lib:format("~w, ~w, ~w, ~w, ~w, ~w, ~.1f, ~w, ~w, ~w, ~w, ~w, ~w\n",
151155
[Elapsed,
152156
Window,
157+
Concurrency,
153158
Units,
154159
Ops,
155160
proplists:get_value(min, Stats),
@@ -177,7 +182,7 @@ op_csv_file({Label, _Op}) ->
177182
TestDir = basho_bench:get_test_dir(),
178183
Fname = filename:join([TestDir, normalize_label(Label) ++ "_latencies.csv"]),
179184
{ok, F} = file:open(Fname, [raw, binary, write]),
180-
ok = file:write(F, <<"elapsed, window, n, ops, min, mean, median, 95th, 99th, 99_9th, max, errors\n">>),
185+
ok = file:write(F, <<"elapsed, window, concurrency, n, ops, min, mean, median, 95th, 99th, 99_9th, max, errors\n">>),
181186
F.
182187

183188
measurement_csv_file({Label, _Op}) ->
@@ -199,7 +204,7 @@ stringify_stats(_RunStatsType=json, RunMetrics) ->
199204
[ jiffy:encode( {[{recordedMetrics, {RunMetrics}}]}, [pretty] ) ];
200205
stringify_stats(_RunStatsType=csv, RunMetrics) ->
201206
% Ordered output of fields
202-
OrderedFields = [n, ops, mean, geometric_mean, harmonic_mean,
207+
OrderedFields = [n, ops, concurrency, mean, geometric_mean, harmonic_mean,
203208
variance, standard_deviation, skewness, kurtosis,
204209
median, p75, p90, p95, p99, p999,
205210
min, max, basho_errors],

src/basho_bench_worker_sup.erl

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,12 @@
2525

2626
%% API
2727
-export([start_link/0,
28+
add_worker/0,
29+
add_workers/1,
30+
add_worker_spec/1,
2831
workers/0,
2932
workers/1,
33+
worker_count/0,
3034
remote_workers/1,
3135
stop_child/1,
3236
active_workers/0]).
@@ -58,6 +62,21 @@ stop_child(Id) ->
5862
active_workers() ->
5963
[X || X <- workers(), X =/= undefined].
6064

65+
worker_count() ->
66+
case whereis(?MODULE) of
67+
undefined ->
68+
case erlang:get(last_worker_count) of
69+
undefined ->
70+
0;
71+
WC ->
72+
WC
73+
end;
74+
_Pid ->
75+
WC = length(active_workers()),
76+
erlang:put(last_worker_count, WC),
77+
WC
78+
end.
79+
6180

6281
%% ===================================================================
6382
%% Supervisor callbacks
@@ -78,6 +97,43 @@ init([]) ->
7897
%% ===================================================================
7998

8099

100+
add_worker() ->
101+
WorkerCount = length(active_workers()),
102+
WorkerNum = WorkerCount + 1,
103+
Id = list_to_atom(lists:concat(['basho_bench_rampup_worker_', WorkerNum])),
104+
%% Use "single_worker" atom for original non-worker case
105+
Spec = {
106+
Id,
107+
{basho_bench_worker, start_link, [Id, {single_worker, WorkerNum, WorkerNum}, []]},
108+
transient, 5000, worker, [basho_bench_worker]
109+
},
110+
add_worker_spec(Spec).
111+
112+
113+
add_workers(WorkerTypes) ->
114+
add_workers(WorkerTypes, []).
115+
116+
117+
add_workers([], Acc) ->
118+
Acc;
119+
add_workers([WorkerType|Rest], Acc) when is_atom(WorkerType) ->
120+
WorkerTypes = basho_bench_config:get(worker_types),
121+
Conf0 = proplists:get_value(WorkerType, WorkerTypes, []),
122+
Conf = [{concurrent, 1} | proplists:delete(concurrent, Conf0)],
123+
WorkerCount = length(active_workers()),
124+
WorkerNum = WorkerCount + 1,
125+
Id = list_to_atom(lists:concat(['basho_bench_rampup_worker_', WorkerType, '_', WorkerNum])),
126+
Spec = {
127+
Id,
128+
{basho_bench_worker, start_link, [Id, {WorkerType, WorkerNum, WorkerNum}, Conf]},
129+
transient, 5000, worker, [basho_bench_worker]},
130+
add_workers(Rest, [add_worker_spec(Spec)|Acc]).
131+
132+
133+
add_worker_spec(Spec) ->
134+
{ok, _Pid} = supervisor:start_child(?MODULE, Spec).
135+
136+
81137
worker_specs([]) ->
82138
worker_specs_single(basho_bench_config:get(concurrent), []);
83139
worker_specs(Workers) ->

0 commit comments

Comments
 (0)