Skip to content

Commit f0f5305

Browse files
committed
[fix] Send message to organization specific channel and fixed test
1 parent 9697b8b commit f0f5305

File tree

2 files changed

+112
-41
lines changed

2 files changed

+112
-41
lines changed

openwisp_controller/geo/apps.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
1+
import json
2+
3+
import channels.layers
14
import swapper
5+
from asgiref.sync import async_to_sync
26
from django.conf import settings
37
from django.db.models import Case, Count, Sum, When
8+
from django.db.models.signals import post_save
49
from django.utils.translation import gettext_lazy as _
510
from django_loci.apps import LociConfig
611
from swapper import get_model_name
@@ -115,5 +120,37 @@ def register_menu_groups(self):
115120
},
116121
)
117122

123+
def _load_receivers(self):
124+
super()._load_receivers()
125+
post_save.connect(
126+
self._location_post_save_websocket_receiver,
127+
sender=self.location_model,
128+
dispatch_uid="geo_ws_update_mobile_location",
129+
)
130+
131+
def _location_post_save_websocket_receiver(
132+
self, sender, instance, created, **kwargs
133+
):
134+
"""
135+
Sends location updates over websockets to organization specific channel group.
136+
"""
137+
if created or not instance.geometry:
138+
return
139+
channel_layer = channels.layers.get_channel_layer()
140+
async_to_sync(channel_layer.group_send)(
141+
f"loci.mobile-location.organization.{instance.organization_id}",
142+
{
143+
"type": "send_message",
144+
"message": {
145+
"id": str(instance.pk),
146+
"geometry": json.loads(instance.geometry.geojson),
147+
"address": instance.address,
148+
"name": instance.name,
149+
"type": instance.type,
150+
"is_mobile": instance.is_mobile,
151+
},
152+
},
153+
)
154+
118155

119156
del LociConfig

openwisp_controller/geo/tests/pytest.py

Lines changed: 75 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,9 @@
55

66
import pytest
77
from channels.db import database_sync_to_async
8-
from channels.layers import get_channel_layer
98
from channels.routing import ProtocolTypeRouter
109
from django.conf import settings
11-
from django.contrib.auth import get_user_model
10+
from django.contrib.auth import get_permission_codename, get_user_model
1211
from django.contrib.auth.models import Permission
1312
from django.utils.module_loading import import_string
1413
from django_loci.tests import TestChannelsMixin
@@ -18,6 +17,7 @@
1817
CommonLocationBroadcast,
1918
LocationBroadcast,
2019
)
20+
from openwisp_users.tests.utils import TestOrganizationMixin
2121

2222
from .utils import TestGeoMixin
2323

@@ -26,10 +26,11 @@
2626
DeviceLocation = load_model("geo", "DeviceLocation")
2727
User = get_user_model()
2828
OrganizationUser = load_model("openwisp_users", "OrganizationUser")
29+
Group = load_model("openwisp_users", "Group")
2930

3031

3132
@skipIf(os.environ.get("SAMPLE_APP", False), "Running tests on SAMPLE_APP")
32-
class TestChannels(TestGeoMixin, TestChannelsMixin):
33+
class TestChannels(TestGeoMixin, TestChannelsMixin, TestOrganizationMixin):
3334
location_consumer = LocationBroadcast
3435
common_location_consumer = CommonLocationBroadcast
3536
application = import_string(getattr(settings, "ASGI_APPLICATION"))
@@ -110,59 +111,92 @@ async def test_common_location_consumer_staff_but_no_change_permission(self):
110111
@pytest.mark.asyncio
111112
@pytest.mark.django_db(transaction=True)
112113
async def test_common_location_org_isolation(self):
113-
org1 = await database_sync_to_async(self._create_organization)(name="test1")
114-
org2 = await database_sync_to_async(self._create_organization)(name="test2")
115-
location1 = await database_sync_to_async(self._create_location)(
114+
administrator = await Group.objects.acreate(name="Administrator")
115+
perm = await Permission.objects.filter(
116+
codename=get_permission_codename("change", self.location_model._meta),
117+
).afirst()
118+
await administrator.permissions.aadd(perm)
119+
org1 = await database_sync_to_async(self._get_org)(org_name="test1")
120+
org2 = await database_sync_to_async(self._get_org)(org_name="test2")
121+
org1_location = await database_sync_to_async(self._create_location)(
116122
is_mobile=True, organization=org1
117123
)
118-
location2 = await database_sync_to_async(self._create_location)(
124+
org2_location = await database_sync_to_async(self._create_location)(
119125
is_mobile=True, organization=org2
120126
)
121-
user1 = await database_sync_to_async(User.objects.create_user)(
122-
username="user1", password="password", email="[email protected]", is_staff=True
127+
org1_user = await database_sync_to_async(self._create_administrator)(
128+
organizations=[org1],
129+
username="user1",
130+
password="password",
131+
123132
)
124-
user2 = await database_sync_to_async(User.objects.create_user)(
125-
username="user2", password="password", email="[email protected]", is_staff=True
133+
org2_user = await database_sync_to_async(self._create_administrator)(
134+
organizations=[org2],
135+
username="user2",
136+
password="password",
137+
126138
)
127-
perm = await Permission.objects.filter(
128-
codename=f"change_{self.location_model._meta.model_name}",
129-
content_type__app_label=self.location_model._meta.app_label,
130-
).afirst()
131-
await database_sync_to_async(user1.user_permissions.add)(perm)
132-
await database_sync_to_async(user2.user_permissions.add)(perm)
133-
await database_sync_to_async(OrganizationUser.objects.create)(
134-
organization=org1, user=user1, is_admin=True
135-
)
136-
await database_sync_to_async(OrganizationUser.objects.create)(
137-
organization=org2, user=user2, is_admin=True
139+
admin = await database_sync_to_async(self._get_admin)()
140+
org1_communicator = self._get_common_location_communicator(
141+
await self._get_common_location_request_dict(
142+
pk=org1_location.pk, user=org1_user
143+
),
144+
org1_user,
138145
)
139-
user1 = await database_sync_to_async(User.objects.get)(pk=user1.pk)
140-
user2 = await database_sync_to_async(User.objects.get)(pk=user2.pk)
141-
channel_layer = get_channel_layer()
142-
communicator1 = self._get_common_location_communicator(
143-
await self._get_common_location_request_dict(pk=location1.pk, user=user1),
144-
user1,
146+
org2_communicator = self._get_common_location_communicator(
147+
await self._get_common_location_request_dict(
148+
pk=org2_location.pk, user=org2_user
149+
),
150+
org2_user,
145151
)
146-
communicator2 = self._get_common_location_communicator(
147-
await self._get_common_location_request_dict(pk=location2.pk, user=user2),
148-
user2,
152+
admin_communicator = self._get_common_location_communicator(
153+
await self._get_common_location_request_dict(
154+
pk=org1_location.pk, user=admin
155+
),
156+
admin,
149157
)
150-
connected, _ = await communicator1.connect()
158+
connected, _ = await org1_communicator.connect()
151159
assert connected
152-
connected, _ = await communicator2.connect()
160+
connected, _ = await org2_communicator.connect()
153161
assert connected
154-
await channel_layer.group_send(
155-
f"loci.mobile-location.organization.{org1.pk}",
156-
{"type": "send.message", "message": {"id": str(location1.pk)}},
162+
connected, _ = await admin_communicator.connect()
163+
assert connected
164+
165+
# Updating co-ordinates for org1_location should notify org1_user and admin,
166+
await self._save_location(str(org1_location.pk))
167+
org1_response = await org1_communicator.receive_json_from(timeout=1)
168+
assert org1_response["id"] == str(org1_location.pk)
169+
admin_response = await admin_communicator.receive_json_from(timeout=1)
170+
assert admin_response["id"] == str(org1_location.pk)
171+
with pytest.raises(asyncio.TimeoutError):
172+
await org2_communicator.receive_json_from(timeout=1)
173+
174+
with suppress(asyncio.CancelledError):
175+
await org2_communicator.disconnect()
176+
177+
org2_communicator = self._get_common_location_communicator(
178+
await self._get_common_location_request_dict(
179+
pk=org2_location.pk, user=org2_user
180+
),
181+
org2_user,
157182
)
158-
response = await communicator1.receive_json_from(timeout=1)
159-
assert response["id"] == str(location1.pk)
183+
connected, _ = await org2_communicator.connect()
184+
assert connected
185+
186+
# Updating co-ordinates for org2_location should notify org2_user and admin,
187+
await self._save_location(str(org2_location.pk))
188+
org2_response = await org2_communicator.receive_json_from(timeout=1)
189+
assert org2_response["id"] == str(org2_location.pk)
190+
admin_response = await admin_communicator.receive_json_from(timeout=1)
191+
assert admin_response["id"] == str(org2_location.pk)
160192
with pytest.raises(asyncio.TimeoutError):
161-
await communicator2.receive_json_from(timeout=1)
193+
await org1_communicator.receive_json_from(timeout=1)
194+
162195
# The task is been cancelled if not completed in the given timeout
163-
await communicator1.disconnect()
164196
with suppress(asyncio.CancelledError):
165-
await communicator2.disconnect()
197+
await org1_communicator.disconnect()
198+
await org2_communicator.disconnect()
199+
await admin_communicator.disconnect()
166200

167201
def test_asgi_application_router(self):
168202
assert isinstance(self.application, ProtocolTypeRouter)

0 commit comments

Comments
 (0)