Skip to content

Commit e4b76bc

Browse files
author
Ian Milligan
committed
Merge branch 'develop-2.2' into develop
2 parents 0d3bfb3 + df2a117 commit e4b76bc

16 files changed

+449
-266
lines changed

rebar.config

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,11 @@
99
{xref_queries, [{"(XC - UC) || (XU - X - B - cluster_info : Mod)", []}]}.
1010

1111
{deps, [
12-
{lager, "2.0.3", {git, "git://github.com/basho/lager.git", {tag, "2.0.3"}}},
12+
{lager, ".*", {git, "git://github.com/basho/lager.git", {tag, "3.2.2"}}},
1313
{ranch, "0.4.0-p1", {git, "git://github.com/basho/ranch.git", {tag, "0.4.0-p1"}}},
1414
{ebloom, ".*", {git, "git://github.com/basho/ebloom.git", {tag, "2.0.0"}}},
15-
{riak_kv, ".*", {git, "git://github.com/basho/riak_kv.git", {branch, "develop"}}},
16-
{riak_repl_pb_api, ".*", {git, "[email protected]:basho/riak_repl_pb_api.git", {branch, "develop"}}}
15+
{riak_kv, ".*", {git, "git://github.com/basho/riak_kv.git", {tag, "2.1.3"}}},
16+
{riak_repl_pb_api, ".*", {git, "[email protected]:basho/riak_repl_pb_api.git", {tag, "2.4.0"}}}
1717
]}.
1818

1919
{edoc_opts, [{preprocess, true}]}.

src/riak_core_connection_mgr.erl

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -584,14 +584,17 @@ locate_endpoints({Type, Name}, Strategy, Locators) ->
584584
%% our book keeping for that endpoint. Black-list it, and
585585
%% adjust a backoff timer so that we wait a while before
586586
%% trying this endpoint again.
587+
588+
-spec fail_endpoint(ip_addr(), term(), proto_id(), #state{}) -> #state{}.
587589
fail_endpoint(Addr, Reason, ProtocolId, State) ->
588590
%% update the stats module
589-
Stat = {conn_error, Reason},
591+
Err = reason_to_atom(Reason),
592+
Stat = {conn_error, Err},
590593
riak_core_connection_mgr_stats:update(Stat, Addr, ProtocolId),
591594
%% update the endpoint
592595
Fun = fun(EP=#ep{backoff_delay = Backoff, failures = Failures}) ->
593596
erlang:send_after(Backoff, self(), {backoff_timer, Addr}),
594-
EP#ep{failures = orddict:update_counter(Reason, 1, Failures),
597+
EP#ep{failures = orddict:update_counter(Err, 1, Failures),
595598
nb_failures = EP#ep.nb_failures + 1,
596599
backoff_delay = increase_backoff(Backoff),
597600
last_fail_time = os:timestamp(),
@@ -600,6 +603,18 @@ fail_endpoint(Addr, Reason, ProtocolId, State) ->
600603
end,
601604
update_endpoint(Addr, Fun, State).
602605

606+
%% Attempt to extract atom from an error reason.
607+
608+
-spec reason_to_atom(term()) -> atom().
609+
reason_to_atom({{Err, _Val}, _Stack}) when is_atom(Err) ->
610+
Err;
611+
reason_to_atom({Err, _Stack}) when is_atom(Err) ->
612+
Err;
613+
reason_to_atom(Reason) when is_atom(Reason) ->
614+
Reason;
615+
reason_to_atom(_Reason) ->
616+
unknown_reason.
617+
603618
connect_endpoint(Addr, State) ->
604619
update_endpoint(Addr, fun(EP) ->
605620
EP#ep{is_black_listed = false,

src/riak_repl2_fscoordinator.erl

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,6 @@
5656
socket,
5757
transport,
5858
largest_n,
59-
owners = [],
6059
connection_ref,
6160
partition_queue = queue:new(),
6261
retries = dict:new(),
@@ -361,7 +360,6 @@ handle_cast(start_fullsync, State) ->
361360
Partitions = sort_partitions(Ring),
362361
State2 = State#state{
363362
largest_n = N,
364-
owners = riak_core_ring:all_owners(Ring),
365363
partition_queue = queue:from_list(Partitions),
366364
retries = dict:new(),
367365
reserve_retries = dict:new(),
@@ -389,7 +387,6 @@ handle_cast(stop_fullsync, State) ->
389387
<- State#state.running_sources],
390388
State2 = State#state{
391389
largest_n = undefined,
392-
owners = [],
393390
partition_queue = queue:new(),
394391
retries = dict:new(),
395392
reserve_retries = dict:new(),
@@ -770,26 +767,26 @@ send_next_whereis_req(State) ->
770767
% two specs: is the local node available, and does our cache of remote nodes
771768
% say the remote node is available.
772769
determine_best_partition(State) ->
773-
#state{partition_queue = Queue, busy_nodes = Busies, owners = Owners, whereis_waiting = Waiting} = State,
770+
#state{partition_queue = Queue, busy_nodes = Busies, whereis_waiting = Waiting} = State,
774771
SeedPart = queue:out(Queue),
775772
lager:debug("Starting partition search"),
776-
determine_best_partition(SeedPart, Busies, Owners, Waiting, queue:new()).
773+
determine_best_partition(SeedPart, Busies, Waiting, queue:new()).
777774

778-
determine_best_partition({empty, _Q}, _Business, _Owners, _Waiting, AccQ) ->
775+
determine_best_partition({empty, _Q}, _Business, _Waiting, AccQ) ->
779776
lager:debug("No partition in the queue that will not exceed a limit; will try again later."),
780777
% there is no best partition, try again later
781778
{undefined, AccQ};
782779

783-
determine_best_partition({{value, Part}, Queue}, Busies, Owners, Waiting, AccQ) ->
784-
case node_available(Part, Owners, Waiting) of
780+
determine_best_partition({{value, Part}, Queue}, Busies, Waiting, AccQ) ->
781+
case node_available(Part, Waiting) of
785782
false ->
786-
determine_best_partition(queue:out(Queue), Busies, Owners, Waiting, queue:in(Part, AccQ));
783+
determine_best_partition(queue:out(Queue), Busies, Waiting, queue:in(Part, AccQ));
787784
skip ->
788-
determine_best_partition(queue:out(Queue), Busies, Owners, Waiting, AccQ);
785+
determine_best_partition(queue:out(Queue), Busies, Waiting, AccQ);
789786
true ->
790787
case remote_node_available(Part, Busies) of
791788
false ->
792-
determine_best_partition(queue:out(Queue), Busies, Owners, Waiting, queue:in(Part, AccQ));
789+
determine_best_partition(queue:out(Queue), Busies, Waiting, queue:in(Part, AccQ));
793790
true ->
794791
{Part, queue:join(Queue, AccQ)}
795792
end
@@ -801,8 +798,10 @@ below_max_sources(State) ->
801798
Max = app_helper:get_env(riak_repl, max_fssource_cluster, ?DEFAULT_SOURCE_PER_CLUSTER),
802799
( length(State#state.running_sources) + length(State#state.whereis_waiting) ) < Max.
803800

804-
node_available(PartitionInfo, Owners, Waiting) ->
801+
node_available(PartitionInfo, Waiting) ->
805802
Partition = PartitionInfo#partition_info.index,
803+
{ok, Ring} = riak_core_ring_manager:get_my_ring(),
804+
Owners = riak_core_ring:all_owners(Ring),
806805
LocalNode = proplists:get_value(Partition, Owners),
807806
Max = app_helper:get_env(riak_repl, max_fssource_node, ?DEFAULT_SOURCE_PER_NODE),
808807
try riak_repl2_fssource_sup:enabled(LocalNode) of
@@ -834,8 +833,8 @@ remote_node_available(Partition, Busies) ->
834833

835834
start_fssource(PartitionVal, Ip, Port, State) ->
836835
Partition = PartitionVal#partition_info.index,
837-
#state{owners = Owners} = State,
838-
LocalNode = proplists:get_value(Partition, Owners),
836+
{ok, Ring} = riak_core_ring_manager:get_my_ring(),
837+
LocalNode = riak_core_ring:index_owner(Ring, Partition),
839838
case riak_repl2_fssource_sup:enable(LocalNode, Partition, {Ip, Port}) of
840839
{ok, Pid} ->
841840
link(Pid),

src/riak_repl2_fssink.erl

Lines changed: 3 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -148,8 +148,9 @@ handle_info({Proto, Socket, Data},
148148
State=#state{socket=Socket,transport=Transport}) when Proto==tcp; Proto==ssl ->
149149
%% aae strategy will not receive messages here
150150
Transport:setopts(Socket, [{active, once}]),
151-
case decode_obj_msg(Data) of
152-
{fs_diff_obj, RObj} ->
151+
case binary_to_term(Data) of
152+
{fs_diff_obj, BinObj} ->
153+
RObj = riak_repl_util:decode_bin_obj(BinObj),
153154
riak_repl_util:do_repl_put(RObj);
154155
Other ->
155156
gen_fsm:send_event(State#state.fullsync_worker, Other)
@@ -185,18 +186,6 @@ handle_info(init_ack, State=#state{socket=Socket,
185186
handle_info(_Msg, State) ->
186187
{noreply, State}.
187188

188-
decode_obj_msg(Data) ->
189-
Msg = binary_to_term(Data),
190-
case Msg of
191-
{fs_diff_obj, BObj} when is_binary(BObj) ->
192-
RObj = riak_repl_util:from_wire(BObj),
193-
{fs_diff_obj, RObj};
194-
{fs_diff_obj, _RObj} ->
195-
Msg;
196-
Other ->
197-
Other
198-
end.
199-
200189
terminate(_Reason, #state{fullsync_worker=FSW, work_dir=WorkDir, strategy=Strategy}) ->
201190
%% TODO: define a fullsync worker behavior and call it's stop function
202191
case is_pid(FSW) andalso is_process_alive(FSW) of

src/riak_repl2_pg_block_provider.erl

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
terminate/2, code_change/3, status/1, status/2]).
1717

1818
%% send a message every KEEPALIVE milliseconds to make sure the service is running on the sink
19-
-define(KEEPALIVE, 1000).
19+
-define(KEEPALIVE, 60000).
2020

2121
-record(state,
2222
{
@@ -27,7 +27,6 @@
2727
connection_ref,
2828
worker,
2929
client,
30-
keepalive_timer,
3130
proxy_gets_provided = 0
3231
}).
3332

@@ -75,16 +74,16 @@ handle_call({connected, Socket, Transport, _Endpoint, _Proto, Props}, _From,
7574
lager:debug("Keeping stats for " ++ SocketTag),
7675
riak_core_tcp_mon:monitor(Socket, {?TCP_MON_PROXYGET_APP, source,
7776
SocketTag}, Transport),
78-
TRef = keepalive_timer(),
7977
Transport:setopts(Socket, [{active, once}]),
8078
{ok, Client} = riak:local_client(),
81-
{reply, ok, State#state{
79+
State2 = State#state{
8280
transport=Transport,
8381
socket=Socket,
8482
other_cluster=Cluster,
85-
client=Client,
86-
keepalive_timer=TRef
87-
}};
83+
client=Client
84+
},
85+
_ = keepalive_timer(),
86+
{reply, ok, State2};
8887
handle_call(status, _From, State=#state{socket=Socket,
8988
proxy_gets_provided=PGCount}) ->
9089
SocketStats = riak_core_tcp_mon:socket_status(Socket),
@@ -109,6 +108,7 @@ handle_cast(_Msg, State) ->
109108
handle_info(keepalive, State=#state{socket=Socket, transport=Transport}) ->
110109
Data = term_to_binary(stay_awake),
111110
Transport:send(Socket, Data),
111+
_ = keepalive_timer(),
112112
{noreply, State};
113113
handle_info({tcp_closed, Socket}, State=#state{socket=Socket}) ->
114114
lager:info("Connection for proxy_get ~p closed", [State#state.other_cluster]),
@@ -125,14 +125,11 @@ handle_info({ssl_error, _Socket, Reason}, State) ->
125125
[State#state.other_cluster, Reason]),
126126
{stop, socket_closed, State};
127127
handle_info({Proto, Socket, Data},
128-
State0=#state{socket=Socket,transport=Transport, keepalive_timer=TRef})
128+
State0=#state{socket=Socket,transport=Transport})
129129
when Proto==tcp; Proto==ssl ->
130130
Transport:setopts(Socket, [{active, once}]),
131-
_ = timer:cancel(TRef),
132131
Msg = binary_to_term(Data),
133-
%% restart the timer after each message has been processed
134-
State = State0#state{keepalive_timer=keepalive_timer()},
135-
handle_msg(Msg, State);
132+
handle_msg(Msg, State0);
136133
handle_info(_Msg, State) ->
137134
{noreply, State}.
138135

@@ -161,5 +158,4 @@ code_change(_OldVsn, State, _Extra) ->
161158
{ok, State}.
162159

163160
keepalive_timer() ->
164-
timer:send_interval(?KEEPALIVE, keepalive).
165-
161+
erlang:send_after(?KEEPALIVE, self(), keepalive).

src/riak_repl2_rtq.erl

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -453,12 +453,8 @@ terminate(Reason, #state{cs = Cs}) ->
453453
%% when started from tests, we may not be registered
454454
catch(erlang:unregister(?SERVER)),
455455
flush_pending_pushes(),
456-
_ = [case DeliverFun of
457-
undefined ->
458-
ok;
459-
_ ->
460-
catch(DeliverFun({error, {terminate, Reason}}))
461-
end || #c{deliver = DeliverFun} <- Cs],
456+
_ = [deliver_error(DeliverFun, {terminate, Reason}) ||
457+
#c{deliver = DeliverFun} <- Cs],
462458
ok.
463459

464460
%% @private
@@ -502,8 +498,8 @@ unregister_q(Name, State = #state{qtab = QTab, cs = Cs}) ->
502498
case C#c.deliver of
503499
undefined ->
504500
ok;
505-
Deliver ->
506-
Deliver({error, unregistered})
501+
DeliverFun ->
502+
deliver_error(DeliverFun, {error, unregistered})
507503
end,
508504
MinSeq = case Cs2 of
509505
[] ->
@@ -551,8 +547,8 @@ pull(Name, DeliverFun, State = #state{qtab = QTab, qseq = QSeq, cs = Cs}) ->
551547
{value, C, Cs2} ->
552548
[maybe_pull(QTab, QSeq, C, CsNames, DeliverFun) | Cs2];
553549
false ->
554-
lager:info("not_registered"),
555-
DeliverFun({error, not_registered})
550+
lager:error("Consumer ~p pulled from RTQ, but was not registered", [Name]),
551+
deliver_error(DeliverFun, not_registered)
556552
end,
557553
State#state{cs = UpdCs}.
558554

@@ -625,6 +621,13 @@ deliver_item(C, DeliverFun, {Seq,_NumItem, _Bin, _Meta} = QEntry) ->
625621
C#c{errs = C#c.errs + 1, deliver = undefined}
626622
end.
627623

624+
%% Deliver an error if a delivery function is registered.
625+
deliver_error(DeliverFun, Reason) when is_function(DeliverFun)->
626+
catch DeliverFun({error, Reason}),
627+
ok;
628+
deliver_error(_NotAFun, _Reason) ->
629+
ok.
630+
628631
% if nothing has been delivered, the sink assumes nothing was skipped
629632
% fulfill that expectation.
630633
set_skip_meta(QEntry, _Seq, _C = #c{delivered = false}) ->

src/riak_repl2_rtsink_conn.erl

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,7 @@ handle_call(status, _From, State = #state{remote = Remote,
135135
{hb_last, HBLast},
136136
%%{peer, peername(State)},
137137
{helper, riak_repl_util:safe_pid_to_list(Helper)},
138+
{helper_msgq_len, riak_repl_util:safe_get_msg_q_len(Helper)},
138139
{active, Active},
139140
{deactivated, Deactivated},
140141
{source_drops, SourceDrops},
@@ -242,8 +243,9 @@ handle_info(report_bt_drops, State=#state{bt_drops = DropDict}) ->
242243
Report = app_helper:get_env(riak_repl, bucket_type_drop_report_interval, ?DEFAULT_INTERVAL_MILLIS),
243244
{noreply, State#state{bt_drops = dict:new(), bt_timer = undefined, bt_interval = Report}}.
244245

245-
terminate(_Reason, _State) ->
246+
terminate(_Reason, State) ->
246247
%% TODO: Consider trying to do something graceful with poolboy?
248+
catch riak_repl2_rtsink_helper:stop(State#state.helper),
247249
ok.
248250

249251
code_change(_OldVsn, State, _Extra) ->

src/riak_repl2_rtsource_conn.erl

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -326,8 +326,9 @@ handle_info(Msg, State) ->
326326
lager:warning("Unhandled info: ~p", [Msg]),
327327
{noreply, State}.
328328

329-
terminate(_Reason, #state{helper_pid=_HelperPid, remote=Remote}) ->
329+
terminate(_Reason, #state{helper_pid=HelperPid, remote=Remote}) ->
330330
riak_core_connection_mgr:disconnect({rt_repl, Remote}),
331+
catch riak_repl2_rtsource_helper:stop(HelperPid),
331332
ok.
332333

333334
code_change(_OldVsn, State, _Extra) ->

src/riak_repl_aae_sink.erl

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -152,13 +152,19 @@ process_msg(?MSG_PUT_OBJ, {fs_diff_obj, BObj}, State) ->
152152
%% replies: ok | not_responsible
153153
process_msg(?MSG_UPDATE_TREE, IndexN, State=#state{tree_pid=TreePid}) ->
154154
ResponseMsg = riak_kv_index_hashtree:update(IndexN, TreePid),
155+
send_reply(ResponseMsg, State);
156+
157+
%% replies: ok | not_built | already_locked | bad_version
158+
process_msg(?MSG_LOCK_TREE, Version, State=#state{tree_pid=TreePid}) ->
159+
ResponseMsg = riak_kv_index_hashtree:get_lock(TreePid, fullsync_sink, Version),
155160
send_reply(ResponseMsg, State).
156161

157-
%% replies: ok | not_built | already_locked
158-
process_msg(?MSG_LOCK_TREE, State=#state{tree_pid=TreePid}) ->
162+
%% replies: ok | not_built | already_locked | bad_version
163+
process_msg(?MSG_LOCK_TREE, State) ->
159164
%% NOTE: be sure to die if tcp connection dies, to give back lock
160-
ResponseMsg = riak_kv_index_hashtree:get_lock(TreePid, fullsync_sink),
161-
send_reply(ResponseMsg, State);
165+
%% Message coming from an old aae source. Only allow lock if old undefined version
166+
%% is in use locally.
167+
process_msg(?MSG_LOCK_TREE, legacy, State);
162168

163169
%% no reply
164170
process_msg(?MSG_COMPLETE, State=#state{owner=Owner}) ->

0 commit comments

Comments
 (0)