Skip to content

Commit d6b8602

Browse files
committed
Merge pull request #658 from basho/develop-2.0-merge
Develop 2.0 merge. Reviewed-by: seancribbs
2 parents c0e6735 + 8c6b159 commit d6b8602

29 files changed

+701
-363
lines changed

dialyzer.ignore-warnings

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,7 @@ riak_repl_keylist_client.erl:267: The call application:unset_env('riak_repl',{'p
44
riak_repl_keylist_client.erl:120: The call application:unset_env('riak_repl',{'progress',_}) breaks the contract (Application,Par) -> 'ok' when is_subtype(Application,atom()), is_subtype(Par,atom())
55
riak_repl_keylist_client.erl:132: The call application:set_env('riak_repl',{'progress',_},nonempty_maybe_improper_list()) breaks the contract (Application,Par,Val) -> 'ok' when is_subtype(Application,atom()), is_subtype(Par,atom()), is_subtype(Val,term())
66
riak_core_connection.erl:108: Function exchange_handshakes_with/4 has no local return
7-
riak_core_connection.erl:171: The call ranch_tcp:send(Socket::port(),Hello::binary()) breaks the contract (inet:socket(),iolist()) -> 'ok' | {'error',atom()}
8-
riak_core_connection.erl:189: Function try_ssl/4 will never be called
9-
riak_core_connection.erl:225: Function negotiate_proto_with_server/3 will never be called
7+
riak_core_connection.erl:172: The call ranch_tcp:send(Socket::port(),Hello::binary()) breaks the contract (inet:socket(),iolist()) -> 'ok' | {'error',atom()}
108
riak_repl_keylist_client.erl:106: The call application:unset_env('riak_repl',{'progress',_}) breaks the contract (Application,Par) -> 'ok' when is_subtype(Application,atom()), is_subtype(Par,atom())
119
riak_repl_keylist_client.erl:216: The call application:unset_env('riak_repl',{'progress',_}) breaks the contract (Application,Par) -> 'ok' when is_subtype(Application,atom()), is_subtype(Par,atom())
1210
riak_repl_keylist_client.erl:265: The call application:unset_env('riak_repl',{'progress',_}) breaks the contract (Application,Par) -> 'ok' when is_subtype(Application,atom()), is_subtype(Par,atom())

include/riak_core_connection.hrl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
-define(CTRL_ACK, <<"riak-ctrl:ack">>).
2929
-define(CTRL_ASK_NAME, <<"riak-ctrl:ask_name">>).
3030
-define(CTRL_ASK_MEMBERS, <<"riak-ctrl:ask_members">>).
31+
-define(CTRL_ALL_MEMBERS, <<"riak-ctrl:all_members">>).
3132

3233

3334
-define(CONNECTION_SETUP_TIMEOUT, 10000).

src/riak_core_cluster_conn.erl

Lines changed: 78 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -75,19 +75,21 @@
7575

7676
-type remote() :: {cluster_by_name, clustername()} | {cluster_by_addr, ip_addr()}.
7777
-type peer_address() :: {string(), pos_integer()}.
78+
-type node_address() :: {atom(), peer_address()}.
7879
-type ranch_transport_messages() :: {atom(), atom(), atom()}.
7980
-record(state, {mode :: atom(),
8081
remote :: remote(),
8182
socket :: port(),
8283
name :: clustername(),
8384
previous_name="undefined" :: clustername(),
84-
members=[] :: [peer_address()],
85+
members=[] :: [peer_address() | node_address()],
8586
connection_ref :: reference(),
8687
connection_timeout :: timeout(),
8788
transport :: atom(),
8889
address :: peer_address(),
8990
connection_props :: proplists:proplist(),
90-
transport_msgs :: ranch_transport_messages()}).
91+
transport_msgs :: ranch_transport_messages(),
92+
proto_version :: {non_neg_integer(), non_neg_integer()} }).
9193
-type state() :: #state{}.
9294

9395
%%%===================================================================
@@ -121,14 +123,17 @@ status(Ref, Timeout) ->
121123
connected(Socket,
122124
Transport,
123125
Addr,
124-
{?REMOTE_CLUSTER_PROTO_ID, _MyVer, _RemoteVer},
126+
{?REMOTE_CLUSTER_PROTO_ID,
127+
_MyVer ={CommonMajor,LocalMinor},
128+
_RemoteVer={CommonMajor,RemoteMinor}},
125129
{_Remote, Client},
126130
Props) ->
127131
%% give control over the socket to the `Client' process.
128132
%% tell client we're connected and to whom
129133
Transport:controlling_process(Socket, Client),
130134
gen_fsm:send_event(Client,
131-
{connected_to_remote, Socket, Transport, Addr, Props}).
135+
{connected_to_remote, Socket, Transport, Addr, Props,
136+
{CommonMajor, min(LocalMinor,RemoteMinor)}}).
132137

133138
-spec connect_failed({term(), term()}, {error, term()}, {_, atom() | pid() | port() | {atom(), _} | {via, _, _}}) -> ok.
134139
connect_failed({_Proto, _Vers}, {error, _}=Error, {_Remote, Client}) ->
@@ -189,7 +194,7 @@ connecting({connect_failed, Error}, State=#state{remote=Remote}) ->
189194
%% This is fatal! We are being supervised by conn_sup and if we
190195
%% die, it will restart us.
191196
{stop, Error, State};
192-
connecting({connected_to_remote, Socket, Transport, Addr, Props}, State) ->
197+
connecting({connected_to_remote, Socket, Transport, Addr, Props, ProtoVersion}, State) ->
193198
RemoteName = proplists:get_value(clustername, Props),
194199
_ = lager:debug("Cluster Manager control channel client connected to"
195200
" remote ~p at ~p named ~p",
@@ -202,7 +207,9 @@ connecting({connected_to_remote, Socket, Transport, Addr, Props}, State) ->
202207
transport=Transport,
203208
address=Addr,
204209
connection_props=Props,
205-
transport_msgs = TransportMsgs},
210+
transport_msgs = TransportMsgs,
211+
proto_version=ProtoVersion
212+
},
206213
_ = request_cluster_name(UpdState),
207214
{next_state, waiting_for_cluster_name, UpdState, ?CONNECTION_SETUP_TIMEOUT};
208215
connecting(poll_cluster, State) ->
@@ -240,13 +247,21 @@ waiting_for_cluster_name(_, _From, _State) ->
240247
{reply, ok, waiting_for_cluster_name, _State}.
241248

242249
%% Async message handling for the `waiting_for_cluster_members' state
243-
waiting_for_cluster_members({cluster_members, Members}, State) ->
250+
waiting_for_cluster_members({cluster_members, NewMembers}, State = #state{ proto_version={1,0} }) ->
244251
#state{address=Addr,
245252
name=Name,
246253
previous_name=PreviousName,
254+
members=OldMembers,
247255
remote=Remote} = State,
248-
%% This is the first time we're updating the cluster manager
249-
%% with the name of this cluster, so it's old name is undefined.
256+
%% this is 1.0 code. NewMembers is list of {IP,Port}
257+
258+
SortedNew = ordsets:from_list(NewMembers),
259+
Members =
260+
NewMembers ++ lists:filter(fun(Mem) ->
261+
not ordsets:is_element(Mem, SortedNew)
262+
end,
263+
OldMembers),
264+
250265
ClusterUpdatedMsg = {cluster_updated,
251266
PreviousName,
252267
Name,
@@ -255,14 +270,48 @@ waiting_for_cluster_members({cluster_members, Members}, State) ->
255270
Remote},
256271
gen_server:cast(?CLUSTER_MANAGER_SERVER, ClusterUpdatedMsg),
257272
{next_state, connected, State#state{members=Members}};
273+
waiting_for_cluster_members({all_cluster_members, NewMembers}, State) ->
274+
#state{address=Addr,
275+
name=Name,
276+
previous_name=PreviousName,
277+
members=OldMembers,
278+
remote=Remote} = State,
279+
280+
%% this is 1.1+ code. Members is list of {node,{IP,Port}}
281+
282+
Members =
283+
lists:foldl(fun(Elm={_Node,{_Ip,Port}}, Acc) when is_integer(Port) ->
284+
[Elm|Acc];
285+
({Node,_}, Acc) ->
286+
case lists:keyfind(Node, 1, OldMembers) of
287+
Elm={Node,{_IP,Port}} when is_integer(Port) ->
288+
[Elm|Acc];
289+
_ ->
290+
Acc
291+
end
292+
end,
293+
[],
294+
NewMembers ),
295+
296+
ClusterUpdatedMsg = {cluster_updated,
297+
PreviousName,
298+
Name,
299+
[Member || {_Node,Member} <- Members],
300+
Addr,
301+
Remote},
302+
gen_server:cast(?CLUSTER_MANAGER_SERVER, ClusterUpdatedMsg),
303+
{next_state, connected, State#state{members=Members}};
258304
waiting_for_cluster_members(_, _State) ->
259305
{next_state, waiting_for_cluster_members, _State}.
260306

261307
%% Sync message handling for the `waiting_for_cluster_members' state
262308
waiting_for_cluster_members(status, _From, State) ->
263309
{reply, {waiting_for_cluster_members, State#state.name}, waiting_for_cluster_members, State};
264-
waiting_for_cluster_members(_, _From, _State) ->
265-
{reply, ok, waiting_for_cluster_members, _State}.
310+
waiting_for_cluster_members(Other, _From, State) ->
311+
_ = lager:error("cluster_conn: client got unexpected "
312+
"msg from remote: ~p, ~p",
313+
[State#state.remote, Other]),
314+
{reply, ok, waiting_for_cluster_members, State}.
266315

267316
%% Async message handling for the `connected' state
268317
connected(poll_cluster, State) ->
@@ -307,11 +356,18 @@ handle_info({TransOK, Socket, Name},
307356
{next_state, waiting_for_cluster_name, State};
308357
handle_info({TransOK, Socket, Members},
309358
waiting_for_cluster_members,
310-
State=#state{socket=Socket, transport_msgs = {TransOK, _, _}}) ->
359+
State=#state{socket=Socket, transport_msgs = {TransOK, _, _}, proto_version={1,0}}) ->
311360
Transport = State#state.transport,
312361
gen_fsm:send_event(self(), {cluster_members, binary_to_term(Members)}),
313362
_ = Transport:setopts(Socket, [{active, once}]),
314363
{next_state, waiting_for_cluster_members, State};
364+
handle_info({TransOK, Socket, Members},
365+
waiting_for_cluster_members,
366+
State=#state{socket=Socket, transport_msgs = {TransOK, _, _}}) ->
367+
Transport = State#state.transport,
368+
gen_fsm:send_event(self(), {all_cluster_members, binary_to_term(Members)}),
369+
_ = Transport:setopts(Socket, [{active, once}]),
370+
{next_state, waiting_for_cluster_members, State};
315371
handle_info({TransOK, Socket, Data},
316372
StateName,
317373
State=#state{address=Addr,
@@ -362,18 +418,25 @@ code_change(_OldVsn, StateName, State, _Extra) ->
362418
request_cluster_name(#state{mode=test}) ->
363419
ok;
364420
request_cluster_name(#state{socket=Socket, transport=Transport}) ->
365-
_ = inet:setopts(Socket, [{active, once}]),
421+
_ = Transport:setopts(Socket, [{active, once}]),
366422
Transport:send(Socket, ?CTRL_ASK_NAME).
367423

368424
-spec request_member_ips(state()) -> ok | {error, term()}.
369425
request_member_ips(#state{mode=test}) ->
370426
ok;
371-
request_member_ips(#state{socket=Socket, transport=Transport}) ->
427+
request_member_ips(#state{socket=Socket, transport=Transport, proto_version={1,0}}) ->
372428
Transport:send(Socket, ?CTRL_ASK_MEMBERS),
373429
%% get the IP we think we've connected to
374430
{ok, {PeerIP, PeerPort}} = Transport:peername(Socket),
375431
%% make it a string
376432
PeerIPStr = inet_parse:ntoa(PeerIP),
433+
Transport:send(Socket, term_to_binary({PeerIPStr, PeerPort}));
434+
request_member_ips(#state{socket=Socket, transport=Transport, proto_version={1,1}}) ->
435+
Transport:send(Socket, ?CTRL_ALL_MEMBERS),
436+
%% get the IP we think we've connected to
437+
{ok, {PeerIP, PeerPort}} = Transport:peername(Socket),
438+
%% make it a string
439+
PeerIPStr = inet_parse:ntoa(PeerIP),
377440
Transport:send(Socket, term_to_binary({PeerIPStr, PeerPort})).
378441

379442
initiate_connection(State=#state{mode=test}) ->
@@ -383,7 +446,7 @@ initiate_connection(State=#state{remote=Remote}) ->
383446
%% `riak_core_connection_mgr::connect/4' is incorrect.
384447
{ok, Ref} = riak_core_connection_mgr:connect(
385448
Remote,
386-
{{?REMOTE_CLUSTER_PROTO_ID, [{1,0}]},
449+
{{?REMOTE_CLUSTER_PROTO_ID, [{1,1},{1,0}]},
387450
{?CTRL_OPTIONS, ?MODULE, {Remote, self()}}},
388451
default),
389452
State#state{connection_ref=Ref}.

0 commit comments

Comments
 (0)