2626 get_aggregated_progress /1 ,
2727 get_stage_info /0 ,
2828 update_stage_info /2 ,
29- update_progress /2 ]).
29+ update_progress /2 ,
30+ submit_master_event /1 ]).
3031
3132% % gen_server callbacks
3233-export ([code_change /3 , init /1 , handle_call /3 , handle_cast /2 , handle_info /2 ,
@@ -102,35 +103,41 @@ update_progress(Stage, StageProgress) ->
102103update_stage_info (Stage , StageInfo ) ->
103104 gen_server :cast (? SERVER , {update_stage_info , Stage , StageInfo }).
104105
105- is_interesting_master_event ({_ , bucket_rebalance_started , _Bucket , _Pid }) ->
106+ get_registered_local_name () ->
107+ ? MODULE .
108+
109+ submit_master_event (Event ) ->
110+ gen_server :cast (get_registered_local_name (), {note , Event }).
111+
112+ is_interesting_master_event ({bucket_rebalance_started , _Bucket , _Pid }) ->
106113 fun handle_bucket_rebalance_started /2 ;
107- is_interesting_master_event ({_ , set_ff_map , _BucketName , _Diff }) ->
114+ is_interesting_master_event ({set_ff_map , _BucketName , _FFMap }) ->
108115 fun handle_set_ff_map /2 ;
109- is_interesting_master_event ({_ , vbucket_move_start , _Pid , _BucketName , _Node , _VBucketId , _ , _ }) ->
116+ is_interesting_master_event ({vbucket_move_start , _Pid , _BucketName , _Node , _VBucketId , _ , _ }) ->
110117 fun handle_vbucket_move_start /2 ;
111- is_interesting_master_event ({_ , vbucket_move_done , _BucketName , _VBucketId }) ->
118+ is_interesting_master_event ({vbucket_move_done , _BucketName , _VBucketId }) ->
112119 fun handle_vbucket_move_done /2 ;
113- is_interesting_master_event ({_ , rebalance_stage_started , _Stage }) ->
120+ is_interesting_master_event ({rebalance_stage_started , _Stage }) ->
114121 fun handle_rebalance_stage_started /2 ;
115- is_interesting_master_event ({_ , rebalance_stage_completed , _Stage }) ->
122+ is_interesting_master_event ({rebalance_stage_completed , _Stage }) ->
116123 fun handle_rebalance_stage_completed /2 ;
117- is_interesting_master_event ({_ , rebalance_stage_event , _Stage , _Event }) ->
124+ is_interesting_master_event ({rebalance_stage_event , _Stage , _Event }) ->
118125 fun handle_rebalance_stage_event /2 ;
119- is_interesting_master_event ({_ , compaction_uninhibit_started , _BucketName , _ }) ->
126+ is_interesting_master_event ({compaction_uninhibit_started , _BucketName , _ }) ->
120127 fun handle_compaction_uninhibit /2 ;
121- is_interesting_master_event ({_ , compaction_uninhibit_done , _BucketName , _ }) ->
128+ is_interesting_master_event ({compaction_uninhibit_done , _BucketName , _ }) ->
122129 fun handle_compaction_uninhibit /2 ;
123- is_interesting_master_event ({_ , takeover_started , _BucketName , _VBucketId , _ , _ }) ->
130+ is_interesting_master_event ({takeover_started , _BucketName , _VBucketId , _ , _ }) ->
124131 fun handle_takeover /2 ;
125- is_interesting_master_event ({_ , takeover_ended , _BucketName , _VBucketId , _ , _ }) ->
132+ is_interesting_master_event ({takeover_ended , _BucketName , _VBucketId , _ , _ }) ->
126133 fun handle_takeover /2 ;
127- is_interesting_master_event ({_ , backfill_phase_started , _BucketName , _VBucketId }) ->
134+ is_interesting_master_event ({backfill_phase_started , _BucketName , _VBucketId }) ->
128135 fun handle_generic_vb_stat_event /2 ;
129- is_interesting_master_event ({_ , backfill_phase_ended , _BucketName , _VBucketId }) ->
136+ is_interesting_master_event ({backfill_phase_ended , _BucketName , _VBucketId }) ->
130137 fun handle_generic_vb_stat_event /2 ;
131- is_interesting_master_event ({_ , seqno_waiting_started , _BucketName , _VBucketId , _ , _ }) ->
138+ is_interesting_master_event ({seqno_waiting_started , _BucketName , _VBucketId , _ , _ }) ->
132139 fun handle_persistence /2 ;
133- is_interesting_master_event ({_ , seqno_waiting_ended , _BucketName , _VBucketId , _ , _ }) ->
140+ is_interesting_master_event ({seqno_waiting_ended , _BucketName , _VBucketId , _ , _ }) ->
134141 fun handle_persistence /2 ;
135142is_interesting_master_event (_ ) ->
136143 undefined .
@@ -160,23 +167,14 @@ get_stage_nodes(Services, NodesInfo) ->
160167
161168init ({Services , NodesInfo , Type }) ->
162169 Self = self (),
163- ns_pubsub :subscribe_link (master_activity_events ,
164- fun (Event , _Ignored ) ->
165- case is_interesting_master_event (Event ) of
166- undefined ->
167- [];
168- Fun ->
169- gen_server :cast (Self , {note , Fun , Event })
170- end
171- end , []),
172-
173170 StageInfo = rebalance_stage_info :init (get_stage_nodes (Services , NodesInfo )),
174171 Buckets = ns_bucket :get_bucket_names (),
175172 BucketsCount = length (Buckets ),
176173 BucketLevelInfo = dict :from_list ([{BN ,
177174 # bucket_level_info {bucket_name = BN }} ||
178175 BN <- Buckets ]),
179176 proc_lib :spawn_link (erlang , apply , [fun docs_left_updater_init /1 , [Self ]]),
177+ erlang :register (get_registered_local_name (), self ()),
180178
181179 {ok , # state {bucket = undefined ,
182180 buckets_count = BucketsCount ,
@@ -202,9 +200,14 @@ handle_call(Req, From, State) ->
202200 ? log_error (" Got unknown request: ~p from ~p " , [Req , From ]),
203201 {reply , unknown_request , State }.
204202
205- handle_cast ({note , Fun , Ev }, State ) ->
206- {noreply , NewState } = Fun (Ev , State ),
207- {noreply , NewState };
203+ handle_cast ({note , Event }, State ) ->
204+ case is_interesting_master_event (Event ) of
205+ undefined ->
206+ {noreply , State };
207+ Fun ->
208+ StampedEvent = list_to_tuple ([os :timestamp () | tuple_to_list (Event )]),
209+ Fun (StampedEvent , State )
210+ end ;
208211
209212handle_cast ({update_stats , BucketName , VBucket , NodeToDocsLeft }, State ) ->
210213 ? log_debug (" Got update_stats: ~p , ~p " , [VBucket , NodeToDocsLeft ]),
@@ -265,20 +268,11 @@ handle_cast(Req, _State) ->
265268 ? log_error (" Got unknown cast: ~p " , [Req ]),
266269 erlang :error ({unknown_cast , Req }).
267270
268- initiate_bucket_rebalance (BucketName , OldState ) when OldState # state .bucket =:= BucketName ->
271+ initiate_bucket_rebalance (BucketName , _FFMap , OldState ) when OldState # state .bucket =:= BucketName ->
269272 OldState ;
270- initiate_bucket_rebalance (BucketName , OldState ) ->
273+ initiate_bucket_rebalance (BucketName , FFMap , OldState ) ->
271274 {ok , BucketConfig } = ns_bucket :get_bucket (BucketName ),
272275 Map = proplists :get_value (map , BucketConfig ),
273- FFMap = case proplists :get_value (fastForwardMap , BucketConfig ) of
274- undefined ->
275- % % yes this is possible if rebalance completes
276- % % faster than we can start observing it's
277- % % progress
278- Map ;
279- FFMap0 ->
280- FFMap0
281- end ,
282276 VBCount = length (Map ),
283277 Diff = [Triple
284278 || {_ , [MasterNode |_ ] = ChainBefore , ChainAfter } = Triple <- lists :zip3 (lists :seq (0 , VBCount - 1 ),
@@ -367,8 +361,8 @@ handle_bucket_rebalance_started({_, bucket_rebalance_started, _BucketName, _Pid}
367361 NewState = State # state {bucket_number = Number + 1 },
368362 {noreply , NewState }.
369363
370- handle_set_ff_map ({_ , set_ff_map , BucketName , _Diff }, State ) ->
371- {noreply , initiate_bucket_rebalance (BucketName , State )}.
364+ handle_set_ff_map ({_ , set_ff_map , BucketName , Map }, State ) ->
365+ {noreply , initiate_bucket_rebalance (BucketName , Map , State )}.
372366
373367handle_vbucket_move_start ({TS , vbucket_move_start , _Pid , BucketName ,
374368 _Node , VBucketId , _ , _ },
0 commit comments