diff --git a/nautobot_chatops/models.py b/nautobot_chatops/models.py index 5bf667ab..7e8fb1dc 100644 --- a/nautobot_chatops/models.py +++ b/nautobot_chatops/models.py @@ -9,6 +9,26 @@ from .choices import AccessGrantTypeChoices, CommandStatusChoices, CommandTokenPlatformChoices +import re + + +class QuerySetRegexMatch(models.QuerySet): + """Customized QuerySet to match string agains regexes stored in database. + + Return a list of matched records. + """ + + def match_re(self, value): + records = self.all() + + results = [] + for row in records: + if row.subcommand == "*": # skip as it is not a valid regex expression and would raise exception + continue + if re.match(row.subcommand, value): + results.append(row.subcommand) + return results + class CommandLog(BaseModel): """Record of a single fully-executed Nautobot command. @@ -68,6 +88,7 @@ class AccessGrant(BaseModel, ChangeLoggedModel): max_length=64, help_text="Enter * to grant access to all subcommands of the given command", ) + regexMatch = QuerySetRegexMatch.as_manager() grant_type = models.CharField(max_length=32, choices=AccessGrantTypeChoices) diff --git a/nautobot_chatops/tests/test_utils.py b/nautobot_chatops/tests/test_utils.py index f9c10cd8..903e98ed 100644 --- a/nautobot_chatops/tests/test_utils.py +++ b/nautobot_chatops/tests/test_utils.py @@ -203,6 +203,29 @@ def setup_db(self): name="user3", value="3333", ) + + AccessGrant.objects.create( + command="x", + subcommand="get-.*", + grant_type=AccessGrantTypeChoices.TYPE_ORGANIZATION, + name="org3", + value="33", + ) + AccessGrant.objects.create( + command="x", + subcommand="get-.*", + grant_type=AccessGrantTypeChoices.TYPE_CHANNEL, + name="channel3", + value="333", + ) + AccessGrant.objects.create( + command="x", + subcommand="get-.*", + grant_type=AccessGrantTypeChoices.TYPE_USER, + name="user3", + value="3333", + ) + # And some wildcard access grants: AccessGrant.objects.create( command="y", @@ -313,6 +336,42 @@ def test_permitted_subcommand(self, mock_enqueue_task): ) mock_enqueue_task.assert_not_called() + def test_permitted_subcommand_re(self, mock_enqueue_task): + """A per-subcommand access grant applies that subcommands under that command, and no others.""" + self.setup_db() + for cmd, subcmd in [("x", "get-device"), ("x", "get-prefix-status"), ("x", "")]: + mock_enqueue_task.reset_mock() + check_and_enqueue_command( + self.mock_registry, + cmd, + subcmd, + [], + {"org_id": "33", "channel_id": "333", "user_id": "3333"}, + MockDispatcher.reset(), + ) + self.assertIsNone(MockDispatcher.error) + mock_enqueue_task.assert_called_once() + + def test_not_permitted_re(self, mock_enqueue_task): + """Per-user access grants are checked.""" + self.setup_db() + for cmd, subcmd in [("x", "set-device")]: + mock_enqueue_task.reset_mock() + check_and_enqueue_command( + self.mock_registry, + cmd, + subcmd, + [], + {"org_id": "33", "channel_id": "333", "user_id": "3333"}, + MockDispatcher.reset(), + ) + self.assertEqual( + MockDispatcher.error, + "Access to this bot and/or command is not permitted in organization 33, " + "ask your Nautobot administrator to define an appropriate Access Grant", + ) + mock_enqueue_task.assert_not_called() + def test_not_permitted_user(self, mock_enqueue_task): """Per-user access grants are checked.""" self.setup_db() diff --git a/nautobot_chatops/utils.py b/nautobot_chatops/utils.py index c1e99e39..f2723501 100644 --- a/nautobot_chatops/utils.py +++ b/nautobot_chatops/utils.py @@ -147,7 +147,10 @@ def check_and_enqueue_command( else: # Actual subcommand - permit only if this particular subcommand (or all commands/subcommands) are permitted access_grants = AccessGrant.objects.filter( - Q(command="*") | Q(command=command, subcommand="*") | Q(command=command, subcommand=subcommand), + Q(command="*") + | Q(command=command, subcommand="*") + | Q(command=command, subcommand=subcommand) + | Q(command=command, subcommand__in=AccessGrant.regexMatch.match_re(subcommand)) ) org_grants = access_grants.filter(grant_type=AccessGrantTypeChoices.TYPE_ORGANIZATION)