Skip to content

Commit df8ea04

Browse files
committed
[fix] Add org specific group and add tests
1 parent 9fcee51 commit df8ea04

File tree

2 files changed

+126
-43
lines changed

2 files changed

+126
-43
lines changed

openwisp_controller/geo/channels/consumers.py

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import swapper
2+
from asgiref.sync import async_to_sync
23
from django_loci.channels.base import BaseCommonLocationBroadcast, BaseLocationBroadcast
34

45
Location = swapper.load_model("geo", "Location")
@@ -22,12 +23,16 @@ def is_authorized(self, user, location):
2223
class CommonLocationBroadcast(BaseCommonLocationBroadcast):
2324
model = Location
2425

25-
def is_autherized(self, user, location):
26-
result = super().is_authorized(user, location)
27-
if (
28-
result
29-
and not user.is_superuser
30-
and not user.is_manager(location.organization)
31-
):
32-
return False
33-
return result
26+
def join_groups(self, user):
27+
"""
28+
Subscribe user to all organizations they manage or bypass if superuser.
29+
"""
30+
if user.is_superuser:
31+
super().join_groups(user)
32+
return
33+
34+
self.group_names = []
35+
for org in user.organizations_managed:
36+
group = f"loci.mobile-location.organization.{org}"
37+
self.group_names.append(group)
38+
async_to_sync(self.channel_layer.group_add)(group, self.channel_name)
Lines changed: 112 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,25 @@
1+
import asyncio
2+
import importlib
13
import os
4+
from contextlib import suppress
25
from unittest import skipIf
36

47
import pytest
58
from channels.db import database_sync_to_async
9+
from channels.layers import get_channel_layer
610
from channels.routing import ProtocolTypeRouter
711
from django.conf import settings
812
from django.contrib.auth import get_user_model
913
from django.contrib.auth.models import Permission
1014
from django.utils.module_loading import import_string
11-
from django_loci.tests.base.test_channels import BaseTestChannels
15+
from django_loci.tests import TestChannelsMixin
1216
from swapper import load_model
1317

18+
from openwisp_controller.geo.channels.consumers import (
19+
CommonLocationBroadcast,
20+
LocationBroadcast,
21+
)
22+
1423
from .utils import TestGeoMixin
1524

1625
Device = load_model("config", "Device")
@@ -21,7 +30,9 @@
2130

2231

2332
@skipIf(os.environ.get("SAMPLE_APP", False), "Running tests on SAMPLE_APP")
24-
class TestChannels(TestGeoMixin, BaseTestChannels):
33+
class TestChannels(TestGeoMixin, TestChannelsMixin):
34+
location_consumer = LocationBroadcast
35+
common_location_consumer = CommonLocationBroadcast
2536
application = import_string(getattr(settings, "ASGI_APPLICATION"))
2637
object_model = Device
2738
location_model = Location
@@ -37,35 +48,32 @@ async def test_consumer_staff_but_no_change_permission(self):
3748
location = await database_sync_to_async(self._create_location)(is_mobile=True)
3849
await database_sync_to_async(self._create_object_location)(location=location)
3950
pk = location.pk
40-
request_vars = await self._get_specific_location_request_dict(user=user, pk=pk)
51+
request_vars = await self._get_specific_location_request_dict(pk=pk, user=user)
4152
communicator = self._get_specific_location_communicator(request_vars, user)
4253
connected, _ = await communicator.connect()
4354
assert not connected
4455
await communicator.disconnect()
4556
# add permission to change location and repeat
46-
perm = await database_sync_to_async(
47-
(
48-
await database_sync_to_async(Permission.objects.filter)(
49-
name="Can change location"
50-
)
51-
).first
52-
)()
57+
perm = await Permission.objects.filter(
58+
codename=f"change_{self.location_model._meta.model_name}",
59+
content_type__app_label=self.location_model._meta.app_label,
60+
).afirst()
5361
await database_sync_to_async(user.user_permissions.add)(perm)
5462
user = await database_sync_to_async(User.objects.get)(pk=user.pk)
55-
request_vars = await self._get_specific_location_request_dict(user=user, pk=pk)
63+
request_vars = await self._get_specific_location_request_dict(pk=pk, user=user)
5664
communicator = self._get_specific_location_communicator(request_vars, user)
5765
connected, _ = await communicator.connect()
5866
assert not connected
5967
await communicator.disconnect()
6068
# add user to organization
6169
await database_sync_to_async(OrganizationUser.objects.create)(
62-
organization=location.organization, user=user, is_admin=True
70+
organization=location.organization,
71+
user=user,
72+
is_admin=True,
6373
)
6474
await database_sync_to_async(location.organization.save)()
6575
user = await database_sync_to_async(User.objects.get)(pk=user.pk)
66-
request_vars = await self._ge_get_specific_location_request_dictt_request_dict(
67-
user=user, pk=pk
68-
)
76+
request_vars = await self._get_specific_location_request_dict(pk=pk, user=user)
6977
communicator = self._get_specific_location_communicator(request_vars, user)
7078
connected, _ = await communicator.connect()
7179
assert connected
@@ -80,37 +88,107 @@ async def test_common_location_consumer_staff_but_no_change_permission(self):
8088
location = await database_sync_to_async(self._create_location)(is_mobile=True)
8189
await database_sync_to_async(self._create_object_location)(location=location)
8290
pk = location.pk
83-
request_vars = await self._get_common_location_request_dict(user=user, pk=pk)
91+
request_vars = await self._get_common_location_request_dict(pk=pk, user=user)
8492
communicator = self._get_common_location_communicator(request_vars, user)
8593
connected, _ = await communicator.connect()
8694
assert not connected
8795
await communicator.disconnect()
88-
# add permission to change location and repeat
89-
perm = await database_sync_to_async(
90-
(
91-
await database_sync_to_async(Permission.objects.filter)(
92-
name="Can change location"
93-
)
94-
).first
95-
)()
96+
# After granting change permission, the user can connect to the common
97+
# location endpoint, but must receive updates only for locations
98+
# belonging to their organization.
99+
perm = await Permission.objects.filter(
100+
codename=f"change_{self.location_model._meta.model_name}",
101+
content_type__app_label=self.location_model._meta.app_label,
102+
).afirst()
96103
await database_sync_to_async(user.user_permissions.add)(perm)
97104
user = await database_sync_to_async(User.objects.get)(pk=user.pk)
98-
request_vars = await self._get_common_location_request_dict(user=user, pk=pk)
105+
request_vars = await self._get_common_location_request_dict(pk=pk, user=user)
99106
communicator = self._get_common_location_communicator(request_vars, user)
100107
connected, _ = await communicator.connect()
101-
assert not connected
108+
assert connected
102109
await communicator.disconnect()
103-
# add user to organization
110+
111+
@pytest.mark.asyncio
112+
@pytest.mark.django_db(transaction=True)
113+
async def test_common_location_org_isolation(self):
114+
org1 = await database_sync_to_async(self._create_organization)(name="test1")
115+
org2 = await database_sync_to_async(self._create_organization)(name="test2")
116+
location1 = await database_sync_to_async(self._create_location)(
117+
is_mobile=True, organization=org1
118+
)
119+
location2 = await database_sync_to_async(self._create_location)(
120+
is_mobile=True, organization=org2
121+
)
122+
user1 = await database_sync_to_async(User.objects.create_user)(
123+
username="user1", password="password", email="[email protected]", is_staff=True
124+
)
125+
user2 = await database_sync_to_async(User.objects.create_user)(
126+
username="user2", password="password", email="[email protected]", is_staff=True
127+
)
128+
perm = await Permission.objects.filter(
129+
codename=f"change_{self.location_model._meta.model_name}",
130+
content_type__app_label=self.location_model._meta.app_label,
131+
).afirst()
132+
await database_sync_to_async(user1.user_permissions.add)(perm)
133+
await database_sync_to_async(user2.user_permissions.add)(perm)
104134
await database_sync_to_async(OrganizationUser.objects.create)(
105-
organization=location.organization, user=user, is_admin=True
135+
organization=org1, user=user1, is_admin=True
106136
)
107-
await database_sync_to_async(location.organization.save)()
108-
user = await database_sync_to_async(User.objects.get)(pk=user.pk)
109-
request_vars = await self._get_common_location_request_dict(user=user, pk=pk)
110-
communicator = self._get_common_location_communicator(request_vars, user)
111-
connected, _ = await communicator.connect()
137+
await database_sync_to_async(OrganizationUser.objects.create)(
138+
organization=org2, user=user2, is_admin=True
139+
)
140+
user1 = await database_sync_to_async(User.objects.get)(pk=user1.pk)
141+
user2 = await database_sync_to_async(User.objects.get)(pk=user2.pk)
142+
channel_layer = get_channel_layer()
143+
communicator1 = self._get_common_location_communicator(
144+
await self._get_common_location_request_dict(pk=location1.pk, user=user1),
145+
user1,
146+
)
147+
communicator2 = self._get_common_location_communicator(
148+
await self._get_common_location_request_dict(pk=location2.pk, user=user2),
149+
user2,
150+
)
151+
connected, _ = await communicator1.connect()
112152
assert connected
113-
await communicator.disconnect()
153+
connected, _ = await communicator2.connect()
154+
assert connected
155+
await channel_layer.group_send(
156+
f"loci.mobile-location.organization.{org1.pk}",
157+
{"type": "send.message", "message": {"id": str(location1.pk)}},
158+
)
159+
response = await communicator1.receive_json_from(timeout=1)
160+
assert response["id"] == str(location1.pk)
161+
with pytest.raises(asyncio.TimeoutError):
162+
await communicator2.receive_json_from(timeout=1)
163+
await communicator1.disconnect()
164+
# The task is been cancelled if not completed in the given timeout
165+
with suppress(asyncio.CancelledError):
166+
await communicator2.disconnect()
167+
168+
# Similarly, A message sent to org2 must not be received by users of org1
169+
communicator1 = self._get_common_location_communicator(
170+
await self._get_common_location_request_dict(pk=location1.pk, user=user1),
171+
user1,
172+
)
173+
communicator2 = self._get_common_location_communicator(
174+
await self._get_common_location_request_dict(pk=location2.pk, user=user2),
175+
user2,
176+
)
177+
connected, _ = await communicator1.connect()
178+
assert connected
179+
connected, _ = await communicator2.connect()
180+
assert connected
181+
await channel_layer.group_send(
182+
f"loci.mobile-location.organization.{org2.pk}",
183+
{"type": "send.message", "message": {"id": str(location2.pk)}},
184+
)
185+
response = await communicator2.receive_json_from(timeout=1)
186+
assert response["id"] == str(location2.pk)
187+
with pytest.raises(asyncio.TimeoutError):
188+
await communicator1.receive_json_from(timeout=1)
189+
await communicator2.disconnect()
190+
with suppress(asyncio.CancelledError):
191+
await communicator1.disconnect()
114192

115193
def test_asgi_application_router(self):
116194
assert isinstance(self.application, ProtocolTypeRouter)

0 commit comments

Comments
 (0)