Skip to content
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
4f33fca
feat(cmek): add CMEK encryption at rest support
tenfyzhong Jan 8, 2026
9e9a080
feat(cmek): add comprehensive test coverage for encryption package
tenfyzhong Jan 8, 2026
8fa1d15
refactor(cmek): improve encryption handling and add TryGetService method
tenfyzhong Jan 8, 2026
4bfb949
feat(cmek): propagate keyspace ID for encryption
tenfyzhong Jan 11, 2026
ab78579
test(cmek): add unit tests for encryption manager
tenfyzhong Jan 12, 2026
4e0a7ce
feat(cmek): improve cache management and cleanup code
tenfyzhong Jan 12, 2026
b9a339b
feat(cmek): Add TiKV HTTP client and enhance encryption support
tenfyzhong Jan 26, 2026
0cb69fb
feat(cmek): add KMS client support for AWS and GCP
tenfyzhong Jan 27, 2026
2957732
feat(cmek): align encryption types with kvproto and simplify cipher s…
tenfyzhong Jan 28, 2026
e6e1178
feat(cmek): refactor factory function type declarations
tenfyzhong Jan 28, 2026
92393fb
chore(deps): Update cloud and AWS SDK dependencies
tenfyzhong Jan 28, 2026
03c525e
Merge branch 'master' into feat-cmek
tenfyzhong Jan 28, 2026
ea30040
feat(cmek): Use IVSize method for IV validation in AES256CTRCipher
tenfyzhong Jan 29, 2026
3de6500
feat(cmek): combine key ID and version retrieval in GetCurrentDataKey
tenfyzhong Jan 29, 2026
feaa54c
feat(cmek): enhance error logging for encryption operations
tenfyzhong Jan 29, 2026
ddc57f3
Merge branch 'master' into feat-cmek
tenfyzhong Feb 9, 2026
a662a9d
feat(encryption): add support for encrypted DDL history and AES key s…
tenfyzhong Mar 2, 2026
2a00996
feat(integration): add CMEK keyspace test and manual keyspace creatio…
tenfyzhong Mar 2, 2026
a52b11c
Merge remote-tracking branch 'origin/master' into feat-cmek
tenfyzhong Mar 2, 2026
dc680e1
feat(eventstore): add encryption support to event store
tenfyzhong Mar 2, 2026
e1c5956
feat(tests): Improve CMEK keyspace integration test script
tenfyzhong Mar 2, 2026
a17d119
fix(encryption): Update TiKV HTTP client endpoint for encryption meta
tenfyzhong Mar 5, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 61 additions & 0 deletions logservice/eventstore/event_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/pingcap/ticdc/pkg/common"
appcontext "github.com/pingcap/ticdc/pkg/common/context"
"github.com/pingcap/ticdc/pkg/config"
"github.com/pingcap/ticdc/pkg/encryption"
"github.com/pingcap/ticdc/pkg/messaging"
"github.com/pingcap/ticdc/pkg/metrics"
"github.com/pingcap/ticdc/pkg/node"
Expand Down Expand Up @@ -114,6 +115,8 @@ type dispatcherStat struct {
resolvedTs atomic.Uint64
// the max ts of events which is not needed by this dispatcher
checkpointTs uint64
// keyspaceID for encryption (0 means default/classic)
keyspaceID uint32
// the difference between `subStat`, `pendingSubStat` and `removingSubStat`:
// 1) if there is no existing subscriptions which can be reused,
// or there is a existing subscription with exact span match,
Expand Down Expand Up @@ -232,6 +235,9 @@ type eventStore struct {

// compressionThreshold is the size in bytes above which a value will be compressed.
compressionThreshold int

// encryptionManager for encrypting/decrypting data (optional)
encryptionManager encryption.EncryptionManager
}

const (
Expand All @@ -252,6 +258,17 @@ func New(
log.Panic("fail to remove path", zap.String("path", dbPath), zap.Error(err))
}

// Try to get encryption manager from appcontext (optional)
var encMgr encryption.EncryptionManager
// Use GetService with a type assertion that won't panic if not found
defer func() {
if r := recover(); r != nil {
// EncryptionManager not registered, use nil
encMgr = nil
}
}()
encMgr = appcontext.GetService[encryption.EncryptionManager]("EncryptionManager")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Using defer and recover for control flow to handle an optional dependency is not idiomatic Go. It can obscure the program's control flow and is generally reserved for handling unexpected panics. A better approach would be to have a TryGetService function in appcontext that returns a boolean indicating whether the service was found, for example: encMgr, ok := appcontext.TryGetService[...](...). This would make the code clearer and more robust.


store := &eventStore{
pdClock: appcontext.GetService[pdutil.Clock](appcontext.DefaultPDClock),
subClient: subClient,
Expand All @@ -273,6 +290,7 @@ func New(
},
},
compressionThreshold: config.GetGlobalServerConfig().Debug.EventStore.CompressionThreshold,
encryptionManager: encMgr,
}
store.gcManager = newGCManager(store.dbs, deleteDataRange, compactDataRange)

Expand Down Expand Up @@ -1224,6 +1242,23 @@ func (e *eventStore) writeEvents(db *pebble.DB, events []eventWithCallback, enco
metrics.EventStoreCompressedRowsCount.Inc()
}

// Encrypt if encryption is enabled (after compression)
if e.encryptionManager != nil {
// TODO: Get keyspaceID from dispatcher/subscription metadata
// For now, use default keyspaceID (0) for classic mode
keyspaceID := uint32(0)
encryptedValue, err := e.encryptionManager.EncryptData(context.Background(), keyspaceID, value)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Using context.Background() here means this encryption operation will not be cancelled if the parent context (from the write task pool) is cancelled. It's better to pass the context from writeTaskPool.run down to writeEvents and use it here. This ensures that long-running operations can be properly cancelled.

if err != nil {
log.Warn("encrypt event value failed, using unencrypted value",
zap.Uint64("subID", uint64(event.subID)),
zap.Int64("tableID", event.tableID),
zap.Error(err))
// Continue with unencrypted value (graceful degradation)
} else {
value = encryptedValue
}
}

key := EncodeKey(uint64(event.subID), event.tableID, &kv, compressionType)
if err := batch.Set(key, value, pebble.NoSync); err != nil {
log.Panic("failed to update pebble batch", zap.Error(err))
Expand Down Expand Up @@ -1267,6 +1302,32 @@ func (iter *eventStoreIter) Next() (*common.RawKVEntry, bool) {
key := iter.innerIter.Key()
value := iter.innerIter.Value()

// Decrypt if encryption is enabled (before decompression)
// Note: eventStoreIter doesn't have direct access to encryptionManager,
// so we need to get it from appcontext or pass it through
// This is a simplified implementation - in production, we should store encryptionManager in eventStoreIter
if encryption.IsEncrypted(value) {
// Try to get encryptionManager from appcontext
var encMgr encryption.EncryptionManager
defer func() {
if r := recover(); r != nil {
// EncryptionManager not registered, skip decryption
encMgr = nil
}
}()
encMgr = appcontext.GetService[encryption.EncryptionManager]("EncryptionManager")
if encMgr != nil {
// TODO: Get keyspaceID from dispatcher/subscription metadata
// For now, use default keyspaceID (0)
keyspaceID := uint32(0)
decryptedValue, err := encMgr.DecryptData(context.Background(), keyspaceID, value)
if err != nil {
log.Panic("failed to decrypt value", zap.Error(err))
}
value = decryptedValue
}
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

As the comment here suggests, fetching the encryptionManager from the global appcontext on every Next() call is inefficient and not idiomatic. It also uses a defer/recover pattern which is best avoided for control flow.

The encryptionManager should be passed to eventStoreIter upon its creation and stored as a field.

  1. Add encryptionManager encryption.EncryptionManager to the eventStoreIter struct.
  2. In GetIterator, initialize this new field from e.encryptionManager.
  3. Then, simplify this Next method to use iter.encryptionManager directly.

This would improve performance and make the code cleaner and more maintainable.

                if iter.encryptionManager != nil && encryption.IsEncrypted(value) {
			// TODO: Get keyspaceID from dispatcher/subscription metadata
			// For now, use default keyspaceID (0)
			keyspaceID := uint32(0)
			decryptedValue, err := iter.encryptionManager.DecryptData(context.Background(), keyspaceID, value)
			if err != nil {
				log.Panic("failed to decrypt value", zap.Error(err))
			}
			value = decryptedValue
		}


_, compressionType := DecodeKeyMetas(key)
var decodedValue []byte
if compressionType == CompressionZSTD {
Expand Down
91 changes: 90 additions & 1 deletion logservice/schemastore/disk_format.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package schemastore

import (
"bytes"
"context"
"encoding/binary"
"encoding/json"
"math"
Expand All @@ -25,6 +26,8 @@ import (
"github.com/pingcap/log"
"github.com/pingcap/ticdc/pkg/common"
commonEvent "github.com/pingcap/ticdc/pkg/common/event"
"github.com/pingcap/ticdc/pkg/encryption"
"github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/filter"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/meta/model"
Expand Down Expand Up @@ -163,6 +166,11 @@ func writeUpperBoundMeta(db *pebble.DB, upperBound UpperBoundMeta) {
}

func loadDatabasesInKVSnap(snap *pebble.Snapshot, gcTs uint64) (map[int64]*BasicDatabaseInfo, error) {
return loadDatabasesInKVSnapWithEncryption(snap, gcTs, nil, 0)
}

// loadDatabasesInKVSnapWithEncryption decrypts and loads databases from snapshot if encryption is enabled
func loadDatabasesInKVSnapWithEncryption(snap *pebble.Snapshot, gcTs uint64, encMgr encryption.EncryptionManager, keyspaceID uint32) (map[int64]*BasicDatabaseInfo, error) {
databaseMap := make(map[int64]*BasicDatabaseInfo)

startKey, err := schemaInfoKey(gcTs, 0)
Expand All @@ -182,8 +190,19 @@ func loadDatabasesInKVSnap(snap *pebble.Snapshot, gcTs uint64) (map[int64]*Basic
}
defer snapIter.Close()
for snapIter.First(); snapIter.Valid(); snapIter.Next() {
value := snapIter.Value()

// Decrypt if encryption is enabled
if encMgr != nil {
decryptedValue, err := encMgr.DecryptData(context.Background(), keyspaceID, value)
if err != nil {
log.Fatal("decrypt db info failed", zap.Error(err))
}
Comment on lines +198 to +200

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Using log.Fatal can lead to abrupt program termination without proper cleanup or stack unwinding. It's generally better to return an error from this function and let the caller decide on the appropriate action. If the error is considered truly unrecoverable at this level, panic(err) would be a better choice as it allows for a top-level recovery mechanism to perform a more graceful shutdown. This applies to other log.Fatal calls in this file as well.

Suggested change
if err != nil {
log.Fatal("decrypt db info failed", zap.Error(err))
}
if err != nil {
return nil, errors.Trace(err)
}

Comment on lines +197 to +200

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

log.Fatal is called on a decryption error. This will terminate the entire TiCDC process abruptly, which is too drastic and prevents any higher-level error handling or recovery. Since loadDatabasesInKVSnapWithEncryption can return an error, it's much better to propagate this error up the call stack for more graceful handling.

This recommendation applies to all new usages of log.Fatal in this file where an error can be returned instead (e.g., lines 373, 528, 612, 661, 879).

Suggested change
decryptedValue, err := encMgr.DecryptData(context.Background(), keyspaceID, value)
if err != nil {
log.Fatal("decrypt db info failed", zap.Error(err))
}
decryptedValue, err := encMgr.DecryptData(context.Background(), keyspaceID, value)
if err != nil {
return nil, errors.Trace(err)
}

value = decryptedValue
}

var dbInfo model.DBInfo
if err := json.Unmarshal(snapIter.Value(), &dbInfo); err != nil {
if err := json.Unmarshal(value, &dbInfo); err != nil {
log.Fatal("unmarshal db info failed", zap.Error(err))
}

Expand Down Expand Up @@ -472,6 +491,11 @@ func unmarshalPersistedDDLEvent(value []byte) PersistedDDLEvent {
}

func readPersistedDDLEvent(snap *pebble.Snapshot, version uint64) PersistedDDLEvent {
return readPersistedDDLEventWithEncryption(snap, version, nil, 0)
}

// readPersistedDDLEventWithEncryption reads and decrypts DDL event if encryption is enabled
func readPersistedDDLEventWithEncryption(snap *pebble.Snapshot, version uint64, encMgr encryption.EncryptionManager, keyspaceID uint32) PersistedDDLEvent {
ddlKey, err := ddlJobKey(version)
if err != nil {
log.Fatal("generate ddl job key failed", zap.Error(err))
Expand All @@ -483,10 +507,28 @@ func readPersistedDDLEvent(snap *pebble.Snapshot, version uint64) PersistedDDLEv
zap.Error(err))
}
defer closer.Close()

// Decrypt if encryption is enabled
if encMgr != nil {
decryptedValue, err := encMgr.DecryptData(context.Background(), keyspaceID, ddlValue)
if err != nil {
log.Fatal("decrypt ddl event failed",
zap.Uint64("version", version),
zap.Uint32("keyspaceID", keyspaceID),
zap.Error(err))
}
ddlValue = decryptedValue
}

return unmarshalPersistedDDLEvent(ddlValue)
}

func writePersistedDDLEvent(db *pebble.DB, ddlEvent *PersistedDDLEvent) error {
return writePersistedDDLEventWithEncryption(db, ddlEvent, nil, 0)
}

// writePersistedDDLEventWithEncryption encrypts and writes DDL event if encryption is enabled
func writePersistedDDLEventWithEncryption(db *pebble.DB, ddlEvent *PersistedDDLEvent, encMgr encryption.EncryptionManager, keyspaceID uint32) error {
batch := db.NewBatch()
ddlKey, err := ddlJobKey(ddlEvent.FinishedTs)
if err != nil {
Expand Down Expand Up @@ -515,6 +557,16 @@ func writePersistedDDLEvent(db *pebble.DB, ddlEvent *PersistedDDLEvent) error {
if err != nil {
return err
}

// Encrypt if encryption is enabled
if encMgr != nil {
encryptedValue, err := encMgr.EncryptData(context.Background(), keyspaceID, ddlValue)
if err != nil {
return errors.Trace(err)
}
ddlValue = encryptedValue
}

batch.Set(ddlKey, ddlValue, pebble.NoSync)
return batch.Commit(pebble.NoSync)
}
Expand All @@ -526,6 +578,11 @@ func isTableRawKey(key []byte) bool {
}

func addSchemaInfoToBatch(batch *pebble.Batch, ts uint64, info *model.DBInfo) {
addSchemaInfoToBatchWithEncryption(batch, ts, info, nil, 0)
}

// addSchemaInfoToBatchWithEncryption encrypts and adds schema info to batch if encryption is enabled
func addSchemaInfoToBatchWithEncryption(batch *pebble.Batch, ts uint64, info *model.DBInfo, encMgr encryption.EncryptionManager, keyspaceID uint32) {
schemaKey, err := schemaInfoKey(ts, info.ID)
if err != nil {
log.Fatal("generate schema key failed", zap.Error(err))
Expand All @@ -534,6 +591,16 @@ func addSchemaInfoToBatch(batch *pebble.Batch, ts uint64, info *model.DBInfo) {
if err != nil {
log.Fatal("marshal schema info failed", zap.Error(err))
}

// Encrypt if encryption is enabled
if encMgr != nil {
encryptedValue, err := encMgr.EncryptData(context.Background(), keyspaceID, schemaValue)
if err != nil {
log.Fatal("encrypt schema info failed", zap.Error(err))
}
schemaValue = encryptedValue
}

batch.Set(schemaKey, schemaValue, pebble.NoSync)
}

Expand All @@ -542,6 +609,18 @@ func addTableInfoToBatch(
ts uint64,
dbInfo *model.DBInfo,
tableInfoValue []byte,
) (int64, string, []int64) {
return addTableInfoToBatchWithEncryption(batch, ts, dbInfo, tableInfoValue, nil, 0)
}

// addTableInfoToBatchWithEncryption encrypts and adds table info to batch if encryption is enabled
func addTableInfoToBatchWithEncryption(
batch *pebble.Batch,
ts uint64,
dbInfo *model.DBInfo,
tableInfoValue []byte,
encMgr encryption.EncryptionManager,
keyspaceID uint32,
) (int64, string, []int64) {
tableInfo := model.TableInfo{}
if err := json.Unmarshal(tableInfoValue, &tableInfo); err != nil {
Expand All @@ -561,6 +640,16 @@ func addTableInfoToBatch(
if err != nil {
log.Fatal("marshal table info entry failed", zap.Error(err))
}

// Encrypt if encryption is enabled
if encMgr != nil {
encryptedValue, err := encMgr.EncryptData(context.Background(), keyspaceID, tableInfoEntryValue)
if err != nil {
log.Fatal("encrypt table info entry failed", zap.Error(err))
}
tableInfoEntryValue = encryptedValue
}

batch.Set(tableKey, tableInfoEntryValue, pebble.NoSync)

// write partition info to batch if the table is a partition table
Expand Down
26 changes: 23 additions & 3 deletions logservice/schemastore/persist_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/ticdc/pkg/common"
appcontext "github.com/pingcap/ticdc/pkg/common/context"
commonEvent "github.com/pingcap/ticdc/pkg/common/event"
"github.com/pingcap/ticdc/pkg/config"
"github.com/pingcap/ticdc/pkg/encryption"
"github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/filter"
"github.com/pingcap/ticdc/pkg/txnutil/gc"
Expand Down Expand Up @@ -99,6 +101,9 @@ type persistentStorage struct {

// tableID -> total registered count
tableRegisteredCount map[int64]int

// encryptionManager for encrypting/decrypting data (optional)
encryptionManager encryption.EncryptionManager
}

func exists(path string) bool {
Expand Down Expand Up @@ -143,6 +148,17 @@ func newPersistentStorage(
pdCli pd.Client,
storage kv.Storage,
) (*persistentStorage, error) {
// Try to get encryption manager from appcontext (optional)
var encMgr encryption.EncryptionManager
// Use GetService with a type assertion that won't panic if not found
defer func() {
if r := recover(); r != nil {
// EncryptionManager not registered, use nil
encMgr = nil
}
}()
encMgr = appcontext.GetService[encryption.EncryptionManager]("EncryptionManager")

dataStorage := &persistentStorage{
rootDir: root,
keyspaceID: keyspaceID,
Expand All @@ -155,6 +171,7 @@ func newPersistentStorage(
tableTriggerDDLHistory: make([]uint64, 0),
tableInfoStoreMap: make(map[int64]*versionedTableInfoStore),
tableRegisteredCount: make(map[int64]int),
encryptionManager: encMgr,
}
dataStorage.ctx, dataStorage.cancel = context.WithCancel(ctx)
err := dataStorage.initialize(ctx)
Expand Down Expand Up @@ -458,7 +475,7 @@ func (p *persistentStorage) fetchTableDDLEvents(dispatcherID common.DispatcherID
// TODO: if the first event is a create table ddl, return error?
events := make([]commonEvent.DDLEvent, 0, len(allTargetTs))
for _, ts := range allTargetTs {
rawEvent := readPersistedDDLEvent(storageSnap, ts)
rawEvent := readPersistedDDLEventWithEncryption(storageSnap, ts, p.encryptionManager, p.keyspaceID)
ddlEvent, ok, err := buildDDLEvent(&rawEvent, tableFilter, tableID)
if err != nil {
return nil, errors.Trace(err)
Expand Down Expand Up @@ -528,7 +545,7 @@ func (p *persistentStorage) fetchTableTriggerDDLEvents(tableFilter filter.Filter
}
p.mu.RUnlock()
for _, ts := range allTargetTs {
rawEvent := readPersistedDDLEvent(storageSnap, ts)
rawEvent := readPersistedDDLEventWithEncryption(storageSnap, ts, p.encryptionManager, p.keyspaceID)
// the tableID of buildDDLEvent is not used in this function, set it to 0
ddlEvent, ok, err := buildDDLEvent(&rawEvent, tableFilter, 0)
if err != nil {
Expand Down Expand Up @@ -761,7 +778,10 @@ func (p *persistentStorage) handleDDLJob(job *model.Job) error {

// Note: need write ddl event to disk before update ddl history,
// because other goroutines may read ddl events from disk according to ddl history
writePersistedDDLEvent(p.db, &ddlEvent)
err := writePersistedDDLEventWithEncryption(p.db, &ddlEvent, p.encryptionManager, p.keyspaceID)
if err != nil {
return errors.Trace(err)
}

p.mu.Lock()
defer p.mu.Unlock()
Expand Down
Loading
Loading