Skip to content

Commit 5e61d5a

Browse files
authored
schemaStorage(ticdc): remove schemaVersion in schemaStorage (#11869) (#11918)
close #11868
1 parent 9f0a11c commit 5e61d5a

File tree

2 files changed

+5
-33
lines changed

2 files changed

+5
-33
lines changed

cdc/entry/schema/snapshot.go

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -109,24 +109,6 @@ func (s *Snapshot) FillSchemaName(job *timodel.Job) error {
109109
return nil
110110
}
111111

112-
// GetSchemaVersion returns the schema version of the meta.
113-
func GetSchemaVersion(meta *timeta.Meta) (int64, error) {
114-
// After we get the schema version at startTs, if the diff corresponding to that version does not exist,
115-
// it means that the job is not committed yet, so we should subtract one from the version, i.e., version--.
116-
version, err := meta.GetSchemaVersion()
117-
if err != nil {
118-
return 0, errors.Trace(err)
119-
}
120-
diff, err := meta.GetSchemaDiff(version)
121-
if err != nil {
122-
return 0, errors.Trace(err)
123-
}
124-
if diff == nil {
125-
version--
126-
}
127-
return version, nil
128-
}
129-
130112
// NewSnapshotFromMeta creates a schema snapshot from meta.
131113
func NewSnapshotFromMeta(
132114
id model.ChangeFeedID,

cdc/entry/schema_storage.go

Lines changed: 5 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -72,9 +72,8 @@ type schemaStorage struct {
7272
snaps []*schema.Snapshot
7373
snapsMu sync.RWMutex
7474

75-
gcTs uint64
76-
resolvedTs uint64
77-
schemaVersion int64
75+
gcTs uint64
76+
resolvedTs uint64
7877

7978
filter filter.Filter
8079

@@ -91,9 +90,8 @@ func NewSchemaStorage(
9190
role util.Role, filter filter.Filter,
9291
) (SchemaStorage, error) {
9392
var (
94-
snap *schema.Snapshot
95-
version int64
96-
err error
93+
snap *schema.Snapshot
94+
err error
9795
)
9896
// storage may be nil in some unit test cases.
9997
if storage == nil {
@@ -104,7 +102,6 @@ func NewSchemaStorage(
104102
if err != nil {
105103
return nil, errors.Trace(err)
106104
}
107-
version, err = schema.GetSchemaVersion(meta)
108105
if err != nil {
109106
return nil, errors.Trace(err)
110107
}
@@ -115,7 +112,6 @@ func NewSchemaStorage(
115112
forceReplicate: forceReplicate,
116113
filter: filter,
117114
id: id,
118-
schemaVersion: version,
119115
role: role,
120116
}, nil
121117
}
@@ -193,7 +189,6 @@ func (s *schemaStorage) GetLastSnapshot() *schema.Snapshot {
193189
// HandleDDLJob creates a new snapshot in storage and handles the ddl job
194190
func (s *schemaStorage) HandleDDLJob(job *timodel.Job) error {
195191
if s.skipJob(job) {
196-
s.schemaVersion = job.BinlogInfo.SchemaVersion
197192
s.AdvanceResolvedTs(job.BinlogInfo.FinishedTS)
198193
return nil
199194
}
@@ -202,16 +197,13 @@ func (s *schemaStorage) HandleDDLJob(job *timodel.Job) error {
202197
var snap *schema.Snapshot
203198
if len(s.snaps) > 0 {
204199
lastSnap := s.snaps[len(s.snaps)-1]
205-
// We use schemaVersion to check if an already-executed DDL job is processed for a second time.
206-
// Unexecuted DDL jobs should have largest schemaVersions.
207-
if job.BinlogInfo.FinishedTS <= lastSnap.CurrentTs() || job.BinlogInfo.SchemaVersion <= s.schemaVersion {
200+
if job.BinlogInfo.FinishedTS <= lastSnap.CurrentTs() {
208201
log.Info("schemaStorage: ignore foregone DDL",
209202
zap.String("namespace", s.id.Namespace),
210203
zap.String("changefeed", s.id.ID),
211204
zap.String("DDL", job.Query),
212205
zap.Int64("jobID", job.ID),
213206
zap.Uint64("finishTs", job.BinlogInfo.FinishedTS),
214-
zap.Int64("schemaVersion", s.schemaVersion),
215207
zap.Int64("jobSchemaVersion", job.BinlogInfo.SchemaVersion),
216208
zap.String("role", s.role.String()))
217209
return nil
@@ -233,7 +225,6 @@ func (s *schemaStorage) HandleDDLJob(job *timodel.Job) error {
233225
return errors.Trace(err)
234226
}
235227
s.snaps = append(s.snaps, snap)
236-
s.schemaVersion = job.BinlogInfo.SchemaVersion
237228
s.AdvanceResolvedTs(job.BinlogInfo.FinishedTS)
238229
log.Info("schemaStorage: update snapshot by the DDL job",
239230
zap.String("namespace", s.id.Namespace),
@@ -242,7 +233,6 @@ func (s *schemaStorage) HandleDDLJob(job *timodel.Job) error {
242233
zap.String("table", job.TableName),
243234
zap.String("query", job.Query),
244235
zap.Uint64("finishedTs", job.BinlogInfo.FinishedTS),
245-
zap.Uint64("schemaVersion", uint64(s.schemaVersion)),
246236
zap.String("role", s.role.String()))
247237
return nil
248238
}

0 commit comments

Comments
 (0)