Skip to content

Commit e6cd9ca

Browse files
committed
Merge pull request #665 from basho/bugfix/im/aae_fullsync_bucket_types
Update AAE fullsync to support bucket types
2 parents 0489c7c + bffb7df commit e6cd9ca

File tree

4 files changed

+34
-31
lines changed

4 files changed

+34
-31
lines changed

src/riak_repl2_fssink.erl

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ handle_info({Proto, Socket, Data},
148148
State=#state{socket=Socket,transport=Transport}) when Proto==tcp; Proto==ssl ->
149149
%% aae strategy will not receive messages here
150150
Transport:setopts(Socket, [{active, once}]),
151-
case decode_obj_msg(Data) of
151+
case riak_repl_util:decode_obj_msg(Data) of
152152
{fs_diff_obj, RObj} ->
153153
riak_repl_util:do_repl_put(RObj);
154154
Other ->
@@ -185,18 +185,6 @@ handle_info(init_ack, State=#state{socket=Socket,
185185
handle_info(_Msg, State) ->
186186
{noreply, State}.
187187

188-
decode_obj_msg(Data) ->
189-
Msg = binary_to_term(Data),
190-
case Msg of
191-
{fs_diff_obj, BObj} when is_binary(BObj) ->
192-
RObj = riak_repl_util:from_wire(BObj),
193-
{fs_diff_obj, RObj};
194-
{fs_diff_obj, _RObj} ->
195-
Msg;
196-
Other ->
197-
Other
198-
end.
199-
200188
terminate(_Reason, #state{fullsync_worker=FSW, work_dir=WorkDir, strategy=Strategy}) ->
201189
%% TODO: define a fullsync worker behavior and call it's stop function
202190
case is_pid(FSW) andalso is_process_alive(FSW) of

src/riak_repl2_fssink_pool.erl

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
%% fullsyncs are running.
33

44
-module(riak_repl2_fssink_pool).
5-
-export([start_link/0, status/0, bin_put/1]).
5+
-export([start_link/0, status/0, put/1, bin_put/1]).
66

77
start_link() ->
88
MinPool = app_helper:get_env(riak_repl, fssink_min_workers, 5),
@@ -21,6 +21,12 @@ status() ->
2121
{overflow, Overflow},
2222
{num_monitors, NumMonitors}].
2323

24+
%% @doc Send an object or BT hash / object pair to the worker pool to
25+
%% run a put against. No guarantees of completion.
26+
put(Obj) ->
27+
Pid = poolboy:checkout(?MODULE, true, infinity),
28+
riak_repl_fullsync_worker:do_put(Pid, Obj, ?MODULE).
29+
2430
%% @doc Send a replication wire-encoded binary to the worker pool
2531
%% for running a put against. No guarantees of completion.
2632
bin_put(BinObj) ->

src/riak_repl_aae_sink.erl

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ handle_info({Proto, _Socket, Data}, State=#state{transport=Transport,
8383
[MsgType|<<>>] ->
8484
process_msg(MsgType, State);
8585
[MsgType|MsgData] ->
86-
process_msg(MsgType, binary_to_term(MsgData), State)
86+
process_msg(MsgType, riak_repl_util:decode_obj_msg(MsgData), State)
8787
end;
8888

8989
handle_info({'DOWN', _, _, _, _}, State) ->
@@ -143,10 +143,10 @@ process_msg(?MSG_GET_AAE_SEGMENT, {SegmentNum,IndexN}, State=#state{tree_pid=Tre
143143
send_reply(ResponseMsg, State);
144144

145145
%% no reply
146-
process_msg(?MSG_PUT_OBJ, {fs_diff_obj, BObj}, State) ->
146+
process_msg(?MSG_PUT_OBJ, {fs_diff_obj, Obj}, State) ->
147147
%% may block on worker pool, ok return means work was submitted
148148
%% to pool, not that put FSM completed successfully.
149-
ok = riak_repl2_fssink_pool:bin_put(BObj),
149+
ok = riak_repl2_fssink_pool:put(Obj),
150150
{noreply, State};
151151

152152
%% replies: ok | not_responsible

src/riak_repl_util.erl

Lines changed: 23 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
sockname/2,
5656
deduce_wire_version_from_proto/1,
5757
encode_obj_msg/2,
58+
decode_obj_msg/1,
5859
make_pg_proxy_name/1,
5960
make_pg_name/1,
6061
mode_12_enabled/1,
@@ -867,27 +868,35 @@ encode_obj_msg(V, {Cmd, RObj}) ->
867868
encode_obj_msg(V, {Cmd, RObj}, riak_object:type(RObj)).
868869

869870
encode_obj_msg(V, {Cmd, RObj}, undefined) ->
870-
case V of
871-
w0 ->
872-
term_to_binary({Cmd, RObj});
873-
_W ->
874-
BObj = riak_repl_util:to_wire(w1,RObj),
875-
term_to_binary({Cmd, BObj})
876-
end;
871+
term_to_binary({Cmd, encode_obj(V, RObj)});
877872
encode_obj_msg(V, {Cmd, RObj}, T) ->
878873
BTHash = case riak_repl_bucket_type_util:property_hash(T) of
879874
undefined ->
880875
0;
881876
Hash ->
882877
Hash
883878
end,
884-
case V of
885-
w0 ->
886-
term_to_binary({Cmd, {BTHash, RObj}});
887-
888-
_W ->
889-
BObj = riak_repl_util:to_wire(w1,RObj),
890-
term_to_binary({Cmd, {BTHash, BObj}})
879+
term_to_binary({Cmd, {BTHash, encode_obj(V, RObj)}}).
880+
881+
%% A wrapper around to_wire which leaves the object unencoded when using the w0 wire protocol.
882+
encode_obj(w0, RObj) ->
883+
RObj;
884+
encode_obj(W, RObj) ->
885+
to_wire(W, RObj).
886+
887+
decode_obj_msg(Data) ->
888+
Msg = binary_to_term(Data),
889+
case Msg of
890+
{fs_diff_obj, BObj} when is_binary(BObj) ->
891+
RObj = from_wire(BObj),
892+
{fs_diff_obj, RObj};
893+
{fs_diff_obj, {BTHash, BObj}} when is_binary(BObj) ->
894+
RObj = from_wire(BObj),
895+
{fs_diff_obj, {BTHash, RObj}};
896+
{fs_diff_obj, _} ->
897+
Msg;
898+
Other ->
899+
Other
891900
end.
892901

893902
%% @doc Create binary wire formatted replication blob for riak 2.0+, complete with

0 commit comments

Comments
 (0)