Skip to content

Commit 1d628cb

Browse files
authored
Ensure Changefeed StartTs Safety When Creating or Resuming Changefeed (pingcap#2580)
close pingcap#2577
1 parent 50e7437 commit 1d628cb

File tree

6 files changed

+78
-57
lines changed

6 files changed

+78
-57
lines changed

api/v2/changefeed.go

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,11 @@ func (h *OpenAPIV2) CreateChangefeed(c *gin.Context) {
102102
}
103103

104104
keyspaceManager := appcontext.GetService[keyspace.KeyspaceManager](appcontext.KeyspaceManager)
105+
keyspaceMeta, err := keyspaceManager.LoadKeyspace(ctx, keyspaceName)
106+
if err != nil {
107+
_ = c.Error(errors.WrapError(errors.ErrKeyspaceNotFound, err))
108+
return
109+
}
105110

106111
co, err := h.server.GetCoordinator()
107112
if err != nil {
@@ -147,6 +152,7 @@ func (h *OpenAPIV2) CreateChangefeed(c *gin.Context) {
147152
ctx,
148153
h.server.GetPdClient(),
149154
createGcServiceID,
155+
keyspaceMeta.Id,
150156
changefeedID,
151157
ensureTTL, cfg.StartTs); err != nil {
152158
if !errors.ErrStartTsBeforeGC.Equal(err) {
@@ -246,12 +252,6 @@ func (h *OpenAPIV2) CreateChangefeed(c *gin.Context) {
246252
return
247253
}
248254

249-
keyspaceMeta, err := keyspaceManager.LoadKeyspace(ctx, keyspaceName)
250-
if err != nil {
251-
_ = c.Error(errors.WrapError(errors.ErrKeyspaceNotFound, err))
252-
return
253-
}
254-
255255
err = gc.UndoEnsureChangefeedStartTsSafety(
256256
ctx,
257257
pdClient,
@@ -661,11 +661,19 @@ func (h *OpenAPIV2) ResumeChangefeed(c *gin.Context) {
661661
newCheckpointTs = cfg.OverwriteCheckpointTs
662662
}
663663

664+
keyspaceManager := appcontext.GetService[keyspace.KeyspaceManager](appcontext.KeyspaceManager)
665+
keyspaceMeta, err := keyspaceManager.LoadKeyspace(ctx, keyspaceName)
666+
if err != nil {
667+
_ = c.Error(errors.WrapError(errors.ErrKeyspaceNotFound, err))
668+
return
669+
}
670+
664671
resumeGcServiceID := h.server.GetEtcdClient().GetEnsureGCServiceID(gc.EnsureGCServiceResuming)
665672
if err := verifyResumeChangefeedConfig(
666673
ctx,
667674
h.server.GetPdClient(),
668675
resumeGcServiceID,
676+
keyspaceMeta.Id,
669677
cfInfo.ChangefeedID,
670678
newCheckpointTs); err != nil {
671679
_ = c.Error(err)
@@ -678,13 +686,6 @@ func (h *OpenAPIV2) ResumeChangefeed(c *gin.Context) {
678686
return
679687
}
680688

681-
keyspaceManager := appcontext.GetService[keyspace.KeyspaceManager](appcontext.KeyspaceManager)
682-
keyspaceMeta, err := keyspaceManager.LoadKeyspace(ctx, keyspaceName)
683-
if err != nil {
684-
_ = c.Error(errors.WrapError(errors.ErrKeyspaceNotFound, err))
685-
return
686-
}
687-
688689
err = gc.UndoEnsureChangefeedStartTsSafety(
689690
ctx,
690691
h.server.GetPdClient(),
@@ -862,6 +863,7 @@ func verifyResumeChangefeedConfig(
862863
ctx context.Context,
863864
pdClient pd.Client,
864865
gcServiceID string,
866+
keyspaceID uint32,
865867
changefeedID common.ChangeFeedID,
866868
overrideCheckpointTs uint64,
867869
) error {
@@ -885,6 +887,7 @@ func verifyResumeChangefeedConfig(
885887
ctx,
886888
pdClient,
887889
gcServiceID,
890+
keyspaceID,
888891
changefeedID,
889892
gcTTL, overrideCheckpointTs)
890893
if err != nil {

cmd/cdc/cli/cli_unsafe_reset.go

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -90,23 +90,23 @@ func (o *unsafeResetOptions) run(cmd *cobra.Command) error {
9090
}
9191

9292
if kerneltype.IsClassic() {
93-
err = gc.RemoveServiceGCSafepoint(ctx, o.pdClient, o.etcdClient.GetGCServiceID())
93+
err := gc.UnifyDeleteGcSafepoint(ctx, o.pdClient, 0, o.etcdClient.GetGCServiceID())
94+
if err != nil {
95+
return errors.Trace(err)
96+
}
97+
} else {
98+
// Next gen mode, remove gc barriers
99+
_, infoMap, err := o.etcdClient.GetChangeFeeds(ctx)
94100
if err != nil {
95101
return errors.Trace(err)
96102
}
97-
}
98-
99-
// Next gen mode, remove gc barriers
100-
_, infoMap, err := o.etcdClient.GetChangeFeeds(ctx)
101-
if err != nil {
102-
return errors.Trace(err)
103-
}
104103

105-
keyspaceNameMap := make(map[string]struct{})
106-
for key := range infoMap {
107-
keyspaceNameMap[key.Keyspace] = struct{}{}
104+
keyspaceNameMap := make(map[string]struct{})
105+
for key := range infoMap {
106+
keyspaceNameMap[key.Keyspace] = struct{}{}
107+
}
108+
removeKeyspaceGCBarrier(ctx, o.pdClient, o.etcdClient.GetGCServiceID(), keyspaceNameMap)
108109
}
109-
removeKeyspaceGCBarrier(ctx, o.pdClient, o.etcdClient.GetGCServiceID(), keyspaceNameMap)
110110

111111
cmd.Println("reset and all metadata truncated in PD!")
112112

logservice/schemastore/persist_storage.go

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ import (
2929
"github.com/pingcap/ticdc/pkg/common"
3030
commonEvent "github.com/pingcap/ticdc/pkg/common/event"
3131
"github.com/pingcap/ticdc/pkg/config"
32-
"github.com/pingcap/ticdc/pkg/config/kerneltype"
3332
"github.com/pingcap/ticdc/pkg/errors"
3433
"github.com/pingcap/ticdc/pkg/filter"
3534
"github.com/pingcap/ticdc/pkg/txnutil/gc"
@@ -157,17 +156,7 @@ func newPersistentStorage(
157156
}
158157

159158
func (p *persistentStorage) getGcSafePoint(ctx context.Context) (uint64, error) {
160-
if kerneltype.IsClassic() {
161-
return gc.SetServiceGCSafepoint(ctx, p.pdCli, defaultSchemaStoreGcServiceID, 0, 0)
162-
}
163-
164-
gcClient := p.pdCli.GetGCStatesClient(p.keyspaceID)
165-
gcState, err := gc.GetGCState(ctx, gcClient)
166-
if err != nil {
167-
return 0, err
168-
}
169-
170-
return gcState.TxnSafePoint, nil
159+
return gc.UnifyGetServiceGCSafepoint(ctx, p.pdCli, p.keyspaceID, defaultSchemaStoreGcServiceID)
171160
}
172161

173162
func (p *persistentStorage) initialize(ctx context.Context) {
@@ -183,6 +172,7 @@ func (p *persistentStorage) initialize(ctx context.Context) {
183172
ctx,
184173
p.pdCli,
185174
defaultSchemaStoreGcServiceID,
175+
p.keyspaceID,
186176
fakeChangefeedID,
187177
defaultGcServiceTTL, gcSafePoint+1)
188178
if err == nil {

pkg/migrate/migrate.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -398,15 +398,13 @@ func (m *migrator) migrateGcServiceSafePoint(ctx context.Context,
398398
}
399399
}
400400
if cdcGcSafePoint != nil {
401-
_, err := gc.SetServiceGCSafepoint(ctx, pdClient, newGcServiceID,
402-
ttl,
403-
cdcGcSafePoint.SafePoint)
401+
_, err := gc.SetServiceGCSafepoint(ctx, pdClient, newGcServiceID, ttl, cdcGcSafePoint.SafePoint)
404402
if err != nil {
405403
log.Error("set gc service safepoint failed",
406404
zap.Error(err))
407405
return errors.Trace(err)
408406
}
409-
err = gc.RemoveServiceGCSafepoint(ctx, pdClient, oldGcServiceID)
407+
err = gc.UnifyDeleteGcSafepoint(ctx, pdClient, 0, oldGcServiceID)
410408
if err != nil {
411409
log.Warn("remove old gc safepoint failed", zap.Error(err))
412410
}

pkg/txnutil/gc/gc_manager.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -98,8 +98,7 @@ func (m *gcManager) TryUpdateGCSafePoint(
9898
}
9999
m.lastUpdatedTime.Store(time.Now())
100100

101-
actual, err := SetServiceGCSafepoint(
102-
ctx, m.pdClient, m.gcServiceID, m.gcTTL, checkpointTs)
101+
actual, err := SetServiceGCSafepoint(ctx, m.pdClient, m.gcServiceID, m.gcTTL, checkpointTs)
103102
if err != nil {
104103
log.Warn("updateGCSafePoint failed",
105104
zap.Uint64("safePointTs", checkpointTs),
@@ -175,9 +174,9 @@ func (m *gcManager) TryUpdateKeyspaceGCBarrier(ctx context.Context, keyspaceID u
175174
}
176175
m.keyspaceLastUpdatedTimeMap.Store(keyspaceID, time.Now())
177176

178-
gcClient := m.pdClient.GetGCStatesClient(keyspaceID)
177+
gcCli := m.pdClient.GetGCStatesClient(keyspaceID)
179178
ttl := time.Duration(m.gcTTL) * time.Second
180-
actual, err := SetGCBarrier(ctx, gcClient, m.gcServiceID, checkpointTs, ttl)
179+
actual, err := SetGCBarrier(ctx, gcCli, m.gcServiceID, checkpointTs, ttl)
181180
if err != nil {
182181
log.Warn("updateKeyspaceGCBarrier failed",
183182
zap.Uint32("keyspaceID", keyspaceID),
@@ -188,7 +187,7 @@ func (m *gcManager) TryUpdateKeyspaceGCBarrier(ctx context.Context, keyspaceID u
188187
barrierInfo := barrierInfoObj.(*keyspaceGCBarrierInfo)
189188
lastSucceededTime = barrierInfo.lastSucceededTime
190189
}
191-
if time.Since(lastSucceededTime) >= ttl {
190+
if time.Since(lastSucceededTime) >= time.Duration(m.gcTTL)*time.Second {
192191
return cerror.ErrUpdateGCBarrierFailed.Wrap(err)
193192
}
194193
return nil
@@ -207,7 +206,8 @@ func (m *gcManager) TryUpdateKeyspaceGCBarrier(ctx context.Context, keyspaceID u
207206
newBarrierInfo := &keyspaceGCBarrierInfo{
208207
lastSucceededTime: time.Now(),
209208
lastGcBarrierTs: actual,
210-
isTiCDCBlockGC: actual == checkpointTs,
209+
// TODO tenfyzhong 2025-10-10 18:39:26
210+
isTiCDCBlockGC: actual == checkpointTs,
211211
}
212212
m.keyspaceGCBarrierInfoMap.Store(keyspaceID, newBarrierInfo)
213213

pkg/txnutil/gc/gc_service.go

Lines changed: 40 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -42,23 +42,28 @@ const (
4242
func EnsureChangefeedStartTsSafety(
4343
ctx context.Context, pdCli pd.Client,
4444
gcServiceIDPrefix string,
45+
keyspaceID uint32,
4546
changefeedID common.ChangeFeedID,
4647
TTL int64, startTs uint64,
4748
) error {
4849
gcServiceID := gcServiceIDPrefix + changefeedID.Keyspace() + "_" + changefeedID.Name()
50+
if kerneltype.IsClassic() {
51+
return ensureChangefeedStartTsSafetyClassic(ctx, pdCli, gcServiceID, TTL, startTs)
52+
}
53+
return ensureChangefeedStartTsSafetyNextGen(ctx, pdCli, gcServiceID, keyspaceID, TTL, startTs)
54+
}
55+
56+
func ensureChangefeedStartTsSafetyClassic(ctx context.Context, pdCli pd.Client, gcServiceID string, ttl int64, startTs uint64) error {
4957
// set gc safepoint for the changefeed gc service
50-
minServiceGCTs, err := SetServiceGCSafepoint(
51-
ctx, pdCli,
52-
gcServiceID,
53-
TTL, startTs)
58+
minServiceGCTs, err := SetServiceGCSafepoint(ctx, pdCli, gcServiceID, ttl, startTs)
5459
if err != nil {
5560
return errors.Trace(err)
5661
}
5762
log.Info("set gc safepoint for changefeed",
5863
zap.String("gcServiceID", gcServiceID),
5964
zap.Uint64("expectedGCSafepoint", startTs),
6065
zap.Uint64("actualGCSafepoint", minServiceGCTs),
61-
zap.Int64("ttl", TTL))
66+
zap.Int64("ttl", ttl))
6267

6368
// startTs should be greater than or equal to minServiceGCTs + 1, otherwise gcManager
6469
// would return a ErrSnapshotLostByGC even though the changefeed would appear to be successfully
@@ -69,6 +74,15 @@ func EnsureChangefeedStartTsSafety(
6974
return nil
7075
}
7176

77+
func ensureChangefeedStartTsSafetyNextGen(ctx context.Context, pdCli pd.Client, gcServiceID string, keyspaceID uint32, ttl int64, startTs uint64) error {
78+
gcCli := pdCli.GetGCStatesClient(keyspaceID)
79+
_, err := SetGCBarrier(ctx, gcCli, gcServiceID, startTs, time.Duration(ttl)*time.Second)
80+
if err != nil {
81+
return errors.ErrStartTsBeforeGC.GenWithStackByArgs(startTs)
82+
}
83+
return nil
84+
}
85+
7286
// UndoEnsureChangefeedStartTsSafety cleans the service GC safepoint of a changefeed
7387
// if something goes wrong after successfully calling EnsureChangefeedStartTsSafety().
7488
func UndoEnsureChangefeedStartTsSafety(
@@ -114,8 +128,23 @@ func SetServiceGCSafepoint(
114128
return minServiceGCTs, err
115129
}
116130

117-
// RemoveServiceGCSafepoint removes a service safepoint from PD.
118-
func RemoveServiceGCSafepoint(ctx context.Context, pdCli pd.Client, serviceID string) error {
131+
// UnifyGetServiceGCSafepoint returns a service gc safepoint on classic mode or
132+
// a gc barrier on next-gen mode
133+
func UnifyGetServiceGCSafepoint(ctx context.Context, pdCli pd.Client, keyspaceID uint32, serviceID string) (uint64, error) {
134+
if kerneltype.IsClassic() {
135+
return SetServiceGCSafepoint(ctx, pdCli, serviceID, 0, 0)
136+
}
137+
138+
gcCli := pdCli.GetGCStatesClient(keyspaceID)
139+
gcState, err := getGCState(ctx, gcCli)
140+
if err != nil {
141+
return 0, err
142+
}
143+
return gcState.TxnSafePoint, nil
144+
}
145+
146+
// removeServiceGCSafepoint removes a service safepoint from PD.
147+
func removeServiceGCSafepoint(ctx context.Context, pdCli pd.Client, serviceID string) error {
119148
// Set TTL to 0 second to delete the service safe point.
120149
TTL := 0
121150
return retry.Do(ctx,
@@ -131,6 +160,7 @@ func RemoveServiceGCSafepoint(ctx context.Context, pdCli pd.Client, serviceID st
131160
retry.WithIsRetryableErr(errors.IsRetryableError))
132161
}
133162

163+
// SetGCBarrier Set a GC Barrier of a keyspace
134164
func SetGCBarrier(ctx context.Context, gcCli gc.GCStatesClient, serviceID string, ts uint64, ttl time.Duration) (barrierTS uint64, err error) {
135165
err = retry.Do(ctx, func() error {
136166
barrierInfo, err1 := gcCli.SetGCBarrier(ctx, serviceID, ts, ttl)
@@ -146,7 +176,7 @@ func SetGCBarrier(ctx context.Context, gcCli gc.GCStatesClient, serviceID string
146176
return barrierTS, err
147177
}
148178

149-
func GetGCState(ctx context.Context, gcCli gc.GCStatesClient) (gc.GCState, error) {
179+
func getGCState(ctx context.Context, gcCli gc.GCStatesClient) (gc.GCState, error) {
150180
return gcCli.GetGCState(ctx)
151181
}
152182

@@ -166,11 +196,11 @@ func DeleteGCBarrier(ctx context.Context, gcCli gc.GCStatesClient, serviceID str
166196
return barrierInfo, err
167197
}
168198

169-
// UnifyDeleteGcSafepoint delete a gc safepoint on classic mode or delte a gc
199+
// UnifyDeleteGcSafepoint delete a gc safepoint on classic mode or delete a gc
170200
// barrier on next-gen mode
171201
func UnifyDeleteGcSafepoint(ctx context.Context, pdCli pd.Client, keyspaceID uint32, serviceID string) error {
172202
if kerneltype.IsClassic() {
173-
return RemoveServiceGCSafepoint(ctx, pdCli, serviceID)
203+
return removeServiceGCSafepoint(ctx, pdCli, serviceID)
174204
}
175205

176206
gcClient := pdCli.GetGCStatesClient(keyspaceID)

0 commit comments

Comments
 (0)