Skip to content

Commit 5f2da45

Browse files
[feature] Run large batch user creation operations asynchronously #608
Closes #608 --------- Co-authored-by: Federico Capoano <f.capoano@openwisp.io>
1 parent 16eb418 commit 5f2da45

File tree

23 files changed

+578
-75
lines changed

23 files changed

+578
-75
lines changed

docs/user/settings.rst

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,18 @@ The default encryption format for storing radius check values.
111111
A list of disabled encryption formats, by default all formats are enabled
112112
in order to keep backward compatibility with legacy systems.
113113

114+
``OPENWISP_RADIUS_BATCH_ASYNC_THRESHOLD``
115+
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
116+
117+
**Default**: ``15``
118+
119+
When the number of users to be generated in batch user creation is greater
120+
than or equal to this value, the operation will be executed as a
121+
background task (asynchronously) using Celery. This prevents timeouts and
122+
keeps the user interface responsive when creating a large number of users.
123+
For batches smaller than the threshold, users will be created immediately
124+
(synchronously).
125+
114126
``OPENWISP_RADIUS_BATCH_DEFAULT_PASSWORD_LENGTH``
115127
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
116128

openwisp_radius/admin.py

Lines changed: 45 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from django.contrib.admin.utils import model_ngettext
88
from django.contrib.auth import get_user_model
99
from django.core.exceptions import PermissionDenied
10+
from django.http import HttpResponseRedirect
1011
from django.templatetags.static import static
1112
from django.urls import reverse
1213
from django.utils.safestring import mark_safe
@@ -338,13 +339,15 @@ class RadiusBatchAdmin(MultitenantAdminMixin, TimeStampedEditableAdmin):
338339
"name",
339340
"organization",
340341
"strategy",
342+
"status",
341343
"expiration_date",
342344
"created",
343345
"modified",
344346
]
345347
fields = [
346348
"strategy",
347349
"organization",
350+
"status",
348351
"name",
349352
"csvfile",
350353
"prefix",
@@ -393,25 +396,21 @@ def number_of_users(self, obj):
393396
number_of_users.short_description = _("number of users")
394397

395398
def get_fields(self, request, obj=None):
396-
fields = super().get_fields(request, obj)[:]
399+
fields = super().get_fields(request, obj)
397400
if not obj:
401+
fields = fields[:]
398402
fields.remove("users")
403+
fields.remove("status")
399404
return fields
400405

401406
def save_model(self, request, obj, form, change):
402-
data = form.cleaned_data
403-
strategy = data.get("strategy")
404-
if not change:
405-
if strategy == "csv":
406-
if data.get("csvfile", False):
407-
csvfile = data.get("csvfile")
408-
obj.csvfile_upload(csvfile)
409-
elif strategy == "prefix":
410-
prefix = data.get("prefix")
411-
n = data.get("number_of_users")
412-
obj.prefix_add(prefix, n)
413-
else:
414-
obj.save()
407+
if change:
408+
super().save_model(request, obj, form, change)
409+
return
410+
# Save the object initially to get a PK
411+
super().save_model(request, obj, form, change)
412+
num_users = form.cleaned_data.get("number_of_users", 0)
413+
obj.schedule_processing(number_of_users=num_users)
415414

416415
def delete_model(self, request, obj):
417416
obj.users.all().delete()
@@ -466,16 +465,46 @@ def get_readonly_fields(self, request, obj=None):
466465
readonly_fields = super(RadiusBatchAdmin, self).get_readonly_fields(
467466
request, obj
468467
)
469-
if obj:
468+
if obj and obj.status != "pending":
470469
return (
471470
"strategy",
472471
"prefix",
473472
"csvfile",
474473
"number_of_users",
475474
"users",
476475
"expiration_date",
476+
"name",
477+
"organization",
478+
"status",
477479
) + readonly_fields
478-
return readonly_fields
480+
elif obj:
481+
return ("status",) + readonly_fields
482+
return ("status",) + readonly_fields
483+
484+
def has_delete_permission(self, request, obj=None):
485+
if obj and obj.status == "processing":
486+
return False
487+
return super().has_delete_permission(request, obj)
488+
489+
def response_add(self, request, obj, post_url_continue=None):
490+
if obj.status != "pending":
491+
return super().response_add(request, obj, post_url_continue)
492+
opts = self.model._meta
493+
msg = _(
494+
'The batch user creation "{obj}" was added successfully '
495+
"and is now being processed in the background."
496+
).format(obj=obj)
497+
if "_continue" in request.POST:
498+
self.message_user(request, msg, messages.SUCCESS)
499+
post_url_continue = reverse(
500+
f"admin:{opts.app_label}_{opts.model_name}_change",
501+
args=(obj.pk,),
502+
current_app=self.admin_site.name,
503+
)
504+
return HttpResponseRedirect(post_url_continue)
505+
else:
506+
self.message_user(request, msg, messages.SUCCESS)
507+
return self.response_post_save_add(request, obj)
479508

480509

481510
# Inlines for UserAdmin & OrganizationAdmin

openwisp_radius/api/serializers.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -390,7 +390,7 @@ class RadiusBatchSerializer(serializers.ModelSerializer):
390390
)
391391
pdf_link = serializers.SerializerMethodField(
392392
help_text=(
393-
"Downlaod link to PDF file containing user credentials. "
393+
"Download link to the PDF file containing user credentials. "
394394
"Provided only for `prefix` strategy.`"
395395
),
396396
required=False,
@@ -405,9 +405,19 @@ class RadiusBatchSerializer(serializers.ModelSerializer):
405405
write_only=True,
406406
min_value=1,
407407
)
408+
status = serializers.CharField(read_only=True)
409+
410+
def create(self, validated_data):
411+
validated_data.pop("organization_slug", None)
412+
validated_data.pop("number_of_users", None)
413+
return super().create(validated_data)
408414

409415
def get_pdf_link(self, obj):
410-
if isinstance(obj, RadiusBatch) and obj.strategy == "prefix":
416+
if (
417+
isinstance(obj, RadiusBatch)
418+
and obj.strategy == "prefix"
419+
and obj.status == RadiusBatch.COMPLETED
420+
):
411421
request = self.context.get("request")
412422
return request.build_absolute_uri(
413423
reverse(
@@ -434,7 +444,7 @@ def validate(self, data):
434444
class Meta:
435445
model = RadiusBatch
436446
fields = "__all__"
437-
read_only_fields = ("created", "modified", "user_credentials")
447+
read_only_fields = ("status", "user_credentials", "created", "modified")
438448

439449

440450
class PasswordResetSerializer(BasePasswordResetSerializer):

openwisp_radius/api/views.py

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -108,17 +108,17 @@ def post(self, request, *args, **kwargs):
108108
serializer = self.get_serializer(data=request.data)
109109
if serializer.is_valid():
110110
valid_data = serializer.validated_data.copy()
111-
num_of_users = valid_data.pop("number_of_users", None)
112-
valid_data["organization"] = valid_data.pop("organization_slug", None)
113-
batch = serializer.create(valid_data)
114-
strategy = valid_data.get("strategy")
115-
if strategy == "csv":
116-
batch.csvfile_upload()
117-
response = RadiusBatchSerializer(batch, context={"request": request})
118-
else:
119-
batch.prefix_add(valid_data.get("prefix"), num_of_users)
120-
response = RadiusBatchSerializer(batch, context={"request": request})
121-
return Response(response.data, status=status.HTTP_201_CREATED)
111+
num_of_users = valid_data.get("number_of_users", 0)
112+
organization = valid_data.pop("organization_slug", None)
113+
valid_data.pop("number_of_users", None)
114+
batch = serializer.save(organization=organization)
115+
is_async = batch.schedule_processing(number_of_users=num_of_users)
116+
batch.refresh_from_db()
117+
response_serializer = self.get_serializer(batch)
118+
status_code = (
119+
status.HTTP_202_ACCEPTED if is_async else status.HTTP_201_CREATED
120+
)
121+
return Response(response_serializer.data, status=status_code)
122122
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
123123

124124

openwisp_radius/base/models.py

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
import django
1111
import phonenumbers
1212
import swapper
13+
from asgiref.sync import async_to_sync
14+
from channels.layers import get_channel_layer
1315
from django.conf import settings
1416
from django.contrib.auth import get_user_model
1517
from django.core.cache import cache
@@ -23,13 +25,15 @@
2325
from django.utils.translation import gettext_lazy as _
2426
from jsonfield import JSONField
2527
from model_utils.fields import AutoLastModifiedField
28+
from openwisp_notifications.signals import notify
2629
from phonenumber_field.modelfields import PhoneNumberField
2730
from private_storage.fields import PrivateFileField
2831

2932
from openwisp_radius.registration import (
3033
REGISTRATION_METHOD_CHOICES,
3134
get_registration_choices,
3235
)
36+
from openwisp_radius.tasks import process_radius_batch
3337
from openwisp_users.mixins import OrgMixin
3438
from openwisp_utils.base import KeyField, TimeStampedEditableModel, UUIDModel
3539
from openwisp_utils.fields import (
@@ -865,13 +869,31 @@ def _get_csv_file_location(instance, filename):
865869

866870

867871
class AbstractRadiusBatch(OrgMixin, TimeStampedEditableModel):
872+
PENDING = "pending"
873+
PROCESSING = "processing"
874+
COMPLETED = "completed"
875+
FAILED = "failed"
876+
877+
BATCH_STATUS_CHOICES = (
878+
(PENDING, _("Pending")),
879+
(PROCESSING, _("Processing")),
880+
(COMPLETED, _("Completed")),
881+
(FAILED, _("Failed")),
882+
)
883+
868884
strategy = models.CharField(
869885
_("strategy"),
870886
max_length=16,
871887
choices=_STRATEGIES,
872888
db_index=True,
873889
help_text=_("Import users from a CSV or generate using a prefix"),
874890
)
891+
status = models.CharField(
892+
max_length=16,
893+
choices=BATCH_STATUS_CHOICES,
894+
default=PENDING,
895+
db_index=True,
896+
)
875897
name = models.CharField(
876898
verbose_name=_("name"),
877899
max_length=128,
@@ -1064,6 +1086,71 @@ def _remove_files(self):
10641086
if self.csvfile:
10651087
self.csvfile.storage.delete(self.csvfile.name)
10661088

1089+
def schedule_processing(self, number_of_users=0):
1090+
items_to_process = 0
1091+
if self.strategy == "prefix":
1092+
items_to_process = number_of_users
1093+
elif self.strategy == "csv" and self.csvfile:
1094+
try:
1095+
csv_data = self.csvfile.read()
1096+
decoded_data = decode_byte_data(csv_data)
1097+
items_to_process = sum(1 for row in csv.reader(StringIO(decoded_data)))
1098+
self.csvfile.seek(0)
1099+
except Exception as e:
1100+
logger.error(f"Could not count rows in CSV for batch {self.pk}: {e}")
1101+
items_to_process = app_settings.BATCH_ASYNC_THRESHOLD
1102+
is_async = items_to_process >= app_settings.BATCH_ASYNC_THRESHOLD
1103+
if is_async:
1104+
process_radius_batch.delay(self.pk, number_of_users=number_of_users)
1105+
else:
1106+
self.process(number_of_users=number_of_users)
1107+
return is_async
1108+
1109+
def process(self, number_of_users=0, is_async=False):
1110+
channel_layer = get_channel_layer()
1111+
group_name = f"radius_batch_{self.pk}"
1112+
try:
1113+
self.status = self.PROCESSING
1114+
self.save(update_fields=["status"])
1115+
if self.strategy == "prefix":
1116+
self.prefix_add(self.prefix, number_of_users)
1117+
elif self.strategy == "csv":
1118+
self.csvfile_upload()
1119+
self.status = self.COMPLETED
1120+
self.save(update_fields=["status"])
1121+
if is_async:
1122+
notify.send(
1123+
type="generic_message",
1124+
level="success",
1125+
message=_(
1126+
f'The batch creation operation for "{self.name}" '
1127+
"has completed successfully."
1128+
),
1129+
sender=self.organization,
1130+
target=self,
1131+
description=_(f"Number of users processed: {self.users.count()}."),
1132+
)
1133+
except Exception as e:
1134+
logger.error(
1135+
"RadiusBatch %s failed during processing:", self.pk, exc_info=True
1136+
)
1137+
self.status = self.FAILED
1138+
self.save(update_fields=["status"])
1139+
notify.send(
1140+
type="generic_message",
1141+
level="error",
1142+
message=_(f'The batch creation operation for "{self.name}" failed.'),
1143+
sender=self.organization,
1144+
target=self,
1145+
description=_(
1146+
f"An error occurred while processing the batch.\n\n" f"Error: {e}"
1147+
),
1148+
)
1149+
finally:
1150+
async_to_sync(channel_layer.group_send)(
1151+
group_name, {"type": "batch_status_update", "status": self.status}
1152+
)
1153+
10671154

10681155
class AbstractRadiusToken(OrgMixin, TimeStampedEditableModel, models.Model):
10691156
# key field is a primary key so additional id field will be redundant

openwisp_radius/consumers.py

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
from asgiref.sync import sync_to_async
2+
from channels.generic.websocket import AsyncJsonWebsocketConsumer
3+
from django.core.exceptions import ObjectDoesNotExist
4+
5+
from .utils import load_model
6+
7+
8+
class RadiusBatchConsumer(AsyncJsonWebsocketConsumer):
9+
def _user_can_access_batch(self, user, batch_id):
10+
RadiusBatch = load_model("RadiusBatch")
11+
# Superusers have access to everything,
12+
if user.is_superuser:
13+
return RadiusBatch.objects.filter(pk=batch_id).exists()
14+
# For non-superusers, check their managed organizations
15+
try:
16+
RadiusBatch.objects.filter(
17+
pk=batch_id, organization__in=user.organizations_managed
18+
).exists()
19+
return True
20+
except ObjectDoesNotExist:
21+
return False
22+
23+
async def connect(self):
24+
self.batch_id = self.scope["url_route"]["kwargs"]["batch_id"]
25+
self.user = self.scope["user"]
26+
self.group_name = f"radius_batch_{self.batch_id}"
27+
28+
if not self.user.is_authenticated or not self.user.is_staff:
29+
await self.close()
30+
return
31+
32+
has_permission = await sync_to_async(self._user_can_access_batch)(
33+
self.user, self.batch_id
34+
)
35+
36+
if not has_permission:
37+
await self.close()
38+
return
39+
40+
await self.channel_layer.group_add(self.group_name, self.channel_name)
41+
await self.accept()
42+
43+
async def disconnect(self, close_code):
44+
await self.channel_layer.group_discard(self.group_name, self.channel_name)
45+
46+
async def batch_status_update(self, event):
47+
await self.send_json({"status": event["status"]})

0 commit comments

Comments
 (0)