@@ -43,6 +43,7 @@ def __init__(self):
4343 self .running = True
4444 self .engine = None
4545 self .session_maker = None
46+ self .last_sync_times : dict [int , datetime ] = {} # provider_id -> last sync time
4647
4748 async def initialize (self ):
4849 """Initialize database connection."""
@@ -70,37 +71,61 @@ async def run(self):
7071
7172 logger .info ("✅ Sync worker ready" )
7273
74+ # Check interval: check every 30 seconds which providers need syncing
75+ CHECK_INTERVAL = 30
76+
7377 while self .running :
7478 try :
75- # Grab config and provider IDs in a short-lived session
79+ # Grab config and providers in a short-lived session
7680 async with self .session_maker () as session :
7781 config_obj = await get_config (session )
78- provider_ids = [p .id for p in await get_all_providers (session )]
82+ all_providers = await get_all_providers (session )
83+
84+ # Create provider snapshots with intervals
85+ providers_info = []
86+ for p in all_providers :
87+ # Use provider-specific interval, or global if not set
88+ interval = p .sync_interval_seconds if p .sync_interval_seconds else config_obj .sync_interval_seconds
89+ providers_info .append ({
90+ "id" : p .id ,
91+ "name" : p .name ,
92+ "sync_enabled" : p .sync_enabled ,
93+ "sync_interval_seconds" : interval ,
94+ })
7995
8096 # Snapshot config values to avoid session-bound objects
81- sync_interval = config_obj .sync_interval_seconds
8297 config_snapshot = {
8398 "litellm_base_url" : config_obj .litellm_base_url ,
8499 "litellm_api_key" : config_obj .litellm_api_key ,
85100 "default_pricing_profile" : config_obj .default_pricing_profile ,
86101 "default_pricing_override" : config_obj .default_pricing_override_dict ,
87102 }
88103
89- # Check if sync is enabled
90- if sync_interval == 0 :
91- logger .debug ("Sync disabled (interval=0), sleeping for 60s..." )
92- await asyncio .sleep (60 )
93- continue
104+ # Check which providers need syncing
105+ now = datetime .now (UTC )
106+ providers_to_sync = []
107+ for provider_info in providers_info :
108+ if not provider_info ["sync_enabled" ]:
109+ continue
94110
95- logger .info ("⏰ Starting sync cycle (interval: %ds)" , sync_interval )
111+ interval = provider_info ["sync_interval_seconds" ]
112+ if interval == 0 :
113+ continue # Sync disabled for this provider
96114
97- # Sync all enabled providers
98- await self .sync_all_providers ( provider_ids , config_snapshot )
115+ provider_id = provider_info [ "id" ]
116+ last_sync = self .last_sync_times . get ( provider_id )
99117
100- logger .info ("✓ Sync cycle complete, waiting %ds for next cycle" , sync_interval )
118+ # Sync if never synced or interval has passed
119+ if last_sync is None or (now - last_sync ).total_seconds () >= interval :
120+ providers_to_sync .append (provider_id )
101121
102- # Wait for next interval
103- await asyncio .sleep (sync_interval )
122+ # Sync providers that are due
123+ if providers_to_sync :
124+ logger .info ("⏰ Syncing %d provider(s)..." , len (providers_to_sync ))
125+ await self .sync_providers (providers_to_sync , config_snapshot )
126+
127+ # Wait before next check
128+ await asyncio .sleep (CHECK_INTERVAL )
104129
105130 except Exception as e :
106131 logger .exception ("❌ Error in sync loop: %s" , e )
@@ -112,8 +137,8 @@ async def run(self):
112137 if self .engine :
113138 await self .engine .dispose ()
114139
115- async def sync_all_providers (self , provider_ids : list [int ], config_snapshot : dict [str , str | None ]):
116- """Sync models from all enabled providers."""
140+ async def sync_providers (self , provider_ids : list [int ], config_snapshot : dict [str , str | None ]):
141+ """Sync models from specified providers and update their last sync times ."""
117142 if not provider_ids :
118143 logger .info ("No providers configured" )
119144 return
@@ -158,6 +183,8 @@ async def sync_all_providers(self, provider_ids: list[int], config_snapshot: dic
158183 stats .get ("updated" , 0 ) if stats else 0 ,
159184 stats .get ("deleted" , 0 ) if stats else 0
160185 )
186+ # Mark successful sync time
187+ self .last_sync_times [provider_id ] = datetime .now (UTC )
161188 else :
162189 logger .debug ("LiteLLM not configured, skipping push for %s" , provider .name )
163190 continue
@@ -177,6 +204,9 @@ async def sync_all_providers(self, provider_ids: list[int], config_snapshot: dic
177204 stats .get ("orphaned" , 0 )
178205 )
179206
207+ # Mark successful sync time
208+ self .last_sync_times [provider_id ] = datetime .now (UTC )
209+
180210 except Exception as e :
181211 logger .error ("❌ Failed to sync provider %s: %s" , provider .name , e , exc_info = True )
182212 await session .rollback ()
0 commit comments