@@ -706,7 +706,6 @@ schedule_tick() ->
706706 erlang :send_after (Tick , ? MODULE , tick ).
707707
708708tick (PreFetchRing , RingID , State = # state {last_ring_id = LastID }) ->
709- maybe_enable_ensembles (),
710709 case RingID of
711710 LastID ->
712711 schedule_tick (),
@@ -725,7 +724,6 @@ tick(PreFetchRing, RingID, State=#state{last_ring_id=LastID}) ->
725724 ? LOG_INFO (" Ingoring fresh ring as shutting down" ),
726725 ok ;
727726 false ->
728- maybe_bootstrap_root_ensemble (Ring ),
729727 maybe_force_ring_update (Ring ),
730728 schedule_tick ()
731729 end ,
@@ -833,157 +831,6 @@ type_claimant(Props) ->
833831 false -> undefined
834832 end .
835833
836- % % The consensus subsystem must be enabled by exactly one node in a cluster
837- % % via a call to riak_ensemble_manager:enable(). We accomplished this by
838- % % having the claimant be that one node. Likewise, we require that the cluster
839- % % includes at least three nodes before we enable consensus. This prevents the
840- % % claimant in a 1-node cluster from enabling consensus before being joined to
841- % % another cluster.
842- maybe_enable_ensembles () ->
843- Desired = riak_core_sup :ensembles_enabled (),
844- Enabled = riak_ensemble_manager :enabled (),
845- case Enabled of
846- Desired ->
847- ok ;
848- _ ->
849- {ok , Ring } = riak_core_ring_manager :get_raw_ring (),
850- IsReady = riak_core_ring :ring_ready (Ring ),
851- IsClaimant = (riak_core_ring :claimant (Ring ) == node ()),
852- EnoughNodes = (length (riak_core_ring :ready_members (Ring )) >= 3 ),
853- case IsReady and IsClaimant and EnoughNodes of
854- true ->
855- enable_ensembles (Ring );
856- false ->
857- ok
858- end
859- end .
860-
861- % % We need to avoid a race where the current claimant enables consensus right
862- % % before going offline and being replaced by a new claimant. It could be
863- % % argued that this corner case is not important since changing the claimant
864- % % requires the user manually marking the current claimant as down. But, it's
865- % % better to be safe and handle things correctly.
866- % %
867- % % To solve this issue, the claimant first marks itself as the "ensemble
868- % % singleton" in the ring metadata. Once the ring has converged, the claimant
869- % % see that it previously marked itself as the singleton and will proceed to
870- % % enable the consensus subsystem. If the claimant goes offline after marking
871- % % itself the singleton, but before enabling consensus, then future claimants
872- % % will be unable to enable consensus. Consensus will be enabled once the
873- % % previous claimant comes back online.
874- % %
875- enable_ensembles (Ring ) ->
876- Node = node (),
877- case ensemble_singleton (Ring ) of
878- undefined ->
879- become_ensemble_singleton ();
880- Node ->
881- % % Ring update is required after enabling consensus to ensure
882- % % that ensembles are properly bootstrapped.
883- riak_ensemble_manager :enable (),
884- riak_core_ring_manager :force_update (),
885- ? LOG_INFO (" Activated consensus subsystem for cluster" );
886- _ ->
887- ok
888- end .
889-
890- ensemble_singleton (Ring ) ->
891- case riak_core_ring :get_meta ('$ensemble_singleton' , Ring ) of
892- undefined ->
893- undefined ;
894- {ok , Node } ->
895- Members = riak_core_ring :all_members (Ring ),
896- case lists :member (Node , Members ) of
897- true ->
898- Node ;
899- false ->
900- undefined
901- end
902- end .
903-
904- become_ensemble_singleton () ->
905- _ = riak_core_ring_manager :ring_trans (fun become_ensemble_singleton_trans /2 ,
906- undefined ),
907- ok .
908-
909- become_ensemble_singleton_trans (Ring , _ ) ->
910- IsClaimant = (riak_core_ring :claimant (Ring ) == node ()),
911- NoSingleton = (ensemble_singleton (Ring ) =:= undefined ),
912- case IsClaimant and NoSingleton of
913- true ->
914- Ring2 = riak_core_ring :update_meta ('$ensemble_singleton' , node (), Ring ),
915- {new_ring , Ring2 };
916- false ->
917- ignore
918- end .
919-
920- maybe_bootstrap_root_ensemble (Ring ) ->
921- IsEnabled = riak_ensemble_manager :enabled (),
922- IsClaimant = (riak_core_ring :claimant (Ring ) == node ()),
923- IsReady = riak_core_ring :ring_ready (Ring ),
924- case IsEnabled and IsClaimant and IsReady of
925- true ->
926- bootstrap_root_ensemble (Ring );
927- false ->
928- ok
929- end .
930-
931- bootstrap_root_ensemble (Ring ) ->
932- bootstrap_members (Ring ),
933- ok .
934-
935- bootstrap_members (Ring ) ->
936- Name = riak_core_ring :cluster_name (Ring ),
937- Members = riak_core_ring :ready_members (Ring ),
938- RootMembers = riak_ensemble_manager :get_members (root ),
939- Known = riak_ensemble_manager :cluster (),
940- Need = Members -- Known ,
941- L = [riak_core_util :proxy_spawn (
942- fun () -> riak_ensemble_manager :join (node (), Member ) end
943- ) || Member <- Need , Member =/= node ()],
944- _ = maybe_reset_ring_id (L ),
945-
946- RootNodes = [Node || {_ , Node } <- RootMembers ],
947- RootAdd = Members -- RootNodes ,
948- RootDel = RootNodes -- Members ,
949-
950- Res = [riak_core_util :proxy_spawn (
951- fun () -> riak_ensemble_manager :remove (node (), N ) end
952- ) || N <- RootDel , N =/= node ()],
953- _ = maybe_reset_ring_id (Res ),
954-
955- Changes =
956- [{add , {Name , Node }} || Node <- RootAdd ] ++
957- [{del , {Name , Node }} || Node <- RootDel ],
958- case Changes of
959- [] ->
960- ok ;
961- _ ->
962- Self = self (),
963- spawn_link (fun () ->
964- async_bootstrap_members (Self , Changes )
965- end ),
966- ok
967- end .
968-
969- async_bootstrap_members (Claimant , Changes ) ->
970- RootLeader = riak_ensemble_manager :rleader_pid (),
971- case riak_ensemble_peer :update_members (RootLeader , Changes , 10000 ) of
972- ok ->
973- ok ;
974- _ ->
975- reset_ring_id (Claimant ),
976- ok
977- end .
978-
979- maybe_reset_ring_id (Results ) ->
980- Failed = [R || R <- Results , R =/= ok ],
981- (Failed =:= []) orelse reset_ring_id (self ()).
982-
983- % % Reset last_ring_id, ensuring future tick re-examines the ring even if the
984- % % ring has not changed.
985- reset_ring_id (Pid ) ->
986- Pid ! reset_ring_id .
987834
988835% % =========================================================================
989836% % Claimant rebalance/reassign logic
@@ -1302,22 +1149,27 @@ maybe_remove_exiting(Node, CState) ->
13021149 Node ->
13031150 % % Change exiting nodes to invalid, skipping this node.
13041151 Exiting = riak_core_ring :members (CState , [exiting ]) -- [Node ],
1305- RootMembers = riak_ensemble_manager : get_members ( root ),
1152+ Changed = ( Exiting /= [] ),
13061153 CState2 =
1307- lists :foldl (fun (ENode , CState0 ) ->
1308- L = [N || {_ , N } <- RootMembers , N =:= ENode ],
1309- case L of
1310- [] ->
1311- ClearedCS =
1312- riak_core_ring :clear_member_meta (Node , CState0 , ENode ),
1313- riak_core_ring :set_member (Node , ClearedCS , ENode ,
1314- invalid , same_vclock );
1315- _ ->
1316- reset_ring_id (self ()),
1317- CState0
1318- end
1319- end , CState , Exiting ),
1320- Changed = (CState2 /= CState ),
1154+ lists :foldl (
1155+ fun (ENode , CState0 ) ->
1156+ ClearedCS =
1157+ riak_core_ring :clear_member_meta (
1158+ Node ,
1159+ CState0 ,
1160+ ENode
1161+ ),
1162+ riak_core_ring :set_member (
1163+ Node ,
1164+ ClearedCS ,
1165+ ENode ,
1166+ invalid ,
1167+ same_vclock
1168+ )
1169+ end ,
1170+ CState ,
1171+ Exiting
1172+ ),
13211173 {Changed , CState2 };
13221174 _ ->
13231175 {false , CState }
0 commit comments