@@ -223,11 +223,44 @@ _reclaim_dynamic_window(LogSource *self, gsize window_size)
223223 atomic_gssize_set (& self -> window_size_to_be_reclaimed , window_size );
224224}
225225
226+ static gboolean
227+ _process_reclaimed_window (LogSource * self )
228+ {
229+ //check pending_reclaimed
230+ gssize total_reclaim = atomic_gssize_set_and_get (& self -> pending_reclaimed , 0 );
231+ gssize to_be_reclaimed = atomic_gssize_get (& self -> window_size_to_be_reclaimed );
232+ gboolean reclaim_in_progress = (to_be_reclaimed > 0 );
233+
234+ if (total_reclaim > 0 )
235+ {
236+ self -> full_window_size -= total_reclaim ;
237+ stats_counter_sub (self -> metrics .window_capacity , total_reclaim );
238+ dynamic_window_release (& self -> dynamic_window , total_reclaim );
239+ }
240+ else
241+ {
242+ //to avoid underflow, we need to set a value <= 0
243+ if (to_be_reclaimed < 0 ) {
244+ atomic_gssize_set (& self -> window_size_to_be_reclaimed , 0 );
245+ }
246+ }
247+
248+ msg_trace ("Checking if reclaim is in progress..." ,
249+ log_pipe_location_tag (& self -> super ),
250+ evt_tag_printf ("connection" , "%p" , self ),
251+ evt_tag_printf ("in progress" , "%s" , reclaim_in_progress ? "yes" : "no" ),
252+ evt_tag_long ("total_reclaim" , total_reclaim ));
253+
254+ return reclaim_in_progress ;
255+ }
256+
226257static void
227258_release_dynamic_window (LogSource * self )
228259{
229260 g_assert (self -> ack_tracker == NULL );
230261
262+ _process_reclaimed_window (self );
263+
231264 gsize dynamic_part = self -> full_window_size - self -> initial_window_size ;
232265 msg_trace ("Releasing dynamic part of the window" , evt_tag_int ("dynamic_window_to_be_released" , dynamic_part ),
233266 log_pipe_location_tag (& self -> super ));
@@ -265,7 +298,6 @@ _inc_balanced(LogSource *self, gsize inc)
265298static void
266299_dec_balanced (LogSource * self , gsize dec )
267300{
268- gsize new_full_window_size = self -> full_window_size - dec ;
269301
270302 gsize empty_window = window_size_counter_get (& self -> window_size , NULL );
271303 gsize remaining_sub = 0 ;
@@ -274,21 +306,18 @@ _dec_balanced(LogSource *self, gsize dec)
274306 {
275307 remaining_sub = dec - empty_window ;
276308 if (empty_window == 0 )
277- {
278- dec = 0 ;
279- }
309+ dec = 0 ;
280310 else
281- {
282- dec = empty_window - 1 ;
283- }
311+ dec = empty_window - 1 ;
284312
285- new_full_window_size = self -> full_window_size - dec ;
286313 _reclaim_dynamic_window (self , remaining_sub );
287314 }
288315
289316 window_size_counter_sub (& self -> window_size , dec , NULL );
290317 stats_counter_sub (self -> metrics .window_available , dec );
291318
319+ gsize new_full_window_size = self -> full_window_size - dec ;
320+
292321 msg_trace ("Balance::decrease" ,
293322 log_pipe_location_tag (& self -> super ),
294323 evt_tag_printf ("connection" , "%p" , self ),
@@ -301,36 +330,6 @@ _dec_balanced(LogSource *self, gsize dec)
301330 dynamic_window_release (& self -> dynamic_window , dec );
302331}
303332
304- static gboolean
305- _reclaim_window_instead_of_rebalance (LogSource * self )
306- {
307- //check pending_reclaimed
308- gssize total_reclaim = atomic_gssize_set_and_get (& self -> pending_reclaimed , 0 );
309- gssize to_be_reclaimed = atomic_gssize_get (& self -> window_size_to_be_reclaimed );
310- gboolean reclaim_in_progress = (to_be_reclaimed > 0 );
311-
312- if (total_reclaim > 0 )
313- {
314- self -> full_window_size -= total_reclaim ;
315- stats_counter_sub (self -> metrics .window_capacity , total_reclaim );
316- dynamic_window_release (& self -> dynamic_window , total_reclaim );
317- }
318- else
319- {
320- //to avoid underflow, we need to set a value <= 0
321- if (to_be_reclaimed < 0 )
322- atomic_gssize_set (& self -> window_size_to_be_reclaimed , 0 );
323- }
324-
325- msg_trace ("Checking if reclaim is in progress..." ,
326- log_pipe_location_tag (& self -> super ),
327- evt_tag_printf ("connection" , "%p" , self ),
328- evt_tag_printf ("in progress" , "%s" , reclaim_in_progress ? "yes" : "no" ),
329- evt_tag_long ("total_reclaim" , total_reclaim ));
330-
331- return reclaim_in_progress ;
332- }
333-
334333static void
335334_dynamic_window_rebalance (LogSource * self )
336335{
@@ -359,7 +358,7 @@ log_source_dynamic_window_realloc(LogSource *self)
359358 /* it is safe to assume that the window size is not decremented while this function runs,
360359 * only incrementation is possible by destination threads */
361360
362- if (!_reclaim_window_instead_of_rebalance (self ))
361+ if (!_process_reclaimed_window (self ))
363362 _dynamic_window_rebalance (self );
364363
365364 dynamic_window_stat_reset (& self -> dynamic_window .stat );
0 commit comments