15
15
from kombu .utils .json import dumps , loads
16
16
17
17
from django .conf import settings
18
- from django .db import transaction , close_old_connections
18
+ from django .db import transaction , close_old_connections , router , DEFAULT_DB_ALIAS
19
19
from django .db .utils import DatabaseError , InterfaceError
20
20
from django .core .exceptions import ObjectDoesNotExist
21
21
@@ -258,7 +258,7 @@ def schedule_changed(self):
258
258
# other transactions until the current transaction is
259
259
# committed (Issue #41).
260
260
try :
261
- transaction .commit ()
261
+ transaction .commit (using = self . target_db )
262
262
except transaction .TransactionManagementError :
263
263
pass # not in transaction management.
264
264
@@ -287,7 +287,17 @@ def reserve(self, entry):
287
287
self ._dirty .add (new_entry .name )
288
288
return new_entry
289
289
290
- def sync (self ):
290
+ @property
291
+ def target_db (self ):
292
+ """Determine if there is a django route"""
293
+ if not settings .DATABASE_ROUTERS :
294
+ return DEFAULT_DB_ALIAS
295
+ # If the project does not actually implement this method, DEFAULT_DB_ALIAS will be automatically returned.
296
+ # The exception will be located to the django routing section
297
+ db = router .db_for_write (self .Model )
298
+ return db
299
+
300
+ def _sync (self ):
291
301
if logger .isEnabledFor (logging .DEBUG ):
292
302
debug ('Writing entries...' )
293
303
_tried = set ()
@@ -313,6 +323,10 @@ def sync(self):
313
323
# retry later, only for the failed ones
314
324
self ._dirty |= _failed
315
325
326
+ def sync (self ):
327
+ with transaction .atomic (using = self .target_db ):
328
+ self ._sync ()
329
+
316
330
def update_from_dict (self , mapping ):
317
331
s = {}
318
332
for name , entry_fields in mapping .items ():
0 commit comments