147147-export ([start_link /0 , start_link /2 , init /1 , handle_call /3 , handle_cast /2 ,
148148 handle_info /2 , terminate /2 , code_change /3 , prioritise_cast /3 ]).
149149
150+ -export ([clear_metrics_of /1 , list_elders /0 , list_clients /0 , get_client_state /1 ]).
151+
150152-define (SERVER , ? MODULE ).
151153% % Reserve 3 handles for ra usage: wal, segment writer and a dets table
152154-define (RESERVED_FOR_OTHERS , 100 + 3 ).
158160-define (CLIENT_ETS_TABLE , file_handle_cache_client ).
159161-define (ELDERS_ETS_TABLE , file_handle_cache_elders ).
160162
163+ -import (rabbit_misc , [safe_ets_update_counter /3 , safe_ets_update_counter /4 ,
164+ safe_ets_update_element /3 , safe_ets_update_element /4 ]).
165+
161166% %----------------------------------------------------------------------------
162167
163168-record (file ,
@@ -621,6 +626,34 @@ clear_process_read_cache() ->
621626 size (Handle # handle .read_buffer ) > 0
622627 ].
623628
629+ % % Only used for testing
630+ clear_metrics_of (Pid ) ->
631+ case whereis (? SERVER ) of
632+ undefined -> ok ;
633+ _ -> gen_server2 :cast (? SERVER , {clear_metrics_of , Pid })
634+ end .
635+
636+ % % Only used for testing
637+ list_elders () ->
638+ case whereis (? SERVER ) of
639+ undefined -> ok ;
640+ _ -> gen_server2 :call (? SERVER , list_elders )
641+ end .
642+
643+ % % Only used for testing
644+ list_clients () ->
645+ case whereis (? SERVER ) of
646+ undefined -> ok ;
647+ _ -> gen_server2 :call (? SERVER , list_clients )
648+ end .
649+
650+ % % Only used for testing
651+ get_client_state (Pid ) ->
652+ case whereis (? SERVER ) of
653+ undefined -> ok ;
654+ _ -> gen_server2 :call (? SERVER , {get_client_state , Pid })
655+ end .
656+
624657% %----------------------------------------------------------------------------
625658% % Internal functions
626659% %----------------------------------------------------------------------------
@@ -1124,13 +1157,13 @@ handle_call({open, Pid, Requested, EldestUnusedSince}, From,
11241157 case needs_reduce (State # fhc_state { open_count = Count + Requested }) of
11251158 true -> case ets :lookup (Clients , Pid ) of
11261159 [# cstate { opened = 0 }] ->
1127- true = ets : update_element (
1160+ _ = safe_ets_update_element (
11281161 Clients , Pid , {# cstate .blocked , true }),
11291162 {noreply ,
11301163 reduce (State # fhc_state {
11311164 open_pending = pending_in (Item , Pending ) })};
11321165 [# cstate { opened = Opened }] ->
1133- true = ets : update_element (
1166+ _ = safe_ets_update_element (
11341167 Clients , Pid ,
11351168 {# cstate .pending_closes , Opened }),
11361169 {reply , close , State }
@@ -1146,7 +1179,7 @@ handle_call({obtain, N, Type, Pid}, From,
11461179 Item = # pending { kind = {obtain , Type }, pid = Pid ,
11471180 requested = N , from = From },
11481181 Enqueue = fun () ->
1149- true = ets : update_element (Clients , Pid ,
1182+ _ = safe_ets_update_element (Clients , Pid ,
11501183 {# cstate .blocked , true }),
11511184 set_obtain_state (Type , pending ,
11521185 pending_in (Item , Pending ), State )
@@ -1174,12 +1207,21 @@ handle_call(get_limit, _From, State = #fhc_state { limit = Limit }) ->
11741207 {reply , Limit , State };
11751208
11761209handle_call ({info , Items }, _From , State ) ->
1177- {reply , infos (Items , State ), State }.
1210+ {reply , infos (Items , State ), State };
1211+
1212+ handle_call (list_elders , _From , State = # fhc_state { elders = Elders }) ->
1213+ {reply , ets :tab2list (Elders ), State };
1214+
1215+ handle_call (list_clients , _From , State = # fhc_state { clients = Clients }) ->
1216+ {reply , ets :tab2list (Clients ), State };
1217+
1218+ handle_call ({get_client_state , ID }, _From , State = # fhc_state { clients = Clients }) ->
1219+ {reply , ets :lookup (Clients , ID ), State }.
11781220
11791221handle_cast ({register_callback , Pid , MFA },
11801222 State = # fhc_state { clients = Clients }) ->
11811223 ok = track_client (Pid , Clients ),
1182- true = ets : update_element (Clients , Pid , {# cstate .callback , MFA }),
1224+ _ = safe_ets_update_element (Clients , Pid , {# cstate .callback , MFA }),
11831225 {noreply , State };
11841226
11851227handle_cast ({update , Pid , EldestUnusedSince },
@@ -1200,7 +1242,7 @@ handle_cast({close, Pid, EldestUnusedSince},
12001242 undefined -> ets :delete (Elders , Pid );
12011243 _ -> ets :insert (Elders , {Pid , EldestUnusedSince })
12021244 end ,
1203- ets : update_counter (Clients , Pid , {# cstate .pending_closes , - 1 , 0 , 0 }),
1245+ safe_ets_update_counter (Clients , Pid , {# cstate .pending_closes , - 1 , 0 , 0 }),
12041246 {noreply , adjust_alarm (State , process_pending (
12051247 update_counts (open , Pid , - 1 , State )))};
12061248
@@ -1226,7 +1268,15 @@ handle_cast({set_reservation, N, Type, Pid},
12261268 {noreply , case needs_reduce (NewState ) of
12271269 true -> reduce (NewState );
12281270 false -> adjust_alarm (State , NewState )
1229- end }.
1271+ end };
1272+
1273+ handle_cast ({clear_metrics_of , Pid },
1274+ State = # fhc_state { elders = Elders , clients = Clients }) ->
1275+ ets :delete (Elders , Pid ),
1276+ ets :delete (Clients , Pid ),
1277+ safe_ets_update_counter (Clients , Pid , {# cstate .pending_closes , - 1 , 0 , 0 }),
1278+ {noreply , adjust_alarm (State , process_pending (
1279+ update_counts (open , Pid , - 1 , State )))}.
12301280
12311281handle_info (check_counts , State ) ->
12321282 {noreply , maybe_reduce (State # fhc_state { timer_ref = undefined })};
@@ -1399,37 +1449,42 @@ run_pending_item(#pending { kind = Kind,
13991449 from = From },
14001450 State = # fhc_state { clients = Clients }) ->
14011451 gen_server2 :reply (From , ok ),
1402- true = ets : update_element (Clients , Pid , {# cstate .blocked , false }),
1452+ safe_ets_update_element (Clients , Pid , {# cstate .blocked , false }),
14031453 update_counts (Kind , Pid , Requested , State ).
14041454
14051455update_counts (open , Pid , Delta ,
14061456 State = # fhc_state { open_count = OpenCount ,
14071457 clients = Clients }) ->
1408- ets :update_counter (Clients , Pid , {# cstate .opened , Delta }),
1458+ safe_ets_update_counter (Clients , Pid , {# cstate .opened , Delta },
1459+ fun () -> rabbit_log :warning (" FHC: failed to update counter 'opened', client pid: ~p " , [Pid ]) end ),
14091460 State # fhc_state { open_count = OpenCount + Delta };
14101461update_counts ({obtain , file }, Pid , Delta ,
14111462 State = # fhc_state {obtain_count_file = ObtainCountF ,
14121463 clients = Clients }) ->
1413- ets :update_counter (Clients , Pid , {# cstate .obtained_file , Delta }),
1464+ safe_ets_update_counter (Clients , Pid , {# cstate .obtained_file , Delta },
1465+ fun () -> rabbit_log :warning (" FHC: failed to update counter 'obtained_file', client pid: ~p " , [Pid ]) end ),
14141466 State # fhc_state { obtain_count_file = ObtainCountF + Delta };
14151467update_counts ({obtain , socket }, Pid , Delta ,
14161468 State = # fhc_state {obtain_count_socket = ObtainCountS ,
14171469 clients = Clients }) ->
1418- ets :update_counter (Clients , Pid , {# cstate .obtained_socket , Delta }),
1470+ safe_ets_update_counter (Clients , Pid , {# cstate .obtained_socket , Delta },
1471+ fun () -> rabbit_log :warning (" FHC: failed to update counter 'obtained_socket', client pid: ~p " , [Pid ]) end ),
14191472 State # fhc_state { obtain_count_socket = ObtainCountS + Delta };
14201473update_counts ({reserve , file }, Pid , NewReservation ,
14211474 State = # fhc_state {reserve_count_file = ReserveCountF ,
14221475 clients = Clients }) ->
14231476 [# cstate {reserved_file = R }] = ets :lookup (Clients , Pid ),
14241477 Delta = NewReservation - R ,
1425- ets :update_counter (Clients , Pid , {# cstate .reserved_file , Delta }),
1478+ safe_ets_update_counter (Clients , Pid , {# cstate .reserved_file , Delta },
1479+ fun () -> rabbit_log :warning (" FHC: failed to update counter 'reserved_file', client pid: ~p " , [Pid ]) end ),
14261480 State # fhc_state { reserve_count_file = ReserveCountF + Delta };
14271481update_counts ({reserve , socket }, Pid , NewReservation ,
14281482 State = # fhc_state {reserve_count_socket = ReserveCountS ,
14291483 clients = Clients }) ->
14301484 [# cstate {reserved_file = R }] = ets :lookup (Clients , Pid ),
14311485 Delta = NewReservation - R ,
1432- ets :update_counter (Clients , Pid , {# cstate .reserved_socket , Delta }),
1486+ safe_ets_update_counter (Clients , Pid , {# cstate .reserved_socket , Delta },
1487+ fun () -> rabbit_log :warning (" FHC: failed to update counter 'reserved_socket', client pid: ~p " , [Pid ]) end ),
14331488 State # fhc_state { reserve_count_socket = ReserveCountS + Delta }.
14341489
14351490maybe_reduce (State ) ->
@@ -1520,7 +1575,7 @@ notify(Clients, Required, [#cstate{ pid = Pid,
15201575 callback = {M , F , A },
15211576 opened = Opened } | Notifications ]) ->
15221577 apply (M , F , A ++ [0 ]),
1523- ets : update_element (Clients , Pid , {# cstate .pending_closes , Opened }),
1578+ safe_ets_update_element (Clients , Pid , {# cstate .pending_closes , Opened }),
15241579 notify (Clients , Required - Opened , Notifications ).
15251580
15261581track_client (Pid , Clients ) ->
0 commit comments