@@ -291,7 +291,148 @@ func (s *schemaStorageImpl) skipJob(job *timodel.Job) bool {
291291 zap .String ("namespace" , s .id .Namespace ),
292292 zap .String ("changefeed" , s .id .ID ),
293293 zap .String ("role" , s .role .String ()))
294+ << << << < HEAD
294295 return ! job .IsSynced () && ! job .IsDone ()
296+ == == == =
297+ return ! job .IsDone ()
298+ }
299+
300+ // BuildDDLEvents by parsing the DDL job
301+ func (s * schemaStorage ) BuildDDLEvents (
302+ ctx context.Context , job * timodel.Job ,
303+ ) (ddlEvents []* model.DDLEvent , err error ) {
304+ switch job .Type {
305+ case timodel .ActionRenameTables :
306+ // The result contains more than one DDLEvent for a rename tables job.
307+ ddlEvents , err = s .buildRenameEvents (ctx , job )
308+ if err != nil {
309+ return nil , errors .Trace (err )
310+ }
311+ case timodel .ActionCreateTables :
312+ if job .BinlogInfo != nil && job .BinlogInfo .MultipleTableInfos != nil {
313+ querys , err := ddl .SplitQueries (job .Query )
314+ if err != nil {
315+ return nil , errors .Trace (err )
316+ }
317+ multiTableInfos := job .BinlogInfo .MultipleTableInfos
318+ for index , tableInfo := range multiTableInfos {
319+ newTableInfo := model .WrapTableInfo (job .SchemaID , job .SchemaName , job .BinlogInfo .FinishedTS , tableInfo )
320+ job .Query = querys [index ]
321+ event := new (model.DDLEvent )
322+ event .FromJob (job , nil , newTableInfo )
323+ ddlEvents = append (ddlEvents , event )
324+ }
325+ } else {
326+ return nil , errors .Errorf ("there is no multiple table infos in the create tables job: %s" , job )
327+ }
328+ default :
329+ // parse preTableInfo
330+ preSnap , err := s .GetSnapshot (ctx , job .BinlogInfo .FinishedTS - 1 )
331+ if err != nil {
332+ return nil , errors .Trace (err )
333+ }
334+ preTableInfo , err := preSnap .PreTableInfo (job )
335+ if err != nil {
336+ return nil , errors .Trace (err )
337+ }
338+
339+ // parse tableInfo
340+ var tableInfo * model.TableInfo
341+ err = preSnap .FillSchemaName (job )
342+ if err != nil {
343+ log .Error ("build DDL event fail" , zap .Any ("job" , job ), zap .Error (err ))
344+ return nil , errors .Trace (err )
345+ }
346+ // TODO: find a better way to refactor this. For example, drop table job should not
347+ // have table info.
348+ if job .BinlogInfo != nil && job .BinlogInfo .TableInfo != nil {
349+ tableInfo = model .WrapTableInfo (job .SchemaID , job .SchemaName , job .BinlogInfo .FinishedTS , job .BinlogInfo .TableInfo )
350+
351+ // TODO: remove this after job is fixed by TiDB.
352+ // ref: https://github.com/pingcap/tidb/issues/43819
353+ if job .Type == timodel .ActionExchangeTablePartition {
354+ oldTableInfo , ok := preSnap .PhysicalTableByID (job .BinlogInfo .TableInfo .ID )
355+ if ! ok {
356+ return nil , cerror .ErrSchemaStorageTableMiss .GenWithStackByArgs (job .TableID )
357+ }
358+ tableInfo .SchemaID = oldTableInfo .SchemaID
359+ tableInfo .TableName = oldTableInfo .TableName
360+ }
361+ } else {
362+ // Just retrieve the schema name for a DDL job that does not contain TableInfo.
363+ // Currently supported by cdc are: ActionCreateSchema, ActionDropSchema,
364+ // and ActionModifySchemaCharsetAndCollate.
365+ tableInfo = & model.TableInfo {
366+ TableName : model.TableName {Schema : job .SchemaName },
367+ Version : job .BinlogInfo .FinishedTS ,
368+ }
369+ }
370+ event := new (model.DDLEvent )
371+ event .FromJob (job , preTableInfo , tableInfo )
372+ ddlEvents = append (ddlEvents , event )
373+ }
374+ return ddlEvents , nil
375+ }
376+
377+ // GetNewJobWithArgs returns a new job with the given args
378+ func GetNewJobWithArgs (job * timodel.Job , args timodel.JobArgs ) (* timodel.Job , error ) {
379+ job .FillArgs (args )
380+ bytes , err := job .Encode (true )
381+ if err != nil {
382+ return nil , errors .Trace (err )
383+ }
384+ encodedJob := & timodel.Job {}
385+ if err = encodedJob .Decode (bytes ); err != nil {
386+ return nil , errors .Trace (err )
387+ }
388+ return encodedJob , nil
389+ }
390+
391+ // TODO: find a better way to refactor this function.
392+ // buildRenameEvents gets a list of DDLEvent from a rename tables DDL job.
393+ func (s * schemaStorage ) buildRenameEvents (
394+ ctx context.Context , job * timodel.Job ,
395+ ) ([]* model.DDLEvent , error ) {
396+ var ddlEvents []* model.DDLEvent
397+ args , err := timodel .GetRenameTablesArgs (job )
398+ if err != nil {
399+ return nil , errors .Trace (err )
400+ }
401+
402+ multiTableInfos := job .BinlogInfo .MultipleTableInfos
403+ if len (multiTableInfos ) != len (args .RenameTableInfos ) {
404+ return nil , cerror .ErrInvalidDDLJob .GenWithStackByArgs (job .ID )
405+ }
406+
407+ preSnap , err := s .GetSnapshot (ctx , job .BinlogInfo .FinishedTS - 1 )
408+ if err != nil {
409+ return nil , errors .Trace (err )
410+ }
411+
412+ for i , tableInfo := range multiTableInfos {
413+ info := args .RenameTableInfos [i ]
414+ newSchema , ok := preSnap .SchemaByID (info .NewSchemaID )
415+ if ! ok {
416+ return nil , cerror .ErrSnapshotSchemaNotFound .GenWithStackByArgs (
417+ info .NewSchemaID )
418+ }
419+ newSchemaName := newSchema .Name .O
420+ oldSchemaName := info .OldSchemaName .O
421+ event := new (model.DDLEvent )
422+ preTableInfo , ok := preSnap .PhysicalTableByID (tableInfo .ID )
423+ if ! ok {
424+ return nil , cerror .ErrSchemaStorageTableMiss .GenWithStackByArgs (
425+ job .TableID )
426+ }
427+
428+ tableInfo := model .WrapTableInfo (info .NewSchemaID , newSchemaName ,
429+ job .BinlogInfo .FinishedTS , tableInfo )
430+ event .FromJobWithArgs (job , preTableInfo , tableInfo , oldSchemaName , newSchemaName )
431+ event .Seq = uint64 (i )
432+ ddlEvents = append (ddlEvents , event )
433+ }
434+ return ddlEvents , nil
435+ >> >> >> > 3 c7fd0a1fd (cdc (ddl ): ensure strict ordering for multi - table DDLs after split (#12450 ))
295436}
296437
297438// MockSchemaStorage is for tests.
0 commit comments