Skip to content

Conversation

@aaronsteers
Copy link
Contributor

@aaronsteers aaronsteers commented Oct 21, 2025

feat: add session-scoped public/private key encryption for remote secrets (MVP)

Summary

Implements #147 - session-scoped public/private key encryption for securely handling remote secrets in the Connector Builder MCP server.

Key Features:

  • Per-session keypair generation using libsodium sealed-box (X25519 + XSalsa20-Poly1305)
  • Private keys stored in memory only, destroyed when session ends
  • Public key exposed as MCP resource (session://encryption/public-key)
  • New MCP tool: execute_stream_test_read_with_encrypted_config for testing with encrypted configs
  • Decrypt-on-use flow with best-effort buffer zeroization
  • 64 KB size limit for ciphertext
  • Feature flag: CONNECTOR_BUILDER_MCP_ENABLE_ENCRYPTION (disabled by default)
  • Comprehensive documentation in docs/ENCRYPTION.md

Implementation Details:

  • Added pynacl>=1.5.0 dependency for libsodium bindings
  • When enabled, server initializes SessionEncryptionManager on startup
  • Manager generates fresh X25519 keypair per session with unique kid
  • Users encrypt secrets client-side, then pass ciphertext to MCP tool
  • Server decrypts only when needed, never logs/persists plaintext

Review & Testing Checklist for Human

Risk Level: 🟡 YELLOW - Security-sensitive feature with limited integration testing

  • Test end-to-end with encryption enabled: Set CONNECTOR_BUILDER_MCP_ENABLE_ENCRYPTION=true and verify the full workflow:

    1. Start MCP server and confirm session keypair generation in logs
    2. Retrieve public key from session://encryption/public-key resource
    3. Encrypt a test config using the Python example from docs/ENCRYPTION.md
    4. Call execute_stream_test_read_with_encrypted_config with encrypted config
    5. Verify decryption works and stream test executes successfully
  • Review cryptographic implementation: Examine connector_builder_mcp/_encryption.py:

    • Verify correct usage of PyNaCl's SealedBox (lines 84-124)
    • Check that kid validation prevents cross-session attacks (line 96-99)
    • Confirm size limits are enforced (line 112-117)
    • Review error handling for all failure modes
  • Verify tool registration with partial function: The new tool uses functools.partial to bind encryption_manager (lines 535-542 in _connector_builder.py). Test that:

    • FastMCP correctly registers the tool and exposes it to clients
    • Type hints don't cause mypy issues (note the type: ignore comment)
    • Tool appears correctly in poe inspect output
  • Confirm no regression when encryption disabled: With feature flag OFF (default), verify:

    • Server starts normally without encryption manager
    • Existing tools work unchanged
    • No new resources appear in MCP server capabilities
    • Tests still pass (70 tests should all pass)

Notes

  • All 70 tests pass including 21 new encryption tests
  • Lint and type checks pass (4 pre-existing mypy errors remain in _connector_builder.py unrelated to this change)
  • Documentation includes Python examples for client-side encryption
  • Buffer zeroization is "best effort" due to Python memory management limitations (acknowledged in code comments)
  • Chose libsodium sealed-box over JWE for simplicity and client-side tooling availability

Requested by: AJ Steers (@aaronsteers)
Session: https://app.devin.ai/sessions/659f16ecc54d422d8adedb059dcfe47f

Summary by CodeRabbit

  • New Features

    • Added optional session-scoped encryption for secure remote secrets handling using public key cryptography.
    • Encryption resources now available to retrieve public keys and obtain usage instructions.
  • Documentation

    • Updated README with new Secure Secrets tool listing.
    • Added comprehensive encryption documentation including setup, usage flows, and security considerations.

…rets

Implements issue #147 with the following features:

- Per-session keypair generation using libsodium sealed-box (X25519 + XSalsa20-Poly1305)
- Private keys stored in memory only, destroyed when session ends
- Public key exposed as MCP resource (session://encryption/public-key)
- New tool: execute_stream_test_read_with_encrypted_config
- Decrypt-on-use flow with buffer zeroization (best effort)
- Size limit of 64 KB for ciphertext
- Feature flag: CONNECTOR_BUILDER_MCP_ENABLE_ENCRYPTION (off by default)
- Comprehensive unit tests (21 tests covering all scenarios)
- Full documentation in docs/ENCRYPTION.md

Security considerations:
- No plaintext logging or persistence
- Client-side encryption (user controls encryption process)
- Clear validation errors for malformed ciphertexts
- Session-scoped keys prevent cross-session reuse

Dependencies:
- Added pynacl>=1.5.0 for libsodium bindings

Co-Authored-By: AJ Steers <[email protected]>
@devin-ai-integration
Copy link
Contributor

Original prompt from AJ Steers
@Devin - Try to implement this: <https://github.com/airbytehq/connector-builder-mcp/issues/147> (private repo so you'll need to pull via api)
Thread URL: https://airbytehq-team.slack.com/archives/D089P0UPVT4/p1761083581932449?thread_ts=1761083581.932449

@devin-ai-integration
Copy link
Contributor

🤖 Devin AI Engineer

I'll be helping with this pull request! Here's what you should know:

✅ I will automatically:

  • Address comments on this PR. Add '(aside)' to your comment to have me ignore it.
  • Look at CI failures and help fix them

Note: I can only respond to comments from users who have write access to this repository.

⚙️ Control Options:

  • Disable automatic comment and CI monitoring

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Oct 21, 2025

📝 Walkthrough

Walkthrough

The PR introduces session-scoped encryption for remote secrets using libsodium sealed-box cryptography. It adds a new encryption module with key management, modifies the connector builder to support encrypted config decryption, integrates encryption into server initialization, adds PyNaCl dependency, includes comprehensive documentation, and provides test coverage.

Changes

Cohort / File(s) Summary
Encryption Core Implementation
connector_builder_mcp/_encryption.py
New module with SessionKeyPair and EncryptedSecret data models, SessionEncryptionManager for per-session Ed25519 keypair management with decryption support, and helper functions for feature detection and usage instructions. Includes buffer zeroization and size validation.
Tool Registration
connector_builder_mcp/_connector_builder.py
Added execute_stream_test_read_with_encrypted_config function to decrypt encrypted configs and run stream tests. Modified register_connector_builder_tools signature to accept optional encryption_manager parameter; registers encrypted tool variant and exposes encryption resources (public-key, instructions) when encryption is enabled.
Server Integration
connector_builder_mcp/server.py
Added imports for encryption utilities. Instantiates SessionEncryptionManager conditionally based on feature flag during server initialization. Passes encryption_manager to tool registration function.
Documentation & Configuration
README.md, docs/ENCRYPTION.md, pyproject.toml
README adds bullet describing Secure Secrets tool. New ENCRYPTION.md provides algorithm details, usage flows, API reference, security considerations, and implementation notes. pyproject.toml adds pynacl>=1.5.0 dependency.
Test Suite
tests/test_encryption.py
Comprehensive tests covering SessionKeyPair immutability, EncryptedSecret creation, SessionEncryptionManager lifecycle, encryption/decryption round-trips, error handling (key ID mismatch, unsupported algorithms, invalid base64, ciphertext size limits), multi-instance behavior, JSON config decryption, feature flag detection, and instructions generation.

Sequence Diagram(s)

sequenceDiagram
    actor Client
    participant MCP Server
    participant Encryption Manager
    
    Client->>MCP Server: Request encryption/public-key resource
    MCP Server->>Encryption Manager: Get public_key_info()
    Encryption Manager-->>MCP Server: Return SessionKeyPair (public key, kid)
    MCP Server-->>Client: Return public key (base64)
    
    Note over Client: Client encrypts secret locally using public key
    
    Client->>MCP Server: Call stream_test_read with encrypted config<br/>(EncryptedSecret: ciphertext, kid, algorithm)
    MCP Server->>Encryption Manager: decrypt_secret(encrypted_secret)
    Encryption Manager->>Encryption Manager: Validate kid & algorithm
    Encryption Manager->>Encryption Manager: Base64 decode ciphertext
    Encryption Manager->>Encryption Manager: SealedBox.decrypt() with private key
    Encryption Manager->>Encryption Manager: UTF-8 decode, zeroize buffers
    Encryption Manager-->>MCP Server: Return plaintext secret
    MCP Server->>MCP Server: Execute stream test with decrypted config
    MCP Server-->>Client: Return test results
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~50 minutes

The changes introduce new cryptographic functionality spanning multiple files with integration points requiring careful validation. The encryption module introduces new primitives (keypair management, decryption logic with zeroization), the connector builder adds conditional tool registration paths, and server initialization logic changes. While individual changes are relatively contained, the feature requires understanding the complete flow across modules and validating cryptographic correctness and error handling.

Suggested labels

enhancement

Pre-merge checks and finishing touches

✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title Check ✅ Passed The pull request title "feat: add session-scoped public/private key encryption for remote secrets (MVP)" is directly and clearly related to the main changes in the changeset. The title accurately captures the core objective: implementing session-scoped encryption using public/private key cryptography for handling remote secrets. The changes across multiple files—including the new encryption module (_encryption.py), updated connector builder tools, supporting documentation (docs/ENCRYPTION.md), new tests, and dependency additions—all align with and support this stated goal. The title is specific and descriptive, avoiding vague terminology, and uses the conventional commit format appropriately. The "(MVP)" qualifier adds helpful context about the scope and maturity level of the implementation.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch devin/1761083721-session-scoped-encryption

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Nitpick comments (16)
pyproject.toml (1)

14-15: Bound PyNaCl to a major range for stability.

To match other deps (e.g., pydantic, airbyte-cdk) and reduce surprise from future majors, bound PyNaCl to <2.0.

-    "pynacl>=1.5.0",
+    "pynacl>=1.5.0,<2.0",
connector_builder_mcp/_encryption.py (5)

53-61: Lower keypair log to DEBUG.

KID isn’t secret, but INFO is noisy; make it DEBUG to keep default logs cleaner.

-        logger.info(f"Generated new session keypair with kid: {self._kid}")
+        logger.debug("Generated new session keypair with kid=%s", self._kid)

22-43: Constrain algorithm via typing Literal and basic format validation.

Prevent accidental algorithm drift and catch obvious bad inputs early.

-from pydantic import BaseModel, Field
+from typing import Literal
+from pydantic import BaseModel, Field, field_validator
@@
 class SessionKeyPair(BaseModel):
@@
-    algorithm: str = Field(default="libsodium-sealed-box", description="Encryption algorithm")
+    algorithm: Literal["libsodium-sealed-box"] = "libsodium-sealed-box"
     encoding: str = Field(default="base64", description="Encoding format for ciphertext")
@@
 class EncryptedSecret(BaseModel):
@@
-    algorithm: str = Field(default="libsodium-sealed-box", description="Encryption algorithm used")
+    algorithm: Literal["libsodium-sealed-box"] = "libsodium-sealed-box"
+
+    @field_validator("ciphertext")
+    @classmethod
+    def _non_empty(cls, v: str) -> str:
+        if not v:
+            raise ValueError("ciphertext must be non-empty")
+        return v

126-146: Use libsodium’s sodium_memzero when possible.

Zeroizing a bytearray via Python loops is slower; sodium_memzero is purpose-built.

-from nacl.public import PrivateKey, SealedBox
+from nacl.public import PrivateKey, SealedBox
+from nacl.bindings import sodium_memzero
@@
     def _zeroize_buffer(buffer: bytearray | bytes) -> None:
@@
-        if isinstance(buffer, bytearray):
-            for i in range(len(buffer)):
-                buffer[i] = 0
-        elif isinstance(buffer, bytes):
-            try:
-                buffer_array = bytearray(buffer)
-                for i in range(len(buffer_array)):
-                    buffer_array[i] = 0
-            except Exception:
-                pass
+        if isinstance(buffer, bytearray):
+            try:
+                sodium_memzero(buffer)
+            finally:
+                return
+        if isinstance(buffer, bytes):
+            # bytes are immutable; best effort: copy to bytearray, then zeroize
+            try:
+                ba = bytearray(buffer)
+                sodium_memzero(ba)
+            except Exception:
+                pass

147-155: Avoid relying on del; add an explicit close() and call it on shutdown.

del isn’t guaranteed at interpreter exit or with cycles. Provide a close() and have the server call it on session end.

     def __del__(self) -> None:
-        """Clean up private key on deletion."""
-        if hasattr(self, "_private_key"):
-            try:
-                key_bytes = bytes(self._private_key)
-                self._zeroize_buffer(bytearray(key_bytes))
-            except Exception:
-                pass
+        """Best-effort cleanup if GC runs."""
+        try:
+            self.close()
+        except Exception:
+            pass
+
+    def close(self) -> None:
+        """Explicitly wipe and drop references."""
+        if getattr(self, "_private_key", None) is not None:
+            try:
+                key_bytes = bytes(self._private_key)
+                self._zeroize_buffer(bytearray(key_bytes))
+            finally:
+                self._private_key = None  # drop reference
+        self._sealed_box = None
+        self._public_key = None

I can open a follow-up PR wiring SessionEncryptionManager.close() into the server’s session shutdown hook. Do you want that?


176-215: Align instructions with actual resource URI and reduce vague tool guidance.

Reference the concrete session:// resource and steer users toward libsodium.js/TweetNaCl rather than “any online tool.”

-1. **Get the public key**: Use the `get_session_public_key` resource to retrieve the session's public key.
+1. **Get the public key**: Read `session://encryption/public-key` to retrieve the session's public key.
@@
-   **Option A: Online Tool (Client-Side Encryption)**
-   - Visit a trusted client-side encryption tool that supports libsodium sealed-box
+   **Option A: In‑browser (client‑side) using libsodium.js/TweetNaCl**
+   - Use a client‑side library (e.g., libsodium.js or TweetNaCl) that implements sealed boxes
tests/test_encryption.py (3)

149-161: Add a test for strict base64 rejection.

Once decode uses validate=True, include a case with whitespace/invalid chars to ensure rejection (beyond the existing '!!!' case).

@@
     def test_decrypt_with_invalid_base64(self) -> None:
@@
         with pytest.raises(ValueError, match="Invalid base64 ciphertext"):
             manager.decrypt_secret(encrypted_secret)
+
+    def test_decrypt_with_non_base64_chars_rejected(self) -> None:
+        manager = SessionEncryptionManager()
+        bad = "YWJjMTIz$$$"  # invalid chars
+        encrypted_secret = EncryptedSecret(ciphertext=bad, kid=manager.kid, algorithm="libsodium-sealed-box")
+        with pytest.raises(ValueError, match="Invalid base64 ciphertext"):
+            manager.decrypt_secret(encrypted_secret)

176-191: Add a pre-decode length guard test.

Cover the base64 length pre-check to avoid decoding huge payloads.

@@
     def test_decrypt_with_oversized_ciphertext(self) -> None:
@@
         with pytest.raises(ValueError, match="Ciphertext too large"):
             manager.decrypt_secret(encrypted_secret)
+
+    def test_oversized_base64_rejected_before_decode(self) -> None:
+        manager = SessionEncryptionManager()
+        # Make a base64 string longer than MAX_CIPHERTEXT_B64_LEN
+        big_b64 = "A" * (70 * 1024)  # adjust if constant changes
+        encrypted_secret = EncryptedSecret(ciphertext=big_b64, kid=manager.kid, algorithm="libsodium-sealed-box")
+        with pytest.raises(ValueError):
+            manager.decrypt_secret(encrypted_secret)

274-282: Assert instructions reference the concrete session resource.

Stabilize docs by checking for session://encryption/public-key in the instructions.

         instructions = get_encryption_instructions()
@@
-        assert "sealed-box" in instructions.lower()
+        assert "sealed-box" in instructions.lower()
+        assert "session://encryption/public-key" in instructions
README.md (1)

31-32: Mention the feature flag toggle for quick discovery.

Add a parenthetical about the env var to enable it.

-- **Secure Secrets**: Session-scoped encryption for remote secrets (optional, see [ENCRYPTION.md](docs/ENCRYPTION.md))
+- **Secure Secrets**: Session-scoped encryption for remote secrets (optional; disabled by default — enable via `CONNECTOR_BUILDER_MCP_ENABLE_ENCRYPTION`, see [ENCRYPTION.md](docs/ENCRYPTION.md))
docs/ENCRYPTION.md (3)

19-24: Clarify client-side support wording.

WebCrypto doesn’t natively expose sealed boxes; recommend libsodium.js/TweetNaCl instead.

-The feature uses **libsodium sealed-box** (X25519 + XSalsa20-Poly1305) for encryption. This algorithm was chosen for:
+The feature uses **libsodium sealed-box** (X25519 + XSalsa20-Poly1305). This approach was chosen for:
 - Simplicity and ease of use
-- Wide client-side support (WebCrypto, command-line tools)
+- Wide client-side support via libsodium.js/TweetNaCl (browser) and PyNaCl/CLI tools
 - Strong security guarantees
 - Copy-paste friendliness for users

114-123: Strengthen third‑party tool caution.

Explicitly require fully client-side tools implementing libsodium sealed boxes.

-You can use any client-side encryption tool that supports libsodium sealed-box:
+Use only client-side tools/libraries that implement libsodium sealed-box (no server round-trips):

236-237: Typo: “webapp” → “web app”.

Minor grammar fix.

-- Hosted encryption webapp for easier client-side encryption
+- Hosted encryption web app for easier client-side encryption
connector_builder_mcp/server.py (2)

23-27: Confirm session scoping; manager lifetime is currently process-wide.

SessionEncryptionManager is created at import time and persists for the process. If FastMCP can host multiple client sessions per process, keys won’t rotate per session. Recommend instantiating per connection/session (or at least within main() so the key rotates per run) and attaching cleanup to app shutdown hooks if available.

Example minimal move into main() to reduce lifetime to a single run:

@@
-app: FastMCP = FastMCP("connector-builder-mcp")
-
-encryption_manager: SessionEncryptionManager | None = None
-if is_encryption_enabled():
-    encryption_manager = SessionEncryptionManager()
-
-register_connector_builder_tools(app, encryption_manager)
+app: FastMCP = FastMCP("connector-builder-mcp")
@@
 def main() -> None:
     """Main entry point for the Builder MCP server."""
     print("Starting Builder MCP server.", file=sys.stderr)
+    encryption_manager: SessionEncryptionManager | None = None
+    if is_encryption_enabled():
+        encryption_manager = SessionEncryptionManager()
+    register_connector_builder_tools(app, encryption_manager)
     try:
         asyncio.run(app.run_stdio_async())

If FastMCP guarantees “one process = one MCP session,” then current approach is fine; otherwise, consider per-connection initialization via framework lifecycle hooks.


13-16: Optional: avoid importing crypto stack when encryption is disabled.

To minimize startup cost and make the feature truly flag‑gated, import _encryption only when the flag is on (or move the is_encryption_enabled() helper to a lightweight module).

For example:

-from connector_builder_mcp._encryption import (
-    SessionEncryptionManager,
-    is_encryption_enabled,
-)
+import os
+def is_encryption_enabled() -> bool:
+    return os.environ.get("CONNECTOR_BUILDER_MCP_ENABLE_ENCRYPTION", "").lower() in ("true","1","yes")

Then import SessionEncryptionManager inside the enabled branch.

connector_builder_mcp/_connector_builder.py (1)

450-518: Harden decryption path: validate JSON shape and drop plaintext sooner.

  • Ensure decrypted payload is a JSON object.
  • Optionally delete the plaintext string after parsing to reduce lifetime.
  • Consider returning a generic message to callers and log details only.
 def execute_stream_test_read_with_encrypted_config(
@@
-    try:
+    try:
         from connector_builder_mcp._encryption import EncryptedSecret
@@
-        encrypted_secret = EncryptedSecret(**encrypted_config)
-        config_json = encryption_manager.decrypt_secret(encrypted_secret)
+        encrypted_secret = EncryptedSecret(**encrypted_config)
+        config_json = encryption_manager.decrypt_secret(encrypted_secret)
@@
-        import json
+        import json
@@
-        config = json.loads(config_json)
+        config = json.loads(config_json)
+        # Best-effort: drop plaintext string reference ASAP
+        del config_json
+        if not isinstance(config, dict):
+            raise ValueError("Decrypted config must be a JSON object.")
@@
-    except ValueError as e:
-        logger.error(f"Error decrypting config: {e}")
-        return StreamTestResult(
-            success=False, message=f"Decryption error: {str(e)}", errors=[str(e)]
-        )
+    except ValueError as e:
+        # Log details, return generic message to the caller
+        logger.error(f"Error decrypting config: {e}")
+        return StreamTestResult(
+            success=False,
+            message="Decryption error",
+            errors=["Invalid encrypted config or key mismatch"],
+        )

Optional: change the parameter type to EncryptedSecret for automatic validation if that fits FastMCP’s schema generation.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between cbcdc20 and ffe24a6.

⛔ Files ignored due to path filters (1)
  • uv.lock is excluded by !**/*.lock
📒 Files selected for processing (7)
  • README.md (1 hunks)
  • connector_builder_mcp/_connector_builder.py (3 hunks)
  • connector_builder_mcp/_encryption.py (1 hunks)
  • connector_builder_mcp/server.py (1 hunks)
  • docs/ENCRYPTION.md (1 hunks)
  • pyproject.toml (1 hunks)
  • tests/test_encryption.py (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (3)
tests/test_encryption.py (1)
connector_builder_mcp/_encryption.py (8)
  • EncryptedSecret (37-42)
  • SessionEncryptionManager (45-154)
  • SessionKeyPair (23-34)
  • get_encryption_instructions (170-221)
  • is_encryption_enabled (157-167)
  • kid (63-65)
  • public_key_info (68-82)
  • decrypt_secret (84-124)
connector_builder_mcp/_connector_builder.py (1)
connector_builder_mcp/_encryption.py (5)
  • SessionEncryptionManager (45-154)
  • EncryptedSecret (37-42)
  • decrypt_secret (84-124)
  • public_key_info (68-82)
  • get_encryption_instructions (170-221)
connector_builder_mcp/server.py (3)
connector_builder_mcp/_encryption.py (2)
  • SessionEncryptionManager (45-154)
  • is_encryption_enabled (157-167)
connector_builder_mcp/_util.py (1)
  • initialize_logging (8-14)
connector_builder_mcp/_connector_builder.py (1)
  • register_connector_builder_tools (520-565)
🪛 Gitleaks (8.28.0)
tests/test_encryption.py

[high] 96-96: Detected a Generic API Key, potentially exposing access to various services and sensitive operations.

(generic-api-key)

🪛 LanguageTool
docs/ENCRYPTION.md

[grammar] ~236-~236: Ensure spelling is correct
Context: ...ents (not in MVP): - Hosted encryption webapp for easier client-side encryption - Sup...

(QB_NEW_EN_ORTHOGRAPHY_ERROR_IDS_1)

🔇 Additional comments (2)
connector_builder_mcp/_connector_builder.py (2)

8-8: LGTM on TYPE_CHECKING import.

Runtime import avoided; keeps boundaries clean.

Also applies to: 30-32


520-565: Tool/resource wiring looks solid and feature-flagged.

Partial with injected manager, stable tool name/doc, and session resources are correctly gated.

Please confirm these resources are absent when the flag is off (tests likely already cover this).

Comment on lines +20 to +21
MAX_CIPHERTEXT_SIZE = 64 * 1024

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Predefine a safe base64 length cap to avoid large allocations.

Add a derived MAX_CIPHERTEXT_B64_LEN to reject oversized base64 input before decoding.

 MAX_CIPHERTEXT_SIZE = 64 * 1024
+
+# Reject oversized base64 before decoding (rough 4/3 expansion + padding safety).
+# 16 extra bytes accounts for padding/newlines variance.
+MAX_CIPHERTEXT_B64_LEN = int((MAX_CIPHERTEXT_SIZE * 4) / 3) + 16
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
MAX_CIPHERTEXT_SIZE = 64 * 1024
MAX_CIPHERTEXT_SIZE = 64 * 1024
# Reject oversized base64 before decoding (rough 4/3 expansion + padding safety).
# 16 extra bytes accounts for padding/newlines variance.
MAX_CIPHERTEXT_B64_LEN = int((MAX_CIPHERTEXT_SIZE * 4) / 3) + 16
🤖 Prompt for AI Agents
In connector_builder_mcp/_encryption.py around lines 20-21, define a derived
constant MAX_CIPHERTEXT_B64_LEN to cap base64-encoded ciphertext sizes (e.g.
compute as (MAX_CIPHERTEXT_SIZE + 2) // 3 * 4 to account for base64 expansion
and padding) and then validate incoming base64 inputs against
MAX_CIPHERTEXT_B64_LEN before attempting to decode; if the input exceeds the
cap, raise a clear ValueError to reject oversized inputs and avoid large
allocations.

Comment on lines +84 to +125
def decrypt_secret(self, encrypted_secret: EncryptedSecret) -> str:
"""Decrypt an encrypted secret.

Args:
encrypted_secret: The encrypted secret payload

Returns:
Decrypted plaintext secret

Raises:
ValueError: If kid mismatch, invalid ciphertext, or decryption fails
"""
if encrypted_secret.kid != self._kid:
raise ValueError(f"Key ID mismatch: expected {self._kid}, got {encrypted_secret.kid}")

if encrypted_secret.algorithm != "libsodium-sealed-box":
raise ValueError(
f"Unsupported algorithm: {encrypted_secret.algorithm}. "
"Only 'libsodium-sealed-box' is supported."
)

try:
ciphertext_bytes = base64.b64decode(encrypted_secret.ciphertext)
except Exception as e:
raise ValueError(f"Invalid base64 ciphertext: {e}") from e

if len(ciphertext_bytes) > MAX_CIPHERTEXT_SIZE:
raise ValueError(
f"Ciphertext too large: {len(ciphertext_bytes)} bytes "
f"(max {MAX_CIPHERTEXT_SIZE} bytes)"
)

try:
plaintext_bytes = self._sealed_box.decrypt(ciphertext_bytes)
plaintext = plaintext_bytes.decode("utf-8")

self._zeroize_buffer(plaintext_bytes)

return plaintext
except Exception as e:
raise ValueError(f"Decryption failed: {e}") from e

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Harden base64 handling and pre-limit input to prevent memory DoS.

  • Pre-check base64 string length.
  • Use base64.b64decode(..., validate=True) to reject non-base64 bytes.
  • Keep existing decoded-size check.
     def decrypt_secret(self, encrypted_secret: EncryptedSecret) -> str:
@@
-        try:
-            ciphertext_bytes = base64.b64decode(encrypted_secret.ciphertext)
+        # Reject absurdly large base64 inputs before allocating decoded buffer
+        if len(encrypted_secret.ciphertext) > MAX_CIPHERTEXT_B64_LEN:
+            raise ValueError(
+                f"Ciphertext too large (base64): {len(encrypted_secret.ciphertext)} bytes "
+                f"(max {MAX_CIPHERTEXT_B64_LEN} bytes)"
+            )
+
+        try:
+            ciphertext_bytes = base64.b64decode(encrypted_secret.ciphertext, validate=True)
         except Exception as e:
             raise ValueError(f"Invalid base64 ciphertext: {e}") from e

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In connector_builder_mcp/_encryption.py around lines 84-125, add a pre-check on
the base64 ciphertext string length before decoding (derive a safe max base64
length from MAX_CIPHERTEXT_SIZE to avoid huge memory allocation), then call
base64.b64decode(encrypted_secret.ciphertext, validate=True) so non-base64 input
is rejected, handle the resulting binascii.Error/ValueError to raise an explicit
ValueError for invalid base64, and retain the existing decoded-size check (and
subsequent decryption/zeroization) to prevent memory DoS and malformed input.

Comment on lines +96 to +113
secret_plaintext = "my-secret-api-key-12345"

public_key_bytes = base64.b64decode(public_key_info.public_key_b64)
public_key = PublicKey(public_key_bytes)
sealed_box = SealedBox(public_key)

ciphertext_bytes = sealed_box.encrypt(secret_plaintext.encode("utf-8"))
ciphertext_b64 = base64.b64encode(ciphertext_bytes).decode("ascii")

encrypted_secret = EncryptedSecret(
ciphertext=ciphertext_b64,
kid=public_key_info.kid,
algorithm="libsodium-sealed-box",
)

decrypted = manager.decrypt_secret(encrypted_secret)
assert decrypted == secret_plaintext

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Avoid secrets scanner false positives in fixtures.

Change the example to a clearly fake token and optionally allowlist the line to keep scanners quiet.

-        secret_plaintext = "my-secret-api-key-12345"
+        # Example token; not a real secret. gitleaks:allow
+        secret_plaintext = "example-not-a-real-token"
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
secret_plaintext = "my-secret-api-key-12345"
public_key_bytes = base64.b64decode(public_key_info.public_key_b64)
public_key = PublicKey(public_key_bytes)
sealed_box = SealedBox(public_key)
ciphertext_bytes = sealed_box.encrypt(secret_plaintext.encode("utf-8"))
ciphertext_b64 = base64.b64encode(ciphertext_bytes).decode("ascii")
encrypted_secret = EncryptedSecret(
ciphertext=ciphertext_b64,
kid=public_key_info.kid,
algorithm="libsodium-sealed-box",
)
decrypted = manager.decrypt_secret(encrypted_secret)
assert decrypted == secret_plaintext
# Example token; not a real secret. gitleaks:allow
secret_plaintext = "example-not-a-real-token"
public_key_bytes = base64.b64decode(public_key_info.public_key_b64)
public_key = PublicKey(public_key_bytes)
sealed_box = SealedBox(public_key)
ciphertext_bytes = sealed_box.encrypt(secret_plaintext.encode("utf-8"))
ciphertext_b64 = base64.b64encode(ciphertext_bytes).decode("ascii")
encrypted_secret = EncryptedSecret(
ciphertext=ciphertext_b64,
kid=public_key_info.kid,
algorithm="libsodium-sealed-box",
)
decrypted = manager.decrypt_secret(encrypted_secret)
assert decrypted == secret_plaintext
🧰 Tools
🪛 Gitleaks (8.28.0)

[high] 96-96: Detected a Generic API Key, potentially exposing access to various services and sensitive operations.

(generic-api-key)

🤖 Prompt for AI Agents
In tests/test_encryption.py around lines 96 to 113, the test uses a
realistic-looking secret "my-secret-api-key-12345" which can trigger secrets
scanners; replace it with an obviously fake token (for example
"sk_test_FAKE_API_KEY_12345" or "fake-secret-0000") and optionally add a
one-line allowlist/comment recognized by our scanners (e.g. "# test-fixture:
allowlist-secrets") on the same line to suppress false positives; ensure the
rest of the test uses this fake value unchanged so assertions still pass.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants