|
4 | 4 | import json
|
5 | 5 | import logging
|
6 | 6 | import uuid
|
| 7 | +from collections import defaultdict |
7 | 8 | from urllib.parse import unquote, urlencode, urlparse
|
8 | 9 |
|
9 | 10 | from django.conf import settings
|
@@ -1500,49 +1501,88 @@ class DocumentAccessViewSet(
|
1500 | 1501 | permission_classes = [permissions.IsAuthenticated, permissions.AccessPermission]
|
1501 | 1502 | queryset = models.DocumentAccess.objects.select_related("user").all()
|
1502 | 1503 | resource_field_name = "document"
|
1503 |
| - serializer_class = serializers.DocumentAccessSerializer |
1504 | 1504 |
|
1505 |
| - def list(self, request, *args, **kwargs): |
1506 |
| - """Return accesses for the current document with filters and annotations.""" |
1507 |
| - user = self.request.user |
| 1505 | + def __init__(self, *args, **kwargs): |
| 1506 | + """Initialize the viewset and define default value for contextual document.""" |
| 1507 | + super().__init__(*args, **kwargs) |
| 1508 | + self.document = None |
1508 | 1509 |
|
1509 |
| - try: |
1510 |
| - document = models.Document.objects.get(pk=self.kwargs["resource_id"]) |
1511 |
| - except models.Document.DoesNotExist: |
1512 |
| - return drf.response.Response([]) |
| 1510 | + def initial(self, request, *args, **kwargs): |
| 1511 | + """Retrieve self.document with annotated user roles.""" |
| 1512 | + super().initial(request, *args, **kwargs) |
1513 | 1513 |
|
1514 |
| - role = document.get_role(user) |
1515 |
| - if role is None: |
1516 |
| - return drf.response.Response([]) |
| 1514 | + try: |
| 1515 | + self.document = models.Document.objects.annotate_user_roles( |
| 1516 | + self.request.user |
| 1517 | + ).get(pk=self.kwargs["resource_id"]) |
| 1518 | + except models.Document.DoesNotExist as excpt: |
| 1519 | + raise Http404() from excpt |
1517 | 1520 |
|
1518 |
| - ancestors = ( |
1519 |
| - (document.get_ancestors() | models.Document.objects.filter(pk=document.pk)) |
1520 |
| - .filter(ancestors_deleted_at__isnull=True) |
1521 |
| - .order_by("path") |
| 1521 | + def get_serializer_class(self): |
| 1522 | + """Use light serializer for unprivileged users.""" |
| 1523 | + return ( |
| 1524 | + serializers.DocumentAccessSerializer |
| 1525 | + if self.document.get_role(self.request.user) in choices.PRIVILEGED_ROLES |
| 1526 | + else serializers.DocumentAccessLightSerializer |
1522 | 1527 | )
|
1523 |
| - highest_readable = ancestors.readable_per_se(user).only("depth").first() |
1524 | 1528 |
|
1525 |
| - if highest_readable is None: |
| 1529 | + def list(self, request, *args, **kwargs): |
| 1530 | + """Return accesses for the current document with filters and annotations.""" |
| 1531 | + user = request.user |
| 1532 | + |
| 1533 | + role = self.document.get_role(user) |
| 1534 | + if not role: |
1526 | 1535 | return drf.response.Response([])
|
1527 | 1536 |
|
1528 |
| - queryset = self.get_queryset() |
1529 |
| - queryset = queryset.filter( |
1530 |
| - document__in=ancestors.filter(depth__gte=highest_readable.depth) |
1531 |
| - ) |
| 1537 | + ancestors = ( |
| 1538 | + self.document.get_ancestors() |
| 1539 | + | models.Document.objects.filter(pk=self.document.pk) |
| 1540 | + ).filter(ancestors_deleted_at__isnull=True) |
1532 | 1541 |
|
1533 |
| - is_privileged = role in choices.PRIVILEGED_ROLES |
1534 |
| - if is_privileged: |
1535 |
| - serializer_class = serializers.DocumentAccessSerializer |
1536 |
| - else: |
1537 |
| - # Return only the document's privileged accesses |
| 1542 | + queryset = self.get_queryset().filter(document__in=ancestors) |
| 1543 | + |
| 1544 | + if role not in choices.PRIVILEGED_ROLES: |
1538 | 1545 | queryset = queryset.filter(role__in=choices.PRIVILEGED_ROLES)
|
1539 |
| - serializer_class = serializers.DocumentAccessLightSerializer |
1540 | 1546 |
|
1541 |
| - queryset = queryset.distinct() |
1542 |
| - serializer = serializer_class( |
1543 |
| - queryset, many=True, context=self.get_serializer_context() |
| 1547 | + accesses = list( |
| 1548 | + queryset.annotate(document_path=db.F("document__path")).order_by( |
| 1549 | + "document_path" |
| 1550 | + ) |
1544 | 1551 | )
|
1545 |
| - return drf.response.Response(serializer.data) |
| 1552 | + |
| 1553 | + # Annotate more information on roles |
| 1554 | + path_to_ancestors_roles = defaultdict(list) |
| 1555 | + path_to_role = defaultdict(lambda: None) |
| 1556 | + for access in accesses: |
| 1557 | + if access.user_id == user.id or access.team in user.teams: |
| 1558 | + parent_path = access.document_path[: -models.Document.steplen] |
| 1559 | + if parent_path: |
| 1560 | + path_to_ancestors_roles[access.document_path].extend( |
| 1561 | + path_to_ancestors_roles[parent_path] |
| 1562 | + ) |
| 1563 | + path_to_ancestors_roles[access.document_path].append( |
| 1564 | + path_to_role[parent_path] |
| 1565 | + ) |
| 1566 | + else: |
| 1567 | + path_to_ancestors_roles[access.document_path] = [] |
| 1568 | + |
| 1569 | + path_to_role[access.document_path] = choices.RoleChoices.max( |
| 1570 | + path_to_role[access.document_path], access.role |
| 1571 | + ) |
| 1572 | + |
| 1573 | + # serialize and return the response |
| 1574 | + context = self.get_serializer_context() |
| 1575 | + serializer_class = self.get_serializer_class() |
| 1576 | + serialized_data = [] |
| 1577 | + for access in accesses: |
| 1578 | + access.set_user_roles_tuple( |
| 1579 | + choices.RoleChoices.max(*path_to_ancestors_roles[access.document_path]), |
| 1580 | + path_to_role.get(access.document_path), |
| 1581 | + ) |
| 1582 | + serializer = serializer_class(access, context=context) |
| 1583 | + serialized_data.append(serializer.data) |
| 1584 | + |
| 1585 | + return drf.response.Response(serialized_data) |
1546 | 1586 |
|
1547 | 1587 | def perform_create(self, serializer):
|
1548 | 1588 | """Add a new access to the document and send an email to the new added user."""
|
|
0 commit comments