|
1 | 1 | import json |
2 | 2 | from copy import deepcopy |
3 | 3 | from datetime import datetime, timezone |
| 4 | +from types import SimpleNamespace |
4 | 5 | from unittest import TestCase |
5 | 6 |
|
6 | 7 | import pytest |
|
12 | 13 | from ....wallet.base import BaseWallet |
13 | 14 | from ....wallet.did_method import SOV, DIDMethods |
14 | 15 | from ....wallet.key_type import ED25519 |
15 | | -from ....wallet.util import b64_to_bytes, bytes_to_b64 |
| 16 | +from ....wallet.util import b58_to_bytes, b64_to_bytes, bytes_to_b64, str_to_b64 |
16 | 17 | from ..attach_decorator import ( |
17 | 18 | AttachDecorator, |
18 | 19 | AttachDecoratorData, |
@@ -522,3 +523,82 @@ async def test_indy_sign(self, wallet, seed): |
522 | 523 | deco_dict["data"]["links"] = "https://en.wikipedia.org/wiki/Potato" |
523 | 524 | with pytest.raises(BaseModelError): |
524 | 525 | AttachDecorator.deserialize(deco_dict) # now has base64 and links |
| 526 | + |
| 527 | + @pytest.mark.asyncio |
| 528 | + async def test_verify_uses_kid_from_header_when_jwk_has_no_kid(self, wallet, seed): |
| 529 | + """Verify uses kid from JWS header when jwk has no kid (Credo/interop case).""" |
| 530 | + # Build a JWS where kid is only in the unprotected header, not in jwk. |
| 531 | + # This is the format some agents (e.g. Credo) send; verification must use header.kid. |
| 532 | + did_info = await wallet.create_local_did(SOV, ED25519, seed[0]) |
| 533 | + verkey_b58 = did_info.verkey |
| 534 | + kid_full = did_key(verkey_b58) |
| 535 | + |
| 536 | + payload = b"payload for header-kid test" |
| 537 | + b64_payload = bytes_to_b64(payload) |
| 538 | + # Protected header with jwk but NO kid in jwk |
| 539 | + protected = { |
| 540 | + "alg": "EdDSA", |
| 541 | + "jwk": { |
| 542 | + "kty": "OKP", |
| 543 | + "crv": "Ed25519", |
| 544 | + "x": bytes_to_b64(b58_to_bytes(verkey_b58), urlsafe=True, pad=False), |
| 545 | + }, |
| 546 | + } |
| 547 | + b64_protected = str_to_b64( |
| 548 | + json.dumps(protected, separators=(",", ":")), |
| 549 | + urlsafe=True, |
| 550 | + pad=False, |
| 551 | + ) |
| 552 | + sign_input = (b64_protected + "." + b64_payload).encode("ascii") |
| 553 | + sig_bytes = await wallet.sign_message( |
| 554 | + message=sign_input, |
| 555 | + from_verkey=verkey_b58, |
| 556 | + ) |
| 557 | + b64_sig = bytes_to_b64(sig_bytes, urlsafe=True, pad=False) |
| 558 | + |
| 559 | + data = AttachDecoratorData.deserialize( |
| 560 | + { |
| 561 | + "base64": b64_payload, |
| 562 | + "jws": { |
| 563 | + "header": {"kid": kid_full}, |
| 564 | + "protected": b64_protected, |
| 565 | + "signature": b64_sig, |
| 566 | + }, |
| 567 | + } |
| 568 | + ) |
| 569 | + assert await data.verify(wallet) |
| 570 | + assert await data.verify(wallet, signer_verkey=verkey_b58) |
| 571 | + |
| 572 | + @pytest.mark.asyncio |
| 573 | + async def test_verify_uses_kid_from_jwk_when_header_has_no_kid(self, wallet, seed): |
| 574 | + """Verify uses jwk.kid when header has no kid (fallback path).""" |
| 575 | + deco = AttachDecorator.data_base64( |
| 576 | + mapping=INDY_CRED, |
| 577 | + ident=IDENT, |
| 578 | + description=DESCRIPTION, |
| 579 | + ) |
| 580 | + did_info = await wallet.create_local_did(SOV, ED25519, seed[0]) |
| 581 | + await deco.data.sign(did_info.verkey, wallet) |
| 582 | + # Force fallback: header has no kid, so verify must use jwk.kid from protected |
| 583 | + deco.data.jws_.header = SimpleNamespace(kid=None) |
| 584 | + assert await deco.data.verify(wallet) |
| 585 | + assert await deco.data.verify(wallet, signer_verkey=did_info.verkey) |
| 586 | + |
| 587 | + @pytest.mark.asyncio |
| 588 | + async def test_verify_returns_false_when_signer_verkey_does_not_match( |
| 589 | + self, wallet, seed |
| 590 | + ): |
| 591 | + """Verify returns False when signer_verkey is not the signing key.""" |
| 592 | + deco = AttachDecorator.data_base64( |
| 593 | + mapping=INDY_CRED, |
| 594 | + ident=IDENT, |
| 595 | + description=DESCRIPTION, |
| 596 | + ) |
| 597 | + did_infos = [await wallet.create_local_did(SOV, ED25519, seed[i]) for i in [0, 1]] |
| 598 | + await deco.data.sign(did_infos[0].verkey, wallet) |
| 599 | + # Sign with key 0; require key 1 -> should fail |
| 600 | + assert not await deco.data.verify(wallet, signer_verkey=did_infos[1].verkey) |
| 601 | + # No signer_verkey constraint -> should pass |
| 602 | + assert await deco.data.verify(wallet) |
| 603 | + # Correct signer_verkey -> should pass |
| 604 | + assert await deco.data.verify(wallet, signer_verkey=did_infos[0].verkey) |
0 commit comments