-
Notifications
You must be signed in to change notification settings - Fork 41
CMEK: Add encryption support for event and schema store data at rest #3955
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 21 commits
4f33fca
9e9a080
8fa1d15
4bfb949
ab78579
4e0a7ce
b9a339b
0cb69fb
2957732
e6e1178
92393fb
03c525e
ea30040
3de6500
feaa54c
ddc57f3
a662a9d
2a00996
a52b11c
dc680e1
e1c5956
a17d119
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -34,6 +34,7 @@ import ( | |
| "github.com/pingcap/ticdc/pkg/common" | ||
| appcontext "github.com/pingcap/ticdc/pkg/common/context" | ||
| "github.com/pingcap/ticdc/pkg/config" | ||
| "github.com/pingcap/ticdc/pkg/encryption" | ||
| "github.com/pingcap/ticdc/pkg/messaging" | ||
| "github.com/pingcap/ticdc/pkg/metrics" | ||
| "github.com/pingcap/ticdc/pkg/node" | ||
|
|
@@ -116,6 +117,8 @@ type dispatcherStat struct { | |
| resolvedTs atomic.Uint64 | ||
| // the max ts of events which is not needed by this dispatcher | ||
| checkpointTs uint64 | ||
| // keyspaceID for encryption (0 means default/classic) | ||
| keyspaceID uint32 | ||
| // the difference between `subStat`, `pendingSubStat` and `removingSubStat`: | ||
| // 1) if there is no existing subscriptions which can be reused, | ||
| // or there is a existing subscription with exact span match, | ||
|
|
@@ -182,9 +185,10 @@ type subscriptionStat struct { | |
| type subscriptionStats map[logpuller.SubscriptionID]*subscriptionStat | ||
|
|
||
| type eventWithCallback struct { | ||
| subID logpuller.SubscriptionID | ||
| tableID int64 | ||
| kvs []common.RawKVEntry | ||
| subID logpuller.SubscriptionID | ||
| tableID int64 | ||
| keyspaceID uint32 | ||
| kvs []common.RawKVEntry | ||
| // kv with commitTs <= currentResolvedTs will be filtered out | ||
| currentResolvedTs uint64 | ||
| enqueueTimeNano int64 | ||
|
|
@@ -238,6 +242,8 @@ type eventStore struct { | |
| compressionThreshold int | ||
| // enableZstdCompression controls whether to enable zstd compression for large values. | ||
| enableZstdCompression bool | ||
| // encryptionManager for encrypting/decrypting data (optional). | ||
| encryptionManager encryption.EncryptionManager | ||
| } | ||
|
|
||
| const ( | ||
|
|
@@ -258,6 +264,9 @@ func New( | |
| log.Panic("fail to remove path", zap.String("path", dbPath), zap.Error(err)) | ||
| } | ||
|
|
||
| // Try to get encryption manager from appcontext (optional) | ||
| encMgr, _ := appcontext.TryGetService[encryption.EncryptionManager]("EncryptionManager") | ||
|
|
||
| store := &eventStore{ | ||
| pdClock: appcontext.GetService[pdutil.Clock](appcontext.DefaultPDClock), | ||
| subClient: subClient, | ||
|
|
@@ -280,6 +289,7 @@ func New( | |
| }, | ||
| compressionThreshold: config.GetGlobalServerConfig().Debug.EventStore.CompressionThreshold, | ||
| enableZstdCompression: config.GetGlobalServerConfig().Debug.EventStore.EnableZstdCompression, | ||
| encryptionManager: encMgr, | ||
| } | ||
| store.gcManager = newGCManager(store.dbs, deleteDataRange, compactDataRange) | ||
|
|
||
|
|
@@ -483,6 +493,7 @@ func (e *eventStore) RegisterDispatcher( | |
| dispatcherID: dispatcherID, | ||
| tableSpan: dispatcherSpan, | ||
| checkpointTs: startTs, | ||
| keyspaceID: dispatcherSpan.KeyspaceID, | ||
| } | ||
| stat.resolvedTs.Store(startTs) | ||
|
|
||
|
|
@@ -609,6 +620,7 @@ func (e *eventStore) RegisterDispatcher( | |
| subStat.eventCh.Push(eventWithCallback{ | ||
| subID: subStat.subID, | ||
| tableID: subStat.tableSpan.TableID, | ||
| keyspaceID: subStat.tableSpan.KeyspaceID, | ||
| kvs: kvs, | ||
| currentResolvedTs: subStat.resolvedTs.Load(), | ||
| enqueueTimeNano: now.UnixNano(), | ||
|
|
@@ -900,16 +912,18 @@ func (e *eventStore) GetIterator(dispatcherID common.DispatcherID, dataRange com | |
| } | ||
|
|
||
| return &eventStoreIter{ | ||
| tableSpan: stat.tableSpan, | ||
| needCheckSpan: needCheckSpan, | ||
| innerIter: iter, | ||
| prevStartTs: 0, | ||
| prevCommitTs: 0, | ||
| startTs: dataRange.CommitTsStart, | ||
| endTs: dataRange.CommitTsEnd, | ||
| rowCount: 0, | ||
| decoder: decoder, | ||
| decoderPool: e.decoderPool, | ||
| tableSpan: stat.tableSpan, | ||
| needCheckSpan: needCheckSpan, | ||
| innerIter: iter, | ||
| prevStartTs: 0, | ||
| prevCommitTs: 0, | ||
| startTs: dataRange.CommitTsStart, | ||
| endTs: dataRange.CommitTsEnd, | ||
| rowCount: 0, | ||
| decoder: decoder, | ||
| decoderPool: e.decoderPool, | ||
| encryptionManager: e.encryptionManager, | ||
| keyspaceID: stat.keyspaceID, | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -1324,6 +1338,20 @@ func (e *eventStore) writeEvents( | |
| metrics.EventStoreCompressedRowsCount.Inc() | ||
| } | ||
|
|
||
| // Encrypt if encryption is enabled (after compression) | ||
| if e.encryptionManager != nil { | ||
| encryptedValue, err := e.encryptionManager.EncryptData(context.Background(), event.keyspaceID, value) | ||
| if err != nil { | ||
| log.Error("encrypt event value failed", | ||
| zap.Uint32("keyspaceID", event.keyspaceID), | ||
| zap.Uint64("subID", uint64(event.subID)), | ||
| zap.Int64("tableID", event.tableID), | ||
| zap.Error(err)) | ||
| return err | ||
| } | ||
| value = encryptedValue | ||
| } | ||
|
Comment on lines
+1342
to
+1353
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I suggest modifying |
||
|
|
||
| key := EncodeKey(uint64(event.subID), event.tableID, &kv, compressionType) | ||
| if err := batch.Set(key, value, pebble.NoSync); err != nil { | ||
| log.Panic("failed to update pebble batch", zap.Error(err)) | ||
|
|
@@ -1369,6 +1397,9 @@ type eventStoreIter struct { | |
| decoder *zstd.Decoder | ||
| decoderPool *sync.Pool | ||
| decodeBuf []byte | ||
| // encryptionManager for decrypting data (optional, can be nil). | ||
| encryptionManager encryption.EncryptionManager | ||
| keyspaceID uint32 | ||
| } | ||
|
|
||
| func (iter *eventStoreIter) Next() (*common.RawKVEntry, bool) { | ||
|
|
@@ -1380,6 +1411,15 @@ func (iter *eventStoreIter) Next() (*common.RawKVEntry, bool) { | |
| key := iter.innerIter.Key() | ||
| value := iter.innerIter.Value() | ||
|
|
||
| // Decrypt if encrypted data is detected and encryption manager is available | ||
| if encryption.IsEncrypted(value) && iter.encryptionManager != nil { | ||
| decryptedValue, err := iter.encryptionManager.DecryptData(context.Background(), iter.keyspaceID, value) | ||
| if err != nil { | ||
| log.Panic("failed to decrypt value", zap.Error(err)) | ||
| } | ||
| value = decryptedValue | ||
| } | ||
|
Comment on lines
+1415
to
+1421
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fail fast when encrypted data is found but encryption manager is missing. Current logic skips decryption when manager is nil and proceeds to decode encrypted bytes, which can panic later with misleading errors. Add an explicit guard and panic/error immediately with context (keyspace, table, subID if available). 🤖 Prompt for AI Agents |
||
|
|
||
| _, compressionType := DecodeKeyMetas(key) | ||
| var decodedValue []byte | ||
| if compressionType == CompressionZSTD { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
Repository: pingcap/ticdc
Length of output: 995
🏁 Script executed:
Repository: pingcap/ticdc
Length of output: 279
🏁 Script executed:
Repository: pingcap/ticdc
Length of output: 1970
🏁 Script executed:
Repository: pingcap/ticdc
Length of output: 1229
🏁 Script executed:
Repository: pingcap/ticdc
Length of output: 91
🏁 Script executed:
Repository: pingcap/ticdc
Length of output: 1682
🏁 Script executed:
Repository: pingcap/ticdc
Length of output: 214
🏁 Script executed:
Repository: pingcap/ticdc
Length of output: 2554
🏁 Script executed:
Repository: pingcap/ticdc
Length of output: 178
🏁 Script executed:
Repository: pingcap/ticdc
Length of output: 458
🏁 Script executed:
Repository: pingcap/ticdc
Length of output: 2603
🏁 Script executed:
Repository: pingcap/ticdc
Length of output: 640
🏁 Script executed:
Repository: pingcap/ticdc
Length of output: 274
🏁 Script executed:
Repository: pingcap/ticdc
Length of output: 313
🏁 Script executed:
Repository: pingcap/ticdc
Length of output: 39
🏁 Script executed:
Repository: pingcap/ticdc
Length of output: 39
🏁 Script executed:
Repository: pingcap/ticdc
Length of output: 384
🏁 Script executed:
Repository: pingcap/ticdc
Length of output: 315
🏁 Script executed:
Repository: pingcap/ticdc
Length of output: 201
🏁 Script executed:
Repository: pingcap/ticdc
Length of output: 120
🏁 Script executed:
Repository: pingcap/ticdc
Length of output: 631
🏁 Script executed:
Repository: pingcap/ticdc
Length of output: 39
🏁 Script executed:
Repository: pingcap/ticdc
Length of output: 39
🏁 Script executed:
Repository: pingcap/ticdc
Length of output: 150
🏁 Script executed:
Repository: pingcap/ticdc
Length of output: 470
🏁 Script executed:
Repository: pingcap/ticdc
Length of output: 792
🏁 Script executed:
Repository: pingcap/ticdc
Length of output: 470
🏁 Script executed:
Repository: pingcap/ticdc
Length of output: 2528
🏁 Script executed:
Repository: pingcap/ticdc
Length of output: 1276
🏁 Script executed:
Repository: pingcap/ticdc
Length of output: 269
🏁 Script executed:
Repository: pingcap/ticdc
Length of output: 39
🏁 Script executed:
Repository: pingcap/ticdc
Length of output: 379
🏁 Script executed:
Repository: pingcap/ticdc
Length of output: 1151
🏁 Script executed:
Repository: pingcap/ticdc
Length of output: 1807
🏁 Script executed:
Repository: pingcap/ticdc
Length of output: 100
🏁 Script executed:
Repository: pingcap/ticdc
Length of output: 39
🏁 Script executed:
Repository: pingcap/ticdc
Length of output: 2811
🏁 Script executed:
Repository: pingcap/ticdc
Length of output: 353
🏁 Script executed:
Repository: pingcap/ticdc
Length of output: 2890
🏁 Script executed:
Repository: pingcap/ticdc
Length of output: 39
🏁 Script executed:
Repository: pingcap/ticdc
Length of output: 3122
🏁 Script executed:
Repository: pingcap/ticdc
Length of output: 432
🏁 Script executed:
Repository: pingcap/ticdc
Length of output: 39
🏁 Script executed:
Repository: pingcap/ticdc
Length of output: 2686
Thread cancellable context through encryption calls on hot paths.
Line 1343 uses
context.Background()inwriteEvents(), which has access to thectxparameter from the callerwriteTaskPool.run(ctx)but doesn't thread it through. Replacecontext.Background()with a context parameter passed from the caller.Line 1416 has similar issue in
eventStoreIter.Next(), but this method implements theEventIteratorinterface with a fixed signature. Extending this to accept context would require refactoring the interface, though the underlying problem (bypassing cancellation during shutdown or dependency stalls) remains valid.🤖 Prompt for AI Agents