Skip to content

Commit d04f9a4

Browse files
committed
Horrible hacks to work with Django 1.6 and still be compatible
1 parent d9af2df commit d04f9a4

File tree

5 files changed

+26
-28
lines changed

5 files changed

+26
-28
lines changed

djcelery/loaders.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,10 @@ def _close_database(self):
7272
try:
7373
funs = [conn.close for conn in db.connections]
7474
except AttributeError:
75-
funs = [db.close_connection] # pre multidb
75+
if hasattr(db, 'close_old_connections'): # Django 1.6+
76+
funs = [db.close_old_connections]
77+
else:
78+
funs = [db.close_connection] # pre multidb
7679

7780
for close in funs:
7881
try:

djcelery/managers.py

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717

1818
from celery.utils.timeutils import maybe_timedelta
1919

20+
from .db import (
21+
commit_on_success, commit_unless_managed, rollback_unless_managed,
22+
)
2023
from .utils import now
2124

2225

@@ -47,8 +50,10 @@ def _inner(*args, **kwargs):
4750
# the transaction.
4851
if retries >= _max_retries:
4952
raise
50-
transaction.rollback_unless_managed()
51-
53+
try:
54+
rollback_unless_managed()
55+
except Exception:
56+
pass
5257
return _inner
5358

5459
return _outer
@@ -105,22 +110,16 @@ def get_all_expired(self, expires):
105110
"""Get all expired task results."""
106111
return self.filter(date_done__lt=now() - maybe_timedelta(expires))
107112

108-
@transaction.commit_manually
109113
def delete_expired(self, expires):
110114
"""Delete all expired taskset results."""
111115
meta = self.model._meta
112-
try:
116+
with commit_on_success():
113117
self.get_all_expired(expires).update(hidden=True)
114118
cursor = self.connection_for_write().cursor()
115119
cursor.execute(
116120
'DELETE FROM {0.db_table} WHERE hidden=%s'.format(meta),
117121
(True, ),
118122
)
119-
except:
120-
transaction.rollback()
121-
raise
122-
else:
123-
transaction.commit()
124123

125124

126125
class PeriodicTaskManager(ExtendedManager):
@@ -244,4 +243,7 @@ def purge(self):
244243
'DELETE FROM {0.db_table} WHERE hidden=%s'.format(meta),
245244
(True, ),
246245
)
247-
transaction.commit_unless_managed()
246+
try:
247+
commit_unless_managed()
248+
except transaction.TransactionManagementError:
249+
pass

djcelery/schedulers.py

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
from django.db import transaction
1616
from django.core.exceptions import ObjectDoesNotExist
1717

18+
from .db import commit_on_success
1819
from .models import (PeriodicTask, PeriodicTasks,
1920
CrontabSchedule, IntervalSchedule)
2021
from .utils import DATABASE_ERRORS, make_aware
@@ -195,24 +196,18 @@ def reserve(self, entry):
195196
self._dirty.add(new_entry.name)
196197
return new_entry
197198

198-
@transaction.commit_manually
199199
def sync(self):
200200
info('Writing entries...')
201201
_tried = set()
202202
try:
203-
try:
203+
with commit_on_success():
204204
while self._dirty:
205205
try:
206206
name = self._dirty.pop()
207207
_tried.add(name)
208208
self.schedule[name].save()
209209
except (KeyError, ObjectDoesNotExist):
210210
pass
211-
except:
212-
transaction.rollback()
213-
raise
214-
else:
215-
transaction.commit()
216211
except DATABASE_ERRORS as exc:
217212
# retry later
218213
self._dirty |= _tried

djcelery/snapshot.py

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
from collections import defaultdict
44
from datetime import datetime, timedelta
55

6-
from django.db import transaction
76
from django.conf import settings
87

98
from celery import states
@@ -13,6 +12,7 @@
1312
from celery.utils.log import get_logger
1413
from celery.utils.timeutils import maybe_iso8601
1514

15+
from .db import commit_on_success
1616
from .models import WorkerState, TaskState
1717
from .utils import maybe_make_aware
1818

@@ -129,23 +129,15 @@ def update_task(self, state, **kwargs):
129129

130130
return obj
131131

132-
@transaction.commit_manually
133132
def on_shutter(self, state, commit_every=100):
134-
if not state.event_count and transaction.is_dirty():
135-
transaction.commit()
136-
return
137133

138134
def _handle_tasks():
139135
for i, task in enumerate(state.tasks.items()):
140136
self.handle_task(task)
141-
if not i % commit_every:
142-
transaction.commit()
143137

144138
for worker in state.workers.items():
145139
self.handle_worker(worker)
146140
_handle_tasks()
147-
if transaction.is_dirty():
148-
transaction.commit()
149141

150142
def on_cleanup(self):
151143
expired = (self.TaskState.objects.expire_by_states(states, expires)

djcelery/tests/test_snapshot.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from itertools import count
55
from time import time
66

7-
from celery.events import Event
7+
from celery.events import Event as _Event
88
from celery.events.state import State, Worker, Task
99
from celery.utils import gen_unique_id
1010

@@ -15,6 +15,12 @@
1515
from djcelery.tests.utils import unittest
1616

1717
_next_id = count(0).next
18+
_next_clock = count(1).next
19+
20+
def Event(*args, **kwargs):
21+
kwargs.setdefault('clock', _next_clock())
22+
kwargs.setdefault('local_received', time())
23+
return _Event(*args, **kwargs)
1824

1925

2026
def create_task(worker, **kwargs):

0 commit comments

Comments
 (0)