Skip to content

CMEK: Add encryption support for event and schema store data at rest#3955

Draft
tenfyzhong wants to merge 22 commits intopingcap:masterfrom
tenfyzhong:feat-cmek
Draft

CMEK: Add encryption support for event and schema store data at rest#3955
tenfyzhong wants to merge 22 commits intopingcap:masterfrom
tenfyzhong:feat-cmek

Conversation

@tenfyzhong
Copy link
Collaborator

@tenfyzhong tenfyzhong commented Jan 8, 2026

What problem does this PR solve?

Issue Number: close #3943

What is changed and how it works?

This PR introduces CMEK (Customer-Managed Encryption Keys) encryption at rest for TiCDC's EventStore and SchemaStore components. The primary goal is to enhance data security by encrypting persisted data on disk using encryption keys managed by customers through their preferred KMS (Key Management Service) providers.

Key changes include:

  • Added encryption support for EventStore: Data written to pebble DB is now encrypted before storage, and decrypted during reads. The encryption uses a layered key model where data keys encrypt the actual data, and master keys (managed by KMS) encrypt the data keys.
  • Added encryption support for SchemaStore: DDL events and schema information stored in the schema store are now encrypted using the same encryption framework.
  • Implemented encryption framework: Created a comprehensive encryption package with support for multiple algorithms (AES-256-CTR, AES-256-GCM), KMS integration, and key management.
  • Graceful degradation: The system can gracefully degrade to unencrypted mode when encryption is disabled or when encryption operations fail (configurable).
  • Backward compatibility: The implementation maintains backward compatibility with existing unencrypted data through automatic detection of encryption headers.

The encryption works by:

  1. Checking if encryption is enabled for a keyspace via encryption metadata stored in TiKV
  2. Using a current data key to encrypt data, with the data key itself encrypted by a master key
  3. Storing encrypted data with a header containing version and data key ID information
  4. Decrypting data on read by extracting the data key ID from the header and retrieving the corresponding data key

Check List

Tests

  • Unit test
  • Integration test
  • Manual test (add detailed scripts or steps below)
  • No code

Questions

Will it cause performance regression or break compatibility?
  • Performance: There will be some performance overhead due to encryption/decryption operations, but this is necessary for security. The impact should be minimal with efficient cipher implementations.
  • Compatibility: The changes maintain backward compatibility. Existing unencrypted data will continue to work, and new data can be written encrypted when encryption is enabled. The system automatically detects whether data is encrypted based on headers.
Do you need to update user documentation, design documentation or monitoring documentation?
  • User documentation: Yes, documentation should be updated to explain how to configure CMEK encryption for TiCDC.
  • Design documentation: The encryption architecture and key management should be documented.
  • Monitoring documentation: New metrics for encryption operations (success/failure rates, latency) should be documented.

Release note

Added support for CMEK (Customer-Managed Encryption Keys) encryption at rest for TiCDC EventStore and SchemaStore. This feature allows users to encrypt persisted data on disk using keys managed through AWS KMS, Azure Key Vault, or GCP KMS. The encryption can be enabled via configuration and provides graceful degradation on encryption errors.

Summary by CodeRabbit

  • New Features

    • Added encryption at rest for databases using customer-managed encryption keys (CMEK)
    • Integrated AWS KMS and Google Cloud KMS for key management and decryption
    • Added configuration options to enable encryption and manage KMS settings
    • Support for keyspace-level encryption with automatic encryption/decryption at rest
    • Added graceful degradation on encryption failures when configured
  • Tests

    • Added integration tests for encrypted keyspace scenarios

- Add encryption manager for transparent data encryption/decryption
- Implement AES-256-CTR and AES-256-GCM cipher algorithms
- Add encryption format with header for version and key ID
- Integrate encryption with event store and schema store
- Add mock KMS and TiKV clients for testing
- Support graceful degradation on encryption errors
- Add configuration options for encryption settings

Signed-off-by: tenfyzhong <tenfy@tenfy.cn>
@ti-chi-bot ti-chi-bot bot added release-note Denotes a PR that will be considered when it comes time to generate release notes. do-not-merge/needs-triage-completed labels Jan 8, 2026
@ti-chi-bot
Copy link

ti-chi-bot bot commented Jan 8, 2026

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by:
Once this PR has been reviewed and has the lgtm label, please assign flowbehappy for approval. For more information see the Code Review Process.
Please ensure that each of them provides their approval before proceeding.

The full list of commands accepted by this bot can be found here.

Details Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@ti-chi-bot ti-chi-bot bot added the size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files. label Jan 8, 2026
@tenfyzhong tenfyzhong marked this pull request as draft January 8, 2026 06:11
@ti-chi-bot ti-chi-bot bot added the do-not-merge/work-in-progress Indicates that a PR should not merge because it is a work in progress. label Jan 8, 2026
@gemini-code-assist
Copy link

Summary of Changes

Hello @tenfyzhong, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request introduces a significant security enhancement by integrating Customer-Managed Encryption Keys (CMEK) encryption at rest for TiCDC's EventStore and SchemaStore components. The primary objective is to safeguard persisted data on disk by encrypting it with keys managed by customers through their chosen Key Management Service (KMS) providers. This foundational work establishes a robust and flexible encryption architecture, ensuring data confidentiality while maintaining seamless operation and compatibility with existing data structures.

Highlights

  • EventStore Encryption: Data written to PebbleDB in the EventStore is now encrypted before storage and decrypted during reads, utilizing a layered key model where data keys encrypt the actual data, and master keys (managed by KMS) encrypt the data keys.
  • SchemaStore Encryption: DDL events and schema information stored in the SchemaStore are now encrypted using the same comprehensive encryption framework.
  • Comprehensive Encryption Framework: A new encryption package has been implemented, supporting multiple algorithms (AES-256-CTR, AES-256-GCM), KMS integration, and robust key management.
  • Graceful Degradation: The system is designed to gracefully degrade to an unencrypted mode when encryption is disabled or when encryption operations encounter failures, which is configurable.
  • Backward Compatibility: The implementation ensures backward compatibility, allowing existing unencrypted data to function correctly while new data can be written in an encrypted state, with automatic detection of encryption headers.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a comprehensive framework for Customer-Managed Encryption Keys (CMEK) to encrypt data at rest for TiCDC's EventStore and SchemaStore. The changes are extensive, including a new encryption package with support for multiple ciphers, KMS integration, and a layered key model. The implementation also considers backward compatibility and graceful degradation. My review focuses on improving code clarity, robustness, and performance by addressing issues such as non-idiomatic Go patterns for optional dependencies, inconsistent error handling, potential race conditions in caching, and inefficient data access patterns. I've also pointed out some minor issues in comments and mock data to enhance maintainability.

Comment on lines +1309 to +1329
if encryption.IsEncrypted(value) {
// Try to get encryptionManager from appcontext
var encMgr encryption.EncryptionManager
defer func() {
if r := recover(); r != nil {
// EncryptionManager not registered, skip decryption
encMgr = nil
}
}()
encMgr = appcontext.GetService[encryption.EncryptionManager]("EncryptionManager")
if encMgr != nil {
// TODO: Get keyspaceID from dispatcher/subscription metadata
// For now, use default keyspaceID (0)
keyspaceID := uint32(0)
decryptedValue, err := encMgr.DecryptData(context.Background(), keyspaceID, value)
if err != nil {
log.Panic("failed to decrypt value", zap.Error(err))
}
value = decryptedValue
}
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

As the comment here suggests, fetching the encryptionManager from the global appcontext on every Next() call is inefficient and not idiomatic. It also uses a defer/recover pattern which is best avoided for control flow.

The encryptionManager should be passed to eventStoreIter upon its creation and stored as a field.

  1. Add encryptionManager encryption.EncryptionManager to the eventStoreIter struct.
  2. In GetIterator, initialize this new field from e.encryptionManager.
  3. Then, simplify this Next method to use iter.encryptionManager directly.

This would improve performance and make the code cleaner and more maintainable.

                if iter.encryptionManager != nil && encryption.IsEncrypted(value) {
			// TODO: Get keyspaceID from dispatcher/subscription metadata
			// For now, use default keyspaceID (0)
			keyspaceID := uint32(0)
			decryptedValue, err := iter.encryptionManager.DecryptData(context.Background(), keyspaceID, value)
			if err != nil {
				log.Panic("failed to decrypt value", zap.Error(err))
			}
			value = decryptedValue
		}

Comment on lines +198 to +200
if err != nil {
log.Fatal("decrypt db info failed", zap.Error(err))
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Using log.Fatal can lead to abrupt program termination without proper cleanup or stack unwinding. It's generally better to return an error from this function and let the caller decide on the appropriate action. If the error is considered truly unrecoverable at this level, panic(err) would be a better choice as it allows for a top-level recovery mechanism to perform a more graceful shutdown. This applies to other log.Fatal calls in this file as well.

Suggested change
if err != nil {
log.Fatal("decrypt db info failed", zap.Error(err))
}
if err != nil {
return nil, errors.Trace(err)
}

Comment on lines +55 to +56
// DecryptMasterKey decrypts the master key using mock KMS
// In a real implementation, this would call the actual KMS service

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The dataKeyID is defined to be 3 bytes long throughout the encryption framework (e.g., in format.go and types.go). However, these mock data key IDs are 8 bytes long. This will cause EncodeEncryptedData to fail. The mock data should be updated to use 3-byte strings for data key IDs to be consistent with the implementation.

	mockDataKeyID1 := "001"
	mockDataKeyID2 := "002"

if cached, ok := m.metaCache[keyspaceID]; ok {
// Check if cache is still valid
if time.Since(cached.timestamp) < m.ttl {
meta := cached.meta

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The cached meta object is returned directly. Since KeyspaceEncryptionMeta contains pointers and maps, a caller could inadvertently modify the cached data, leading to race conditions or inconsistent state. To prevent this, a deep copy of the meta object should be returned from the cache.

Comment on lines +264 to +270
defer func() {
if r := recover(); r != nil {
// EncryptionManager not registered, use nil
encMgr = nil
}
}()
encMgr = appcontext.GetService[encryption.EncryptionManager]("EncryptionManager")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Using defer and recover for control flow to handle an optional dependency is not idiomatic Go. It can obscure the program's control flow and is generally reserved for handling unexpected panics. A better approach would be to have a TryGetService function in appcontext that returns a boolean indicating whether the service was found, for example: encMgr, ok := appcontext.TryGetService[...](...). This would make the code clearer and more robust.

// TODO: Get keyspaceID from dispatcher/subscription metadata
// For now, use default keyspaceID (0) for classic mode
keyspaceID := uint32(0)
encryptedValue, err := e.encryptionManager.EncryptData(context.Background(), keyspaceID, value)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Using context.Background() here means this encryption operation will not be cancelled if the parent context (from the write task pool) is cancelled. It's better to pass the context from writeTaskPool.run down to writeEvents and use it here. This ensures that long-running operations can be properly cancelled.

return &AES256GCMCipher{}
}

// IVSize returns the IV size for AES-256-GCM (12 bytes recommended, but we use 16 for compatibility)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The comment states that 16 bytes are used for compatibility, but the function returns 12. This is confusing. The standard and recommended nonce size for GCM is 12 bytes, which the code correctly returns. The comment should be updated to reflect this and remove the mention of 16 bytes to avoid confusion.

Suggested change
// IVSize returns the IV size for AES-256-GCM (12 bytes recommended, but we use 16 for compatibility)
// IVSize returns the IV size for AES-256-GCM (12 bytes).

// DataKeyID represents a 3-byte data key identifier
type DataKeyID [3]byte

// ToString converts DataKeyID to hex string

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The comment says ToString converts the DataKeyID to a hex string, but the implementation performs a direct byte-to-string conversion. The comment should be corrected to reflect the actual behavior.

Suggested change
// ToString converts DataKeyID to hex string
// ToString converts DataKeyID to a string.

- Add unit tests for cipher functionality including unsupported algorithm detection and AES256CTR encryption/decryption
- Add unit tests for data format encoding/decoding with encrypted and unencrypted data
- Add unit tests for mock TiKV client behavior including keyspace encryption meta retrieval

Signed-off-by: tenfyzhong <tenfy@tenfy.cn>
- Replace panic-prone appcontext.GetService calls with TryGetService for optional encryption manager
- Pass encryption manager to eventStoreIter to avoid repeated appcontext lookups
- Update encryption format to support version-based detection from TiKV metadata
- Add GetEncryptionVersion method to encryption meta manager interface
- Improve backward compatibility for legacy unencrypted data formats
- Add comprehensive tests for encryption format handling

Signed-off-by: tenfyzhong <tenfy@tenfy.cn>
- Add keyspaceID field to eventWithCallback struct to carry keyspace information
- Store keyspaceID in dispatcher statistics during registration
- Pass keyspaceID to encryption manager for both encryption and decryption operations
- Update encryption manager to return errors instead of graceful degradation when configured
- Add test to verify keyspaceID is correctly used in encryption/decryption flow

Signed-off-by: tenfyzhong <tenfy@tenfy.cn>
- Add test for encryption degrade on error with allow flag enabled
- Add test for encryption degrade on error with allow flag disabled
- Add test for encryption disabled scenario
- Include mock meta manager for testing encryption manager behavior

Signed-off-by: tenfyzhong <tenfy@tenfy.cn>
- Remove redundant comments and build tags from test files
- Use sync.Once for safe stop channel closure in encryption manager
- Clear meta cache entries before refresh to ensure fresh data
- Standardize formatting and remove outdated compatibility comments

Signed-off-by: tenfyzhong <tenfy@tenfy.cn>
- Add `ByteArray` type for flexible JSON decoding of byte arrays
- Implement `tikvEncryptionHTTPClient` to fetch encryption metadata from TiKV status API
- Update `encryptionMetaManager.decryptDataKey` to use zero IV and enforce 32-byte lengths
- Add `Close` method to encryption meta manager
- Include encryption manager setup in server initialization when enabled in debug config
- Improve mock TiKV client to use realistic encrypted data keys with zero IV
- Add comprehensive unit tests for HTTP client and decryption logic

Signed-off-by: tenfyzhong <tenfy@tenfy.cn>
- Upgrade AWS SDK dependencies to support KMS operations
- Add KMS configuration structure to support AWS and GCP client overrides
- Implement AWS KMS decryptor with configurable region, endpoint, and credentials
- Implement GCP KMS decryptor with configurable endpoint and credentials
- Create KMS client factory with caching and configuration resolution
- Replace mock KMS client with real implementation in server initialization
- Add comprehensive unit tests for KMS client functionality

Signed-off-by: tenfyzhong <tenfy@tenfy.cn>
…election

- Remove AES256GCMCipher implementation and GetCipher function
- Add DataKeyID type and helper functions for 24-bit encoding/decoding
- Update encryption manager to use only AES256CTRCipher
- Refactor encryption meta manager to align with kvproto types
- Simplify encryption meta retrieval and data key management
- Update tests to reflect new type structure and simplified cipher selection

Signed-off-by: tenfyzhong <tenfy@tenfy.cn>
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Jan 28, 2026

Important

Review skipped

Draft detected.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: e3da3599-c311-44d2-8ca1-9d96fd38a0fc

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

This PR implements CMEK (Customer-Managed Encryption Keys) encryption support for TiCDC's Next-Gen architecture. It introduces encryption infrastructure including ciphers, KMS client integrations, encryption metadata management, and integrates encryption/decryption into event store and schema store read/write paths. Configuration, initialization, and comprehensive test coverage are included.

Changes

Cohort / File(s) Summary
Encryption Core Infrastructure
pkg/encryption/cipher.go, pkg/encryption/cipher_test.go, pkg/encryption/data_key_id.go, pkg/encryption/data_key_id_24be.go, pkg/encryption/format.go, pkg/encryption/format_test.go, pkg/encryption/json_types.go, pkg/encryption/types.go
Implements AES-256-CTR cipher, 3-byte data key ID encoding/decoding, 4-byte encryption headers, and encryption metadata data structures with encoding/decoding and format detection utilities.
Encryption Manager & Metadata
pkg/encryption/encryption_manager.go, pkg/encryption/encryption_manager_test.go, pkg/encryption/manager.go, pkg/encryption/manager_test.go
Introduces EncryptionManager for encrypt/decrypt operations and EncryptionMetaManager for fetching, caching, and refreshing encryption metadata from TiKV with background refresh loops and per-keyspace data key caching.
KMS Client Integration
pkg/encryption/kms/client.go, pkg/encryption/kms/client_test.go, pkg/encryption/kms/aws_kms.go, pkg/encryption/kms/gcp_kms.go, pkg/encryption/kms/mock_client.go
Implements KMS client abstraction with AWS and GCP decryptor support, vendor-specific configuration resolution, per-vendor decryptor caching, and mock implementations for testing.
TiKV Encryption Client
pkg/encryption/mock_tikv_client.go, pkg/encryption/mock_tikv_client_test.go, pkg/encryption/tikv_http_client.go, pkg/encryption/tikv_http_client_test.go
Adds HTTP-based TiKV encryption metadata client for fetching keyspace encryption metadata from TiKV stores with JSON/protobuf decoding support, and mock client for testing scenarios.
Event Store Integration
logservice/eventstore/event_store.go, logservice/eventstore/event_store_test.go
Wires EncryptionManager into event store write/read paths; propagates keyspaceID through subscriptions; encrypts event values after compression on write, decrypts on read when encryption is detected.
Schema Store Integration
logservice/schemastore/disk_format.go, logservice/schemastore/persist_storage.go, logservice/schemastore/schema_store.go, logservice/schemastore/schema_store_test.go
Introduces encryption-aware variants for loading/writing DDL events and schema info; replaces plaintext read/write with encrypted variants; adds error handling for DDL processing failures; updates resolved timestamp logic with retry semantics.
Configuration & Context
pkg/config/debug.go, pkg/config/server.go, pkg/common/context/app_context.go, pkg/errors/error.go
Adds encryption configuration structure (EncryptionConfig, KMSConfig, AWSKMSConfig, GCPKMSConfig); initializes default encryption settings; adds generic TryGetService helper; defines six encryption-related error variables.
Server Initialization
server/server.go
Wires up encryption initialization during pre-services; creates TiKV HTTP client, KMS client, and EncryptionMetaManager; registers EncryptionManager in app context when encryption is enabled.
Dependency Updates
go.mod
Updates AWS SDK v2 (v1.40.0 → v1.41.1), adds cloud.google.com/go/kms and google.golang.org/api dependencies, updates internal AWS references and AWS Smithy dependency.
Test Infrastructure
tests/integration_tests/_utils/start_tidb_cluster_nextgen, tests/integration_tests/cmek_keyspace/run.sh, tests/integration_tests/cmek_keyspace/conf/*
Adds keyspace creation by ID via PD API; comprehensive integration test script for CMEK scenarios (valid and invalid KMS); multiple test configuration files (cdc_valid.toml, cdc_invalid.toml, pd_config.toml, diff_config.toml).

Sequence Diagram(s)

sequenceDiagram
    participant Server as Server Startup
    participant TiKVClient as TiKV HTTP Client
    participant KMSClient as KMS Client
    participant MetaMgr as Encryption Meta Manager
    participant AppCtx as App Context
    
    Server->>TiKVClient: NewTiKVEncryptionHTTPClient()
    Server->>KMSClient: NewClient(config)
    Server->>MetaMgr: NewEncryptionMetaManager(tikvClient, kmsClient)
    MetaMgr->>MetaMgr: Start() - begin refresh loop
    Server->>AppCtx: Register EncryptionManager
    AppCtx->>AppCtx: EncryptionManager available for services
Loading
sequenceDiagram
    participant App as Application
    participant EventStore as Event Store
    participant EncMgr as Encryption Manager
    participant MetaMgr as Meta Manager
    participant KMSClient as KMS Client
    
    App->>EventStore: WriteEvent(keyspaceID, data)
    EventStore->>EncMgr: EncryptData(ctx, keyspaceID, compressedData)
    EncMgr->>MetaMgr: GetCurrentDataKey(keyspaceID)
    MetaMgr->>KMSClient: DecryptMasterKey(ciphertext)
    KMSClient-->>MetaMgr: plaintext masterKey
    MetaMgr-->>EncMgr: dataKey, keyID, version
    EncMgr->>EncMgr: AES-CTR encrypt, prepend IV and header
    EncMgr-->>EventStore: encryptedData
    EventStore->>EventStore: persist encryptedData
    
    App->>EventStore: ReadEvent(keyspaceID)
    EventStore->>EncMgr: DecryptData(ctx, keyspaceID, encryptedData)
    EncMgr->>EncMgr: Detect encryption via header
    EncMgr->>MetaMgr: GetDataKey(keyID)
    MetaMgr->>KMSClient: DecryptMasterKey(ciphertext)
    KMSClient-->>MetaMgr: plaintext masterKey
    MetaMgr-->>EncMgr: dataKey
    EncMgr->>EncMgr: Extract IV, AES-CTR decrypt
    EncMgr-->>EventStore: plaintext data
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

Suggested labels

lgtm, approved

Suggested reviewers

  • asddongmen
  • lidezhu
  • wk989898

Poem

🔐 A rabbit hops through keys with care,
Encrypting secrets here and there,
AES-256 makes data shine,
KMS brings the keys online,
TiCDC flows secure and bright! 🐰✨

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 21.43% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly and concisely summarizes the main change: adding CMEK encryption support for event and schema store data at rest, which is the primary objective of the PR.
Description check ✅ Passed The description covers the problem statement (issue #3943), detailed explanation of changes, backward compatibility considerations, performance implications, and includes a comprehensive release note. All required template sections are addressed.
Linked Issues check ✅ Passed The PR implementation addresses all key objectives from issue #3943: keyspace metadata retrieval via TiKV HTTP client, master key management through AWS/GCP KMS integration, data encryption/decryption in EventStore and SchemaStore, EncryptionMetaManager with caching, and backward compatibility via header detection.
Out of Scope Changes check ✅ Passed The PR includes integration test scaffolding and configuration files for testing, which support the stated objective. All changes directly support CMEK encryption implementation for EventStore and SchemaStore; no unrelated modifications detected.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Tip

Try Coding Plans. Let us write the prompt for your AI agent so you can ship faster (with fewer bugs).
Share your feedback on Discord.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

- Use grouped type declaration syntax for AWS and GCP decryptor factories
- Improve code organization and readability by grouping related types

Signed-off-by: tenfyzhong <tenfy@tenfy.cn>
- Move cloud.google.com/go/kms from indirect to direct dependency
- Add google.golang.org/api as direct dependency
- Add github.com/aws/aws-sdk-go-v2/service/kms as direct dependency
- Update AWS SDK v2 dependencies to newer versions
- Remove duplicate indirect dependencies that are now direct

Signed-off-by: tenfyzhong <tenfy@tenfy.cn>
Signed-off-by: tenfyzhong <tenfy@tenfy.cn>
- Replace hardcoded aes.BlockSize with c.IVSize() in Encrypt method
- Replace hardcoded aes.BlockSize with c.IVSize() in Decrypt method
- Ensures consistency with cipher's defined IV size

Signed-off-by: tenfyzhong <tenfy@tenfy.cn>
- Modify GetCurrentDataKey to return data key, key ID, and version in a single call
- Remove separate GetCurrentDataKeyID and GetEncryptionVersion methods
- Update mock implementations in tests to match new interface
- Improve cache handling by retrieving all metadata together to prevent inconsistencies during key rotation

Signed-off-by: tenfyzhong <tenfy@tenfy.cn>
- Add detailed error logging in encryption_manager.go for encryption and decryption failures
- Improve KMS client error logging with vendor-specific details and configuration context
- Add comprehensive logging to encryption meta manager for key retrieval and decryption operations
- Enhance TiKV HTTP client logging with store-specific information and response details
- Implement safe logging utilities to avoid nil pointer dereferences when logging KMS metadata

Signed-off-by: tenfyzhong <tenfy@tenfy.cn>
Signed-off-by: tenfyzhong <tenfy@tenfy.cn>
…izes

- Add encryption parameters to DDL history loading functions
- Decrypt DDL events when reading from persistent storage
- Update resolved timestamp logic to retry failed DDL handling
- Support multiple AES key sizes (128, 192, 256 bits) in cipher
- Add protobuf support for encryption meta responses from TiKV
- Add comprehensive tests for encryption integration

Signed-off-by: tenfyzhong <tenfy@tenfy.cn>
…n support

- Add new integration test `cmek_keyspace` for testing CMEK-encrypted keyspaces
- Support manual keyspace creation in cluster startup script
- Add configuration files for CMEK testing with valid and invalid KMS configurations
- Add local KMS server for testing encryption functionality
- Enable keyspace-specific TiDB instances for different encryption scenarios
- Add test cases for both valid and invalid KMS configurations
- Update test runner to include new CMEK keyspace test group

Signed-off-by: tenfyzhong <tenfy@tenfy.cn>
- Add encryption manager field to eventStore struct for encrypting/decrypting data
- Pass encryption manager parameter to New() constructor
- Add encryption manager and keyspace ID fields to eventStoreIter struct
- Update test to use new writeEvents signature with compression buffer parameter

Signed-off-by: tenfyzhong <tenfy@tenfy.cn>
- Fix shell script formatting for better readability
- Remove unnecessary backslash in curl command pipeline
- Properly indent heredoc content in configuration function

Signed-off-by: tenfyzhong <tenfy@tenfy.cn>
@tenfyzhong
Copy link
Collaborator Author

/gemini review

@tenfyzhong
Copy link
Collaborator Author

@coderabbitai review

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Mar 3, 2026

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces CMEK support for encrypting data at rest in the event store and schema store. The changes are extensive, adding a new encryption package that handles key management, interaction with KMS providers, and the encryption/decryption logic. The event store and schema store are modified to use this new package to protect data before writing to disk and decrypt it upon reading. The implementation includes caching of encryption metadata and data keys for performance, and graceful degradation for robustness. The changes are well-tested with new unit and integration tests. My feedback focuses on improving error handling, context propagation, and ensuring configuration is correctly applied.

Note: Security Review did not run due to the size of the PR.

Comment on lines +197 to +200
decryptedValue, err := encMgr.DecryptData(context.Background(), keyspaceID, value)
if err != nil {
log.Fatal("decrypt db info failed", zap.Error(err))
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

log.Fatal is called on a decryption error. This will terminate the entire TiCDC process abruptly, which is too drastic and prevents any higher-level error handling or recovery. Since loadDatabasesInKVSnapWithEncryption can return an error, it's much better to propagate this error up the call stack for more graceful handling.

This recommendation applies to all new usages of log.Fatal in this file where an error can be returned instead (e.g., lines 373, 528, 612, 661, 879).

Suggested change
decryptedValue, err := encMgr.DecryptData(context.Background(), keyspaceID, value)
if err != nil {
log.Fatal("decrypt db info failed", zap.Error(err))
}
decryptedValue, err := encMgr.DecryptData(context.Background(), keyspaceID, value)
if err != nil {
return nil, errors.Trace(err)
}

Comment on lines +78 to +88
func NewEncryptionMetaManager(tikvClient TiKVEncryptionClient, kmsClient kms.KMSClient) EncryptionMetaManager {
return &encryptionMetaManager{
tikvClient: tikvClient,
kmsClient: kmsClient,
metaCache: make(map[uint32]*cachedMeta),
dataKeyCache: make(map[uint32]map[string]*cachedDataKey),
ttl: 1 * time.Hour, // Default TTL: 1 hour
refreshInterval: 1 * time.Hour, // Default refresh interval: 1 hour
stopCh: make(chan struct{}),
}
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The ttl and refreshInterval for the metadata cache are hardcoded, which makes the corresponding settings in EncryptionConfig (MetaCacheTTL and MetaRefreshInterval) ineffective.

The NewEncryptionMetaManager function should be updated to accept these values from the configuration. This will allow users to properly configure the caching behavior.

func NewEncryptionMetaManager(
	tikvClient TiKVEncryptionClient,
	kmsClient kms.KMSClient,
	cfg *config.EncryptionConfig,
) EncryptionMetaManager {
	return &encryptionMetaManager{
		tikvClient:      tikvClient,
		kmsClient:       kmsClient,
		metaCache:       make(map[uint32]*cachedMeta),
		dataKeyCache:    make(map[uint32]map[string]*cachedDataKey),
		ttl:             time.Duration(cfg.MetaCacheTTL),
		refreshInterval: time.Duration(cfg.MetaRefreshInterval),
		stopCh:          make(chan struct{}),
	}
}

Comment on lines +1342 to +1353
if e.encryptionManager != nil {
encryptedValue, err := e.encryptionManager.EncryptData(context.Background(), event.keyspaceID, value)
if err != nil {
log.Error("encrypt event value failed",
zap.Uint32("keyspaceID", event.keyspaceID),
zap.Uint64("subID", uint64(event.subID)),
zap.Int64("tableID", event.tableID),
zap.Error(err))
return err
}
value = encryptedValue
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

context.Background() is used for the EncryptData call. This operation may involve network I/O to a KMS, and using a background context prevents proper cancellation and timeout handling, which could lead to the worker goroutine being blocked indefinitely. It would be more robust to propagate the worker's context.

I suggest modifying writeEvents to accept a context.Context and pass it down from writeTaskPool.run. The same applies to decryption calls in eventStoreIter.Next.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 17

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (3)
logservice/schemastore/disk_format.go (2)

839-849: ⚠️ Potential issue | 🔴 Critical

Encrypted snapshot reads are still using plaintext loaders.

loadAllPhysicalTablesAtTs now takes encMgr/keyspaceID, but snapshot DB/table loading still uses non-encryption loaders. If snapshot records are encrypted, unmarshalling can fail before DDL replay.

🔧 Proposed fix
-	databaseMap, err := loadDatabasesInKVSnap(storageSnap, gcTs)
+	databaseMap, err := loadDatabasesInKVSnapWithEncryption(storageSnap, gcTs, encMgr, keyspaceID)
 	if err != nil {
 		return nil, err
 	}
 
-	tableInfoMap, tableMap, partitionMap, err := loadFullTablesInKVSnap(storageSnap, gcTs, databaseMap)
+	tableInfoMap, tableMap, partitionMap, err := loadFullTablesInKVSnapWithEncryption(storageSnap, gcTs, databaseMap, encMgr, keyspaceID)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@logservice/schemastore/disk_format.go` around lines 839 - 849,
loadAllPhysicalTablesAtTs now receives encMgr and keyspaceID but still calls
plaintext loaders (loadDatabasesInKVSnap, loadFullTablesInKVSnap), so encrypted
snapshot records will fail to unmarshal; update these calls to use their
encrypted-aware variants (or augment them to accept encMgr and keyspaceID) so
bytes are decrypted before unmarshalling: modify loadDatabasesInKVSnap and
loadFullTablesInKVSnap (or create
loadDatabasesInKVSnapEncrypted/loadFullTablesInKVSnapEncrypted) to accept
encryption.EncryptionManager and keyspaceID and perform decryption when reading
records, then replace the current calls in loadAllPhysicalTablesAtTs to pass
encMgr and keyspaceID. Ensure all downstream consumers
(tableInfoMap/tableMap/partitionMap) continue to receive decrypted structures.

544-585: ⚠️ Potential issue | 🟠 Major

Wrap all returned errors from third-party/library calls in writePersistedDDLEventWithEncryption.

This function has inconsistent error handling: encMgr.EncryptData() is wrapped with errors.Trace(), but errors from json.Marshal(), ddlJobKey(), ExtraTableInfo.Marshal(), MarshalMsg(), and batch.Commit() are returned bare, losing stack context. Wrap all of these to preserve trace information.

🔧 Proposed fix
 	ddlKey, err := ddlJobKey(ddlEvent.FinishedTs)
 	if err != nil {
-		return err
+		return errors.Trace(err)
 	}
 	ddlEvent.TableInfoValue, err = json.Marshal(ddlEvent.TableInfo)
 	if err != nil {
-		return err
+		return errors.Trace(err)
 	}
 	if ddlEvent.ExtraTableInfo != nil {
 		ddlEvent.ExtraTableInfoValue, err = ddlEvent.ExtraTableInfo.Marshal()
 		if err != nil {
-			return err
+			return errors.Trace(err)
 		}
 	}
 	if len(ddlEvent.MultipleTableInfos) > 0 {
 		ddlEvent.MultipleTableInfosValue = make([][]byte, len(ddlEvent.MultipleTableInfos))
 		for i := range ddlEvent.MultipleTableInfos {
 			ddlEvent.MultipleTableInfosValue[i], err = json.Marshal(ddlEvent.MultipleTableInfos[i])
 			if err != nil {
-				return err
+				return errors.Trace(err)
 			}
 		}
 	}
 	ddlValue, err := ddlEvent.MarshalMsg(nil)
 	if err != nil {
-		return err
+		return errors.Trace(err)
 	}
 	batch.Set(ddlKey, ddlValue, pebble.NoSync)
-	return batch.Commit(pebble.NoSync)
+	return errors.Trace(batch.Commit(pebble.NoSync))
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@logservice/schemastore/disk_format.go` around lines 544 - 585, In
writePersistedDDLEventWithEncryption, consistently wrap all errors from
third-party/library calls with errors.Trace to preserve stack context; replace
bare returns from ddlJobKey, json.Marshal (for TableInfo and
MultipleTableInfos), ExtraTableInfo.Marshal, ddlEvent.MarshalMsg, and
batch.Commit with returned errors wrapped via errors.Trace(err) (and likewise
wrap the initial json.Marshal/loop errors), and keep the existing errors.Trace
wrapping for encMgr.EncryptData to ensure uniform error tracing across
ddlJobKey, MarshalMsg, json.Marshal, ExtraTableInfo.Marshal, and batch.Commit.
logservice/eventstore/event_store.go (1)

508-513: ⚠️ Potential issue | 🔴 Critical

Cross-keyspace subscription reuse is not guarded.

The reuse/index path is keyed by TableID only and does not enforce KeyspaceID equality. A dispatcher from another keyspace can reuse the same subscription, leading to wrong-key encryption/decryption and potential tenant isolation break.

Also applies to: 603-607

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@logservice/eventstore/event_store.go` around lines 508 - 513, The
subscription reuse is currently keyed only by dispatcherSpan.TableID via
dispatcherMeta.tableStats and doesn't verify KeyspaceID, allowing cross-keyspace
reuse; update the lookup/validation in the dispatcher reuse paths (e.g., where
dispatcherMeta.tableStats[dispatcherSpan.TableID] is iterated and where similar
logic appears around subStat/tableSpan reuse at the other location) to also
require equality of KeyspaceID (e.g., ensure subStat.tableSpan.KeyspaceID ==
dispatcherSpan.KeyspaceID) before treating a subStat as a match, and add the
same KeyspaceID guard to any other reuse/index checks that reference
dispatcherSpan.TableID and subStat.tableSpan to prevent cross-tenant reuse.
🧹 Nitpick comments (7)
tests/integration_tests/cmek_keyspace/run.sh (2)

30-33: DOWNSTREAM_KEYSPACE_ID is assigned but never used.

Either use it in an assertion/log line or remove it to reduce dead state in this test script.

🧹 Optional cleanup
-DOWNSTREAM_KEYSPACE_ID=
@@
-	DOWNSTREAM_KEYSPACE_ID=$(create_encrypted_keyspace "$DOWN_PD_HOST" "$DOWN_PD_PORT" "$DOWNSTREAM_KEYSPACE" "${downstream_key_id}" "${local_kms_endpoint}")
+	create_encrypted_keyspace "$DOWN_PD_HOST" "$DOWN_PD_PORT" "$DOWNSTREAM_KEYSPACE" "${downstream_key_id}" "${local_kms_endpoint}" >/dev/null

Also applies to: 228-228

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/integration_tests/cmek_keyspace/run.sh` around lines 30 - 33, The
variable DOWNSTREAM_KEYSPACE_ID is defined but never used; either remove the
unused assignment or use it in the test flow — for example reference
DOWNSTREAM_KEYSPACE_ID in an assertion or a diagnostic log (e.g., echo or test
-n checks) so it’s validated at runtime; update the run.sh block where
UPSTREAM_VALID_KEYSPACE_ID and UPSTREAM_INVALID_KEYSPACE_ID are defined to
either delete the DOWNSTREAM_KEYSPACE_ID line or add a simple usage
(assertion/log) that references DOWNSTREAM_KEYSPACE_ID to eliminate the dead
state.

3-3: Enable pipefail for pipeline safety.

Several checks rely on curl | jq; without pipefail, upstream failures can be masked.

🔧 Proposed fix
-set -eu
+set -euo pipefail
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/integration_tests/cmek_keyspace/run.sh` at line 3, The script currently
sets strict errors with "set -eu" but misses enabling pipefail, which can hide
failures in pipelines like "curl | jq"; update the shell options to enable
pipefail (e.g., change the "set -eu" invocation in the run.sh script to include
pipefail, such as "set -euo pipefail" or "set -eu -o pipefail") so any failed
command in a pipeline causes the script to fail.
pkg/encryption/cipher_test.go (1)

22-35: Add negative-path coverage for key/IV validation.

This test covers the happy path only. Please add cases for invalid key length and invalid IV length so Encrypt/Decrypt validation branches are protected from regressions.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/encryption/cipher_test.go` around lines 22 - 35, Extend
TestAES256CTREncryptDecrypt to include negative-path subtests that call Encrypt
and Decrypt with invalid key and IV lengths: create subtests (e.g., "invalid key
length" and "invalid iv length") that use NewAES256CTRCipher() and pass a
too-short/too-long key and a too-short/too-long iv to Encrypt and Decrypt,
asserting that an error is returned and that no successful encryption/decryption
occurs; ensure you exercise both Encrypt(encrypted, key, iv) and
Decrypt(encrypted, key, iv) paths so the key/IV validation branches in Encrypt
and Decrypt are covered.
logservice/schemastore/disk_format.go (1)

197-197: Avoid hardcoded context.Background() in encryption/decryption calls.

These paths can hit metadata/KMS on cache misses. Using background context removes cancellation/timeouts and can stall schema load/write flows under dependency latency. Please thread caller context through these helpers.

Also applies to: 371-371, 526-526, 576-576, 610-610, 659-659, 877-877

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@logservice/schemastore/disk_format.go` at line 197, The DecryptData call is
using context.Background() (encMgr.DecryptData(context.Background(), ...)),
which removes cancellation/timeouts; modify the callers in disk_format.go (and
other similar call sites listed) to accept a context.Context parameter and pass
that ctx into encryption/decryption calls instead of context.Background();
update the helper function signatures that invoke encMgr.DecryptData/EncryptData
to add ctx, propagate the ctx from higher-level callers, and ensure all call
chains are adjusted accordingly so no call site uses context.Background() for
encMgr.DecryptData or encMgr.EncryptData.
server/server.go (1)

303-303: Avoid stringly-typed service key for EncryptionManager.

Using a raw string on Line 303 increases drift risk across producers/consumers (SetService vs TryGetService). Please centralize this as a shared constant in app context.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@server/server.go` at line 303, Replace the raw string "EncryptionManager"
passed to appctx.SetService with a shared constant to avoid stringly-typed keys;
add a constant like ServiceEncryptionManager in the appctx package (or wherever
service keys live) and use appctx.ServiceEncryptionManager in the call to
appctx.SetService and in all consumers that call appctx.TryGetService so both
producers and consumers reference the same symbol (e.g., update the SetService
invocation that currently calls encryption.NewEncryptionManager(metaManager) to
use the new appctx.ServiceEncryptionManager constant).
pkg/encryption/mock_tikv_client.go (1)

35-40: Protect mock maps with a mutex for race-safe tests.

metaMap and notFoundKeyspaces are accessed from multiple methods without synchronization. If tests run concurrent reads/writes, this can race.

💡 Suggested fix
import (
	"context"
	"crypto/aes"
	"crypto/cipher"
	"crypto/rand"
+	"sync"
	...
)

type MockTiKVClient struct {
+	mu sync.RWMutex
	metaMap map[uint32]*EncryptionMeta
	notFoundKeyspaces map[uint32]bool
}

func (c *MockTiKVClient) GetKeyspaceEncryptionMeta(ctx context.Context, keyspaceID uint32) (*EncryptionMeta, error) {
+	c.mu.RLock()
+	defer c.mu.RUnlock()
	...
}

func (c *MockTiKVClient) SetKeyspaceMeta(keyspaceID uint32, meta *EncryptionMeta) {
+	c.mu.Lock()
+	defer c.mu.Unlock()
	c.metaMap[keyspaceID] = meta
}

Also applies to: 129-170

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/encryption/mock_tikv_client.go` around lines 35 - 40, The
MockTiKVClient's maps (metaMap and notFoundKeyspaces) are unprotected and can
race; add a sync.Mutex or sync.RWMutex field (e.g., mu) to MockTiKVClient and
guard all accesses and mutations to metaMap and notFoundKeyspaces in methods
like NewMockTiKVClient, GetEncryptionMeta, SetEncryptionMeta, MarkNotFound, and
any other helpers by locking/unlocking (RLock/RUnlock for read-only methods,
Lock/Unlock for writes) to ensure tests are race-safe.
pkg/encryption/manager.go (1)

174-185: Use defensive copies when caching and returning plaintext keys.

On cache miss, the same plaintextKey slice is both cached and returned. A downstream mutation can poison the cache.

💡 Suggested fix
- m.dataKeyCache[keyspaceID][currentKeyID] = &cachedDataKey{
- 	key:       plaintextKey,
+ keyCopy := append([]byte(nil), plaintextKey...)
+ m.dataKeyCache[keyspaceID][currentKeyID] = &cachedDataKey{
+ 	key:       keyCopy,
  	timestamp: time.Now(),
  }
 ...
- return plaintextKey, currentKeyID, version, nil
+ return append([]byte(nil), keyCopy...), currentKeyID, version, nil

Also applies to: 252-264

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/encryption/manager.go` around lines 174 - 185, The code caches and
returns the same plaintextKey slice allowing callers to mutate the cached value;
make defensive copies of the plaintextKey when storing into m.dataKeyCache and
when returning it to callers: allocate a new slice and copy plaintextKey bytes
into it for the cached entry (cachedDataKey.key) and return a separate copy (not
the cached slice) from the function that sets currentKeyID; update the same
pattern where cachedDataKey entries are created/returned (also at the other
occurrence around the 252-264 range) and keep using m.dataKeyMu to protect cache
operations.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@logservice/eventstore/event_store.go`:
- Around line 1415-1421: When encryption.IsEncrypted(value) returns true but
iter.encryptionManager is nil, fail fast instead of continuing; add an explicit
guard in the same block that immediately logs/panics with contextual details
(use iter.keyspaceID and any table or subID fields on iter if available) and a
clear message that encrypted data was encountered with no encryption manager, so
the code does not attempt to decode encrypted bytes later and produce misleading
errors.
- Line 1343: writeEvents is calling e.encryptionManager.EncryptData with
context.Background(); change writeEvents to accept a context (e.g., func
writeEvents(ctx context.Context, ...)) and propagate the ctx from
writeTaskPool.run(ctx) into the EncryptData call so encryption uses the caller's
cancellable context (replace context.Background() with the passed ctx). For
eventStoreIter.Next (which currently cannot change signature due to the
EventIterator interface), avoid using context.Background() by either adding a
context-carrying constructor or field on eventStoreIter (store the caller ctx
when the iterator is created) and use that stored ctx in Next, or add a new
context-aware NextContext(ctx context.Context) method and have callers use it
while you plan interface refactor; update all creation sites of eventStoreIter
to supply the caller context.

In `@pkg/common/context/app_context.go`:
- Around line 67-76: Update the TryGetService comment to explicitly state that
it returns the zero value and false both when the named service is not present
and when the stored value cannot be asserted to the requested type T; reference
the TryGetService function and the fact it loads from
GetGlobalContext().serviceMap and performs a type assertion, so callers
understand type-mismatch will also yield (zero, false).

In `@pkg/config/debug.go`:
- Around line 197-204: The default AllowDegradeOnError on EncryptionConfig in
NewDefaultEncryptionConfig currently returns true (which enables fail-open
behavior); change the default to false so encryption fails closed by default,
i.e., set AllowDegradeOnError: false in NewDefaultEncryptionConfig, and update
any related comments/tests that assume the old default to reflect the new safer
behavior for EncryptionConfig and KMSConfig.

In `@pkg/encryption/cipher.go`:
- Around line 101-106: The GenerateIV function currently accepts size==0 and
returns an empty IV; add an early validation at the start of GenerateIV to fail
fast when size <= 0 (and optionally when size is unreasonably large) and return
a wrapped error (e.g., using your existing cerrors package) describing the
invalid IV size, then only allocate and call rand.Read when size is valid;
reference the GenerateIV function and reuse the existing cerrors pattern for
consistency.

In `@pkg/encryption/data_key_id.go`:
- Around line 27-30: DataKeyIDFromString currently returns a generic error for
wrong-length IDs; change it to return the project RFC-coded error
cerrors.ErrInvalidDataKeyID so callers can classify failures consistently. In
the DataKeyIDFromString function replace the errors.New("data key ID must be
exactly 3 bytes") return with cerrors.ErrInvalidDataKeyID (propagate any wrapped
context if needed) and ensure imports include the cerrors package so callers
observing cerrors.ErrInvalidDataKeyID continue to work.

In `@pkg/encryption/encryption_manager.go`:
- Around line 79-85: The IV generation, Encrypt, and EncodeEncryptedData error
paths in encryption_manager.go do not honor the allowDegradeOnError flag like
the key-fetch branch does; update the error handling in the GenerateIV, Encrypt,
and EncodeEncryptedData branches to check allowDegradeOnError and return
cerrors.ErrEncryptionDegraded.Wrap(err) (and log with degrade context) when
degradation is allowed, otherwise keep returning
cerrors.ErrEncryptionFailed.Wrap(err); specifically modify the blocks around
GenerateIV(...), cipherImpl.Encrypt(...), and EncodeEncryptedData(...) that
currently call log.Error and return ErrEncryptionFailed so they mirror the
key-fetch pattern using keyspaceID, allowDegradeOnError, and the same zap.Error
logging context.

In `@pkg/encryption/format.go`:
- Around line 74-79: IsEncrypted currently treats any non-zero first byte as
encrypted which misclassifies legacy plaintext; update IsEncrypted to validate
the full encryption header (using EncryptionHeaderSize) rather than just
data[0], e.g., verify the expected header signature/magic and that the version
byte equals a known encrypted version (compare against VersionUnencrypted and
allowed version constants) before returning true; change IsEncrypted to return
false if the header bytes don't match the expected encryption header format so
legacy plaintext isn't treated as encrypted.
- Around line 83-88: IsEncryptedWithVersion currently returns true when
expectedVersion is VersionUnencrypted (0); add an explicit guard to reject that
case and ensure we never treat an unencrypted-version byte as valid. In
IsEncryptedWithVersion, first return false if expectedVersion ==
VersionUnencrypted, then keep the existing length check and compare data[0] to
expectedVersion (optionally also ensure data[0] != VersionUnencrypted).
Reference: function IsEncryptedWithVersion and constant VersionUnencrypted (and
EncryptionHeaderSize) to locate the change.

In `@pkg/encryption/json_types.go`:
- Around line 36-42: The errors returned by json.Unmarshal and
base64.StdEncoding.DecodeString in pkg/encryption/json_types.go should be
wrapped with errors.Trace to preserve stack context: locate the UnmarshalJSON
(or similar) methods where json.Unmarshal(...) and
base64.StdEncoding.DecodeString(...) are called (both occurrences around the s
variable and the later repeated block) and replace direct returns like "return
err" with "return errors.Trace(err)" so both json.Unmarshal and DecodeString
failures are wrapped before being propagated.

In `@pkg/encryption/kms/aws_kms.go`:
- Around line 37-39: The AWS SDK errors are returned directly from AWS calls;
wrap them with errors.Trace(err) before returning to preserve stack traces.
Update the return paths where you call d.client.Decrypt (and the analogous AWS
call at the other site mentioned) so they do not return err directly—replace
returns like "return nil, err" or "return err" with "return nil,
errors.Trace(err)" or "return errors.Trace(err)" respectively, ensuring you
import/use the errors.Trace helper and keep the existing context in the
Decrypt-related methods (references: d.client.Decrypt and the other AWS client
call in the same file).

In `@pkg/encryption/kms/gcp_kms.go`:
- Around line 30-37: The GCP KMS client errors returned from
gcpKMSDecryptor.Decrypt (and the other return sites noted) should be wrapped
with errors.Trace() from github.com/pingcap/errors before returning to preserve
stack context; update the Decrypt method (and any other returns of third-party
err values in this file) to return errors.Trace(err) instead of err and add the
import for github.com/pingcap/errors at the top of the file.

In `@pkg/encryption/kms/mock_client.go`:
- Around line 27-31: Extract the KMSClient interface into a new dedicated file
(e.g., interface.go or types.go) and remove its definition from mock_client.go;
specifically create the file containing: "type KMSClient interface {
DecryptMasterKey(ctx context.Context, ciphertext []byte, keyID string, vendor
string, region string, endpoint string) ([]byte, error) }", then update any
implementations that referenced the in-file definition (e.g., the mock type in
mock_client.go and the production client type in client.go) to use the moved
KMSClient symbol from the new file (no package change required), and run a quick
build to ensure no missing imports or references remain.

In `@pkg/encryption/manager.go`:
- Around line 279-284: The logging calls access meta.Current.DataKeyId and use
meta.Current without checking for nil, causing a panic if meta.Current is nil;
update the code around the log.Debug calls (the lines referencing
meta.Current.DataKeyId, byte(meta.Current.DataKeyId&0xFF), and
len(meta.DataKeys)) to first guard that meta.Current != nil and only include
fields derived from meta.Current when non-nil (or substitute safe default
values), and apply the same nil-check fix to the other logging block that also
references meta.Current (the block around the 326-334 region).

In `@pkg/encryption/tikv_http_client.go`:
- Around line 119-120: The HTTP response body reads use io.ReadAll(resp.Body)
without bounds; replace those calls with a bounded read using
io.ReadAll(io.LimitReader(resp.Body, maxResponseBodySize)) and introduce a
package-level constant (e.g., maxResponseBodySize = 10<<20 /* 10MB */) so
oversized responses cannot blow memory; apply this change to both occurrences
where resp.Body is read (the io.ReadAll(resp.Body) calls) and add an error/log
path if the payload hits the limit or the read fails.
- Around line 191-198: The code currently logs when meta.KeyspaceId != 0 &&
meta.KeyspaceId != keyspaceID but still returns meta; change this to treat the
mismatch as an error and not accept the metadata: when you detect
meta.KeyspaceId != 0 && meta.KeyspaceId != keyspaceID (in the function that
currently returns meta), log the mismatch and return a non-nil error (or nil
meta) so the caller can continue probing other stores instead of using this
meta; ensure the returned error message includes the request keyspaceID and
meta.KeyspaceId and references storeID/statusAddr/storeURL to aid debugging, and
update callers to handle the nil/meta+error path accordingly.

In `@server/server.go`:
- Around line 294-301: When metaManager.Start fails, the previously registered
closeable (kmsClient cast to common.Closeable and appended to c.preServices) is
left running; modify the startup code around the kmsClient/common.Closeable
append and encryption.NewEncryptionMetaManager/metaManager.Start so that if
metaManager.Start returns an error you immediately call Close() on that
closeable and remove it from c.preServices (or avoid appending until Start
succeeds), ensuring no background resources leak; locate the kmsClient
type-assertion and the c.preServices append as well as metaManager.Start to
implement the cleanup (call closeable.Close() and splice c.preServices to remove
the entry) on error.

---

Outside diff comments:
In `@logservice/eventstore/event_store.go`:
- Around line 508-513: The subscription reuse is currently keyed only by
dispatcherSpan.TableID via dispatcherMeta.tableStats and doesn't verify
KeyspaceID, allowing cross-keyspace reuse; update the lookup/validation in the
dispatcher reuse paths (e.g., where
dispatcherMeta.tableStats[dispatcherSpan.TableID] is iterated and where similar
logic appears around subStat/tableSpan reuse at the other location) to also
require equality of KeyspaceID (e.g., ensure subStat.tableSpan.KeyspaceID ==
dispatcherSpan.KeyspaceID) before treating a subStat as a match, and add the
same KeyspaceID guard to any other reuse/index checks that reference
dispatcherSpan.TableID and subStat.tableSpan to prevent cross-tenant reuse.

In `@logservice/schemastore/disk_format.go`:
- Around line 839-849: loadAllPhysicalTablesAtTs now receives encMgr and
keyspaceID but still calls plaintext loaders (loadDatabasesInKVSnap,
loadFullTablesInKVSnap), so encrypted snapshot records will fail to unmarshal;
update these calls to use their encrypted-aware variants (or augment them to
accept encMgr and keyspaceID) so bytes are decrypted before unmarshalling:
modify loadDatabasesInKVSnap and loadFullTablesInKVSnap (or create
loadDatabasesInKVSnapEncrypted/loadFullTablesInKVSnapEncrypted) to accept
encryption.EncryptionManager and keyspaceID and perform decryption when reading
records, then replace the current calls in loadAllPhysicalTablesAtTs to pass
encMgr and keyspaceID. Ensure all downstream consumers
(tableInfoMap/tableMap/partitionMap) continue to receive decrypted structures.
- Around line 544-585: In writePersistedDDLEventWithEncryption, consistently
wrap all errors from third-party/library calls with errors.Trace to preserve
stack context; replace bare returns from ddlJobKey, json.Marshal (for TableInfo
and MultipleTableInfos), ExtraTableInfo.Marshal, ddlEvent.MarshalMsg, and
batch.Commit with returned errors wrapped via errors.Trace(err) (and likewise
wrap the initial json.Marshal/loop errors), and keep the existing errors.Trace
wrapping for encMgr.EncryptData to ensure uniform error tracing across
ddlJobKey, MarshalMsg, json.Marshal, ExtraTableInfo.Marshal, and batch.Commit.

---

Nitpick comments:
In `@logservice/schemastore/disk_format.go`:
- Line 197: The DecryptData call is using context.Background()
(encMgr.DecryptData(context.Background(), ...)), which removes
cancellation/timeouts; modify the callers in disk_format.go (and other similar
call sites listed) to accept a context.Context parameter and pass that ctx into
encryption/decryption calls instead of context.Background(); update the helper
function signatures that invoke encMgr.DecryptData/EncryptData to add ctx,
propagate the ctx from higher-level callers, and ensure all call chains are
adjusted accordingly so no call site uses context.Background() for
encMgr.DecryptData or encMgr.EncryptData.

In `@pkg/encryption/cipher_test.go`:
- Around line 22-35: Extend TestAES256CTREncryptDecrypt to include negative-path
subtests that call Encrypt and Decrypt with invalid key and IV lengths: create
subtests (e.g., "invalid key length" and "invalid iv length") that use
NewAES256CTRCipher() and pass a too-short/too-long key and a too-short/too-long
iv to Encrypt and Decrypt, asserting that an error is returned and that no
successful encryption/decryption occurs; ensure you exercise both
Encrypt(encrypted, key, iv) and Decrypt(encrypted, key, iv) paths so the key/IV
validation branches in Encrypt and Decrypt are covered.

In `@pkg/encryption/manager.go`:
- Around line 174-185: The code caches and returns the same plaintextKey slice
allowing callers to mutate the cached value; make defensive copies of the
plaintextKey when storing into m.dataKeyCache and when returning it to callers:
allocate a new slice and copy plaintextKey bytes into it for the cached entry
(cachedDataKey.key) and return a separate copy (not the cached slice) from the
function that sets currentKeyID; update the same pattern where cachedDataKey
entries are created/returned (also at the other occurrence around the 252-264
range) and keep using m.dataKeyMu to protect cache operations.

In `@pkg/encryption/mock_tikv_client.go`:
- Around line 35-40: The MockTiKVClient's maps (metaMap and notFoundKeyspaces)
are unprotected and can race; add a sync.Mutex or sync.RWMutex field (e.g., mu)
to MockTiKVClient and guard all accesses and mutations to metaMap and
notFoundKeyspaces in methods like NewMockTiKVClient, GetEncryptionMeta,
SetEncryptionMeta, MarkNotFound, and any other helpers by locking/unlocking
(RLock/RUnlock for read-only methods, Lock/Unlock for writes) to ensure tests
are race-safe.

In `@server/server.go`:
- Line 303: Replace the raw string "EncryptionManager" passed to
appctx.SetService with a shared constant to avoid stringly-typed keys; add a
constant like ServiceEncryptionManager in the appctx package (or wherever
service keys live) and use appctx.ServiceEncryptionManager in the call to
appctx.SetService and in all consumers that call appctx.TryGetService so both
producers and consumers reference the same symbol (e.g., update the SetService
invocation that currently calls encryption.NewEncryptionManager(metaManager) to
use the new appctx.ServiceEncryptionManager constant).

In `@tests/integration_tests/cmek_keyspace/run.sh`:
- Around line 30-33: The variable DOWNSTREAM_KEYSPACE_ID is defined but never
used; either remove the unused assignment or use it in the test flow — for
example reference DOWNSTREAM_KEYSPACE_ID in an assertion or a diagnostic log
(e.g., echo or test -n checks) so it’s validated at runtime; update the run.sh
block where UPSTREAM_VALID_KEYSPACE_ID and UPSTREAM_INVALID_KEYSPACE_ID are
defined to either delete the DOWNSTREAM_KEYSPACE_ID line or add a simple usage
(assertion/log) that references DOWNSTREAM_KEYSPACE_ID to eliminate the dead
state.
- Line 3: The script currently sets strict errors with "set -eu" but misses
enabling pipefail, which can hide failures in pipelines like "curl | jq"; update
the shell options to enable pipefail (e.g., change the "set -eu" invocation in
the run.sh script to include pipefail, such as "set -euo pipefail" or "set -eu
-o pipefail") so any failed command in a pipeline causes the script to fail.

ℹ️ Review info

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between e382ce8 and e1c5956.

⛔ Files ignored due to path filters (1)
  • go.sum is excluded by !**/*.sum
📒 Files selected for processing (39)
  • go.mod
  • logservice/eventstore/event_store.go
  • logservice/eventstore/event_store_test.go
  • logservice/schemastore/disk_format.go
  • logservice/schemastore/persist_storage.go
  • logservice/schemastore/schema_store.go
  • logservice/schemastore/schema_store_test.go
  • pkg/common/context/app_context.go
  • pkg/config/debug.go
  • pkg/config/server.go
  • pkg/encryption/cipher.go
  • pkg/encryption/cipher_test.go
  • pkg/encryption/data_key_id.go
  • pkg/encryption/data_key_id_24be.go
  • pkg/encryption/encryption_manager.go
  • pkg/encryption/encryption_manager_test.go
  • pkg/encryption/format.go
  • pkg/encryption/format_test.go
  • pkg/encryption/json_types.go
  • pkg/encryption/kms/aws_kms.go
  • pkg/encryption/kms/client.go
  • pkg/encryption/kms/client_test.go
  • pkg/encryption/kms/gcp_kms.go
  • pkg/encryption/kms/mock_client.go
  • pkg/encryption/manager.go
  • pkg/encryption/manager_test.go
  • pkg/encryption/mock_tikv_client.go
  • pkg/encryption/mock_tikv_client_test.go
  • pkg/encryption/tikv_http_client.go
  • pkg/encryption/tikv_http_client_test.go
  • pkg/encryption/types.go
  • pkg/errors/error.go
  • server/server.go
  • tests/integration_tests/_utils/start_tidb_cluster_nextgen
  • tests/integration_tests/cmek_keyspace/conf/cdc_invalid.toml
  • tests/integration_tests/cmek_keyspace/conf/cdc_valid.toml
  • tests/integration_tests/cmek_keyspace/conf/diff_config.toml
  • tests/integration_tests/cmek_keyspace/conf/pd_config.toml
  • tests/integration_tests/cmek_keyspace/run.sh


// Encrypt if encryption is enabled (after compression)
if e.encryptionManager != nil {
encryptedValue, err := e.encryptionManager.EncryptData(context.Background(), event.keyspaceID, value)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# First, verify the exact lines mentioned in the review
cd logservice/eventstore && head -20 event_store.go | head -1 && echo "---" && \
sed -n '1340,1350p' event_store.go && echo "--- Line 1416 context ---" && \
sed -n '1410,1420p' event_store.go

Repository: pingcap/ticdc

Length of output: 995


🏁 Script executed:

# Run the provided verification script
rg -nP 'EncryptData\(context\.Background\(\)|DecryptData\(context\.Background\(\)' logservice/eventstore/event_store.go

Repository: pingcap/ticdc

Length of output: 279


🏁 Script executed:

# Get broader context around both lines to understand the function and call path
sed -n '1330,1360p' logservice/eventstore/event_store.go && echo "--- Line 1416 area ---" && \
sed -n '1405,1430p' logservice/eventstore/event_store.go

Repository: pingcap/ticdc

Length of output: 1970


🏁 Script executed:

# Check if there's a context parameter available in the functions containing these calls
rg -B 30 'EncryptData\(context\.Background' logservice/eventstore/event_store.go | head -50

Repository: pingcap/ticdc

Length of output: 1229


🏁 Script executed:

# Find the function containing line 1343
sed -n '1300,1345p' logservice/eventstore/event_store.go | grep -E '^\s*(func|if e\.encryptionManager)' | tail -5

Repository: pingcap/ticdc

Length of output: 91


🏁 Script executed:

# Get the full function signature for the function containing line 1343
sed -n '1280,1343p' logservice/eventstore/event_store.go | grep -B 50 'EncryptData' | head -70

Repository: pingcap/ticdc

Length of output: 1682


🏁 Script executed:

# Check the eventStoreIter struct and Next method signature
rg -A 5 'func \(iter \*eventStoreIter\) Next' logservice/eventstore/event_store.go

Repository: pingcap/ticdc

Length of output: 214


🏁 Script executed:

# See if there's a context passed into the outer function at line 1343
sed -n '1280,1350p' logservice/eventstore/event_store.go | head -80

Repository: pingcap/ticdc

Length of output: 2554


🏁 Script executed:

# Find callers of writeEvents function
rg 'writeEvents\(' logservice/eventstore/event_store.go

Repository: pingcap/ticdc

Length of output: 178


🏁 Script executed:

# Check the function signature of the caller(s)
rg -B 10 'e\.writeEvents\(' logservice/eventstore/event_store.go | head -40

Repository: pingcap/ticdc

Length of output: 458


🏁 Script executed:

# Look at the public API that eventually calls writeEvents
sed -n '1100,1350p' logservice/eventstore/event_store.go | grep -A 20 'func.*eventStore'

Repository: pingcap/ticdc

Length of output: 2603


🏁 Script executed:

# Check if there are any context-accepting methods or patterns in the eventStore type
rg 'func \(e \*eventStore\).*ctx|context' logservice/eventstore/event_store.go | head -10

Repository: pingcap/ticdc

Length of output: 640


🏁 Script executed:

# Find where eventStoreIter is created and how Next is called
rg 'eventStoreIter|\.Next\(\)' logservice/eventstore/event_store.go | head -20

Repository: pingcap/ticdc

Length of output: 274


🏁 Script executed:

# Check the Iterator interface to see if context is part of standard pattern
rg -A 5 'type.*Iter.*interface' logservice/eventstore/event_store.go

Repository: pingcap/ticdc

Length of output: 313


🏁 Script executed:

# Look at the Iter function to see how eventStoreIter is instantiated
rg -B 5 -A 15 'func.*Iter.*context' logservice/eventstore/event_store.go

Repository: pingcap/ticdc

Length of output: 39


🏁 Script executed:

# Check the full Iter function signature to see if context is passed when creating the iterator
rg -B 5 -A 20 'func.*Iter\(' logservice/eventstore/event_store.go | head -50

Repository: pingcap/ticdc

Length of output: 39


🏁 Script executed:

# Check the eventStoreIter struct fields to see if context could be stored
rg -A 10 'type eventStoreIter struct' logservice/eventstore/event_store.go

Repository: pingcap/ticdc

Length of output: 384


🏁 Script executed:

# Find where eventStoreIter.Next() is called to understand usage patterns
rg -B 3 'iter\.Next\(\)|\.Next\(\)' logservice/eventstore/event_store.go | head -30

Repository: pingcap/ticdc

Length of output: 315


🏁 Script executed:

# Check if EventIterator interface is defined in this file or imported
rg -B 2 'type EventIterator interface' logservice/eventstore/

Repository: pingcap/ticdc

Length of output: 201


🏁 Script executed:

# Find all implementations of EventIterator interface
rg 'func.*Next\(\).*\*common.RawKVEntry' logservice/eventstore/event_store.go

Repository: pingcap/ticdc

Length of output: 120


🏁 Script executed:

# Check the full eventStoreIter struct to see all fields
rg -A 20 'type eventStoreIter struct' logservice/eventstore/event_store.go

Repository: pingcap/ticdc

Length of output: 631


🏁 Script executed:

# Look at the Iter function to understand how eventStoreIter is created
rg -B 5 -A 25 'func.*\(e \*eventStore\) Iter\(' logservice/eventstore/event_store.go

Repository: pingcap/ticdc

Length of output: 39


🏁 Script executed:

# Search for functions that return EventIterator or *eventStoreIter
rg -A 20 'func.*Iter\(\)|returns.*EventIterator' logservice/eventstore/event_store.go | head -60

Repository: pingcap/ticdc

Length of output: 39


🏁 Script executed:

# Look for the New or constructor function for eventStoreIter
rg 'eventStoreIter{' logservice/eventstore/event_store.go -B 5 -A 10

Repository: pingcap/ticdc

Length of output: 150


🏁 Script executed:

# Check who calls the iterator and what context might be available
rg 'for.*\.Next\(\)|while.*\.Next\(\)' logservice/eventstore/ -r

Repository: pingcap/ticdc

Length of output: 470


🏁 Script executed:

# Look more broadly at how the iterator is used in consuming code
rg -B 10 -A 5 'iter.*=.*Iter' logservice/eventstore/event_store.go | head -40

Repository: pingcap/ticdc

Length of output: 792


🏁 Script executed:

# Check if EventIterator interface is exported and might have external implementations
rg 'EventIterator' logservice/eventstore/ -r | grep -v 'eventStoreIter'

Repository: pingcap/ticdc

Length of output: 470


🏁 Script executed:

# Look at the complete Iter function that creates eventStoreIter
sed -n '1070,1150p' logservice/eventstore/event_store.go

Repository: pingcap/ticdc

Length of output: 2528


🏁 Script executed:

# Check the signature and context of writeTaskPool.run to verify context availability
rg -B 5 -A 30 'func.*writeTaskPool.*run' logservice/eventstore/event_store.go

Repository: pingcap/ticdc

Length of output: 1276


🏁 Script executed:

# Check if there are other context.Background() calls in this file
rg 'context\.Background\(\)' logservice/eventstore/event_store.go

Repository: pingcap/ticdc

Length of output: 269


🏁 Script executed:

# Verify the full writeTaskPool.run implementation to understand ctx usage
sed -n '800,950p' logservice/eventstore/event_store.go | grep -A 80 'func.*writeTaskPool.*run'

Repository: pingcap/ticdc

Length of output: 39


🏁 Script executed:

# Check where eventStoreIter is instantiated to see if we could pass context
rg -B 10 '&eventStoreIter\{' logservice/eventstore/event_store.go

Repository: pingcap/ticdc

Length of output: 379


🏁 Script executed:

# Find where the eventStoreIter is created and returned
rg -B 30 'return.*eventStoreIter' logservice/eventstore/event_store.go | head -50

Repository: pingcap/ticdc

Length of output: 1151


🏁 Script executed:

# Look for the function that contains the eventStoreIter creation
sed -n '1050,1110p' logservice/eventstore/event_store.go

Repository: pingcap/ticdc

Length of output: 1807


🏁 Script executed:

# Find callers of the function that returns eventStoreIter
rg 'Iter\(' logservice/eventstore/event_store.go | head -10

Repository: pingcap/ticdc

Length of output: 100


🏁 Script executed:

# Find the function that returns eventStoreIter and check its signature
rg -B 50 'return.*&eventStoreIter' logservice/eventstore/event_store.go | grep -E 'func.*\(.*\).*\{|func.*Iter' | head -3

Repository: pingcap/ticdc

Length of output: 39


🏁 Script executed:

# Get the full function signature
sed -n '1010,1115p' logservice/eventstore/event_store.go | head -100

Repository: pingcap/ticdc

Length of output: 2811


🏁 Script executed:

# Verify the EventIterator interface signature to understand immutability constraints
rg -A 8 'type EventIterator interface' logservice/eventstore/event_store.go

Repository: pingcap/ticdc

Length of output: 353


🏁 Script executed:

# Check if context is available where eventStoreIter is created - find the function
sed -n '1000,1100p' logservice/eventstore/event_store.go

Repository: pingcap/ticdc

Length of output: 2890


🏁 Script executed:

# Find who creates/calls the iterator in hot path code  
rg -B 10 'eventStore.*Iter\(' logservice/eventstore/event_store.go | head -30

Repository: pingcap/ticdc

Length of output: 39


🏁 Script executed:

# Verify the full writeTaskPool.run context flow
sed -n '800,880p' logservice/eventstore/event_store.go

Repository: pingcap/ticdc

Length of output: 3122


🏁 Script executed:

# Search for the Iter function signature more directly
rg 'func.*Iter.*\(' logservice/eventstore/event_store.go -A 2 | head -20

Repository: pingcap/ticdc

Length of output: 432


🏁 Script executed:

# Find GetIterator function which likely creates the iterator
rg -B 5 'func.*GetIterator\|func.*Iterator' logservice/eventstore/event_store.go | head -30

Repository: pingcap/ticdc

Length of output: 39


🏁 Script executed:

# Look for the function containing "return &eventStoreIter"
rg -B 80 'return &eventStoreIter' logservice/eventstore/event_store.go | tail -90

Repository: pingcap/ticdc

Length of output: 2686


Thread cancellable context through encryption calls on hot paths.

Line 1343 uses context.Background() in writeEvents(), which has access to the ctx parameter from the caller writeTaskPool.run(ctx) but doesn't thread it through. Replace context.Background() with a context parameter passed from the caller.

Line 1416 has similar issue in eventStoreIter.Next(), but this method implements the EventIterator interface with a fixed signature. Extending this to accept context would require refactoring the interface, though the underlying problem (bypassing cancellation during shutdown or dependency stalls) remains valid.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@logservice/eventstore/event_store.go` at line 1343, writeEvents is calling
e.encryptionManager.EncryptData with context.Background(); change writeEvents to
accept a context (e.g., func writeEvents(ctx context.Context, ...)) and
propagate the ctx from writeTaskPool.run(ctx) into the EncryptData call so
encryption uses the caller's cancellable context (replace context.Background()
with the passed ctx). For eventStoreIter.Next (which currently cannot change
signature due to the EventIterator interface), avoid using context.Background()
by either adding a context-carrying constructor or field on eventStoreIter
(store the caller ctx when the iterator is created) and use that stored ctx in
Next, or add a new context-aware NextContext(ctx context.Context) method and
have callers use it while you plan interface refactor; update all creation sites
of eventStoreIter to supply the caller context.

Comment on lines +1415 to +1421
if encryption.IsEncrypted(value) && iter.encryptionManager != nil {
decryptedValue, err := iter.encryptionManager.DecryptData(context.Background(), iter.keyspaceID, value)
if err != nil {
log.Panic("failed to decrypt value", zap.Error(err))
}
value = decryptedValue
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Fail fast when encrypted data is found but encryption manager is missing.

Current logic skips decryption when manager is nil and proceeds to decode encrypted bytes, which can panic later with misleading errors. Add an explicit guard and panic/error immediately with context (keyspace, table, subID if available).

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@logservice/eventstore/event_store.go` around lines 1415 - 1421, When
encryption.IsEncrypted(value) returns true but iter.encryptionManager is nil,
fail fast instead of continuing; add an explicit guard in the same block that
immediately logs/panics with contextual details (use iter.keyspaceID and any
table or subID fields on iter if available) and a clear message that encrypted
data was encountered with no encryption manager, so the code does not attempt to
decode encrypted bytes later and produce misleading errors.

Comment on lines +67 to +76
// TryGetService attempts to get a service by name.
// Returns the service and true if found, or zero value and false if not found.
func TryGetService[T any](name string) (T, bool) {
v, ok := GetGlobalContext().serviceMap.Load(name)
if !ok {
var zero T
return zero, false
}
t, ok := v.(T)
return t, ok
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Clarify TryGetService docs for type-mismatch behavior.

Current docs only mention “not found,” but this method also returns (zero, false) on type assertion failure. Please document that explicitly to avoid silent misuse.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/common/context/app_context.go` around lines 67 - 76, Update the
TryGetService comment to explicitly state that it returns the zero value and
false both when the named service is not present and when the stored value
cannot be asserted to the requested type T; reference the TryGetService function
and the fact it loads from GetGlobalContext().serviceMap and performs a type
assertion, so callers understand type-mismatch will also yield (zero, false).

Comment on lines +197 to +204
// NewDefaultEncryptionConfig returns the default encryption configuration
func NewDefaultEncryptionConfig() *EncryptionConfig {
return &EncryptionConfig{
EnableEncryption: false,
MetaRefreshInterval: TomlDuration(1 * time.Hour),
MetaCacheTTL: TomlDuration(1 * time.Hour),
AllowDegradeOnError: true,
KMS: &KMSConfig{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Defaulting AllowDegradeOnError to true makes encryption fail-open.

With encryption enabled, transient KMS/metadata failures can silently fall back to plaintext writes. A fail-closed default is safer for compliance-sensitive deployments.

🔧 Proposed fix
-		AllowDegradeOnError: true,
+		AllowDegradeOnError: false,
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/config/debug.go` around lines 197 - 204, The default AllowDegradeOnError
on EncryptionConfig in NewDefaultEncryptionConfig currently returns true (which
enables fail-open behavior); change the default to false so encryption fails
closed by default, i.e., set AllowDegradeOnError: false in
NewDefaultEncryptionConfig, and update any related comments/tests that assume
the old default to reflect the new safer behavior for EncryptionConfig and
KMSConfig.

Comment on lines +101 to +106
func GenerateIV(size int) ([]byte, error) {
iv := make([]byte, size)
if _, err := rand.Read(iv); err != nil {
return nil, cerrors.ErrEncryptionFailed.Wrap(err)
}
return iv, nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Guard against invalid IV sizes in GenerateIV.

GenerateIV(0) currently returns an empty IV without error. Failing fast here avoids propagating invalid IVs.

🔧 Proposed fix
 func GenerateIV(size int) ([]byte, error) {
+	if size <= 0 {
+		return nil, cerrors.ErrEncryptionFailed.GenWithStackByArgs("IV size must be positive")
+	}
 	iv := make([]byte, size)
 	if _, err := rand.Read(iv); err != nil {
 		return nil, cerrors.ErrEncryptionFailed.Wrap(err)
 	}
 	return iv, nil
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/encryption/cipher.go` around lines 101 - 106, The GenerateIV function
currently accepts size==0 and returns an empty IV; add an early validation at
the start of GenerateIV to fail fast when size <= 0 (and optionally when size is
unreasonably large) and return a wrapped error (e.g., using your existing
cerrors package) describing the invalid IV size, then only allocate and call
rand.Read when size is valid; reference the GenerateIV function and reuse the
existing cerrors pattern for consistency.

Comment on lines +27 to +31
// KMSClient is the interface for Key Management Service operations
type KMSClient interface {
// DecryptMasterKey decrypts the master key using the specified KMS parameters
DecryptMasterKey(ctx context.Context, ciphertext []byte, keyID string, vendor string, region string, endpoint string) ([]byte, error)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

# Check if KMSClient interface is defined elsewhere
rg -n "type KMSClient interface" --type=go

Repository: pingcap/ticdc

Length of output: 120


🏁 Script executed:

# Check all references to KMSClient to understand usage pattern
rg -n "KMSClient" --type=go -B2 -A2

Repository: pingcap/ticdc

Length of output: 7087


🏁 Script executed:

# Check the structure of the kms package
fd -t f . pkg/encryption/kms/ | sort

Repository: pingcap/ticdc

Length of output: 213


🏁 Script executed:

# Check if aws_kms.go or gcp_kms.go implement KMSClient (look for DecryptMasterKey method)
rg -n "func.*DecryptMasterKey" --type=go pkg/encryption/kms/

Repository: pingcap/ticdc

Length of output: 811


🏁 Script executed:

# Check the client.go structure to see if internal client implements KMSClient
head -100 pkg/encryption/kms/client.go

Repository: pingcap/ticdc

Length of output: 2441


Move the KMSClient interface to a separate file.

The KMSClient interface is defined in mock_client.go but serves as the contract for multiple implementations, including the production client type in client.go and test implementations. The interface definition should be extracted to a dedicated file (e.g., types.go or interface.go) to properly separate the interface contract from the mock implementation.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/encryption/kms/mock_client.go` around lines 27 - 31, Extract the
KMSClient interface into a new dedicated file (e.g., interface.go or types.go)
and remove its definition from mock_client.go; specifically create the file
containing: "type KMSClient interface { DecryptMasterKey(ctx context.Context,
ciphertext []byte, keyID string, vendor string, region string, endpoint string)
([]byte, error) }", then update any implementations that referenced the in-file
definition (e.g., the mock type in mock_client.go and the production client type
in client.go) to use the moved KMSClient symbol from the new file (no package
change required), and run a quick build to ensure no missing imports or
references remain.

Comment on lines +279 to +284
log.Debug("using cached encryption meta",
zap.Uint32("keyspaceID", keyspaceID),
zap.Uint32("metaKeyspaceID", meta.KeyspaceId),
zap.Uint32("currentDataKeyID", meta.Current.DataKeyId),
zap.Uint8("version", byte(meta.Current.DataKeyId&0xFF)),
zap.Int("dataKeyCount", len(meta.DataKeys)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Guard meta.Current before logging derived fields.

Both logging blocks dereference meta.Current.DataKeyId directly. If meta.Current is nil, this path panics.

🔧 Suggested fix
- log.Debug("using cached encryption meta",
- 	zap.Uint32("keyspaceID", keyspaceID),
- 	zap.Uint32("metaKeyspaceID", meta.KeyspaceId),
- 	zap.Uint32("currentDataKeyID", meta.Current.DataKeyId),
- 	zap.Uint8("version", byte(meta.Current.DataKeyId&0xFF)),
- 	zap.Int("dataKeyCount", len(meta.DataKeys)))
+ currentDataKeyID := uint32(0)
+ version := byte(0)
+ if meta.Current != nil {
+ 	currentDataKeyID = meta.Current.DataKeyId
+ 	version = byte(meta.Current.DataKeyId & 0xFF)
+ }
+ log.Debug("using cached encryption meta",
+ 	zap.Uint32("keyspaceID", keyspaceID),
+ 	zap.Uint32("metaKeyspaceID", meta.KeyspaceId),
+ 	zap.Uint32("currentDataKeyID", currentDataKeyID),
+ 	zap.Uint8("version", version),
+ 	zap.Int("dataKeyCount", len(meta.DataKeys)))

Also applies to: 326-334

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/encryption/manager.go` around lines 279 - 284, The logging calls access
meta.Current.DataKeyId and use meta.Current without checking for nil, causing a
panic if meta.Current is nil; update the code around the log.Debug calls (the
lines referencing meta.Current.DataKeyId, byte(meta.Current.DataKeyId&0xFF), and
len(meta.DataKeys)) to first guard that meta.Current != nil and only include
fields derived from meta.Current when non-nil (or substitute safe default
values), and apply the same nil-check fix to the other logging block that also
references meta.Current (the block around the 326-334 region).

Comment on lines +119 to +120
body, err := io.ReadAll(resp.Body)
if err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Bound the HTTP response body before reading it.

io.ReadAll(resp.Body) is unbounded here. A malformed or oversized response can spike memory and destabilize the process.

🔧 Suggested fix
+const maxEncryptionMetaResponseSize = 4 << 20 // 4 MiB
+
- body, err := io.ReadAll(resp.Body)
+ body, err := io.ReadAll(io.LimitReader(resp.Body, maxEncryptionMetaResponseSize+1))
  if err != nil {
      ...
  }
+ if len(body) > maxEncryptionMetaResponseSize {
+     return nil, cerrors.ErrDecodeFailed.GenWithStackByArgs("encryption meta response too large")
+ }

Also applies to: 148-149

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/encryption/tikv_http_client.go` around lines 119 - 120, The HTTP response
body reads use io.ReadAll(resp.Body) without bounds; replace those calls with a
bounded read using io.ReadAll(io.LimitReader(resp.Body, maxResponseBodySize))
and introduce a package-level constant (e.g., maxResponseBodySize = 10<<20 /*
10MB */) so oversized responses cannot blow memory; apply this change to both
occurrences where resp.Body is read (the io.ReadAll(resp.Body) calls) and add an
error/log path if the payload hits the limit or the read fails.

Comment on lines +191 to +198
if meta.KeyspaceId != 0 && meta.KeyspaceId != keyspaceID {
log.Warn("encryption meta keyspace ID mismatch",
zap.Uint64("storeID", storeID),
zap.String("statusAddr", statusAddr),
zap.Uint32("requestedKeyspaceID", keyspaceID),
zap.Uint32("metaKeyspaceID", meta.KeyspaceId),
zap.String("url", storeURL))
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Do not accept metadata from the wrong keyspace.

On keyspace mismatch, the function only logs and still returns meta. This can select wrong keys and corrupt encrypt/decrypt behavior. Treat mismatch as an error and continue probing other stores.

🔧 Suggested fix
if meta.KeyspaceId != 0 && meta.KeyspaceId != keyspaceID {
	log.Warn("encryption meta keyspace ID mismatch",
		zap.Uint64("storeID", storeID),
		zap.String("statusAddr", statusAddr),
		zap.Uint32("requestedKeyspaceID", keyspaceID),
		zap.Uint32("metaKeyspaceID", meta.KeyspaceId),
		zap.String("url", storeURL))
+	return nil, cerrors.ErrDecodeFailed.GenWithStackByArgs("keyspace ID mismatch")
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/encryption/tikv_http_client.go` around lines 191 - 198, The code
currently logs when meta.KeyspaceId != 0 && meta.KeyspaceId != keyspaceID but
still returns meta; change this to treat the mismatch as an error and not accept
the metadata: when you detect meta.KeyspaceId != 0 && meta.KeyspaceId !=
keyspaceID (in the function that currently returns meta), log the mismatch and
return a non-nil error (or nil meta) so the caller can continue probing other
stores instead of using this meta; ensure the returned error message includes
the request keyspaceID and meta.KeyspaceId and references
storeID/statusAddr/storeURL to aid debugging, and update callers to handle the
nil/meta+error path accordingly.

Comment on lines +294 to +301
if closeable, ok := kmsClient.(common.Closeable); ok {
c.preServices = append(c.preServices, closeable)
}

metaManager := encryption.NewEncryptionMetaManager(tikvClient, kmsClient)
if err := metaManager.Start(ctx); err != nil {
return errors.Trace(err)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Missing rollback for partially initialized encryption pre-services.

If metaManager.Start fails on Line 299, any previously started/registered closeable (e.g., KMS client from Line 294) is left running because setPreServices returns early. This can leak background resources on startup failure.

Proposed fix
 		kmsClient, err := kms.NewClient(conf.Debug.Encryption)
 		if err != nil {
 			return errors.Trace(err)
 		}
+		var kmsCloseable common.Closeable
 		if closeable, ok := kmsClient.(common.Closeable); ok {
-			c.preServices = append(c.preServices, closeable)
+			kmsCloseable = closeable
+			c.preServices = append(c.preServices, closeable)
 		}
 
 		metaManager := encryption.NewEncryptionMetaManager(tikvClient, kmsClient)
 		if err := metaManager.Start(ctx); err != nil {
+			if kmsCloseable != nil {
+				kmsCloseable.Close()
+			}
 			return errors.Trace(err)
 		}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if closeable, ok := kmsClient.(common.Closeable); ok {
c.preServices = append(c.preServices, closeable)
}
metaManager := encryption.NewEncryptionMetaManager(tikvClient, kmsClient)
if err := metaManager.Start(ctx); err != nil {
return errors.Trace(err)
}
var kmsCloseable common.Closeable
if closeable, ok := kmsClient.(common.Closeable); ok {
kmsCloseable = closeable
c.preServices = append(c.preServices, closeable)
}
metaManager := encryption.NewEncryptionMetaManager(tikvClient, kmsClient)
if err := metaManager.Start(ctx); err != nil {
if kmsCloseable != nil {
kmsCloseable.Close()
}
return errors.Trace(err)
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@server/server.go` around lines 294 - 301, When metaManager.Start fails, the
previously registered closeable (kmsClient cast to common.Closeable and appended
to c.preServices) is left running; modify the startup code around the
kmsClient/common.Closeable append and
encryption.NewEncryptionMetaManager/metaManager.Start so that if
metaManager.Start returns an error you immediately call Close() on that
closeable and remove it from c.preServices (or avoid appending until Start
succeeds), ensuring no background resources leak; locate the kmsClient
type-assertion and the c.preServices append as well as metaManager.Start to
implement the cleanup (call closeable.Close() and splice c.preServices to remove
the entry) on error.

- Change API endpoint from `/encryption_meta` to `/encryption/get-meta`
- Update both URL construction in client and test handlers
- Maintain same query parameter structure for keyspace ID

Signed-off-by: tenfyzhong <tenfy@tenfy.cn>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

do-not-merge/work-in-progress Indicates that a PR should not merge because it is a work in progress. release-note Denotes a PR that will be considered when it comes time to generate release notes. size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Implement CMEK Encryption Support for TiCDC Next-Gen Architecture

1 participant