4949-define (LOG_BUNDLE_DELAY , 5 ).
5050-define (COMPLETE_BUNDLE_DELAY , 2 ).
5151
52+ -define (HIBERNATE_AFTER , 10000 ).
53+
5254-define (MAX_WRAP_ENTRIES , 500 ).
5355
5456-define (PERSISTER_LOG_FORMAT_VERSION , {2 , 4 }).
@@ -164,10 +166,8 @@ handle_call({transaction, Key, MessageList}, From, State) ->
164166 do_noreply (internal_commit (From , Key , NewState ));
165167handle_call ({commit_transaction , TxnKey }, From , State ) ->
166168 do_noreply (internal_commit (From , TxnKey , State ));
167- handle_call (force_snapshot , _From , State = # pstate {log_handle = LH ,
168- snapshot = Snapshot }) ->
169- ok = take_snapshot (LH , Snapshot ),
170- do_reply (ok , State );
169+ handle_call (force_snapshot , _From , State ) ->
170+ do_reply (ok , flush (true , State ));
171171handle_call (serial , _From ,
172172 State = # pstate {snapshot = # psnapshot {serial = Serial }}) ->
173173 do_reply (Serial , State );
@@ -183,8 +183,13 @@ handle_cast({extend_transaction, TxnKey, MessageList}, State) ->
183183handle_cast (_Msg , State ) ->
184184 {noreply , State }.
185185
186+ handle_info (timeout , State = # pstate {deadline = infinity }) ->
187+ State1 = flush (true , State ),
188+ % % TODO: Once we drop support for R11B-5, we can change this to
189+ % % {noreply, State1, hibernate};
190+ proc_lib :hibernate (gen_server2 , enter_loop , [? MODULE , [], State1 ]);
186191handle_info (timeout , State ) ->
187- { noreply , flush (State )} ;
192+ do_noreply ( flush (State )) ;
188193handle_info (_Info , State ) ->
189194 {noreply , State }.
190195
@@ -275,12 +280,13 @@ take_snapshot_and_save_old(LogHandle, Snapshot) ->
275280 rabbit_log :info (" Saving persister log in ~p~n " , [OldFileName ]),
276281 ok = take_snapshot (LogHandle , OldFileName , Snapshot ).
277282
278- maybe_take_snapshot (State = # pstate {entry_count = EntryCount , log_handle = LH ,
279- snapshot = Snapshot })
280- when EntryCount >= ? MAX_WRAP_ENTRIES ->
283+ maybe_take_snapshot (Force , State = # pstate {entry_count = EntryCount ,
284+ log_handle = LH ,
285+ snapshot = Snapshot })
286+ when Force orelse EntryCount >= ? MAX_WRAP_ENTRIES ->
281287 ok = take_snapshot (LH , Snapshot ),
282288 State # pstate {entry_count = 0 };
283- maybe_take_snapshot (State ) ->
289+ maybe_take_snapshot (_Force , State ) ->
284290 State .
285291
286292later_ms (DeltaMilliSec ) ->
@@ -298,7 +304,7 @@ compute_deadline(_TimerDelay, ExistingDeadline) ->
298304 ExistingDeadline .
299305
300306compute_timeout (infinity ) ->
301- infinity ;
307+ ? HIBERNATE_AFTER ;
302308compute_timeout (Deadline ) ->
303309 DeltaMilliSec = time_diff (Deadline , now ()) * 1000.0 ,
304310 if
@@ -314,26 +320,26 @@ do_noreply(State = #pstate{deadline = Deadline}) ->
314320do_reply (Reply , State = # pstate {deadline = Deadline }) ->
315321 {reply , Reply , State , compute_timeout (Deadline )}.
316322
317- flush (State = # pstate {pending_logs = PendingLogs ,
318- pending_replies = Waiting ,
319- log_handle = LogHandle }) ->
320- State1 = if
321- PendingLogs /= [] ->
323+ flush (State ) -> flush (false , State ).
324+
325+ flush (ForceSnapshot , State = # pstate {pending_logs = PendingLogs ,
326+ pending_replies = Waiting ,
327+ log_handle = LogHandle }) ->
328+ State1 = if PendingLogs /= [] ->
322329 disk_log :alog (LogHandle , lists :reverse (PendingLogs )),
323- maybe_take_snapshot (
324- State # pstate {
325- entry_count = State # pstate .entry_count + 1 });
326- true ->
330+ State # pstate {entry_count = State # pstate .entry_count + 1 };
331+ true ->
327332 State
328333 end ,
334+ State2 = maybe_take_snapshot (ForceSnapshot , State1 ),
329335 if Waiting /= [] ->
330336 ok = disk_log :sync (LogHandle ),
331337 lists :foreach (fun (From ) -> gen_server :reply (From , ok ) end ,
332338 Waiting );
333339 true ->
334340 ok
335341 end ,
336- State1 # pstate {deadline = infinity ,
342+ State2 # pstate {deadline = infinity ,
337343 pending_logs = [],
338344 pending_replies = []}.
339345
0 commit comments