Skip to content

Commit 506b9c2

Browse files
authored
logservice: read code and some tiny refactor (pingcap#2772)
close pingcap#2773
1 parent a9f1e80 commit 506b9c2

File tree

25 files changed

+285
-299
lines changed

25 files changed

+285
-299
lines changed

api/middleware/authenticate_middleware.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ func verify(ctx *gin.Context, etcdCli etcd.Client) error {
102102

103103
// fetchTiDBTopology parses the TiDB topology from etcd.
104104
func fetchTiDBTopology(ctx context.Context, etcdClient etcd.Client, ks string) ([]upstream.TidbInstance, error) {
105-
keyspaceManager := appcontext.GetService[keyspace.KeyspaceManager](appcontext.KeyspaceManager)
105+
keyspaceManager := appcontext.GetService[keyspace.Manager](appcontext.KeyspaceManager)
106106
meta, err := keyspaceManager.LoadKeyspace(ctx, ks)
107107
if err != nil {
108108
return nil, err

api/middleware/middleware.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,7 @@ func KeyspaceCheckerMiddleware() gin.HandlerFunc {
238238
return
239239
}
240240

241-
keyspaceManager := appcontext.GetService[keyspace.KeyspaceManager](appcontext.KeyspaceManager)
241+
keyspaceManager := appcontext.GetService[keyspace.Manager](appcontext.KeyspaceManager)
242242
meta, err := keyspaceManager.LoadKeyspace(c.Request.Context(), ks)
243243
if errors.IsKeyspaceNotExistError(err) {
244244
c.IndentedJSON(http.StatusBadRequest, errors.ErrAPIInvalidParam)

api/v2/changefeed.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ func (h *OpenAPIV2) CreateChangefeed(c *gin.Context) {
103103
return
104104
}
105105

106-
keyspaceManager := appcontext.GetService[keyspace.KeyspaceManager](appcontext.KeyspaceManager)
106+
keyspaceManager := appcontext.GetService[keyspace.Manager](appcontext.KeyspaceManager)
107107
keyspaceMeta, err := keyspaceManager.LoadKeyspace(ctx, keyspaceName)
108108
if err != nil {
109109
_ = c.Error(errors.WrapError(errors.ErrKeyspaceNotFound, err))
@@ -381,7 +381,7 @@ func (h *OpenAPIV2) VerifyTable(c *gin.Context) {
381381
}
382382
protocol, _ := config.ParseSinkProtocolFromString(util.GetOrZero(replicaCfg.Sink.Protocol))
383383

384-
keyspaceManager := appcontext.GetService[keyspace.KeyspaceManager](appcontext.KeyspaceManager)
384+
keyspaceManager := appcontext.GetService[keyspace.Manager](appcontext.KeyspaceManager)
385385
keyspaceName := GetKeyspaceValueWithDefault(c)
386386
kvStorage, err := keyspaceManager.GetStorage(keyspaceName)
387387
if err != nil {
@@ -663,7 +663,7 @@ func (h *OpenAPIV2) ResumeChangefeed(c *gin.Context) {
663663
newCheckpointTs = cfg.OverwriteCheckpointTs
664664
}
665665

666-
keyspaceManager := appcontext.GetService[keyspace.KeyspaceManager](appcontext.KeyspaceManager)
666+
keyspaceManager := appcontext.GetService[keyspace.Manager](appcontext.KeyspaceManager)
667667
keyspaceMeta, err := keyspaceManager.LoadKeyspace(ctx, keyspaceName)
668668
if err != nil {
669669
_ = c.Error(errors.WrapError(errors.ErrKeyspaceNotFound, err))
@@ -820,7 +820,7 @@ func (h *OpenAPIV2) UpdateChangefeed(c *gin.Context) {
820820
}
821821
protocol, _ := config.ParseSinkProtocolFromString(util.GetOrZero(oldCfInfo.Config.Sink.Protocol))
822822

823-
keyspaceManager := appcontext.GetService[keyspace.KeyspaceManager](appcontext.KeyspaceManager)
823+
keyspaceManager := appcontext.GetService[keyspace.Manager](appcontext.KeyspaceManager)
824824

825825
kvStorage, err := keyspaceManager.GetStorage(keyspaceName)
826826
if err != nil {

api/v2/unsafe.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ func (h *OpenAPIV2) ResolveLock(c *gin.Context) {
5454

5555
keyspaceName := GetKeyspaceValueWithDefault(c)
5656

57-
keyspaceManager := appcontext.GetService[keyspace.KeyspaceManager](appcontext.KeyspaceManager)
57+
keyspaceManager := appcontext.GetService[keyspace.Manager](appcontext.KeyspaceManager)
5858
// The ctx's lifecycle is the same as the HTTP request.
5959
// The schema store may use the context to fetch database information asynchronously.
6060
// Therefore, we cannot use the context of the HTTP request.
@@ -93,7 +93,7 @@ func (h *OpenAPIV2) DeleteServiceGcSafePoint(c *gin.Context) {
9393
defer pdClient.Close()
9494

9595
keyspaceName := GetKeyspaceValueWithDefault(c)
96-
keyspaceManager := appcontext.GetService[keyspace.KeyspaceManager](appcontext.KeyspaceManager)
96+
keyspaceManager := appcontext.GetService[keyspace.Manager](appcontext.KeyspaceManager)
9797
keyspaceMeta, err := keyspaceManager.LoadKeyspace(c.Request.Context(), keyspaceName)
9898
if err != nil {
9999
_ = c.Error(cerror.WrapError(cerror.ErrKeyspaceNotFound, err))

coordinator/coordinator.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -269,7 +269,7 @@ func (c *coordinator) handleStateChange(
269269
if event.state == config.StateFailed || event.state == config.StateFinished {
270270
progress = config.ProgressStopping
271271
}
272-
if err := c.backend.UpdateChangefeed(context.Background(), cfInfo, cf.GetStatus().CheckpointTs, progress); err != nil {
272+
if err = c.backend.UpdateChangefeed(context.Background(), cfInfo, cf.GetStatus().CheckpointTs, progress); err != nil {
273273
log.Error("failed to update changefeed state",
274274
zap.Error(err))
275275
return errors.Trace(err)
@@ -287,7 +287,7 @@ func (c *coordinator) handleStateChange(
287287
log.Info("changefeed is resumed or created successfully, try to delete its safeguard gc safepoint",
288288
zap.String("changefeed", event.changefeedID.String()))
289289

290-
keyspaceManager := appcontext.GetService[keyspace.KeyspaceManager](appcontext.KeyspaceManager)
290+
keyspaceManager := appcontext.GetService[keyspace.Manager](appcontext.KeyspaceManager)
291291
keyspaceMeta, err := keyspaceManager.LoadKeyspace(ctx, event.changefeedID.Keyspace())
292292
if err != nil {
293293
log.Warn("failed to load keyspace", zap.String("keyspace", event.changefeedID.Keyspace()), zap.Error(err))
@@ -473,7 +473,7 @@ func (c *coordinator) updateAllKeyspaceGcBarriers(ctx context.Context) error {
473473

474474
func (c *coordinator) updateKeyspaceGcBarrier(ctx context.Context, barrierMap map[string]uint64, keyspaceName string) error {
475475
// Obtain keyspace metadata from PD
476-
keyspaceManager := appcontext.GetService[keyspace.KeyspaceManager](appcontext.KeyspaceManager)
476+
keyspaceManager := appcontext.GetService[keyspace.Manager](appcontext.KeyspaceManager)
477477
keyspaceMeta, err := keyspaceManager.LoadKeyspace(ctx, keyspaceName)
478478
if err != nil {
479479
return cerror.WrapError(cerror.ErrLoadKeyspaceFailed, err)

downstreamadapter/dispatchermanager/dispatcher_manager.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ func NewDispatcherManager(
158158
ctx, cancel := context.WithCancel(context.Background())
159159
pdClock := appcontext.GetService[pdutil.Clock](appcontext.DefaultPDClock)
160160

161-
keyspaceManager := appcontext.GetService[keyspace.KeyspaceManager](appcontext.KeyspaceManager)
161+
keyspaceManager := appcontext.GetService[keyspace.Manager](appcontext.KeyspaceManager)
162162
keyspaceMeta, err := keyspaceManager.LoadKeyspace(ctx, changefeedID.Keyspace())
163163
if err != nil {
164164
cancel()

logservice/eventstore/event_store.go

Lines changed: 11 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -184,8 +184,8 @@ type eventWithCallback struct {
184184

185185
func eventWithCallbackSizer(e eventWithCallback) int {
186186
size := 0
187-
for _, e := range e.kvs {
188-
size += int(e.KeyLen + e.ValueLen + e.OldValueLen)
187+
for _, kv := range e.kvs {
188+
size += int(kv.KeyLen + kv.ValueLen + kv.OldValueLen)
189189
}
190190
return size
191191
}
@@ -236,7 +236,6 @@ const (
236236
)
237237

238238
func New(
239-
ctx context.Context,
240239
root string,
241240
subClient logpuller.SubscriptionClient,
242241
) EventStore {
@@ -259,6 +258,7 @@ func New(
259258
gcManager: newGCManager(),
260259

261260
subscriptionChangeCh: chann.NewAutoDrainChann[SubscriptionChange](),
261+
messageCenter: appcontext.GetService[messaging.MessageCenter](appcontext.MessageCenter),
262262

263263
decoderPool: &sync.Pool{
264264
New: func() any {
@@ -280,6 +280,7 @@ func New(
280280
store.dispatcherMeta.dispatcherStats = make(map[common.DispatcherID]*dispatcherStat)
281281
store.dispatcherMeta.tableStats = make(map[int64]subscriptionStats)
282282

283+
store.messageCenter.RegisterHandler(messaging.EventStoreTopic, store.handleMessage)
283284
return store
284285
}
285286

@@ -326,11 +327,11 @@ func (p *writeTaskPool) run(ctx context.Context) {
326327
return
327328
}
328329
start := time.Now()
329-
if err := p.store.writeEvents(p.db, events, encoder); err != nil {
330-
log.Panic("write events failed")
330+
if err = p.store.writeEvents(p.db, events, encoder); err != nil {
331+
log.Panic("write events failed", zap.Error(err))
331332
}
332-
for i := range events {
333-
events[i].callback()
333+
for idx := range events {
334+
events[idx].callback()
334335
}
335336
ioWriteDuration.Observe(time.Since(start).Seconds())
336337
totalDuration.Observe(time.Since(totalStart).Seconds())
@@ -362,15 +363,9 @@ func (e *eventStore) Run(ctx context.Context) error {
362363
defer func() {
363364
log.Info("event store exited")
364365
}()
365-
eg, ctx := errgroup.WithContext(ctx)
366-
367-
// recv and handle messages
368-
messageCenter := appcontext.GetService[messaging.MessageCenter](appcontext.MessageCenter)
369-
e.messageCenter = messageCenter
370-
messageCenter.RegisterHandler(messaging.EventStoreTopic, e.handleMessage)
371366

367+
eg, ctx := errgroup.WithContext(ctx)
372368
for _, p := range e.writeTaskPools {
373-
p := p
374369
eg.Go(func() error {
375370
p.run(ctx)
376371
return nil
@@ -399,7 +394,7 @@ func (e *eventStore) Run(ctx context.Context) error {
399394
return eg.Wait()
400395
}
401396

402-
func (e *eventStore) Close(ctx context.Context) error {
397+
func (e *eventStore) Close(_ context.Context) error {
403398
log.Info("event store start to close")
404399
defer log.Info("event store closed")
405400

@@ -1340,7 +1335,7 @@ func (e *eventStore) uploadStatePeriodically(ctx context.Context) error {
13401335
// When the log coordinator resides on the same node, it will receive the same object reference.
13411336
// To prevent data races, we need to create a clone of the state.
13421337
message := messaging.NewSingleTargetMessage(coordinatorID, messaging.LogCoordinatorTopic, state.Copy())
1343-
// just ignore messagees fail to send
1338+
// just ignore messages fail to send
13441339
if err := e.messageCenter.SendEvent(message); err != nil {
13451340
log.Warn("send broadcast message to coordinator failed", zap.Error(err))
13461341
}

logservice/eventstore/event_store_test.go

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"github.com/pingcap/ticdc/logservice/logpuller"
2828
"github.com/pingcap/ticdc/pkg/common"
2929
appcontext "github.com/pingcap/ticdc/pkg/common/context"
30+
"github.com/pingcap/ticdc/pkg/messaging"
3031
"github.com/pingcap/ticdc/pkg/pdutil"
3132
"github.com/stretchr/testify/require"
3233
)
@@ -89,11 +90,12 @@ func (s *mockSubscriptionClient) Unsubscribe(subID logpuller.SubscriptionID) {
8990
}
9091

9192
func newEventStoreForTest(path string) (logpuller.SubscriptionClient, EventStore) {
92-
ctx := context.Background()
9393
mockPDClock := pdutil.NewClock4Test()
9494
appcontext.SetService(appcontext.DefaultPDClock, mockPDClock)
95+
mc := messaging.NewMockMessageCenter()
96+
appcontext.SetService(appcontext.MessageCenter, mc)
9597
subClient := NewMockSubscriptionClient()
96-
store := New(ctx, path, subClient)
98+
store := New(path, subClient)
9799
return subClient, store
98100
}
99101

@@ -584,15 +586,12 @@ func TestEventStoreSwitchSubStat(t *testing.T) {
584586
}
585587

586588
func TestWriteToEventStore(t *testing.T) {
587-
ctx, cancel := context.WithCancel(context.Background())
588-
defer cancel()
589-
590589
mockPDClock := pdutil.NewClock4Test()
591590
appcontext.SetService(appcontext.DefaultPDClock, mockPDClock)
592591

593592
dir := t.TempDir()
594-
store := New(ctx, dir, nil).(*eventStore)
595-
defer store.Close(ctx)
593+
store := New(dir, nil).(*eventStore)
594+
defer store.Close(context.Background())
596595

597596
smallEntryKey := []byte("small-key")
598597
smallEntryValue := []byte("small-value")

logservice/schemastore/ddl_job_fetcher.go

Lines changed: 21 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,12 @@ import (
1818
"math"
1919
"sync"
2020

21-
"github.com/pingcap/errors"
2221
"github.com/pingcap/log"
2322
"github.com/pingcap/ticdc/heartbeatpb"
2423
"github.com/pingcap/ticdc/logservice/logpuller"
2524
"github.com/pingcap/ticdc/pkg/common"
2625
"github.com/pingcap/ticdc/pkg/common/event"
27-
cerror "github.com/pingcap/ticdc/pkg/errors"
26+
"github.com/pingcap/ticdc/pkg/errors"
2827
"github.com/pingcap/ticdc/utils/heap"
2928
"github.com/pingcap/tidb/pkg/kv"
3029
"github.com/pingcap/tidb/pkg/meta"
@@ -49,7 +48,7 @@ type ddlJobFetcher struct {
4948
}
5049

5150
// cacheDDLEvent and advanceResolvedTs may be called concurrently
52-
// the only gurantee is that when call advanceResolvedTs with ts, all ddl job with commit ts <= ts has been passed to cacheDDLEvent
51+
// the only guarantee is that when call advanceResolvedTs with ts, all ddl job with commit ts <= ts has been passed to cacheDDLEvent
5352
cacheDDLEvent func(ddlEvent DDLJobWithCommitTs)
5453
advanceResolvedTs func(resolvedTS uint64)
5554

@@ -58,7 +57,7 @@ type ddlJobFetcher struct {
5857
keyspaceID uint32
5958

6059
ddlTableInfo *event.DDLTableInfo
61-
onceInitddlTableInfo sync.Once
60+
onceInitDDLTableInfo sync.Once
6261
}
6362

6463
func newDDLJobFetcher(
@@ -69,18 +68,18 @@ func newDDLJobFetcher(
6968
cacheDDLEvent func(ddlEvent DDLJobWithCommitTs),
7069
advanceResolvedTs func(resolvedTS uint64),
7170
) *ddlJobFetcher {
72-
ddlJobFetcher := &ddlJobFetcher{
71+
fetcher := &ddlJobFetcher{
7372
ctx: ctx,
7473
subClient: subClient,
7574
cacheDDLEvent: cacheDDLEvent,
7675
advanceResolvedTs: advanceResolvedTs,
7776
kvStorage: kvStorage,
7877
keyspaceID: keyspaceID,
7978
}
80-
ddlJobFetcher.resolvedTsTracker.resolvedTsItemMap = make(map[logpuller.SubscriptionID]*resolvedTsItem)
81-
ddlJobFetcher.resolvedTsTracker.resolvedTsHeap = heap.NewHeap[*resolvedTsItem]()
79+
fetcher.resolvedTsTracker.resolvedTsItemMap = make(map[logpuller.SubscriptionID]*resolvedTsItem)
80+
fetcher.resolvedTsTracker.resolvedTsHeap = heap.NewHeap[*resolvedTsItem]()
8281

83-
return ddlJobFetcher
82+
return fetcher
8483
}
8584

8685
func (p *ddlJobFetcher) run(startTs uint64) error {
@@ -123,16 +122,16 @@ func (p *ddlJobFetcher) tryAdvanceResolvedTs(subID logpuller.SubscriptionID, new
123122
if !ok || minResolvedTsItem.resolvedTs == math.MaxUint64 {
124123
log.Panic("should not happen")
125124
}
126-
// minResolvedTsItem may be 0, it it ok to send it because it will be filtered later.
125+
// minResolvedTsItem may be 0, it's ok to send it because it will be filtered later.
127126
// it is ok to send redundant resolved ts to advanceResolvedTs.
128127
p.advanceResolvedTs(minResolvedTsItem.resolvedTs)
129128
}
130129

131130
func (p *ddlJobFetcher) input(kvs []common.RawKVEntry, _ func()) bool {
132-
for _, kv := range kvs {
133-
job, err := p.unmarshalDDL(&kv)
131+
for _, entry := range kvs {
132+
job, err := p.unmarshalDDL(&entry)
134133
if err != nil {
135-
log.Fatal("unmarshal ddl failed", zap.Any("kv", kv), zap.Error(err))
134+
log.Fatal("unmarshal ddl failed", zap.Any("entry", entry), zap.Error(err))
136135
}
137136

138137
if job == nil {
@@ -142,19 +141,19 @@ func (p *ddlJobFetcher) input(kvs []common.RawKVEntry, _ func()) bool {
142141
// cache ddl job in memory until the resolve ts pass its commit ts
143142
p.cacheDDLEvent(DDLJobWithCommitTs{
144143
Job: job,
145-
CommitTs: kv.CRTs,
144+
CommitTs: entry.CRTs,
146145
})
147146
}
148147
return false
149148
}
150149

151-
// unmarshalDDL unmarshals a ddl job from a raw kv entry.
150+
// unmarshalDDL unmarshal a ddl job from a raw kv entry.
152151
func (p *ddlJobFetcher) unmarshalDDL(rawKV *common.RawKVEntry) (*model.Job, error) {
153152
if rawKV.OpType != common.OpTypePut {
154153
return nil, nil
155154
}
156155
if !event.IsLegacyFormatJob(rawKV) {
157-
p.onceInitddlTableInfo.Do(func() {
156+
p.onceInitDDLTableInfo.Do(func() {
158157
if err := p.initDDLTableInfo(p.ctx, p.kvStorage); err != nil {
159158
log.Fatal("init ddl table info failed", zap.Error(err))
160159
}
@@ -179,7 +178,7 @@ func (p *ddlJobFetcher) initDDLTableInfo(ctx context.Context, kvStorage kv.Stora
179178

180179
dbInfos, err := snap.ListDatabases()
181180
if err != nil {
182-
return cerror.WrapError(cerror.ErrMetaListDatabases, err)
181+
return errors.WrapError(errors.ErrMetaListDatabases, err)
183182
}
184183

185184
db, err := findDBByName(dbInfos, mysql.SystemDB)
@@ -231,8 +230,8 @@ func findDBByName(dbs []*model.DBInfo, name string) (*model.DBInfo, error) {
231230
return db, nil
232231
}
233232
}
234-
return nil, cerror.WrapError(
235-
cerror.ErrDDLSchemaNotFound,
233+
return nil, errors.WrapError(
234+
errors.ErrDDLSchemaNotFound,
236235
errors.Errorf("can't find schema %s", name))
237236
}
238237

@@ -242,8 +241,8 @@ func findTableByName(tbls []*model.TableInfo, name string) (*model.TableInfo, er
242241
return t, nil
243242
}
244243
}
245-
return nil, cerror.WrapError(
246-
cerror.ErrDDLSchemaNotFound,
244+
return nil, errors.WrapError(
245+
errors.ErrDDLSchemaNotFound,
247246
errors.Errorf("can't find table %s", name))
248247
}
249248

@@ -253,8 +252,8 @@ func findColumnByName(cols []*model.ColumnInfo, name string) (*model.ColumnInfo,
253252
return c, nil
254253
}
255254
}
256-
return nil, cerror.WrapError(
257-
cerror.ErrDDLSchemaNotFound,
255+
return nil, errors.WrapError(
256+
errors.ErrDDLSchemaNotFound,
258257
errors.Errorf("can't find column %s", name))
259258
}
260259

0 commit comments

Comments
 (0)