Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
4f33fca
feat(cmek): add CMEK encryption at rest support
tenfyzhong Jan 8, 2026
9e9a080
feat(cmek): add comprehensive test coverage for encryption package
tenfyzhong Jan 8, 2026
8fa1d15
refactor(cmek): improve encryption handling and add TryGetService method
tenfyzhong Jan 8, 2026
4bfb949
feat(cmek): propagate keyspace ID for encryption
tenfyzhong Jan 11, 2026
ab78579
test(cmek): add unit tests for encryption manager
tenfyzhong Jan 12, 2026
4e0a7ce
feat(cmek): improve cache management and cleanup code
tenfyzhong Jan 12, 2026
b9a339b
feat(cmek): Add TiKV HTTP client and enhance encryption support
tenfyzhong Jan 26, 2026
0cb69fb
feat(cmek): add KMS client support for AWS and GCP
tenfyzhong Jan 27, 2026
2957732
feat(cmek): align encryption types with kvproto and simplify cipher s…
tenfyzhong Jan 28, 2026
e6e1178
feat(cmek): refactor factory function type declarations
tenfyzhong Jan 28, 2026
92393fb
chore(deps): Update cloud and AWS SDK dependencies
tenfyzhong Jan 28, 2026
03c525e
Merge branch 'master' into feat-cmek
tenfyzhong Jan 28, 2026
ea30040
feat(cmek): Use IVSize method for IV validation in AES256CTRCipher
tenfyzhong Jan 29, 2026
3de6500
feat(cmek): combine key ID and version retrieval in GetCurrentDataKey
tenfyzhong Jan 29, 2026
feaa54c
feat(cmek): enhance error logging for encryption operations
tenfyzhong Jan 29, 2026
ddc57f3
Merge branch 'master' into feat-cmek
tenfyzhong Feb 9, 2026
a662a9d
feat(encryption): add support for encrypted DDL history and AES key s…
tenfyzhong Mar 2, 2026
2a00996
feat(integration): add CMEK keyspace test and manual keyspace creatio…
tenfyzhong Mar 2, 2026
a52b11c
Merge remote-tracking branch 'origin/master' into feat-cmek
tenfyzhong Mar 2, 2026
dc680e1
feat(eventstore): add encryption support to event store
tenfyzhong Mar 2, 2026
e1c5956
feat(tests): Improve CMEK keyspace integration test script
tenfyzhong Mar 2, 2026
a17d119
fix(encryption): Update TiKV HTTP client endpoint for encryption meta
tenfyzhong Mar 5, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/pingcap/ticdc
go 1.25.5

require (
cloud.google.com/go/kms v1.15.8
cloud.google.com/go/storage v1.39.1
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.20.0
github.com/BurntSushi/toml v1.5.0
Expand All @@ -12,10 +13,11 @@ require (
github.com/agiledragon/gomonkey/v2 v2.11.0
github.com/apache/pulsar-client-go v0.13.0
github.com/aws/aws-sdk-go v1.55.5
github.com/aws/aws-sdk-go-v2 v1.40.0
github.com/aws/aws-sdk-go-v2 v1.41.1
github.com/aws/aws-sdk-go-v2/config v1.32.2
github.com/aws/aws-sdk-go-v2/credentials v1.19.2
github.com/aws/aws-sdk-go-v2/service/glue v1.134.1
github.com/aws/aws-sdk-go-v2/service/kms v1.49.5
github.com/benbjohnson/clock v1.3.5
github.com/bradleyjkemp/grpc-tools v0.2.5
github.com/cenkalti/backoff/v4 v4.2.1
Expand Down Expand Up @@ -92,6 +94,7 @@ require (
golang.org/x/term v0.34.0
golang.org/x/text v0.29.0
golang.org/x/time v0.12.0
google.golang.org/api v0.170.0
google.golang.org/grpc v1.67.1
google.golang.org/protobuf v1.36.6
)
Expand All @@ -110,7 +113,6 @@ require (
cloud.google.com/go v0.112.2 // indirect
cloud.google.com/go/compute/metadata v0.5.0 // indirect
cloud.google.com/go/iam v1.1.7 // indirect
cloud.google.com/go/kms v1.15.8 // indirect
filippo.io/edwards25519 v1.1.0 // indirect
github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 // indirect
github.com/99designs/keyring v1.2.1 // indirect
Expand Down Expand Up @@ -143,8 +145,8 @@ require (
github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.3 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.14 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.14 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.14 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.17 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.17 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.4 // indirect
github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.14 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.3 // indirect
Expand All @@ -156,7 +158,7 @@ require (
github.com/aws/aws-sdk-go-v2/service/sso v1.30.5 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.10 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.41.2 // indirect
github.com/aws/smithy-go v1.23.2 // indirect
github.com/aws/smithy-go v1.24.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bits-and-blooms/bitset v1.14.3 // indirect
github.com/blacktear23/go-proxyprotocol v1.0.6 // indirect
Expand Down Expand Up @@ -363,7 +365,6 @@ require (
golang.org/x/mod v0.27.0 // indirect
golang.org/x/tools v0.36.0 // indirect
golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da // indirect
google.golang.org/api v0.170.0 // indirect
google.golang.org/genproto v0.0.0-20240401170217-c3f982113cda // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240814211410-ddb44dafa142 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20250425173222-7b384671a197 // indirect
Expand Down
18 changes: 10 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1392,8 +1392,8 @@ github.com/aws/aws-sdk-go v1.32.6/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU
github.com/aws/aws-sdk-go v1.44.204/go.mod h1:aVsgQcEevwlmQ7qHE9I3h+dtQgpqhFB+i8Phjh7fkwI=
github.com/aws/aws-sdk-go v1.55.5 h1:KKUZBfBoyqy5d3swXyiC7Q76ic40rYcbqH7qjh59kzU=
github.com/aws/aws-sdk-go v1.55.5/go.mod h1:eRwEWoyTWFMVYVQzKMNHWP5/RV4xIUGMQfXQHfHkpNU=
github.com/aws/aws-sdk-go-v2 v1.40.0 h1:/WMUA0kjhZExjOQN2z3oLALDREea1A7TobfuiBrKlwc=
github.com/aws/aws-sdk-go-v2 v1.40.0/go.mod h1:c9pm7VwuW0UPxAEYGyTmyurVcNrbF6Rt/wixFqDhcjE=
github.com/aws/aws-sdk-go-v2 v1.41.1 h1:ABlyEARCDLN034NhxlRUSZr4l71mh+T5KAeGh6cerhU=
github.com/aws/aws-sdk-go-v2 v1.41.1/go.mod h1:MayyLB8y+buD9hZqkCW3kX1AKq07Y5pXxtgB+rRFhz0=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.3 h1:DHctwEM8P8iTXFxC/QK0MRjwEpWQeM9yzidCRjldUz0=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.3/go.mod h1:xdCzcZEtnSTKVDOmUZs4l/j3pSV6rpo1WXl5ugNsL8Y=
github.com/aws/aws-sdk-go-v2/config v1.32.2 h1:4liUsdEpUUPZs5WVapsJLx5NPmQhQdez7nYFcovrytk=
Expand All @@ -1402,10 +1402,10 @@ github.com/aws/aws-sdk-go-v2/credentials v1.19.2 h1:qZry8VUyTK4VIo5aEdUcBjPZHL2v
github.com/aws/aws-sdk-go-v2/credentials v1.19.2/go.mod h1:YUqm5a1/kBnoK+/NY5WEiMocZihKSo15/tJdmdXnM5g=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.14 h1:WZVR5DbDgxzA0BJeudId89Kmgy6DIU4ORpxwsVHz0qA=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.14/go.mod h1:Dadl9QO0kHgbrH1GRqGiZdYtW5w+IXXaBNCHTIaheM4=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.14 h1:PZHqQACxYb8mYgms4RZbhZG0a7dPW06xOjmaH0EJC/I=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.14/go.mod h1:VymhrMJUWs69D8u0/lZ7jSB6WgaG/NqHi3gX0aYf6U0=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.14 h1:bOS19y6zlJwagBfHxs0ESzr1XCOU2KXJCWcq3E2vfjY=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.14/go.mod h1:1ipeGBMAxZ0xcTm6y6paC2C/J6f6OO7LBODV9afuAyM=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.17 h1:xOLELNKGp2vsiteLsvLPwxC+mYmO6OZ8PYgiuPJzF8U=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.17/go.mod h1:5M5CI3D12dNOtH3/mk6minaRwI2/37ifCURZISxA/IQ=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.17 h1:WWLqlh79iO48yLkj1v3ISRNiv+3KdQoZ6JWyfcsyQik=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.17/go.mod h1:EhG22vHRrvF8oXSTYStZhJc1aUgKtnJe+aOiFEV90cM=
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.4 h1:WKuaxf++XKWlHWu9ECbMlha8WOEGm0OUEZqm4K/Gcfk=
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.4/go.mod h1:ZWy7j6v1vWGmPReu0iSGvRiise4YI5SkR3OHKTZ6Wuc=
github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.14 h1:ITi7qiDSv/mSGDSWNpZ4k4Ve0DQR6Ug2SJQ8zEHoDXg=
Expand All @@ -1420,6 +1420,8 @@ github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.14 h1:FIouAnCE
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.14/go.mod h1:UTwDc5COa5+guonQU8qBikJo1ZJ4ln2r1MkF7Dqag1E=
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.14 h1:FzQE21lNtUor0Fb7QNgnEyiRCBlolLTX/Z1j65S7teM=
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.14/go.mod h1:s1ydyWG9pm3ZwmmYN21HKyG9WzAZhYVW85wMHs5FV6w=
github.com/aws/aws-sdk-go-v2/service/kms v1.49.5 h1:DKibav4XF66XSeaXcrn9GlWGHos6D/vJ4r7jsK7z5CE=
github.com/aws/aws-sdk-go-v2/service/kms v1.49.5/go.mod h1:1SdcmEGUEQE1mrU2sIgeHtcMSxHuybhPvuEPANzIDfI=
github.com/aws/aws-sdk-go-v2/service/s3 v1.92.1 h1:OgQy/+0+Kc3khtqiEOk23xQAglXi3Tj0y5doOxbi5tg=
github.com/aws/aws-sdk-go-v2/service/s3 v1.92.1/go.mod h1:wYNqY3L02Z3IgRYxOBPH9I1zD9Cjh9hI5QOy/eOjQvw=
github.com/aws/aws-sdk-go-v2/service/signin v1.0.2 h1:MxMBdKTYBjPQChlJhi4qlEueqB1p1KcbTEa7tD5aqPs=
Expand All @@ -1430,8 +1432,8 @@ github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.10 h1:GtsxyiF3Nd3JahRBJbxLCCd
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.10/go.mod h1:/j67Z5XBVDx8nZVp9EuFM9/BS5dvBznbqILGuu73hug=
github.com/aws/aws-sdk-go-v2/service/sts v1.41.2 h1:a5UTtD4mHBU3t0o6aHQZFJTNKVfxFWfPX7J0Lr7G+uY=
github.com/aws/aws-sdk-go-v2/service/sts v1.41.2/go.mod h1:6TxbXoDSgBQ225Qd8Q+MbxUxUh6TtNKwbRt/EPS9xso=
github.com/aws/smithy-go v1.23.2 h1:Crv0eatJUQhaManss33hS5r40CG3ZFH+21XSkqMrIUM=
github.com/aws/smithy-go v1.23.2/go.mod h1:LEj2LM3rBRQJxPZTB4KuzZkaZYnZPnvgIhb4pu07mx0=
github.com/aws/smithy-go v1.24.0 h1:LpilSUItNPFr1eY85RYgTIg5eIEPtvFbskaFcmmIUnk=
github.com/aws/smithy-go v1.24.0/go.mod h1:LEj2LM3rBRQJxPZTB4KuzZkaZYnZPnvgIhb4pu07mx0=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/benbjohnson/clock v1.3.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/benbjohnson/clock v1.3.5 h1:VvXlSJBzZpA/zum6Sj74hxwYI2DIxRWuNIoXAzHZz5o=
Expand Down
66 changes: 53 additions & 13 deletions logservice/eventstore/event_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/pingcap/ticdc/pkg/common"
appcontext "github.com/pingcap/ticdc/pkg/common/context"
"github.com/pingcap/ticdc/pkg/config"
"github.com/pingcap/ticdc/pkg/encryption"
"github.com/pingcap/ticdc/pkg/messaging"
"github.com/pingcap/ticdc/pkg/metrics"
"github.com/pingcap/ticdc/pkg/node"
Expand Down Expand Up @@ -116,6 +117,8 @@ type dispatcherStat struct {
resolvedTs atomic.Uint64
// the max ts of events which is not needed by this dispatcher
checkpointTs uint64
// keyspaceID for encryption (0 means default/classic)
keyspaceID uint32
// the difference between `subStat`, `pendingSubStat` and `removingSubStat`:
// 1) if there is no existing subscriptions which can be reused,
// or there is a existing subscription with exact span match,
Expand Down Expand Up @@ -182,9 +185,10 @@ type subscriptionStat struct {
type subscriptionStats map[logpuller.SubscriptionID]*subscriptionStat

type eventWithCallback struct {
subID logpuller.SubscriptionID
tableID int64
kvs []common.RawKVEntry
subID logpuller.SubscriptionID
tableID int64
keyspaceID uint32
kvs []common.RawKVEntry
// kv with commitTs <= currentResolvedTs will be filtered out
currentResolvedTs uint64
enqueueTimeNano int64
Expand Down Expand Up @@ -238,6 +242,8 @@ type eventStore struct {
compressionThreshold int
// enableZstdCompression controls whether to enable zstd compression for large values.
enableZstdCompression bool
// encryptionManager for encrypting/decrypting data (optional).
encryptionManager encryption.EncryptionManager
}

const (
Expand All @@ -258,6 +264,9 @@ func New(
log.Panic("fail to remove path", zap.String("path", dbPath), zap.Error(err))
}

// Try to get encryption manager from appcontext (optional)
encMgr, _ := appcontext.TryGetService[encryption.EncryptionManager]("EncryptionManager")

store := &eventStore{
pdClock: appcontext.GetService[pdutil.Clock](appcontext.DefaultPDClock),
subClient: subClient,
Expand All @@ -280,6 +289,7 @@ func New(
},
compressionThreshold: config.GetGlobalServerConfig().Debug.EventStore.CompressionThreshold,
enableZstdCompression: config.GetGlobalServerConfig().Debug.EventStore.EnableZstdCompression,
encryptionManager: encMgr,
}
store.gcManager = newGCManager(store.dbs, deleteDataRange, compactDataRange)

Expand Down Expand Up @@ -483,6 +493,7 @@ func (e *eventStore) RegisterDispatcher(
dispatcherID: dispatcherID,
tableSpan: dispatcherSpan,
checkpointTs: startTs,
keyspaceID: dispatcherSpan.KeyspaceID,
}
stat.resolvedTs.Store(startTs)

Expand Down Expand Up @@ -609,6 +620,7 @@ func (e *eventStore) RegisterDispatcher(
subStat.eventCh.Push(eventWithCallback{
subID: subStat.subID,
tableID: subStat.tableSpan.TableID,
keyspaceID: subStat.tableSpan.KeyspaceID,
kvs: kvs,
currentResolvedTs: subStat.resolvedTs.Load(),
enqueueTimeNano: now.UnixNano(),
Expand Down Expand Up @@ -900,16 +912,18 @@ func (e *eventStore) GetIterator(dispatcherID common.DispatcherID, dataRange com
}

return &eventStoreIter{
tableSpan: stat.tableSpan,
needCheckSpan: needCheckSpan,
innerIter: iter,
prevStartTs: 0,
prevCommitTs: 0,
startTs: dataRange.CommitTsStart,
endTs: dataRange.CommitTsEnd,
rowCount: 0,
decoder: decoder,
decoderPool: e.decoderPool,
tableSpan: stat.tableSpan,
needCheckSpan: needCheckSpan,
innerIter: iter,
prevStartTs: 0,
prevCommitTs: 0,
startTs: dataRange.CommitTsStart,
endTs: dataRange.CommitTsEnd,
rowCount: 0,
decoder: decoder,
decoderPool: e.decoderPool,
encryptionManager: e.encryptionManager,
keyspaceID: stat.keyspaceID,
}
}

Expand Down Expand Up @@ -1324,6 +1338,20 @@ func (e *eventStore) writeEvents(
metrics.EventStoreCompressedRowsCount.Inc()
}

// Encrypt if encryption is enabled (after compression)
if e.encryptionManager != nil {
encryptedValue, err := e.encryptionManager.EncryptData(context.Background(), event.keyspaceID, value)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# First, verify the exact lines mentioned in the review
cd logservice/eventstore && head -20 event_store.go | head -1 && echo "---" && \
sed -n '1340,1350p' event_store.go && echo "--- Line 1416 context ---" && \
sed -n '1410,1420p' event_store.go

Repository: pingcap/ticdc

Length of output: 995


🏁 Script executed:

# Run the provided verification script
rg -nP 'EncryptData\(context\.Background\(\)|DecryptData\(context\.Background\(\)' logservice/eventstore/event_store.go

Repository: pingcap/ticdc

Length of output: 279


🏁 Script executed:

# Get broader context around both lines to understand the function and call path
sed -n '1330,1360p' logservice/eventstore/event_store.go && echo "--- Line 1416 area ---" && \
sed -n '1405,1430p' logservice/eventstore/event_store.go

Repository: pingcap/ticdc

Length of output: 1970


🏁 Script executed:

# Check if there's a context parameter available in the functions containing these calls
rg -B 30 'EncryptData\(context\.Background' logservice/eventstore/event_store.go | head -50

Repository: pingcap/ticdc

Length of output: 1229


🏁 Script executed:

# Find the function containing line 1343
sed -n '1300,1345p' logservice/eventstore/event_store.go | grep -E '^\s*(func|if e\.encryptionManager)' | tail -5

Repository: pingcap/ticdc

Length of output: 91


🏁 Script executed:

# Get the full function signature for the function containing line 1343
sed -n '1280,1343p' logservice/eventstore/event_store.go | grep -B 50 'EncryptData' | head -70

Repository: pingcap/ticdc

Length of output: 1682


🏁 Script executed:

# Check the eventStoreIter struct and Next method signature
rg -A 5 'func \(iter \*eventStoreIter\) Next' logservice/eventstore/event_store.go

Repository: pingcap/ticdc

Length of output: 214


🏁 Script executed:

# See if there's a context passed into the outer function at line 1343
sed -n '1280,1350p' logservice/eventstore/event_store.go | head -80

Repository: pingcap/ticdc

Length of output: 2554


🏁 Script executed:

# Find callers of writeEvents function
rg 'writeEvents\(' logservice/eventstore/event_store.go

Repository: pingcap/ticdc

Length of output: 178


🏁 Script executed:

# Check the function signature of the caller(s)
rg -B 10 'e\.writeEvents\(' logservice/eventstore/event_store.go | head -40

Repository: pingcap/ticdc

Length of output: 458


🏁 Script executed:

# Look at the public API that eventually calls writeEvents
sed -n '1100,1350p' logservice/eventstore/event_store.go | grep -A 20 'func.*eventStore'

Repository: pingcap/ticdc

Length of output: 2603


🏁 Script executed:

# Check if there are any context-accepting methods or patterns in the eventStore type
rg 'func \(e \*eventStore\).*ctx|context' logservice/eventstore/event_store.go | head -10

Repository: pingcap/ticdc

Length of output: 640


🏁 Script executed:

# Find where eventStoreIter is created and how Next is called
rg 'eventStoreIter|\.Next\(\)' logservice/eventstore/event_store.go | head -20

Repository: pingcap/ticdc

Length of output: 274


🏁 Script executed:

# Check the Iterator interface to see if context is part of standard pattern
rg -A 5 'type.*Iter.*interface' logservice/eventstore/event_store.go

Repository: pingcap/ticdc

Length of output: 313


🏁 Script executed:

# Look at the Iter function to see how eventStoreIter is instantiated
rg -B 5 -A 15 'func.*Iter.*context' logservice/eventstore/event_store.go

Repository: pingcap/ticdc

Length of output: 39


🏁 Script executed:

# Check the full Iter function signature to see if context is passed when creating the iterator
rg -B 5 -A 20 'func.*Iter\(' logservice/eventstore/event_store.go | head -50

Repository: pingcap/ticdc

Length of output: 39


🏁 Script executed:

# Check the eventStoreIter struct fields to see if context could be stored
rg -A 10 'type eventStoreIter struct' logservice/eventstore/event_store.go

Repository: pingcap/ticdc

Length of output: 384


🏁 Script executed:

# Find where eventStoreIter.Next() is called to understand usage patterns
rg -B 3 'iter\.Next\(\)|\.Next\(\)' logservice/eventstore/event_store.go | head -30

Repository: pingcap/ticdc

Length of output: 315


🏁 Script executed:

# Check if EventIterator interface is defined in this file or imported
rg -B 2 'type EventIterator interface' logservice/eventstore/

Repository: pingcap/ticdc

Length of output: 201


🏁 Script executed:

# Find all implementations of EventIterator interface
rg 'func.*Next\(\).*\*common.RawKVEntry' logservice/eventstore/event_store.go

Repository: pingcap/ticdc

Length of output: 120


🏁 Script executed:

# Check the full eventStoreIter struct to see all fields
rg -A 20 'type eventStoreIter struct' logservice/eventstore/event_store.go

Repository: pingcap/ticdc

Length of output: 631


🏁 Script executed:

# Look at the Iter function to understand how eventStoreIter is created
rg -B 5 -A 25 'func.*\(e \*eventStore\) Iter\(' logservice/eventstore/event_store.go

Repository: pingcap/ticdc

Length of output: 39


🏁 Script executed:

# Search for functions that return EventIterator or *eventStoreIter
rg -A 20 'func.*Iter\(\)|returns.*EventIterator' logservice/eventstore/event_store.go | head -60

Repository: pingcap/ticdc

Length of output: 39


🏁 Script executed:

# Look for the New or constructor function for eventStoreIter
rg 'eventStoreIter{' logservice/eventstore/event_store.go -B 5 -A 10

Repository: pingcap/ticdc

Length of output: 150


🏁 Script executed:

# Check who calls the iterator and what context might be available
rg 'for.*\.Next\(\)|while.*\.Next\(\)' logservice/eventstore/ -r

Repository: pingcap/ticdc

Length of output: 470


🏁 Script executed:

# Look more broadly at how the iterator is used in consuming code
rg -B 10 -A 5 'iter.*=.*Iter' logservice/eventstore/event_store.go | head -40

Repository: pingcap/ticdc

Length of output: 792


🏁 Script executed:

# Check if EventIterator interface is exported and might have external implementations
rg 'EventIterator' logservice/eventstore/ -r | grep -v 'eventStoreIter'

Repository: pingcap/ticdc

Length of output: 470


🏁 Script executed:

# Look at the complete Iter function that creates eventStoreIter
sed -n '1070,1150p' logservice/eventstore/event_store.go

Repository: pingcap/ticdc

Length of output: 2528


🏁 Script executed:

# Check the signature and context of writeTaskPool.run to verify context availability
rg -B 5 -A 30 'func.*writeTaskPool.*run' logservice/eventstore/event_store.go

Repository: pingcap/ticdc

Length of output: 1276


🏁 Script executed:

# Check if there are other context.Background() calls in this file
rg 'context\.Background\(\)' logservice/eventstore/event_store.go

Repository: pingcap/ticdc

Length of output: 269


🏁 Script executed:

# Verify the full writeTaskPool.run implementation to understand ctx usage
sed -n '800,950p' logservice/eventstore/event_store.go | grep -A 80 'func.*writeTaskPool.*run'

Repository: pingcap/ticdc

Length of output: 39


🏁 Script executed:

# Check where eventStoreIter is instantiated to see if we could pass context
rg -B 10 '&eventStoreIter\{' logservice/eventstore/event_store.go

Repository: pingcap/ticdc

Length of output: 379


🏁 Script executed:

# Find where the eventStoreIter is created and returned
rg -B 30 'return.*eventStoreIter' logservice/eventstore/event_store.go | head -50

Repository: pingcap/ticdc

Length of output: 1151


🏁 Script executed:

# Look for the function that contains the eventStoreIter creation
sed -n '1050,1110p' logservice/eventstore/event_store.go

Repository: pingcap/ticdc

Length of output: 1807


🏁 Script executed:

# Find callers of the function that returns eventStoreIter
rg 'Iter\(' logservice/eventstore/event_store.go | head -10

Repository: pingcap/ticdc

Length of output: 100


🏁 Script executed:

# Find the function that returns eventStoreIter and check its signature
rg -B 50 'return.*&eventStoreIter' logservice/eventstore/event_store.go | grep -E 'func.*\(.*\).*\{|func.*Iter' | head -3

Repository: pingcap/ticdc

Length of output: 39


🏁 Script executed:

# Get the full function signature
sed -n '1010,1115p' logservice/eventstore/event_store.go | head -100

Repository: pingcap/ticdc

Length of output: 2811


🏁 Script executed:

# Verify the EventIterator interface signature to understand immutability constraints
rg -A 8 'type EventIterator interface' logservice/eventstore/event_store.go

Repository: pingcap/ticdc

Length of output: 353


🏁 Script executed:

# Check if context is available where eventStoreIter is created - find the function
sed -n '1000,1100p' logservice/eventstore/event_store.go

Repository: pingcap/ticdc

Length of output: 2890


🏁 Script executed:

# Find who creates/calls the iterator in hot path code  
rg -B 10 'eventStore.*Iter\(' logservice/eventstore/event_store.go | head -30

Repository: pingcap/ticdc

Length of output: 39


🏁 Script executed:

# Verify the full writeTaskPool.run context flow
sed -n '800,880p' logservice/eventstore/event_store.go

Repository: pingcap/ticdc

Length of output: 3122


🏁 Script executed:

# Search for the Iter function signature more directly
rg 'func.*Iter.*\(' logservice/eventstore/event_store.go -A 2 | head -20

Repository: pingcap/ticdc

Length of output: 432


🏁 Script executed:

# Find GetIterator function which likely creates the iterator
rg -B 5 'func.*GetIterator\|func.*Iterator' logservice/eventstore/event_store.go | head -30

Repository: pingcap/ticdc

Length of output: 39


🏁 Script executed:

# Look for the function containing "return &eventStoreIter"
rg -B 80 'return &eventStoreIter' logservice/eventstore/event_store.go | tail -90

Repository: pingcap/ticdc

Length of output: 2686


Thread cancellable context through encryption calls on hot paths.

Line 1343 uses context.Background() in writeEvents(), which has access to the ctx parameter from the caller writeTaskPool.run(ctx) but doesn't thread it through. Replace context.Background() with a context parameter passed from the caller.

Line 1416 has similar issue in eventStoreIter.Next(), but this method implements the EventIterator interface with a fixed signature. Extending this to accept context would require refactoring the interface, though the underlying problem (bypassing cancellation during shutdown or dependency stalls) remains valid.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@logservice/eventstore/event_store.go` at line 1343, writeEvents is calling
e.encryptionManager.EncryptData with context.Background(); change writeEvents to
accept a context (e.g., func writeEvents(ctx context.Context, ...)) and
propagate the ctx from writeTaskPool.run(ctx) into the EncryptData call so
encryption uses the caller's cancellable context (replace context.Background()
with the passed ctx). For eventStoreIter.Next (which currently cannot change
signature due to the EventIterator interface), avoid using context.Background()
by either adding a context-carrying constructor or field on eventStoreIter
(store the caller ctx when the iterator is created) and use that stored ctx in
Next, or add a new context-aware NextContext(ctx context.Context) method and
have callers use it while you plan interface refactor; update all creation sites
of eventStoreIter to supply the caller context.

if err != nil {
log.Error("encrypt event value failed",
zap.Uint32("keyspaceID", event.keyspaceID),
zap.Uint64("subID", uint64(event.subID)),
zap.Int64("tableID", event.tableID),
zap.Error(err))
return err
}
value = encryptedValue
}
Comment on lines +1342 to +1353

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

context.Background() is used for the EncryptData call. This operation may involve network I/O to a KMS, and using a background context prevents proper cancellation and timeout handling, which could lead to the worker goroutine being blocked indefinitely. It would be more robust to propagate the worker's context.

I suggest modifying writeEvents to accept a context.Context and pass it down from writeTaskPool.run. The same applies to decryption calls in eventStoreIter.Next.


key := EncodeKey(uint64(event.subID), event.tableID, &kv, compressionType)
if err := batch.Set(key, value, pebble.NoSync); err != nil {
log.Panic("failed to update pebble batch", zap.Error(err))
Expand Down Expand Up @@ -1369,6 +1397,9 @@ type eventStoreIter struct {
decoder *zstd.Decoder
decoderPool *sync.Pool
decodeBuf []byte
// encryptionManager for decrypting data (optional, can be nil).
encryptionManager encryption.EncryptionManager
keyspaceID uint32
}

func (iter *eventStoreIter) Next() (*common.RawKVEntry, bool) {
Expand All @@ -1380,6 +1411,15 @@ func (iter *eventStoreIter) Next() (*common.RawKVEntry, bool) {
key := iter.innerIter.Key()
value := iter.innerIter.Value()

// Decrypt if encrypted data is detected and encryption manager is available
if encryption.IsEncrypted(value) && iter.encryptionManager != nil {
decryptedValue, err := iter.encryptionManager.DecryptData(context.Background(), iter.keyspaceID, value)
if err != nil {
log.Panic("failed to decrypt value", zap.Error(err))
}
value = decryptedValue
}
Comment on lines +1415 to +1421
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Fail fast when encrypted data is found but encryption manager is missing.

Current logic skips decryption when manager is nil and proceeds to decode encrypted bytes, which can panic later with misleading errors. Add an explicit guard and panic/error immediately with context (keyspace, table, subID if available).

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@logservice/eventstore/event_store.go` around lines 1415 - 1421, When
encryption.IsEncrypted(value) returns true but iter.encryptionManager is nil,
fail fast instead of continuing; add an explicit guard in the same block that
immediately logs/panics with contextual details (use iter.keyspaceID and any
table or subID fields on iter if available) and a clear message that encrypted
data was encountered with no encryption manager, so the code does not attempt to
decode encrypted bytes later and produce misleading errors.


_, compressionType := DecodeKeyMetas(key)
var decodedValue []byte
if compressionType == CompressionZSTD {
Expand Down
93 changes: 93 additions & 0 deletions logservice/eventstore/event_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package eventstore
import (
"bytes"
"context"
"errors"
"fmt"
"os"
"sync"
Expand All @@ -30,6 +31,7 @@ import (
"github.com/pingcap/ticdc/pkg/common"
appcontext "github.com/pingcap/ticdc/pkg/common/context"
"github.com/pingcap/ticdc/pkg/config"
"github.com/pingcap/ticdc/pkg/encryption"
"github.com/pingcap/ticdc/pkg/messaging"
"github.com/pingcap/ticdc/pkg/metrics"
"github.com/pingcap/ticdc/pkg/pdutil"
Expand All @@ -48,6 +50,27 @@ type mockSubscriptionClient struct {
subscriptions map[logpuller.SubscriptionID]*mockSubscriptionStat
}

type spyEncryptionManager struct {
encryptKeyspaceID uint32
decryptKeyspaceID uint32
}

func (m *spyEncryptionManager) EncryptData(ctx context.Context, keyspaceID uint32, data []byte) ([]byte, error) {
m.encryptKeyspaceID = keyspaceID
encrypted := make([]byte, encryption.EncryptionHeaderSize+len(data))
encrypted[0] = 0x01
copy(encrypted[encryption.EncryptionHeaderSize:], data)
return encrypted, nil
}

func (m *spyEncryptionManager) DecryptData(ctx context.Context, keyspaceID uint32, encryptedData []byte) ([]byte, error) {
m.decryptKeyspaceID = keyspaceID
if len(encryptedData) < encryption.EncryptionHeaderSize {
return nil, errors.New("encrypted data too short")
}
return encryptedData[encryption.EncryptionHeaderSize:], nil
}

func NewMockSubscriptionClient() logpuller.SubscriptionClient {
return &mockSubscriptionClient{
subscriptions: make(map[logpuller.SubscriptionID]*mockSubscriptionStat),
Expand Down Expand Up @@ -181,6 +204,76 @@ func TestEventStoreInteractionWithSubClient(t *testing.T) {
}
}

func TestEventStoreUsesKeyspaceIDForEncryption(t *testing.T) {
subClient, store := newEventStoreForTest(fmt.Sprintf("/tmp/%s", t.Name()))
es := store.(*eventStore)
spy := &spyEncryptionManager{}
es.encryptionManager = spy

dispatcherID := common.NewDispatcherID()
cfID := common.NewChangefeedID4Test("default", "test-cf")
span := &heartbeatpb.TableSpan{
TableID: 1,
StartKey: []byte("a"),
EndKey: []byte("z"),
KeyspaceID: 42,
}
ok := store.RegisterDispatcher(cfID, dispatcherID, span, 0, func(uint64, uint64) {}, false, false)
require.True(t, ok)

es.dispatcherMeta.RLock()
stat := es.dispatcherMeta.dispatcherStats[dispatcherID]
subStat := stat.subStat
if subStat == nil {
subStat = stat.pendingSubStat
}
es.dispatcherMeta.RUnlock()
require.NotNil(t, subStat)

kv := common.RawKVEntry{
OpType: common.OpTypePut,
CRTs: 10,
StartTs: 5,
Key: []byte("k"),
Value: []byte("v"),
}
encoder, err := zstd.NewWriter(nil)
require.NoError(t, err)
defer encoder.Close()

events := []eventWithCallback{
{
subID: subStat.subID,
tableID: subStat.tableSpan.TableID,
keyspaceID: subStat.tableSpan.KeyspaceID,
kvs: []common.RawKVEntry{kv},
currentResolvedTs: 0,
callback: func() {},
},
}
var compressionBuf []byte
err = es.writeEvents(es.dbs[subStat.dbIndex], events, encoder, &compressionBuf)
require.NoError(t, err)
require.Equal(t, uint32(42), spy.encryptKeyspaceID)

subStat.resolvedTs.Store(kv.CRTs)
dataRange := common.DataRange{
Span: span,
CommitTsStart: 0,
CommitTsEnd: kv.CRTs,
}
iter := es.GetIterator(dispatcherID, dataRange)
require.NotNil(t, iter)

_, ok = iter.Next()
require.True(t, ok)
require.Equal(t, uint32(42), spy.decryptKeyspaceID)

_, err = iter.Close()
require.NoError(t, err)
subClient.(*mockSubscriptionClient).Unsubscribe(subStat.subID)
}

func markSubStatsInitializedForTest(store EventStore, tableID int64) {
es := store.(*eventStore)
subStats := es.dispatcherMeta.tableStats[tableID]
Expand Down
Loading