Skip to content

Commit 919fd0c

Browse files
committed
Merge branch '2.0' into 2.1
2 parents 154ad91 + e5cc497 commit 919fd0c

File tree

2 files changed

+146
-4
lines changed

2 files changed

+146
-4
lines changed

src/riak_repl2_rtq.erl

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@
3333
ack_sync/2,
3434
status/0,
3535
dumpq/0,
36+
summarize/0,
37+
evict/1,
38+
evict/2,
3639
is_empty/1,
3740
all_queues_empty/0,
3841
shutdown/0,
@@ -85,6 +88,7 @@
8588
}).
8689

8790
-type name() :: term().
91+
-type seq() :: non_neg_integer().
8892

8993
%% API
9094
%% @doc Start linked, registered to module name.
@@ -233,6 +237,31 @@ status() ->
233237
dumpq() ->
234238
gen_server:call(?SERVER, dumpq, infinity).
235239

240+
%% @doc Return summary data for the objects currently in the queue.
241+
%% The return value is a list of tuples of the form {SequenceNum, Key, Size}.
242+
-spec summarize() -> [{seq(), riak_object:key(), non_neg_integer()}].
243+
summarize() ->
244+
gen_server:call(?SERVER, summarize, infinity).
245+
246+
%% @doc If an object with the given Seq number is currently in the queue,
247+
%% evict it and return ok.
248+
-spec evict(Seq :: seq()) -> 'ok'.
249+
evict(Seq) ->
250+
gen_server:call(?SERVER, {evict, Seq}, infinity).
251+
252+
%% @doc If an object with the given Seq number is currently in the queue and it
253+
%% also matches the given Key, then evict it and return ok. This is a safer
254+
%% alternative to evict/1 since `Seq' numbers can potentially be recycled.
255+
%% It also provides a more meaningful return value in the case that the object
256+
%% was not present. Specifically, if there is no object in the queue with the
257+
%% given `Seq' number, then {not_found, Seq} is returned, whereas if the
258+
%% object with the given `Seq' number is present but does not match the
259+
%% provided `Key', then {wrong_key, Seq, Key} is returned.
260+
-spec evict(Seq :: seq(), Key :: riak_object:key()) ->
261+
'ok' | {'not_found', integer()} | {'wrong_key', integer(), riak_object:key()}.
262+
evict(Seq, Key) ->
263+
gen_server:call(?SERVER, {evict, Seq, Key}, infinity).
264+
236265
%% @doc Signal that this node is doing down, and so a proxy process needs to
237266
%% start to avoid dropping, or aborting unacked results.
238267
-spec shutdown() -> 'ok'.
@@ -334,6 +363,32 @@ handle_call({set_max_bytes, MaxBytes}, _From, State) ->
334363
handle_call(dumpq, _From, State = #state{qtab = QTab}) ->
335364
{reply, ets:tab2list(QTab), State};
336365

366+
handle_call(summarize, _From, State = #state{qtab = QTab}) ->
367+
Fun = fun({Seq, _NumItems, Bin, _Meta}, Acc) ->
368+
Obj = riak_repl_util:from_wire(Bin),
369+
{Key, Size} = summarize_object(Obj),
370+
Acc ++ [{Seq, Key, Size}]
371+
end,
372+
{reply, ets:foldl(Fun, [], QTab), State};
373+
374+
handle_call({evict, Seq}, _From, State = #state{qtab = QTab}) ->
375+
ets:delete(QTab, Seq),
376+
{reply, ok, State};
377+
handle_call({evict, Seq, Key}, _From, State = #state{qtab = QTab}) ->
378+
case ets:lookup(QTab, Seq) of
379+
[{Seq, _, Bin, _}] ->
380+
Obj = riak_repl_util:from_wire(Bin),
381+
case Key =:= riak_object:key(Obj) of
382+
true ->
383+
ets:delete(QTab, Seq),
384+
{reply, ok, State};
385+
false ->
386+
{reply, {wrong_key, Seq, Key}, State}
387+
end;
388+
_ ->
389+
{reply, {not_found, Seq}, State}
390+
end;
391+
337392
handle_call({pull_with_ack, Name, DeliverFun}, _From, State) ->
338393
{reply, ok, pull(Name, DeliverFun, State)};
339394

@@ -711,3 +766,7 @@ minseq(QTab, QSeq) ->
711766
MinSeq ->
712767
MinSeq - 1
713768
end.
769+
770+
summarize_object(Obj) ->
771+
ObjFmt = riak_core_capability:get({riak_kv, object_format}, v0),
772+
{riak_object:key(Obj), riak_object:approximate_size(ObjFmt, Obj)}.

test/riak_repl2_rtq_tests.erl

Lines changed: 87 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,12 @@
22
-compile(export_all).
33
-include_lib("eunit/include/eunit.hrl").
44

5+
-define(SETUP_ENV, application:set_env(riak_repl, rtq_max_bytes, 10*1024*1024)).
6+
-define(CLEAN_ENV, application:unset_env(riak_repl, rtq_max_bytes)).
7+
58
rtq_trim_test() ->
69
%% make sure the queue is 10mb
7-
application:set_env(riak_repl, rtq_max_bytes, 10*1024*1024),
10+
?SETUP_ENV,
811
{ok, Pid} = riak_repl2_rtq:start_test(),
912
try
1013
gen_server:call(Pid, {register, rtq_test}),
@@ -19,7 +22,7 @@ rtq_trim_test() ->
1922
%% the queue is now empty
2023
?assert(gen_server:call(Pid, {is_empty, rtq_test}))
2124
after
22-
application:unset_env(riak_repl, rtq_max_bytes),
25+
?CLEAN_ENV,
2326
exit(Pid, kill)
2427
end.
2528

@@ -45,12 +48,12 @@ accumulate(Pid, Acc, C) ->
4548

4649
status_test_() ->
4750
{setup, fun() ->
48-
application:set_env(riak_repl, rtq_max_bytes, 10 * 1024 * 1024),
51+
?SETUP_ENV,
4952
{ok, QPid} = riak_repl2_rtq:start_link(),
5053
QPid
5154
end,
5255
fun(QPid) ->
53-
application:unset_env(riak_repl, rtq_max_bytes),
56+
?CLEAN_ENV,
5457
riak_repl_test_util:kill_and_wait(QPid)
5558
end,
5659
fun(_QPid) -> [
@@ -68,6 +71,64 @@ status_test_() ->
6871

6972
] end}.
7073

74+
summarize_test_() ->
75+
{setup,
76+
fun start_rtq/0,
77+
fun kill_rtq/1,
78+
fun(_QPid) -> [
79+
{"includes sequence number, object ID, and size",
80+
fun() ->
81+
Objects = push_objects(<<"BucketsOfRain">>, [<<"obj1">>, <<"obj2">>]),
82+
Summarized = riak_repl2_rtq:summarize(),
83+
Zipped = lists:zip(Objects, Summarized),
84+
lists:foreach(
85+
fun({Obj, Summary}) ->
86+
{Seq, _, _} = Summary,
87+
ExpectedSummary = {Seq, riak_object:key(Obj), get_approximate_size(Obj)},
88+
?assertMatch(ExpectedSummary, Summary)
89+
end,
90+
Zipped)
91+
end
92+
}
93+
]
94+
end
95+
}.
96+
97+
evict_test_() ->
98+
{foreach,
99+
fun start_rtq/0,
100+
fun kill_rtq/1,
101+
[
102+
fun(_QPid) ->
103+
{"evicts object by sequence if present",
104+
fun() ->
105+
Objects = push_objects(<<"TwoPeasInABucket">>, [<<"obj1">>, <<"obj2">>]),
106+
[KeyToEvict, RemainingKey] = [riak_object:key(O) || O <- Objects],
107+
[{SeqToEvict, KeyToEvict, _}, {RemainingSeq, RemainingKey, _}] = riak_repl2_rtq:summarize(),
108+
ok = riak_repl2_rtq:evict(SeqToEvict),
109+
?assertMatch([{RemainingSeq, RemainingKey, _}], riak_repl2_rtq:summarize()),
110+
ok = riak_repl2_rtq:evict(RemainingSeq + 1),
111+
?assertMatch([{RemainingSeq, RemainingKey, _}], riak_repl2_rtq:summarize())
112+
end
113+
}
114+
end,
115+
fun(_QPid) ->
116+
{"evicts object by sequence if present and key matches",
117+
fun() ->
118+
Objects = push_objects(<<"TwoPeasInABucket">>, [<<"obj1">>, <<"obj2">>]),
119+
[KeyToEvict, RemainingKey] = [riak_object:key(O) || O <- Objects],
120+
[{SeqToEvict, KeyToEvict, _}, {RemainingSeq, RemainingKey, _}] = riak_repl2_rtq:summarize(),
121+
?assertMatch({wrong_key, _, _}, riak_repl2_rtq:evict(SeqToEvict, RemainingKey)),
122+
?assertMatch({not_found, _}, riak_repl2_rtq:evict(RemainingSeq + 1, RemainingKey)),
123+
?assertEqual(2, length(riak_repl2_rtq:summarize())),
124+
ok = riak_repl2_rtq:evict(SeqToEvict, KeyToEvict),
125+
?assertMatch([{RemainingSeq, RemainingKey, _}], riak_repl2_rtq:summarize())
126+
end
127+
}
128+
end
129+
]
130+
}.
131+
71132
overload_protection_start_test_() ->
72133
[
73134
{"able to start after a crash without ets errors", fun() ->
@@ -213,6 +274,28 @@ overload_test_() ->
213274

214275
]}.
215276

277+
start_rtq() ->
278+
?SETUP_ENV,
279+
{ok, Pid} = riak_repl2_rtq:start_link(),
280+
gen_server:call(Pid, {register, rtq_test}),
281+
Pid.
282+
283+
kill_rtq(QPid) ->
284+
?CLEAN_ENV,
285+
riak_repl_test_util:kill_and_wait(QPid).
286+
287+
object_format() -> riak_core_capability:get({riak_kv, object_format}, v0).
288+
289+
get_approximate_size(O) -> riak_object:approximate_size(object_format(), O).
290+
291+
push_objects(Bucket, Keys) -> [push_object(Bucket, O) || O <- Keys].
292+
293+
push_object(Bucket, Key) ->
294+
RandomData = crypto:rand_bytes(1024 * 1024),
295+
Obj = riak_object:new(Bucket, Key, RandomData),
296+
riak_repl2_rtq:push(1, Obj),
297+
Obj.
298+
216299
pull(N) ->
217300
lists:foldl(fun(_Nth, _LastSeq) ->
218301
pull()

0 commit comments

Comments
 (0)