-
Notifications
You must be signed in to change notification settings - Fork 26
Expand file tree
/
Copy pathbatch.py
More file actions
141 lines (117 loc) · 5.06 KB
/
batch.py
File metadata and controls
141 lines (117 loc) · 5.06 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
"""
Aggregator service.
This service periodically determines which stale_blocks need updating, and
enqueues tasks to perform those updates.
"""
from __future__ import absolute_import, division, print_function, unicode_literals
import collections
import logging
import time
from django.conf import settings
from django.core.cache import cache
from . import models, utils
from .tasks import aggregation_tasks
log = logging.getLogger(__name__)
EnrollmentTuple = collections.namedtuple('EnrollmentTuple', ['username', 'course_key'])
MAX_KEYS_PER_TASK = 16
def perform_aggregation(batch_size=10000, delay=0.0, limit=None, routing_key=None):
"""
Enqueues tasks to reaggregate modified completions.
When blocks are completed, they mark themselves as stale. This function
collects all stale blocks for each enrollment, and enqueues a single
recalculation of all aggregators containing those stale blocks.
There is a locking mechanism that ensures that only one `perform_aggregation` is running at the moment.
The lock is released manually upon exiting this function.
batch_size (int|None) [default: 10000]:
Maximum number of stale completions to fetch in a single query to the
database.
delay (float) [default: 0.0]:
The amount of time to wait between sending batches of 1000 tasks to
celery.
limit (int|None) [default: None]:
Maximum number of stale completions to process in a single run of this
function. None means process all outstanding StaleCompletions.
routing_key (str|None) [default None]:
A routing key to pass to celery for the update_aggregators tasks. None
means use the default routing key.
"""
if not cache.add(
settings.COMPLETION_AGGREGATOR_AGGREGATION_LOCK,
True,
settings.COMPLETION_AGGREGATOR_AGGREGATION_LOCK_TIMEOUT_SECONDS
):
log.warning("Aggregation is already running. Exiting.")
return
stale_queryset = models.StaleCompletion.objects.filter(resolved=False)
task_options = {}
if limit is None:
limit = float('inf')
try:
min_id = stale_queryset.order_by('id')[0].id
max_id = stale_queryset.order_by('-id')[0].id
except IndexError:
log.warning("No StaleCompletions to process. Exiting.")
cache.delete(settings.COMPLETION_AGGREGATOR_AGGREGATION_LOCK) # Release the lock.
return
if routing_key:
task_options['routing_key'] = routing_key
stale_blocks = collections.defaultdict(set)
forced_updates = set()
enqueued = 0
for idx in range(max_id, min([min_id + batch_size, max_id]) - 1, -1 * batch_size):
if enqueued >= limit:
break
evaluated = stale_queryset.filter(id__gt=idx - batch_size, id__lte=idx)
enqueued += len(evaluated)
for stale in evaluated:
enrollment = EnrollmentTuple(
username=stale.username,
course_key=stale.course_key,
)
if not stale.block_key:
stale_blocks[enrollment] = utils.BagOfHolding()
blocks = stale_blocks[enrollment]
if isinstance(blocks, utils.BagOfHolding) or len(blocks) <= MAX_KEYS_PER_TASK:
# We can stop adding once we have exceeded the maximum number
# of keys per task. This keeps the memory usage of this
# function down, and limits the size of the task signature sent
# to celery.
stale_blocks[enrollment].add(stale.block_key)
if stale.force:
forced_updates.add(enrollment)
log.info("Performing aggregation update for %s user enrollments", len(stale_blocks))
for idx, enrollment in enumerate(stale_blocks):
if isinstance(stale_blocks[enrollment], utils.BagOfHolding):
blocks = []
elif len(stale_blocks[enrollment]) > MAX_KEYS_PER_TASK:
blocks = []
else:
blocks = [str(block_key) for block_key in stale_blocks[enrollment]]
aggregation_tasks.update_aggregators.apply_async(
kwargs={
'username': enrollment.username,
'course_key': str(enrollment.course_key),
'block_keys': blocks,
'force': enrollment in forced_updates,
},
**task_options
)
if idx % 1000 == 999:
if delay:
time.sleep(delay)
cache.delete(settings.COMPLETION_AGGREGATOR_AGGREGATION_LOCK) # Release the lock.
log.info("Finished aggregation update for %s user enrollments", len(stale_blocks))
def perform_cleanup():
"""
Remove resolved StaleCompletion objects.
"""
if not cache.add(
settings.COMPLETION_AGGREGATOR_CLEANUP_LOCK,
True,
settings.COMPLETION_AGGREGATOR_CLEANUP_LOCK_TIMEOUT_SECONDS
):
log.warning("Cleanup is already running. Exiting.")
return None
deleted = models.StaleCompletion.objects.filter(resolved=True).delete()
cache.delete(settings.COMPLETION_AGGREGATOR_CLEANUP_LOCK) # Release the lock.
return deleted