|
22 | 22 |
|
23 | 23 | from parsec._parsec import ( |
24 | 24 | AccountAuthMethodID, |
| 25 | + CryptoError, |
25 | 26 | DateTime, |
26 | 27 | EmailAddress, |
27 | 28 | HumanHandle, |
28 | 29 | OrganizationID, |
29 | 30 | ParsecAddr, |
30 | 31 | SecretKey, |
| 32 | + SigningKey, |
31 | 33 | ValidationCode, |
32 | 34 | authenticated_cmds, |
33 | 35 | ) |
@@ -121,6 +123,7 @@ def __init__( |
121 | 123 | self.template_per_org: dict[OrganizationID, TestbedTemplateContent] = {} |
122 | 124 | # We expose some routes to simulate a OpenBao server, hence this field |
123 | 125 | self.openbao_secrets: defaultdict[tuple[str, str], list[Any]] = defaultdict(list) |
| 126 | + self.openbao_signing_keys: dict[str, SigningKey] = {} |
124 | 127 | # If the testbed server run with the memory backend, we can copy the datamodel |
125 | 128 | # object to create snapshots and overwrite the initial one to rollback. |
126 | 129 | self._data_snapshots: list[tuple[DateTime, MemoryDatamodel]] = [] |
@@ -418,7 +421,7 @@ async def test_openbao_auth_token_lookup_self(request: Request): |
418 | 421 | } |
419 | 422 |
|
420 | 423 |
|
421 | | -@testbed_router.get("/testbed/mock/openbao/v1/secret/parsec-keys/data/{secret_path:path}") |
| 424 | +@testbed_router.get("/testbed/mock/openbao/v1/secret/data/{secret_path:path}") |
422 | 425 | async def test_openbao_read_secret(secret_path: str, request: Request): |
423 | 426 | testbed: TestbedBackend = request.app.state.testbed |
424 | 427 |
|
@@ -446,7 +449,7 @@ async def test_openbao_read_secret(secret_path: str, request: Request): |
446 | 449 | } |
447 | 450 |
|
448 | 451 |
|
449 | | -@testbed_router.post("/testbed/mock/openbao/v1/secret/parsec-keys/data/{secret_path:path}") |
| 452 | +@testbed_router.post("/testbed/mock/openbao/v1/secret/data/{secret_path:path}") |
450 | 453 | async def test_openbao_create_secret(secret_path: str, request: Request): |
451 | 454 | testbed: TestbedBackend = request.app.state.testbed |
452 | 455 |
|
@@ -519,6 +522,225 @@ async def test_openbao_hexagone_oidc_auth_url(request: Request, auth_provider: s |
519 | 522 | } |
520 | 523 |
|
521 | 524 |
|
| 525 | +@testbed_router.post("/testbed/mock/openbao/v1/transit/keys/{key_name}") |
| 526 | +async def test_openbao_create_key(request: Request, key_name: str): |
| 527 | + testbed: TestbedBackend = request.app.state.testbed |
| 528 | + |
| 529 | + entity_id = _openbao_entity_id_from_vault_token_header(request) |
| 530 | + if not entity_id: |
| 531 | + return Response(status_code=403) |
| 532 | + |
| 533 | + if key_name != f"user-{entity_id}": |
| 534 | + return Response(status_code=403) |
| 535 | + |
| 536 | + # See https://openbao.org/api-docs/secret/transit/#create-key |
| 537 | + |
| 538 | + try: |
| 539 | + await request.json() |
| 540 | + except ValueError: |
| 541 | + return Response("Bad JSON body", status_code=400) |
| 542 | + |
| 543 | + try: |
| 544 | + key = testbed.openbao_signing_keys[entity_id] |
| 545 | + except KeyError: |
| 546 | + key = SigningKey.generate() |
| 547 | + testbed.openbao_signing_keys[entity_id] = key |
| 548 | + |
| 549 | + return { |
| 550 | + "request_id": str(uuid.uuid4()), |
| 551 | + "lease_id": "", |
| 552 | + "renewable": False, |
| 553 | + "lease_duration": 0, |
| 554 | + "data": { |
| 555 | + "allow_plaintext_backup": False, |
| 556 | + "auto_rotate_period": 0, |
| 557 | + "deletion_allowed": False, |
| 558 | + "derived": False, |
| 559 | + "exportable": False, |
| 560 | + "imported_key": False, |
| 561 | + "keys": { |
| 562 | + "1": { |
| 563 | + "certificate_chain": "", |
| 564 | + "creation_time": "2025-12-01T15:50:33.606080602Z", |
| 565 | + "name": "ed25519", |
| 566 | + "public_key": b64encode(key.verify_key.encode()).decode(), |
| 567 | + } |
| 568 | + }, |
| 569 | + "latest_version": 1, |
| 570 | + "min_available_version": 0, |
| 571 | + "min_decryption_version": 1, |
| 572 | + "min_encryption_version": 0, |
| 573 | + "name": key_name, |
| 574 | + "soft_deleted": False, |
| 575 | + "supports_decryption": False, |
| 576 | + "supports_derivation": True, |
| 577 | + "supports_encryption": False, |
| 578 | + "supports_signing": True, |
| 579 | + "type": "ed25519", |
| 580 | + }, |
| 581 | + "wrap_info": None, |
| 582 | + "warnings": None, |
| 583 | + "auth": None, |
| 584 | + } |
| 585 | + |
| 586 | + |
| 587 | +@testbed_router.post("/testbed/mock/openbao/v1/transit/sign/{key_name}") |
| 588 | +async def test_openbao_sign(request: Request, key_name: str): |
| 589 | + testbed: TestbedBackend = request.app.state.testbed |
| 590 | + |
| 591 | + entity_id = _openbao_entity_id_from_vault_token_header(request) |
| 592 | + if not entity_id: |
| 593 | + return Response(status_code=403) |
| 594 | + |
| 595 | + if key_name != f"user-{entity_id}": |
| 596 | + return Response(status_code=403) |
| 597 | + |
| 598 | + # See https://openbao.org/api-docs/secret/transit/#sign-data |
| 599 | + |
| 600 | + try: |
| 601 | + req = await request.json() |
| 602 | + except ValueError: |
| 603 | + return Response("Bad JSON body", status_code=400) |
| 604 | + |
| 605 | + input_b64 = req.get("input") |
| 606 | + if not isinstance(input_b64, str): |
| 607 | + return Response("Bad/missing field `input`", status_code=400) |
| 608 | + try: |
| 609 | + input_raw = b64decode(input_b64) |
| 610 | + except ValueError: |
| 611 | + return Response("Bad/missing field `input`", status_code=400) |
| 612 | + |
| 613 | + try: |
| 614 | + key = testbed.openbao_signing_keys[entity_id] |
| 615 | + except KeyError: |
| 616 | + return JSONResponse(status_code=400, content={"errors": ["signing key not found"]}) |
| 617 | + |
| 618 | + signature_raw = key.sign_only_signature(input_raw) |
| 619 | + signature = f"vault:v1:{b64encode(signature_raw).decode()}" |
| 620 | + return { |
| 621 | + "request_id": str(uuid.uuid4()), |
| 622 | + "lease_id": "", |
| 623 | + "renewable": False, |
| 624 | + "lease_duration": 0, |
| 625 | + "data": { |
| 626 | + "key_version": 1, |
| 627 | + "signature": signature, |
| 628 | + }, |
| 629 | + "wrap_info": None, |
| 630 | + "warnings": None, |
| 631 | + "auth": None, |
| 632 | + } |
| 633 | + |
| 634 | + |
| 635 | +@testbed_router.post("/testbed/mock/openbao/v1/transit/verify/{key_name}") |
| 636 | +async def test_openbao_verify(request: Request, key_name: str): |
| 637 | + testbed: TestbedBackend = request.app.state.testbed |
| 638 | + |
| 639 | + entity_id = _openbao_entity_id_from_vault_token_header(request) |
| 640 | + if not entity_id: |
| 641 | + return Response(status_code=403) |
| 642 | + |
| 643 | + author_entity_id = key_name.removeprefix("user-") |
| 644 | + |
| 645 | + # See https://openbao.org/api-docs/secret/transit/#verify-signed-data |
| 646 | + |
| 647 | + try: |
| 648 | + req = await request.json() |
| 649 | + except ValueError: |
| 650 | + return Response("Bad JSON body", status_code=400) |
| 651 | + |
| 652 | + input_b64 = req.get("input") |
| 653 | + if not isinstance(input_b64, str): |
| 654 | + return Response("Bad/missing field `input`", status_code=400) |
| 655 | + try: |
| 656 | + input_raw = b64decode(input_b64) |
| 657 | + except ValueError: |
| 658 | + return Response("Bad/missing field `input`", status_code=400) |
| 659 | + |
| 660 | + signature = req.get("signature") |
| 661 | + if not isinstance(signature, str) or not signature.startswith("vault:v1:"): |
| 662 | + return Response("Bad/missing field `signature`", status_code=400) |
| 663 | + try: |
| 664 | + signature_raw = b64decode(signature.removeprefix("vault:v1:")) |
| 665 | + except ValueError: |
| 666 | + return Response("Bad/missing field `signature`", status_code=400) |
| 667 | + |
| 668 | + try: |
| 669 | + key = testbed.openbao_signing_keys[author_entity_id] |
| 670 | + except KeyError: |
| 671 | + return JSONResponse( |
| 672 | + status_code=400, content={"errors": ["signature verification key not found"]} |
| 673 | + ) |
| 674 | + |
| 675 | + try: |
| 676 | + key.verify_key.verify_with_signature(signature_raw, input_raw) |
| 677 | + is_valid = True |
| 678 | + except CryptoError: |
| 679 | + is_valid = False |
| 680 | + |
| 681 | + return { |
| 682 | + "request_id": str(uuid.uuid4()), |
| 683 | + "lease_id": "", |
| 684 | + "renewable": False, |
| 685 | + "lease_duration": 0, |
| 686 | + "data": {"valid": is_valid}, |
| 687 | + "wrap_info": None, |
| 688 | + "warnings": None, |
| 689 | + "auth": None, |
| 690 | + } |
| 691 | + |
| 692 | + |
| 693 | +@testbed_router.get("/testbed/mock/openbao/v1/identity/entity/id/{other_entity_id}") |
| 694 | +async def test_openbao_get_entity_info(request: Request, other_entity_id: str): |
| 695 | + self_entity_id = _openbao_entity_id_from_vault_token_header(request) |
| 696 | + if not self_entity_id: |
| 697 | + return Response(status_code=403) |
| 698 | + |
| 699 | + # See https://openbao.org/api-docs/secret/transit/#verify-signed-data |
| 700 | + |
| 701 | + email = f"{other_entity_id}@example.invalid" |
| 702 | + |
| 703 | + return { |
| 704 | + "request_id": str(uuid.uuid4()), |
| 705 | + "lease_id": "", |
| 706 | + "renewable": False, |
| 707 | + "lease_duration": 0, |
| 708 | + "data": { |
| 709 | + "aliases": [ |
| 710 | + { |
| 711 | + "canonical_id": "217b7a03-b4d0-aff6-eaa8-4e1aa0573193", |
| 712 | + "creation_time": "2025-12-01T15:09:53Z", |
| 713 | + "custom_metadata": None, |
| 714 | + "id": "d42286c0-a41d-bf0f-7dab-c9c27a0f0a58", |
| 715 | + "last_update_time": "2025-12-01T15:09:53Z", |
| 716 | + "local": False, |
| 717 | + "merged_from_canonical_ids": None, |
| 718 | + "metadata": {"role": "default"}, |
| 719 | + "mount_accessor": "auth_oidc_e99f8297", |
| 720 | + "mount_path": "auth/oidc/", |
| 721 | + "mount_type": "oidc", |
| 722 | + "name": email, |
| 723 | + } |
| 724 | + ], |
| 725 | + "creation_time": "2025-12-01T15:09:53Z", |
| 726 | + "direct_group_ids": [], |
| 727 | + "disabled": False, |
| 728 | + "group_ids": [], |
| 729 | + "id": "217b7a03-b4d0-aff6-eaa8-4e1aa0573193", |
| 730 | + "inherited_group_ids": [], |
| 731 | + "last_update_time": "2025-12-01T15:09:53Z", |
| 732 | + "merged_entity_ids": None, |
| 733 | + "metadata": None, |
| 734 | + "name": "entity_46f449cb.root", |
| 735 | + "namespace_id": "root", |
| 736 | + "policies": [], |
| 737 | + }, |
| 738 | + "wrap_info": None, |
| 739 | + "warnings": None, |
| 740 | + "auth": None, |
| 741 | + } |
| 742 | + |
| 743 | + |
522 | 744 | @testbed_router.get("/testbed/mock/sso/authorize") |
523 | 745 | async def test_openbao_hexagone_auth_url(request: Request): |
524 | 746 | testbed: TestbedBackend = request.app.state.testbed |
@@ -763,7 +985,7 @@ async def testbed_backend_factory( |
763 | 985 | organization_spontaneous_bootstrap=True, |
764 | 986 | openbao_config=OpenBaoConfig( |
765 | 987 | server_url=server_addr.to_http_url("/testbed/mock/openbao"), |
766 | | - secret_mount_path="secret/parsec-keys", |
| 988 | + secret_mount_path="secret", |
767 | 989 | auths=[ |
768 | 990 | OpenBaoAuthConfig( |
769 | 991 | id=OpenBaoAuthType.HEXAGONE, |
|
0 commit comments