Skip to content

Commit df36463

Browse files
committed
Merge pull request #668 from basho/revert-665-bugfix/im/aae_fullsync_bucket_types
Revert "Update AAE fullsync to support bucket types" Reviewed-by: bsparrow435
2 parents e6cd9ca + 078f163 commit df36463

File tree

4 files changed

+31
-34
lines changed

4 files changed

+31
-34
lines changed

src/riak_repl2_fssink.erl

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ handle_info({Proto, Socket, Data},
148148
State=#state{socket=Socket,transport=Transport}) when Proto==tcp; Proto==ssl ->
149149
%% aae strategy will not receive messages here
150150
Transport:setopts(Socket, [{active, once}]),
151-
case riak_repl_util:decode_obj_msg(Data) of
151+
case decode_obj_msg(Data) of
152152
{fs_diff_obj, RObj} ->
153153
riak_repl_util:do_repl_put(RObj);
154154
Other ->
@@ -185,6 +185,18 @@ handle_info(init_ack, State=#state{socket=Socket,
185185
handle_info(_Msg, State) ->
186186
{noreply, State}.
187187

188+
decode_obj_msg(Data) ->
189+
Msg = binary_to_term(Data),
190+
case Msg of
191+
{fs_diff_obj, BObj} when is_binary(BObj) ->
192+
RObj = riak_repl_util:from_wire(BObj),
193+
{fs_diff_obj, RObj};
194+
{fs_diff_obj, _RObj} ->
195+
Msg;
196+
Other ->
197+
Other
198+
end.
199+
188200
terminate(_Reason, #state{fullsync_worker=FSW, work_dir=WorkDir, strategy=Strategy}) ->
189201
%% TODO: define a fullsync worker behavior and call it's stop function
190202
case is_pid(FSW) andalso is_process_alive(FSW) of

src/riak_repl2_fssink_pool.erl

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
%% fullsyncs are running.
33

44
-module(riak_repl2_fssink_pool).
5-
-export([start_link/0, status/0, put/1, bin_put/1]).
5+
-export([start_link/0, status/0, bin_put/1]).
66

77
start_link() ->
88
MinPool = app_helper:get_env(riak_repl, fssink_min_workers, 5),
@@ -21,12 +21,6 @@ status() ->
2121
{overflow, Overflow},
2222
{num_monitors, NumMonitors}].
2323

24-
%% @doc Send an object or BT hash / object pair to the worker pool to
25-
%% run a put against. No guarantees of completion.
26-
put(Obj) ->
27-
Pid = poolboy:checkout(?MODULE, true, infinity),
28-
riak_repl_fullsync_worker:do_put(Pid, Obj, ?MODULE).
29-
3024
%% @doc Send a replication wire-encoded binary to the worker pool
3125
%% for running a put against. No guarantees of completion.
3226
bin_put(BinObj) ->

src/riak_repl_aae_sink.erl

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ handle_info({Proto, _Socket, Data}, State=#state{transport=Transport,
8383
[MsgType|<<>>] ->
8484
process_msg(MsgType, State);
8585
[MsgType|MsgData] ->
86-
process_msg(MsgType, riak_repl_util:decode_obj_msg(MsgData), State)
86+
process_msg(MsgType, binary_to_term(MsgData), State)
8787
end;
8888

8989
handle_info({'DOWN', _, _, _, _}, State) ->
@@ -143,10 +143,10 @@ process_msg(?MSG_GET_AAE_SEGMENT, {SegmentNum,IndexN}, State=#state{tree_pid=Tre
143143
send_reply(ResponseMsg, State);
144144

145145
%% no reply
146-
process_msg(?MSG_PUT_OBJ, {fs_diff_obj, Obj}, State) ->
146+
process_msg(?MSG_PUT_OBJ, {fs_diff_obj, BObj}, State) ->
147147
%% may block on worker pool, ok return means work was submitted
148148
%% to pool, not that put FSM completed successfully.
149-
ok = riak_repl2_fssink_pool:put(Obj),
149+
ok = riak_repl2_fssink_pool:bin_put(BObj),
150150
{noreply, State};
151151

152152
%% replies: ok | not_responsible

src/riak_repl_util.erl

Lines changed: 14 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,6 @@
5555
sockname/2,
5656
deduce_wire_version_from_proto/1,
5757
encode_obj_msg/2,
58-
decode_obj_msg/1,
5958
make_pg_proxy_name/1,
6059
make_pg_name/1,
6160
mode_12_enabled/1,
@@ -868,35 +867,27 @@ encode_obj_msg(V, {Cmd, RObj}) ->
868867
encode_obj_msg(V, {Cmd, RObj}, riak_object:type(RObj)).
869868

870869
encode_obj_msg(V, {Cmd, RObj}, undefined) ->
871-
term_to_binary({Cmd, encode_obj(V, RObj)});
870+
case V of
871+
w0 ->
872+
term_to_binary({Cmd, RObj});
873+
_W ->
874+
BObj = riak_repl_util:to_wire(w1,RObj),
875+
term_to_binary({Cmd, BObj})
876+
end;
872877
encode_obj_msg(V, {Cmd, RObj}, T) ->
873878
BTHash = case riak_repl_bucket_type_util:property_hash(T) of
874879
undefined ->
875880
0;
876881
Hash ->
877882
Hash
878883
end,
879-
term_to_binary({Cmd, {BTHash, encode_obj(V, RObj)}}).
880-
881-
%% A wrapper around to_wire which leaves the object unencoded when using the w0 wire protocol.
882-
encode_obj(w0, RObj) ->
883-
RObj;
884-
encode_obj(W, RObj) ->
885-
to_wire(W, RObj).
886-
887-
decode_obj_msg(Data) ->
888-
Msg = binary_to_term(Data),
889-
case Msg of
890-
{fs_diff_obj, BObj} when is_binary(BObj) ->
891-
RObj = from_wire(BObj),
892-
{fs_diff_obj, RObj};
893-
{fs_diff_obj, {BTHash, BObj}} when is_binary(BObj) ->
894-
RObj = from_wire(BObj),
895-
{fs_diff_obj, {BTHash, RObj}};
896-
{fs_diff_obj, _} ->
897-
Msg;
898-
Other ->
899-
Other
884+
case V of
885+
w0 ->
886+
term_to_binary({Cmd, {BTHash, RObj}});
887+
888+
_W ->
889+
BObj = riak_repl_util:to_wire(w1,RObj),
890+
term_to_binary({Cmd, {BTHash, BObj}})
900891
end.
901892

902893
%% @doc Create binary wire formatted replication blob for riak 2.0+, complete with

0 commit comments

Comments
 (0)