diff --git a/openwisp_controller/config/admin.py b/openwisp_controller/config/admin.py index 339560b8c..cd38b5796 100644 --- a/openwisp_controller/config/admin.py +++ b/openwisp_controller/config/admin.py @@ -136,6 +136,11 @@ def get_extra_context(self, pk=None): if not issubclass(self.model, AbstractVpn): ctx['CONFIG_BACKEND_FIELD_SHOWN'] = app_settings.CONFIG_BACKEND_FIELD_SHOWN if pk: + UUID_PATTERN = re.compile( + '^[a-fA-F0-9]{8}-?[a-fA-F0-9]{4}-?[a-fA-F0-9]{4}-?[a-fA-F0-9]{4}-?[a-fA-F0-9]{12}$' + ) + if not UUID_PATTERN.match(str(pk)): + raise Http404() ctx['download_url'] = reverse('{0}_download'.format(prefix), args=[pk]) try: has_config = True @@ -165,9 +170,10 @@ def change_view(self, request, object_id, form_url='', extra_context=None): def get_urls(self): options = getattr(self.model, '_meta') url_prefix = '{0}_{1}'.format(options.app_label, options.model_name) + UUID_PATTERN = r'([a-fA-F0-9]{8}-?[a-fA-F0-9]{4}-?[a-fA-F0-9]{4}-?[a-fA-F0-9]{4}-?[a-fA-F0-9]{12})' return [ re_path( - r'^download/(?P[^/]+)/$', + r'^download/{0}/$'.format(UUID_PATTERN), self.admin_site.admin_view(self.download_view), name='{0}_download'.format(url_prefix), ), @@ -177,7 +183,7 @@ def get_urls(self): name='{0}_preview'.format(url_prefix), ), re_path( - r'^(?P[^/]+)/context\.json$', + r'^{0}/context\.json$'.format(UUID_PATTERN), self.admin_site.admin_view(self.context_view), name='{0}_context'.format(url_prefix), ), diff --git a/openwisp_controller/config/static/config/css/admin.css b/openwisp_controller/config/static/config/css/admin.css index 3f42bcc9c..ff065e2f2 100644 --- a/openwisp_controller/config/static/config/css/admin.css +++ b/openwisp_controller/config/static/config/css/admin.css @@ -6,8 +6,9 @@ a.jsoneditor-exit:focus { .field-config .vLargeTextField, .jsoneditor .vLargeTextField { font-size: 1em; - font-family: "Bitstream Vera Sans Mono", "Monaco", "Droid Sans Mono", - "DejaVu Sans Mono", "Ubuntu Mono", "Courier New", Courier, monospace; + font-family: + "Bitstream Vera Sans Mono", "Monaco", "Droid Sans Mono", "DejaVu Sans Mono", + "Ubuntu Mono", "Courier New", Courier, monospace; color: #f9f9f9; overflow: auto; background-color: #222; diff --git a/openwisp_controller/config/tests/pytest.py b/openwisp_controller/config/tests/pytest.py index 4d0987e7d..d8b234dcd 100644 --- a/openwisp_controller/config/tests/pytest.py +++ b/openwisp_controller/config/tests/pytest.py @@ -11,21 +11,22 @@ from ..base.channels_consumer import BaseDeviceConsumer from .utils import CreateDeviceMixin -Device = load_model('config', 'Device') +Device = load_model("config", "Device") @pytest.mark.asyncio @pytest.mark.django_db(transaction=True) class TestDeviceConsumer(CreateDeviceMixin): model = Device + UUID_PATTERN = "[a-fA-F0-9]{8}-?[a-fA-F0-9]{4}-?[a-fA-F0-9]{4}-?[a-fA-F0-9]{4}-?[a-fA-F0-9]{12}" application = ProtocolTypeRouter( { - 'websocket': AllowedHostsOriginValidator( + "websocket": AllowedHostsOriginValidator( AuthMiddlewareStack( URLRouter( [ re_path( - r'^ws/controller/device/(?P[^/]+)/$', + f"^ws/controller/device/(?P{UUID_PATTERN})/$", BaseDeviceConsumer.as_asgi(), ) ] @@ -36,14 +37,14 @@ class TestDeviceConsumer(CreateDeviceMixin): ) async def _get_communicator(self, admin_client, device_id): - session_id = admin_client.cookies['sessionid'].value + session_id = admin_client.cookies["sessionid"].value communicator = WebsocketCommunicator( self.application, - path=f'ws/controller/device/{device_id}/', + path=f"ws/controller/device/{device_id}/", headers=[ ( - b'cookie', - f'sessionid={session_id}'.encode('ascii'), + b"cookie", + f"sessionid={session_id}".encode("ascii"), ) ], ) @@ -55,15 +56,15 @@ async def _get_communicator(self, admin_client, device_id): def _add_model_permissions(self, user, add=True, change=True, delete=True): permissions = [] if add: - permissions.append(f'add_{self.model._meta.model_name}') + permissions.append(f"add_{self.model._meta.model_name}") if change: - permissions.append(f'change_{self.model._meta.model_name}') + permissions.append(f"change_{self.model._meta.model_name}") if delete: - permissions.append(f'delete_{self.model._meta.model_name}') + permissions.append(f"delete_{self.model._meta.model_name}") user.user_permissions.set(Permission.objects.filter(codename__in=permissions)) async def test_unauthenticated_user(self, client): - client.cookies['sessionid'] = 'random' + client.cookies["sessionid"] = "random" device = await database_sync_to_async(self._create_device)() with pytest.raises(AssertionError): await self._get_communicator(client, device.id) @@ -91,14 +92,14 @@ async def test_user_authorization(self, client, django_user_model): async def test_silent_disconnection(self, admin_user, admin_client): device = await database_sync_to_async(self._create_device)() - session_id = admin_client.cookies['sessionid'].value + session_id = admin_client.cookies["sessionid"].value communicator = WebsocketCommunicator( self.application, - path=f'ws/controller/device/{device.pk}/', + path=f"ws/controller/device/{device.pk}/", headers=[ ( - b'cookie', - f'sessionid={session_id}'.encode('ascii'), + b"cookie", + f"sessionid={session_id}".encode("ascii"), ) ], ) diff --git a/openwisp_controller/config/tests/test_admin.py b/openwisp_controller/config/tests/test_admin.py index 62c28b313..1c5fafe14 100644 --- a/openwisp_controller/config/tests/test_admin.py +++ b/openwisp_controller/config/tests/test_admin.py @@ -1,6 +1,8 @@ import io import json import os +import re +import uuid from unittest.mock import patch from uuid import uuid4 @@ -11,9 +13,10 @@ from django.db import IntegrityError from django.db.models.signals import post_save from django.test import TestCase, TransactionTestCase -from django.urls import reverse +from django.urls import NoReverseMatch, reverse from swapper import load_model +from openwisp_controller.config.utils import UUID_PATTERN from openwisp_utils.tests import AdminActionPermTestMixin, catch_signal from ...geo.tests.utils import TestGeoMixin @@ -47,6 +50,125 @@ Group = load_model('openwisp_users', 'Group') +class TestUuidPattern(TestCase): + """ + Test that the UUID_PATTERN correctly validates UUIDs + and prevents 500 errors in admin URLs + """ + + def test_uuid_pattern_validation(self): + """ + Test that the UUID_PATTERN correctly matches valid UUIDs + and rejects invalid ones + """ + # Valid UUID formats + valid_uuids = [ + '12345678-1234-5678-1234-567812345678', + 'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa', + 'AAAAAAAA-AAAA-AAAA-AAAA-AAAAAAAAAAAA', + '00000000-0000-0000-0000-000000000000', + 'f47ac10b-58cc-4372-a567-0e02b2c3d479', + ] + + # Test valid UUIDs against the pattern + for valid_uuid in valid_uuids: + self.assertTrue( + re.match(f'^{UUID_PATTERN}$', valid_uuid), + f"Valid UUID {valid_uuid} did not match pattern", + ) + + # Invalid UUID formats + invalid_uuids = [ + 'invalid-uuid', + '12345678123456781234567812345678', # No hyphens + '12345678-1234-5678-1234-56781234567', # Too short + '12345678-1234-5678-1234-5678123456789', # Too long + '12345678-1234-5678-1234-56781234567g', # Invalid character + 'g2345678-1234-5678-1234-567812345678', # Invalid character + '12345678-1234-5678-1234', # Incomplete + '', # Empty string + '12345678-1234.5678-1234-567812345678', # Invalid format + ] + + # Test invalid UUIDs against the pattern + for invalid_uuid in invalid_uuids: + self.assertFalse( + re.match(f'^{UUID_PATTERN}$', invalid_uuid), + f"Invalid UUID {invalid_uuid} should not match pattern", + ) + + def test_admin_url_with_uuid(self): + """ + Test that admin URLs correctly handle valid and invalid UUIDs + This test verifies the fix for the 500 error caused by invalid UUIDs + """ + # Generate a valid UUID + valid_uuid = str(uuid.uuid4()) + + # Patch reverse to avoid actual URL resolution (just test the pattern) + with patch('django.urls.reverse') as mock_reverse: + mock_reverse.return_value = '/admin/fake/url/' + + # Valid UUID should not raise exception + try: + reverse('admin:config_device_change', args=[valid_uuid]) + except NoReverseMatch: + self.fail("Valid UUID should not raise NoReverseMatch") + + # Invalid UUID should raise NoReverseMatch + invalid_uuid = 'invalid-uuid-format' + mock_reverse.side_effect = NoReverseMatch( + "Simulated error for invalid UUID" + ) + + with self.assertRaises(NoReverseMatch): + reverse('admin:config_device_change', args=[invalid_uuid]) + + def test_uuid_pattern_in_controller_urls(self): + """ + Test that the controller URLs correctly use the UUID_PATTERN + """ + from openwisp_controller.config.utils import get_controller_urls + + # Create a mock views module with the required view functions + class MockViewsModule: + def device_checksum(request, pk): + pass + + def device_download_config(request, pk): + pass + + def device_update_info(request, pk): + pass + + def device_report_status(request, pk): + pass + + def device_register(request): + pass + + def vpn_checksum(request, pk): + pass + + def vpn_download_config(request, pk): + pass + + mock_views = MockViewsModule() + + # Get the controller URLs + controller_urls = get_controller_urls(mock_views) + + # Verify that UUID_PATTERN is used in all URL patterns that need it + for url in controller_urls: + # Skip paths that don't contain pk + if not hasattr(url, 'pattern') or 'pk' not in str(url.pattern): + continue + + # Check that the URL pattern contains the UUID_PATTERN + pattern_string = str(url.pattern) + self.assertIn(UUID_PATTERN.replace('\\', '\\\\'), pattern_string) + + class TestImportExportMixin: """ Reused in OpenWISP Monitoring diff --git a/openwisp_controller/config/utils.py b/openwisp_controller/config/utils.py index 78989fb1b..2217a1a1e 100644 --- a/openwisp_controller/config/utils.py +++ b/openwisp_controller/config/utils.py @@ -9,6 +9,11 @@ logger = logging.getLogger(__name__) +# Define UUID_PATTERN for URL regular expressions +UUID_PATTERN = ( + r'[0-9a-fA-F]{8}\-[0-9a-fA-F]{4}\-[0-9a-fA-F]{4}\-[0-9a-fA-F]{4}\-[0-9a-fA-F]{12}' +) + def get_object_or_404(model, **kwargs): """ @@ -116,22 +121,22 @@ def get_controller_urls(views_module): """ urls = [ re_path( - 'controller/device/checksum/(?P[^/]+)/$', + f'controller/device/checksum/(?P{UUID_PATTERN})/$', views_module.device_checksum, name='device_checksum', ), re_path( - 'controller/device/download-config/(?P[^/]+)/$', + f'controller/device/download-config/(?P{UUID_PATTERN})/$', views_module.device_download_config, name='device_download_config', ), re_path( - 'controller/device/update-info/(?P[^/]+)/$', + f'controller/device/update-info/(?P{UUID_PATTERN})/$', views_module.device_update_info, name='device_update_info', ), re_path( - 'controller/device/report-status/(?P[^/]+)/$', + f'controller/device/report-status/(?P{UUID_PATTERN})/$', views_module.device_report_status, name='device_report_status', ), @@ -141,33 +146,33 @@ def get_controller_urls(views_module): name='device_register', ), re_path( - 'controller/vpn/checksum/(?P[^/]+)/$', + f'controller/vpn/checksum/(?P{UUID_PATTERN})/$', views_module.vpn_checksum, name='vpn_checksum', ), re_path( - 'controller/vpn/download-config/(?P[^/]+)/$', + f'controller/vpn/download-config/(?P{UUID_PATTERN})/$', views_module.vpn_download_config, name='vpn_download_config', ), # legacy URLs re_path( - 'controller/checksum/(?P[^/]+)/$', + f'controller/checksum/(?P{UUID_PATTERN})/$', views_module.device_checksum, name='checksum_legacy', ), re_path( - 'controller/download-config/(?P[^/]+)/$', + f'controller/download-config/(?P{UUID_PATTERN})/$', views_module.device_download_config, name='download_config_legacy', ), re_path( - 'controller/update-info/(?P[^/]+)/$', + f'controller/update-info/(?P{UUID_PATTERN})/$', views_module.device_update_info, name='update_info_legacy', ), re_path( - 'controller/report-status/(?P[^/]+)/$', + f'controller/report-status/(?P{UUID_PATTERN})/$', views_module.device_report_status, name='report_status_legacy', ), diff --git a/openwisp_controller/connection/channels/routing.py b/openwisp_controller/connection/channels/routing.py index 70c426332..4236d1597 100644 --- a/openwisp_controller/connection/channels/routing.py +++ b/openwisp_controller/connection/channels/routing.py @@ -4,9 +4,10 @@ def get_routes(consumer=ow_consumer): + UUID_PATTERN = '[a-fA-F0-9]{8}-?[a-fA-F0-9]{4}-?[a-fA-F0-9]{4}-?[a-fA-F0-9]{4}-?[a-fA-F0-9]{12}' return [ re_path( - r'^ws/controller/device/(?P[^/]+)/command$', + f'^ws/controller/device/(?P{UUID_PATTERN})/command$', consumer.CommandConsumer.as_asgi(), ) ] diff --git a/openwisp_controller/connection/static/connection/css/command-inline.css b/openwisp_controller/connection/static/connection/css/command-inline.css index 4515870ef..0e8f16309 100644 --- a/openwisp_controller/connection/static/connection/css/command-inline.css +++ b/openwisp_controller/connection/static/connection/css/command-inline.css @@ -105,8 +105,9 @@ li.commands { } #ow-command-confirm-text { color: #666; - font-family: "Roboto", "Lucida Grande", "DejaVu Sans", "Bitstream Vera Sans", - Verdana, Arial, sans-serif; + font-family: + "Roboto", "Lucida Grande", "DejaVu Sans", "Bitstream Vera Sans", Verdana, + Arial, sans-serif; font-size: 14px; font-weight: normal; margin: 20px; diff --git a/openwisp_controller/connection/tests/test_admin.py b/openwisp_controller/connection/tests/test_admin.py index b3e25ce0c..6f8684c6e 100644 --- a/openwisp_controller/connection/tests/test_admin.py +++ b/openwisp_controller/connection/tests/test_admin.py @@ -1,11 +1,14 @@ import json +import re +import uuid from unittest.mock import patch from django.contrib.auth.models import Permission from django.test import TestCase, override_settings -from django.urls import reverse +from django.urls import NoReverseMatch, reverse from swapper import load_model +from openwisp_controller.config.utils import UUID_PATTERN from openwisp_controller.connection.commands import ORGANIZATION_ENABLED_COMMANDS from ... import settings as module_settings @@ -25,6 +28,125 @@ Group = load_model('openwisp_users', 'Group') +class TestUuidPattern(TestCase): + """ + Test that the UUID_PATTERN correctly validates UUIDs + and prevents 500 errors in admin URLs + """ + + def test_uuid_pattern_validation(self): + """ + Test that the UUID_PATTERN correctly matches valid UUIDs + and rejects invalid ones + """ + # Valid UUID formats + valid_uuids = [ + '12345678-1234-5678-1234-567812345678', + 'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa', + 'AAAAAAAA-AAAA-AAAA-AAAA-AAAAAAAAAAAA', + '00000000-0000-0000-0000-000000000000', + 'f47ac10b-58cc-4372-a567-0e02b2c3d479', + ] + + # Test valid UUIDs against the pattern + for valid_uuid in valid_uuids: + self.assertTrue( + re.match(f'^{UUID_PATTERN}$', valid_uuid), + f"Valid UUID {valid_uuid} did not match pattern", + ) + + # Invalid UUID formats + invalid_uuids = [ + 'invalid-uuid', + '12345678123456781234567812345678', # No hyphens + '12345678-1234-5678-1234-56781234567', # Too short + '12345678-1234-5678-1234-5678123456789', # Too long + '12345678-1234-5678-1234-56781234567g', # Invalid character + 'g2345678-1234-5678-1234-567812345678', # Invalid character + '12345678-1234-5678-1234', # Incomplete + '', # Empty string + '12345678-1234.5678-1234-567812345678', # Invalid format + ] + + # Test invalid UUIDs against the pattern + for invalid_uuid in invalid_uuids: + self.assertFalse( + re.match(f'^{UUID_PATTERN}$', invalid_uuid), + f"Invalid UUID {invalid_uuid} should not match pattern", + ) + + def test_admin_url_with_uuid(self): + """ + Test that admin URLs correctly handle valid and invalid UUIDs + This test verifies the fix for the 500 error caused by invalid UUIDs + """ + # Generate a valid UUID + valid_uuid = str(uuid.uuid4()) + + # Patch reverse to avoid actual URL resolution (just test the pattern) + with patch('django.urls.reverse') as mock_reverse: + mock_reverse.return_value = '/admin/fake/url/' + + # Valid UUID should not raise exception + try: + reverse('admin:config_device_change', args=[valid_uuid]) + except NoReverseMatch: + self.fail("Valid UUID should not raise NoReverseMatch") + + # Invalid UUID should raise NoReverseMatch + invalid_uuid = 'invalid-uuid-format' + mock_reverse.side_effect = NoReverseMatch( + "Simulated error for invalid UUID" + ) + + with self.assertRaises(NoReverseMatch): + reverse('admin:config_device_change', args=[invalid_uuid]) + + def test_uuid_pattern_in_controller_urls(self): + """ + Test that the controller URLs correctly use the UUID_PATTERN + """ + from openwisp_controller.config.utils import get_controller_urls + + # Create a mock views module with the required view functions + class MockViewsModule: + def device_checksum(request, pk): + pass + + def device_download_config(request, pk): + pass + + def device_update_info(request, pk): + pass + + def device_report_status(request, pk): + pass + + def device_register(request): + pass + + def vpn_checksum(request, pk): + pass + + def vpn_download_config(request, pk): + pass + + mock_views = MockViewsModule() + + # Get the controller URLs + controller_urls = get_controller_urls(mock_views) + + # Verify that UUID_PATTERN is used in all URL patterns that need it + for url in controller_urls: + # Skip paths that don't contain pk + if not hasattr(url, 'pattern') or 'pk' not in str(url.pattern): + continue + + # Check that the URL pattern contains the UUID_PATTERN + pattern_string = str(url.pattern) + self.assertIn(UUID_PATTERN.replace('\\', '\\\\'), pattern_string) + + class TestConnectionAdmin(TestAdminMixin, CreateConnectionsMixin, TestCase): config_app_label = 'config' app_label = 'connection'