|
5 | 5 | from weaviate.connect.v4 import _ExpectedStatusCodes |
6 | 6 | from weaviate.rbac.models import ( |
7 | 7 | _Permission, |
| 8 | + PermissionsOutputType, |
8 | 9 | PermissionsInputType, |
9 | 10 | Role, |
10 | 11 | User, |
@@ -129,6 +130,17 @@ async def _remove_permissions(self, permissions: List[WeaviatePermission], role: |
129 | 130 | status_codes=_ExpectedStatusCodes(ok_in=[200], error="Remove permissions"), |
130 | 131 | ) |
131 | 132 |
|
| 133 | + async def _has_permission(self, permission: WeaviatePermission, role: str) -> bool: |
| 134 | + path = f"/authz/roles/{role}/has-permission" |
| 135 | + |
| 136 | + res = await self._connection.post( |
| 137 | + path, |
| 138 | + weaviate_object=permission, |
| 139 | + error_msg="Could not check permission", |
| 140 | + status_codes=_ExpectedStatusCodes(ok_in=[200], error="Check permission"), |
| 141 | + ) |
| 142 | + return cast(bool, res.json()) |
| 143 | + |
132 | 144 |
|
133 | 145 | class _RolesAsync(_RolesBase): |
134 | 146 | def __user_from_weaviate_user(self, user: str) -> User: |
@@ -279,6 +291,20 @@ async def remove_permissions(self, *, permissions: PermissionsInputType, role: s |
279 | 291 | [permission._to_weaviate() for permission in _flatten_permissions(permissions)], role |
280 | 292 | ) |
281 | 293 |
|
| 294 | + async def has_permission( |
| 295 | + self, *, permission: Union[_Permission, PermissionsOutputType], role: str |
| 296 | + ) -> bool: |
| 297 | + """Check if a role has a specific permission. |
| 298 | +
|
| 299 | + Args: |
| 300 | + permission: The permission to check. |
| 301 | + role: The role to check the permission for. |
| 302 | +
|
| 303 | + Returns: |
| 304 | + True if the role has the permission, False otherwise. |
| 305 | + """ |
| 306 | + return await self._has_permission(permission._to_weaviate(), role) |
| 307 | + |
282 | 308 |
|
283 | 309 | def _flatten_permissions(permissions: PermissionsInputType) -> List[_Permission]: |
284 | 310 | if isinstance(permissions, _Permission): |
|
0 commit comments