-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathscheduler.py
More file actions
151 lines (127 loc) · 5.36 KB
/
scheduler.py
File metadata and controls
151 lines (127 loc) · 5.36 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
import logging
from typing import ClassVar
from django.core.cache import cache
from django.db import close_old_connections, transaction
from django.db.utils import DatabaseError, InterfaceError
from django_celery_beat.schedulers import ADD_ENTRY_ERROR, DatabaseScheduler, ModelEntry
from django_tenants.utils import get_public_schema_name, get_tenant_model, schema_context
from tenant_schemas_celery.scheduler import TenantAwareScheduleEntry
logger = logging.getLogger("celery-scheduler")
TenantModel = get_tenant_model()
class TenantModelEntry(ModelEntry, TenantAwareScheduleEntry):
def __init__(self, model, app=None, *args, **kwargs):
if args:
# Unpickled from database
self.tenant_schemas = args[-1]
else:
# Initialized from code
self.tenant_schemas = kwargs.pop("tenant_schemas", None)
if not self.tenant_schemas:
self.tenant_schemas = ["public"]
super().__init__(model, app)
def __next__(self):
self.model.last_run_at = self._default_now()
self.model.total_run_count += 1
self.model.no_changes = True
next_entry = self.__class__(self.model)
next_entry.tenant_schemas = self.tenant_schemas
return next_entry
def save(self):
with schema_context(self.tenant_schemas[0]):
super().save()
def _disable(self, model):
with schema_context(self.tenant_schemas[0]):
model.no_changes = True
model.enabled = False
model.save()
class TenantDatabaseScheduler(DatabaseScheduler):
Entry = TenantModelEntry
diffs: ClassVar[dict] = {}
@classmethod
def get_schemas_list(cls) -> list:
schemas_cache = cache.get("scheduler_schemas_list")
if schemas_cache:
return schemas_cache
schemas = list(TenantModel.objects.values_list("schema_name", flat=True))
schemas.append(get_public_schema_name())
cache.set("scheduler_schemas_list", schemas, 60) # one minute cache
return schemas
def all_as_schedule(self):
schedule = {}
schemas = self.get_schemas_list()
for schema in schemas:
with schema_context(schema):
for model in self.Model.objects.enabled():
try:
schedule[f"{schema}:{model.name}"] = self.Entry(model, app=self.app, tenant_schemas=(schema,))
except ValueError as e:
logger.error(e)
return schedule
def reserve(self, entry):
with schema_context(entry.tenant_schemas[0]):
new_entry = next(entry)
self._dirty.add(f"{new_entry.tenant_schemas[0]}:{new_entry.name}")
return new_entry
def is_due(self, entry):
with schema_context(entry.tenant_schemas[0]):
return entry.is_due()
def apply_entry(self, entry: TenantModelEntry, producer=None):
logger.info(
"TenantDatabaseScheduler: Sending due task %s (%s) to %s tenant",
entry.name,
entry.task,
str(len(entry.tenant_schemas)),
)
schema_name = entry.tenant_schemas[0] if entry.tenant_schemas else get_public_schema_name()
with schema_context(schema_name):
logger.debug(
"Sending due task %s (%s) to tenant %s",
entry.name,
entry.task,
schema_name,
)
try:
result = self.apply_async(entry, producer=producer, advance=False)
except Exception as exc:
logger.exception(exc)
else:
logger.debug("%s sent. id->%s", entry.task, result.id)
def schedule_changed(self):
schemas = self.get_schemas_list()
for schema in schemas:
with schema_context(schema):
try:
close_old_connections()
try:
transaction.commit()
except transaction.TransactionManagementError:
pass # not in transaction management.
ts = self.Changes.last_change()
except DatabaseError as exc:
logger.exception("Database gave error: %r", exc)
continue
except InterfaceError:
logger.warning(
"DatabaseScheduler: InterfaceError in schedule_changed(), " "waiting to retry in next call...",
)
continue
try:
last = self.diffs.get(schema)
if ts and ts > (last if last else ts):
return True
finally:
self.diffs[schema] = ts
return False
def update_from_dict(self, mapping):
schemas = self.get_schemas_list()
for schema in schemas:
with schema_context(schema):
s = {}
for name, entry_fields in mapping.items():
try:
entry = self.Entry.from_entry(name, app=self.app, **entry_fields)
if entry.model.enabled:
s[name] = entry
except Exception as exc:
logger.exception(ADD_ENTRY_ERROR, name, exc, entry_fields)
self.schedule.update(s)