@@ -240,7 +240,7 @@ def auto_schedule() -> List[Rebalancer]:
240240 # TLDR: willing to pay 1 sat for every value_per_fee sats moved
241241 if Rebalancer .objects .filter (last_hop_pubkey = target .remote_pubkey ).exclude (status = 0 ).exists ():
242242 last_rebalance = Rebalancer .objects .filter (last_hop_pubkey = target .remote_pubkey ).exclude (status = 0 ).order_by ('-id' )[0 ]
243- if not (last_rebalance .status == 2 or (last_rebalance .status in [ 3 , 4 , 5 , 6 , 7 , 400 , 408 , 499 ] and (int ((datetime .now () - last_rebalance .stop ).total_seconds () / 60 ) > wait_period )) or (last_rebalance .status == 1 and (int ((datetime .now () - last_rebalance .start ).total_seconds () / 60 ) > wait_period ))):
243+ if not (last_rebalance .status == 2 or (last_rebalance .status > 2 and (int ((datetime .now () - last_rebalance .stop ).total_seconds () / 60 ) > wait_period )) or (last_rebalance .status == 1 and (( int ((datetime .now () - last_rebalance .start ).total_seconds () / 60 ) - last_rebalance . duration ) > wait_period ))):
244244 continue
245245 print (f"{ datetime .now ().strftime ('%c' )} : Creating Auto Rebalance Request for: { target .chan_id } " )
246246 print (f"{ datetime .now ().strftime ('%c' )} : Request routing through: { outbound_cans } " )
@@ -320,25 +320,29 @@ def get_pending_rebals():
320320 print (f"{ datetime .now ().strftime ('%c' )} : Error getting pending rebalances: { str (e )} " )
321321
322322shutdown_rebalancer = False
323+ scheduled_rebalances = []
323324active_rebalances = []
324325async def async_queue_manager (rebalancer_queue ):
325- global shutdown_rebalancer
326+ global scheduled_rebalances , active_rebalances , shutdown_rebalancer
326327 print (f"{ datetime .now ().strftime ('%c' )} : Queue manager is starting..." )
327- pending_rebalances , rebal_count = await get_pending_rebals ()
328- if rebal_count > 0 :
329- for rebalance in pending_rebalances :
330- await rebalancer_queue .put (rebalance )
331328 try :
332329 while True :
333- global active_rebalances
334330 print (f"{ datetime .now ().strftime ('%c' )} : Queue currently has { rebalancer_queue .qsize ()} items..." )
335331 print (f"{ datetime .now ().strftime ('%c' )} : There are currently { len (active_rebalances )} tasks in progress..." )
336332 print (f"{ datetime .now ().strftime ('%c' )} : Queue manager is checking for more work..." )
333+ pending_rebalances , rebal_count = await get_pending_rebals ()
334+ if rebal_count > 0 :
335+ for rebalance in pending_rebalances :
336+ if rebalance .id not in (scheduled_rebalances + active_rebalances ):
337+ print (f"{ datetime .now ().strftime ('%c' )} : Found a pending job to schedule with id: { rebalance .id } " )
338+ scheduled_rebalances .append (rebalance .id )
339+ await rebalancer_queue .put (rebalance )
337340 await auto_enable ()
338341 scheduled = await auto_schedule ()
339342 if len (scheduled ) > 0 :
340343 print (f"{ datetime .now ().strftime ('%c' )} : Scheduling { len (scheduled )} more jobs..." )
341344 for rebalance in scheduled :
345+ scheduled_rebalances .append (rebalance .id )
342346 await rebalancer_queue .put (rebalance )
343347 elif rebalancer_queue .qsize () == 0 and len (active_rebalances ) == 0 :
344348 print (f"{ datetime .now ().strftime ('%c' )} : Queue is still empty, stoping the rebalancer..." )
@@ -353,14 +357,15 @@ async def async_queue_manager(rebalancer_queue):
353357
354358async def async_run_rebalancer (worker , rebalancer_queue ):
355359 while True :
356- global active_rebalances , shutdown_rebalancer
360+ global scheduled_rebalances , active_rebalances , shutdown_rebalancer
357361 if not rebalancer_queue .empty ():
358362 rebalance = await rebalancer_queue .get ()
359363 print (f"{ datetime .now ().strftime ('%c' )} : { worker } is starting a new request..." )
360364 active_rebalance_id = None
361365 if rebalance != None :
362366 active_rebalance_id = rebalance .id
363367 active_rebalances .append (active_rebalance_id )
368+ scheduled_rebalances .remove (active_rebalance_id )
364369 while rebalance != None :
365370 rebalance = await run_rebalancer (rebalance , worker )
366371 if active_rebalance_id != None :
0 commit comments