|
2 | 2 |
|
3 | 3 | import json |
4 | 4 | import logging |
5 | | -from typing import Mapping, Optional, Tuple |
| 5 | +from typing import List, Mapping, Optional, Tuple |
6 | 6 |
|
7 | 7 | from anoncreds import CredentialDefinition |
8 | 8 | from marshmallow import RAISE |
9 | 9 |
|
10 | | -from ......anoncreds.base import AnonCredsResolutionError |
| 10 | +from ......anoncreds.base import AnonCredsObjectNotFound, AnonCredsResolutionError |
11 | 11 | from ......anoncreds.holder import AnonCredsHolder, AnonCredsHolderError |
12 | 12 | from ......anoncreds.issuer import CATEGORY_CRED_DEF, AnonCredsIssuer |
13 | 13 | from ......anoncreds.models.credential import AnoncredsCredentialSchema |
@@ -204,34 +204,51 @@ async def _create(): |
204 | 204 | offer_json = await issuer.create_credential_offer(cred_def_id) |
205 | 205 | return json.loads(offer_json) |
206 | 206 |
|
207 | | - async def _get_attr_names(schema_id): |
| 207 | + async def _get_attr_names(schema_id) -> List[str] | None: |
| 208 | + """Fetch attribute names for a given schema ID from the registry.""" |
| 209 | + if not schema_id: |
| 210 | + return None |
208 | 211 | try: |
209 | | - return ( |
210 | | - await registry.get_schema(self.profile, schema_id) |
211 | | - ).schema.attr_names |
| 212 | + schema_result = await registry.get_schema(self.profile, schema_id) |
| 213 | + return schema_result.schema.attr_names |
| 214 | + except AnonCredsObjectNotFound: |
| 215 | + LOGGER.info(f"Schema not found for schema_id={schema_id}") |
| 216 | + return None |
212 | 217 | except AnonCredsResolutionError as e: |
213 | | - LOGGER.error(f"Error getting schema: {e} from schema_id") |
| 218 | + LOGGER.warning(f"Schema resolution failed for schema_id={schema_id}: {e}") |
214 | 219 | return None |
215 | 220 |
|
| 221 | + async def _fetch_schema_attr_names( |
| 222 | + self, anoncreds_attachment, cred_def_id |
| 223 | + ) -> List[str] | None: |
| 224 | + """Determine schema attribute names from schema_id or cred_def_id.""" |
| 225 | + schema_id = anoncreds_attachment.get("schema_id") |
| 226 | + attr_names = await _get_attr_names(schema_id) |
| 227 | + |
| 228 | + if attr_names: |
| 229 | + return attr_names |
| 230 | + |
| 231 | + if cred_def_id: |
| 232 | + async with self.profile.session() as session: |
| 233 | + cred_def_entry = await session.handle.fetch( |
| 234 | + CATEGORY_CRED_DEF, cred_def_id |
| 235 | + ) |
| 236 | + cred_def_dict = CredentialDefinition.load( |
| 237 | + cred_def_entry.value |
| 238 | + ).to_dict() |
| 239 | + return await _get_attr_names(cred_def_dict.get("schemaId")) |
| 240 | + |
| 241 | + return None |
| 242 | + |
216 | 243 | attr_names = None |
217 | 244 | registry = self.profile.inject(AnonCredsRegistry) |
218 | | - # Attempt to get schema attributes from schema_id |
219 | | - if anoncreds_attachment.get("schema_id"): |
220 | | - attr_names = await _get_attr_names(anoncreds_attachment.get("schema_id")) |
221 | 245 |
|
222 | | - # Attempt to get schema attributes from cred def id |
223 | | - if not attr_names and cred_def_id: |
224 | | - async with self.profile.session() as session: |
225 | | - cred_def_entry = await session.handle.fetch( |
226 | | - CATEGORY_CRED_DEF, cred_def_id |
227 | | - ) |
228 | | - cred_def_dict = CredentialDefinition.load(cred_def_entry.value).to_dict() |
229 | | - attr_names = await _get_attr_names(cred_def_dict["schemaId"]) |
| 246 | + attr_names = await _fetch_schema_attr_names(anoncreds_attachment, cred_def_id) |
230 | 247 |
|
231 | 248 | if not attr_names: |
232 | 249 | raise V20CredFormatError( |
233 | | - "Could not determine schema attributes. Are you the issuer that created" |
234 | | - "the schema? Or do you need to provide the schema_id" |
| 250 | + "Could not determine schema attributes. If you did not create the " |
| 251 | + "schema, then you need to provide the schema_id." |
235 | 252 | ) |
236 | 253 |
|
237 | 254 | schema_attrs = set(attr_names) |
|
0 commit comments