diff --git a/openwisp_controller/connection/channels/consumers.py b/openwisp_controller/connection/channels/consumers.py index ef19d16fc..6c47cbcac 100644 --- a/openwisp_controller/connection/channels/consumers.py +++ b/openwisp_controller/connection/channels/consumers.py @@ -1,4 +1,5 @@ import json +from copy import deepcopy from swapper import load_model @@ -9,5 +10,6 @@ class CommandConsumer(BaseDeviceConsumer): def send_update(self, event): - event.pop('type') - self.send(json.dumps(event)) + data = deepcopy(event) + data.pop('type') + self.send(json.dumps(data)) diff --git a/openwisp_controller/connection/tests/pytest.py b/openwisp_controller/connection/tests/pytest.py index 63558d31e..4966e6337 100644 --- a/openwisp_controller/connection/tests/pytest.py +++ b/openwisp_controller/connection/tests/pytest.py @@ -36,11 +36,7 @@ async def _get_communicator(self, admin_client, device_id): assert connected is True return communicator - @mock.patch('paramiko.SSHClient.connect') - async def test_new_command_created(self, admin_user, admin_client): - device_conn = await database_sync_to_async(self._create_device_connection)() - communicator = await self._get_communicator(admin_client, device_conn.device_id) - + async def _create_command(self, device_conn): command = Command( device_id=device_conn.device_id, connection=device_conn, @@ -52,11 +48,12 @@ async def test_new_command_created(self, admin_user, admin_client): mocked_exec_command.return_value = self._exec_command_return_value( stdout='test' ) - await database_sync_to_async(command.save)() - await database_sync_to_async(command.refresh_from_db)() + await database_sync_to_async(command.save)() + await database_sync_to_async(command.refresh_from_db)() + return command - response = await communicator.receive_json_from() - expected_response = { + def _get_expected_response(self, command): + return { 'model': 'Command', 'data': { 'id': str(command.id), @@ -70,5 +67,41 @@ async def test_new_command_created(self, admin_user, admin_client): 'connection': str(command.connection_id), }, } + + @mock.patch('paramiko.SSHClient.connect') + async def test_new_command_created(self, admin_user, admin_client): + device_conn = await database_sync_to_async(self._create_device_connection)() + communicator = await self._get_communicator(admin_client, device_conn.device_id) + command = await self._create_command(device_conn) + response = await communicator.receive_json_from() + expected_response = self._get_expected_response(command) assert response == expected_response await communicator.disconnect() + + async def test_multiple_connections_receive_updates_with_redis( + self, admin_user, admin_client, settings + ): + settings.CHANNEL_LAYERS = { + 'default': { + 'BACKEND': 'channels_redis.core.RedisChannelLayer', + 'CONFIG': { + 'hosts': [('localhost', 6379)], + }, + }, + } + + device_conn = await database_sync_to_async(self._create_device_connection)() + communicator1 = await self._get_communicator( + admin_client, device_conn.device_id + ) + communicator2 = await self._get_communicator( + admin_client, device_conn.device_id + ) + command = await self._create_command(device_conn) + response1 = await communicator1.receive_json_from() + response2 = await communicator2.receive_json_from() + expected_response = self._get_expected_response(command) + assert response1 == expected_response + assert response2 == expected_response + await communicator1.disconnect() + await communicator2.disconnect()