|
4 | 4 | import json
|
5 | 5 | import logging
|
6 | 6 | import uuid
|
| 7 | +from collections import defaultdict |
7 | 8 | from urllib.parse import unquote, urlencode, urlparse
|
8 | 9 |
|
9 | 10 | from django.conf import settings
|
@@ -1421,49 +1422,88 @@ class DocumentAccessViewSet(
|
1421 | 1422 | permission_classes = [permissions.IsAuthenticated, permissions.AccessPermission]
|
1422 | 1423 | queryset = models.DocumentAccess.objects.select_related("user").all()
|
1423 | 1424 | resource_field_name = "document"
|
1424 |
| - serializer_class = serializers.DocumentAccessSerializer |
1425 | 1425 |
|
1426 |
| - def list(self, request, *args, **kwargs): |
1427 |
| - """Return accesses for the current document with filters and annotations.""" |
1428 |
| - user = self.request.user |
| 1426 | + def __init__(self, *args, **kwargs): |
| 1427 | + """Initialize the viewset and define default value for contextual document.""" |
| 1428 | + super().__init__(*args, **kwargs) |
| 1429 | + self.document = None |
1429 | 1430 |
|
1430 |
| - try: |
1431 |
| - document = models.Document.objects.get(pk=self.kwargs["resource_id"]) |
1432 |
| - except models.Document.DoesNotExist: |
1433 |
| - return drf.response.Response([]) |
| 1431 | + def initial(self, request, *args, **kwargs): |
| 1432 | + """Retrieve self.document with annotated user roles.""" |
| 1433 | + super().initial(request, *args, **kwargs) |
1434 | 1434 |
|
1435 |
| - role = document.get_role(user) |
1436 |
| - if role is None: |
1437 |
| - return drf.response.Response([]) |
| 1435 | + try: |
| 1436 | + self.document = models.Document.objects.annotate_user_roles( |
| 1437 | + self.request.user |
| 1438 | + ).get(pk=self.kwargs["resource_id"]) |
| 1439 | + except models.Document.DoesNotExist as excpt: |
| 1440 | + raise Http404() from excpt |
1438 | 1441 |
|
1439 |
| - ancestors = ( |
1440 |
| - (document.get_ancestors() | models.Document.objects.filter(pk=document.pk)) |
1441 |
| - .filter(ancestors_deleted_at__isnull=True) |
1442 |
| - .order_by("path") |
| 1442 | + def get_serializer_class(self): |
| 1443 | + """Use light serializer for unprivileged users.""" |
| 1444 | + return ( |
| 1445 | + serializers.DocumentAccessSerializer |
| 1446 | + if self.document.get_role(self.request.user) in choices.PRIVILEGED_ROLES |
| 1447 | + else serializers.DocumentAccessLightSerializer |
1443 | 1448 | )
|
1444 |
| - highest_readable = ancestors.readable_per_se(user).only("depth").first() |
1445 | 1449 |
|
1446 |
| - if highest_readable is None: |
| 1450 | + def list(self, request, *args, **kwargs): |
| 1451 | + """Return accesses for the current document with filters and annotations.""" |
| 1452 | + user = request.user |
| 1453 | + |
| 1454 | + role = self.document.get_role(user) |
| 1455 | + if not role: |
1447 | 1456 | return drf.response.Response([])
|
1448 | 1457 |
|
1449 |
| - queryset = self.get_queryset() |
1450 |
| - queryset = queryset.filter( |
1451 |
| - document__in=ancestors.filter(depth__gte=highest_readable.depth) |
1452 |
| - ) |
| 1458 | + ancestors = ( |
| 1459 | + self.document.get_ancestors() |
| 1460 | + | models.Document.objects.filter(pk=self.document.pk) |
| 1461 | + ).filter(ancestors_deleted_at__isnull=True) |
1453 | 1462 |
|
1454 |
| - is_privileged = role in choices.PRIVILEGED_ROLES |
1455 |
| - if is_privileged: |
1456 |
| - serializer_class = serializers.DocumentAccessSerializer |
1457 |
| - else: |
1458 |
| - # Return only the document's privileged accesses |
| 1463 | + queryset = self.get_queryset().filter(document__in=ancestors) |
| 1464 | + |
| 1465 | + if role not in choices.PRIVILEGED_ROLES: |
1459 | 1466 | queryset = queryset.filter(role__in=choices.PRIVILEGED_ROLES)
|
1460 |
| - serializer_class = serializers.DocumentAccessLightSerializer |
1461 | 1467 |
|
1462 |
| - queryset = queryset.distinct() |
1463 |
| - serializer = serializer_class( |
1464 |
| - queryset, many=True, context=self.get_serializer_context() |
| 1468 | + accesses = list( |
| 1469 | + queryset.annotate(document_path=db.F("document__path")).order_by( |
| 1470 | + "document_path" |
| 1471 | + ) |
1465 | 1472 | )
|
1466 |
| - return drf.response.Response(serializer.data) |
| 1473 | + |
| 1474 | + # Annotate more information on roles |
| 1475 | + path_to_ancestors_roles = defaultdict(list) |
| 1476 | + path_to_role = defaultdict(lambda: None) |
| 1477 | + for access in accesses: |
| 1478 | + if access.user_id == user.id or access.team in user.teams: |
| 1479 | + parent_path = access.document_path[: -models.Document.steplen] |
| 1480 | + if parent_path: |
| 1481 | + path_to_ancestors_roles[access.document_path].extend( |
| 1482 | + path_to_ancestors_roles[parent_path] |
| 1483 | + ) |
| 1484 | + path_to_ancestors_roles[access.document_path].append( |
| 1485 | + path_to_role[parent_path] |
| 1486 | + ) |
| 1487 | + else: |
| 1488 | + path_to_ancestors_roles[access.document_path] = [] |
| 1489 | + |
| 1490 | + path_to_role[access.document_path] = choices.RoleChoices.max( |
| 1491 | + path_to_role[access.document_path], access.role |
| 1492 | + ) |
| 1493 | + |
| 1494 | + # serialize and return the response |
| 1495 | + context = self.get_serializer_context() |
| 1496 | + serializer_class = self.get_serializer_class() |
| 1497 | + serialized_data = [] |
| 1498 | + for access in accesses: |
| 1499 | + access.set_user_roles_tuple( |
| 1500 | + choices.RoleChoices.max(*path_to_ancestors_roles[access.document_path]), |
| 1501 | + path_to_role.get(access.document_path), |
| 1502 | + ) |
| 1503 | + serializer = serializer_class(access, context=context) |
| 1504 | + serialized_data.append(serializer.data) |
| 1505 | + |
| 1506 | + return drf.response.Response(serialized_data) |
1467 | 1507 |
|
1468 | 1508 | def perform_create(self, serializer):
|
1469 | 1509 | """Add a new access to the document and send an email to the new added user."""
|
|
0 commit comments