|
| 1 | +import asyncio |
1 | 2 | import sys |
2 | 3 | from collections.abc import Collection |
3 | 4 | from dataclasses import dataclass |
|
29 | 30 | "Accept": "application/scim+json", |
30 | 31 | "Content-Type": "application/scim+json", |
31 | 32 | } |
| 33 | +CONFIG_RESOURCES = (ResourceType, Schema, ServiceProviderConfig) |
32 | 34 |
|
33 | 35 |
|
34 | 36 | @dataclass |
@@ -153,18 +155,27 @@ def __init__( |
153 | 155 | check_response_payload: bool = True, |
154 | 156 | raise_scim_errors: bool = True, |
155 | 157 | ): |
156 | | - self.resource_models = tuple( |
157 | | - set(resource_models or []) | {ResourceType, Schema, ServiceProviderConfig} |
158 | | - ) |
| 158 | + self.resource_models = tuple(set(resource_models or []) | set(CONFIG_RESOURCES)) |
159 | 159 | self.resource_types = resource_types |
160 | 160 | self.check_request_payload = check_request_payload |
161 | 161 | self.check_response_payload = check_response_payload |
162 | 162 | self.raise_scim_errors = raise_scim_errors |
163 | 163 |
|
| 164 | + def get_resource_model(self, name: str) -> Optional[type[Resource]]: |
| 165 | + """Get a registered model by its name or its schema.""" |
| 166 | + for resource_model in self.resource_models: |
| 167 | + schema = resource_model.model_fields["schemas"].default[0] |
| 168 | + if schema == name or schema.split(":")[-1] == name: |
| 169 | + return resource_model |
| 170 | + return None |
| 171 | + |
164 | 172 | def check_resource_model( |
165 | 173 | self, resource_model: type[Resource], payload=None |
166 | 174 | ) -> None: |
167 | | - if resource_model not in self.resource_models: |
| 175 | + if ( |
| 176 | + resource_model not in self.resource_models |
| 177 | + and resource_model not in CONFIG_RESOURCES |
| 178 | + ): |
168 | 179 | raise SCIMRequestError( |
169 | 180 | f"Unknown resource type: '{resource_model}'", source=payload |
170 | 181 | ) |
@@ -202,7 +213,7 @@ def register_naive_resource_types(self): |
202 | 213 | self.resource_types = [ |
203 | 214 | ResourceType.from_resource(model) |
204 | 215 | for model in self.resource_models |
205 | | - if model not in (ResourceType, Schema, ServiceProviderConfig) |
| 216 | + if model not in CONFIG_RESOURCES |
206 | 217 | ] |
207 | 218 |
|
208 | 219 | def check_response( |
@@ -759,6 +770,31 @@ def replace( |
759 | 770 | """ |
760 | 771 | raise NotImplementedError() |
761 | 772 |
|
| 773 | + def discover(self): |
| 774 | + """Dynamically discover the server models :class:`~scim2_models.Schema` and :class:`~scim2_models.ResourceType`. |
| 775 | +
|
| 776 | + This is a shortcut for :meth:`BaseSyncSCIMClient.discover_models` |
| 777 | + and :meth:`BaseSyncSCIMClient.discover_resource_types`. |
| 778 | + """ |
| 779 | + self.discover_models() |
| 780 | + self.discover_resource_types() |
| 781 | + |
| 782 | + def discover_models(self): |
| 783 | + """Dynamically register resource models by reading the server :class:`~scim2_models.Schema` endpoint. |
| 784 | +
|
| 785 | + Internally it performs a request to the SCIM server to get all the schemas, |
| 786 | + generate classes from those schemas, and register them in the client. |
| 787 | + """ |
| 788 | + schemas = self.query(Schema) |
| 789 | + self.resource_models = tuple( |
| 790 | + Resource.from_schema(schema) for schema in schemas.resources |
| 791 | + ) |
| 792 | + |
| 793 | + def discover_resource_types(self): |
| 794 | + """Dynamically register resource types by reading the server :class:`~scim2_models.ResourceType` endpoint.""" |
| 795 | + schemas = self.query(ResourceType) |
| 796 | + self.resource_types = schemas.resources |
| 797 | + |
762 | 798 |
|
763 | 799 | class BaseAsyncSCIMClient(BaseSCIMClient): |
764 | 800 | """Base class for asynchronous request clients.""" |
@@ -1015,3 +1051,30 @@ async def replace( |
1015 | 1051 | the response payload. |
1016 | 1052 | """ |
1017 | 1053 | raise NotImplementedError() |
| 1054 | + |
| 1055 | + async def discover(self): |
| 1056 | + """Dynamically discover the server models :class:`~scim2_models.Schema` and :class:`~scim2_models.ResourceType`. |
| 1057 | +
|
| 1058 | + This is a shortcut for the parallel execution of :meth:`BaseAsyncSCIMClient.discover_models` |
| 1059 | + and :meth:`BaseAsyncSCIMClient.discover_resource_types`. |
| 1060 | + """ |
| 1061 | + models_task = asyncio.create_task(self.discover_models()) |
| 1062 | + resources_task = asyncio.create_task(self.discover_resource_types()) |
| 1063 | + await models_task |
| 1064 | + await resources_task |
| 1065 | + |
| 1066 | + async def discover_models(self): |
| 1067 | + """Dynamically register resource models by reading the server :class:`~scim2_models.Schema` endpoint. |
| 1068 | +
|
| 1069 | + Internally it performs a request to the SCIM server to get all the schemas, |
| 1070 | + generate classes from those schemas, and register them in the client. |
| 1071 | + """ |
| 1072 | + schemas = await self.query(Schema) |
| 1073 | + self.resource_models = tuple( |
| 1074 | + Resource.from_schema(schema) for schema in schemas.resources |
| 1075 | + ) |
| 1076 | + |
| 1077 | + async def discover_resource_types(self): |
| 1078 | + """Dynamically register resource types by reading the server :class:`~scim2_models.ResourceType` endpoint.""" |
| 1079 | + schemas = await self.query(ResourceType) |
| 1080 | + self.resource_types = schemas.resources |
0 commit comments