|
395 | 395 |
|
396 | 396 | -define(GROUP_TABLE, gm_group). |
397 | 397 | -define(MAX_BUFFER_SIZE, 100000000). %% 100MB |
398 | | --define(HIBERNATE_AFTER_MIN, 1000). |
399 | | --define(DESIRED_HIBERNATE, 10000). |
400 | 398 | -define(BROADCAST_TIMER, 25). |
| 399 | +-define(FORCE_GC_TIMER, 250). |
401 | 400 | -define(VERSION_START, 0). |
402 | 401 | -define(SETS, ordsets). |
403 | 402 | -define(DICT, orddict). |
|
416 | 415 | broadcast_buffer, |
417 | 416 | broadcast_buffer_sz, |
418 | 417 | broadcast_timer, |
| 418 | + force_gc_timer, |
419 | 419 | txn_executor, |
420 | 420 | shutting_down |
421 | 421 | }). |
@@ -508,7 +508,8 @@ table_definitions() -> |
508 | 508 | [{Name, [?TABLE_MATCH | Attributes]}]. |
509 | 509 |
|
510 | 510 | start_link(GroupName, Module, Args, TxnFun) -> |
511 | | - gen_server2:start_link(?MODULE, [GroupName, Module, Args, TxnFun], []). |
| 511 | + gen_server2:start_link(?MODULE, [GroupName, Module, Args, TxnFun], |
| 512 | + [{spawn_opt, [{fullsweep_after, 0}]}]). |
512 | 513 |
|
513 | 514 | leave(Server) -> |
514 | 515 | gen_server2:cast(Server, leave). |
@@ -551,9 +552,9 @@ init([GroupName, Module, Args, TxnFun]) -> |
551 | 552 | broadcast_buffer = [], |
552 | 553 | broadcast_buffer_sz = 0, |
553 | 554 | broadcast_timer = undefined, |
| 555 | + force_gc_timer = undefined, |
554 | 556 | txn_executor = TxnFun, |
555 | | - shutting_down = false }, hibernate, |
556 | | - {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. |
| 557 | + shutting_down = false }}. |
557 | 558 |
|
558 | 559 |
|
559 | 560 | handle_call({confirmed_broadcast, _Msg}, _From, |
@@ -708,6 +709,10 @@ handle_cast(leave, State) -> |
708 | 709 | {stop, normal, State}. |
709 | 710 |
|
710 | 711 |
|
| 712 | +handle_info(force_gc, State) -> |
| 713 | + garbage_collect(), |
| 714 | + noreply(State #state { force_gc_timer = undefined }); |
| 715 | + |
711 | 716 | handle_info(flush, State) -> |
712 | 717 | noreply( |
713 | 718 | flush_broadcast_buffer(State #state { broadcast_timer = undefined })); |
@@ -883,14 +888,24 @@ handle_msg({activity, _NotLeft, _Activity}, State) -> |
883 | 888 |
|
884 | 889 |
|
885 | 890 | noreply(State) -> |
886 | | - {noreply, ensure_broadcast_timer(State), flush_timeout(State)}. |
| 891 | + {noreply, ensure_timers(State), flush_timeout(State)}. |
887 | 892 |
|
888 | 893 | reply(Reply, State) -> |
889 | | - {reply, Reply, ensure_broadcast_timer(State), flush_timeout(State)}. |
| 894 | + {reply, Reply, ensure_timers(State), flush_timeout(State)}. |
| 895 | + |
| 896 | +ensure_timers(State) -> |
| 897 | + ensure_force_gc_timer(ensure_broadcast_timer(State)). |
890 | 898 |
|
891 | | -flush_timeout(#state{broadcast_buffer = []}) -> hibernate; |
| 899 | +flush_timeout(#state{broadcast_buffer = []}) -> infinity; |
892 | 900 | flush_timeout(_) -> 0. |
893 | 901 |
|
| 902 | +ensure_force_gc_timer(State = #state { force_gc_timer = TRef }) |
| 903 | + when is_reference(TRef) -> |
| 904 | + State; |
| 905 | +ensure_force_gc_timer(State = #state { force_gc_timer = undefined }) -> |
| 906 | + TRef = erlang:send_after(?FORCE_GC_TIMER, self(), force_gc), |
| 907 | + State #state { force_gc_timer = TRef }. |
| 908 | + |
894 | 909 | ensure_broadcast_timer(State = #state { broadcast_buffer = [], |
895 | 910 | broadcast_timer = undefined }) -> |
896 | 911 | State; |
@@ -958,8 +973,7 @@ flush_broadcast_buffer(State = #state { self = Self, |
958 | 973 | end, Self, MembersState), |
959 | 974 | State #state { members_state = MembersState1, |
960 | 975 | broadcast_buffer = [], |
961 | | - broadcast_buffer_sz = 0}. |
962 | | - |
| 976 | + broadcast_buffer_sz = 0 }. |
963 | 977 |
|
964 | 978 | %% --------------------------------------------------------------------------- |
965 | 979 | %% View construction and inspection |
|
0 commit comments