Skip to content

Commit b8cfd20

Browse files
committed
Merge pull request #706 from basho/bugfix/im/aae_fullsync_bucket_types_2.0
Backport Bugfix/im/aae_fullsync_bucket_types to 2.0 Reviewed-by: jonmeredith
2 parents e5cc497 + c4d425f commit b8cfd20

File tree

3 files changed

+15
-29
lines changed

3 files changed

+15
-29
lines changed

src/riak_repl2_fssink.erl

Lines changed: 3 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -148,8 +148,9 @@ handle_info({Proto, Socket, Data},
148148
State=#state{socket=Socket,transport=Transport}) when Proto==tcp; Proto==ssl ->
149149
%% aae strategy will not receive messages here
150150
Transport:setopts(Socket, [{active, once}]),
151-
case decode_obj_msg(Data) of
152-
{fs_diff_obj, RObj} ->
151+
case binary_to_term(Data) of
152+
{fs_diff_obj, BinObj} ->
153+
RObj = riak_repl_util:decode_bin_obj(BinObj),
153154
riak_repl_util:do_repl_put(RObj);
154155
Other ->
155156
gen_fsm:send_event(State#state.fullsync_worker, Other)
@@ -185,18 +186,6 @@ handle_info(init_ack, State=#state{socket=Socket,
185186
handle_info(_Msg, State) ->
186187
{noreply, State}.
187188

188-
decode_obj_msg(Data) ->
189-
Msg = binary_to_term(Data),
190-
case Msg of
191-
{fs_diff_obj, BObj} when is_binary(BObj) ->
192-
RObj = riak_repl_util:from_wire(BObj),
193-
{fs_diff_obj, RObj};
194-
{fs_diff_obj, _RObj} ->
195-
Msg;
196-
Other ->
197-
Other
198-
end.
199-
200189
terminate(_Reason, #state{fullsync_worker=FSW, work_dir=WorkDir, strategy=Strategy}) ->
201190
%% TODO: define a fullsync worker behavior and call it's stop function
202191
case is_pid(FSW) andalso is_process_alive(FSW) of

src/riak_repl_fullsync_worker.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ handle_cast({put, RObj, Pool}, State) ->
140140
poolboy:checkin(Pool, self()),
141141
{noreply, State};
142142
handle_cast({bin_put, BinObj, Pool}, State) ->
143-
RObj = riak_repl_util:from_wire(BinObj),
143+
RObj = riak_repl_util:decode_bin_obj(BinObj),
144144
riak_repl_util:do_repl_put(RObj),
145145
poolboy:checkin(Pool, self()), % resume work
146146
{noreply, State};

src/riak_repl_util.erl

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
sockname/2,
5757
deduce_wire_version_from_proto/1,
5858
encode_obj_msg/2,
59+
decode_bin_obj/1,
5960
make_pg_proxy_name/1,
6061
make_pg_name/1,
6162
mode_12_enabled/1,
@@ -875,28 +876,24 @@ encode_obj_msg(V, {Cmd, RObj}) ->
875876
encode_obj_msg(V, {Cmd, RObj}, riak_object:type(RObj)).
876877

877878
encode_obj_msg(V, {Cmd, RObj}, undefined) ->
878-
case V of
879-
w0 ->
880-
term_to_binary({Cmd, RObj});
881-
_W ->
882-
BObj = riak_repl_util:to_wire(w1,RObj),
883-
term_to_binary({Cmd, BObj})
884-
end;
879+
term_to_binary({Cmd, encode_obj(V, RObj)});
885880
encode_obj_msg(V, {Cmd, RObj}, T) ->
886881
BTHash = case riak_repl_bucket_type_util:property_hash(T) of
887882
undefined ->
888883
0;
889884
Hash ->
890885
Hash
891886
end,
892-
case V of
893-
w0 ->
894-
term_to_binary({Cmd, {BTHash, RObj}});
887+
term_to_binary({Cmd, {BTHash, encode_obj(V, RObj)}}).
895888

896-
_W ->
897-
BObj = riak_repl_util:to_wire(w1,RObj),
898-
term_to_binary({Cmd, {BTHash, BObj}})
899-
end.
889+
%% A wrapper around to_wire which leaves the object unencoded when using the w0 wire protocol.
890+
encode_obj(w0, RObj) ->
891+
RObj;
892+
encode_obj(W, RObj) ->
893+
to_wire(W, RObj).
894+
895+
decode_bin_obj({BTHash, BinObj}) -> {BTHash, from_wire(BinObj)};
896+
decode_bin_obj(BinObj) -> from_wire(BinObj).
900897

901898
%% @doc Create binary wire formatted replication blob for riak 2.0+, complete with
902899
%% possible type, bucket and key for reconstruction on the other end. BinObj should be

0 commit comments

Comments
 (0)