Skip to content

Commit 1f59580

Browse files
authored
monitoring: add event broker scanned bytes and count metrics and adjust some code (pingcap#1407)
close pingcap#1413
1 parent 03b33e1 commit 1f59580

File tree

10 files changed

+3103
-2974
lines changed

10 files changed

+3103
-2974
lines changed

downstreamadapter/eventcollector/event_collector.go

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -125,35 +125,37 @@ type EventCollector struct {
125125
}
126126

127127
func New(serverId node.ID) *EventCollector {
128-
eventCollector := EventCollector{
128+
receiveChannels := make([]chan *messaging.TargetMessage, config.DefaultBasicEventHandlerConcurrency)
129+
for i := 0; i < config.DefaultBasicEventHandlerConcurrency; i++ {
130+
receiveChannels[i] = make(chan *messaging.TargetMessage, receiveChanSize)
131+
}
132+
eventCollector := &EventCollector{
129133
serverId: serverId,
130134
dispatcherMap: sync.Map{},
131135
dispatcherRequestChan: chann.NewAutoDrainChann[DispatcherRequestWithTarget](),
132136
logCoordinatorRequestChan: chann.NewAutoDrainChann[*logservicepb.ReusableEventServiceRequest](),
133137
mc: appcontext.GetService[messaging.MessageCenter](appcontext.MessageCenter),
134138
dispatcherHeartbeatChan: chann.NewAutoDrainChann[*DispatcherHeartbeatWithTarget](),
135-
receiveChannels: make([]chan *messaging.TargetMessage, config.DefaultBasicEventHandlerConcurrency),
139+
receiveChannels: receiveChannels,
136140
metricDispatcherReceivedKVEventCount: metrics.DispatcherReceivedEventCount.WithLabelValues("KVEvent"),
137141
metricDispatcherReceivedResolvedTsEventCount: metrics.DispatcherReceivedEventCount.WithLabelValues("ResolvedTs"),
138142
metricReceiveEventLagDuration: metrics.EventCollectorReceivedEventLagDuration.WithLabelValues("Msg"),
139143
}
140-
eventCollector.ds = NewEventDynamicStream(&eventCollector)
144+
eventCollector.ds = NewEventDynamicStream(eventCollector)
141145
eventCollector.mc.RegisterHandler(messaging.EventCollectorTopic, eventCollector.RecvEventsMessage)
142146

143-
return &eventCollector
147+
return eventCollector
144148
}
145149

146150
func (c *EventCollector) Run(ctx context.Context) {
147151
ctx, cancel := context.WithCancel(ctx)
148152
c.cancel = cancel
149153

150154
for i := 0; i < config.DefaultBasicEventHandlerConcurrency; i++ {
151-
ch := make(chan *messaging.TargetMessage, receiveChanSize)
152-
c.receiveChannels[i] = ch
153155
c.wg.Add(1)
154156
go func() {
155157
defer c.wg.Done()
156-
c.runProcessMessage(ctx, ch)
158+
c.runProcessMessage(ctx, c.receiveChannels[i])
157159
}()
158160
}
159161

@@ -498,7 +500,7 @@ func (c *EventCollector) sendDispatcherHeartbeat(heartbeat *DispatcherHeartbeatW
498500
return nil
499501
}
500502

501-
func (c *EventCollector) handleDispatcherHeartbeatResponse(targetMessage *messaging.TargetMessage) error {
503+
func (c *EventCollector) handleDispatcherHeartbeatResponse(targetMessage *messaging.TargetMessage) {
502504
if len(targetMessage.Message) != 1 {
503505
log.Panic("invalid dispatcher heartbeat response message", zap.Any("msg", targetMessage))
504506
}
@@ -519,7 +521,6 @@ func (c *EventCollector) handleDispatcherHeartbeatResponse(targetMessage *messag
519521
}
520522
}
521523
}
522-
return nil
523524
}
524525

525526
// RecvEventsMessage is the handler for the events message from EventService.

logservice/eventstore/event_store.go

Lines changed: 22 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333
"github.com/pingcap/ticdc/pkg/common"
3434
appcontext "github.com/pingcap/ticdc/pkg/common/context"
3535
"github.com/pingcap/ticdc/pkg/config"
36+
"github.com/pingcap/ticdc/pkg/errors"
3637
"github.com/pingcap/ticdc/pkg/messaging"
3738
"github.com/pingcap/ticdc/pkg/metrics"
3839
"github.com/pingcap/ticdc/pkg/node"
@@ -76,7 +77,7 @@ type EventStore interface {
7677

7778
GetDispatcherDMLEventState(dispatcherID common.DispatcherID) (bool, DMLEventState)
7879

79-
// return an iterator which scan the data in ts range (dataRange.StartTs, dataRange.EndTs]
80+
// GetIterator return an iterator which scan the data in ts range (dataRange.StartTs, dataRange.EndTs]
8081
GetIterator(dispatcherID common.DispatcherID, dataRange common.DataRange) (EventIterator, error)
8182
}
8283

@@ -593,11 +594,12 @@ func (e *eventStore) GetIterator(dispatcherID common.DispatcherID, dataRange com
593594
return nil, nil
594595
}
595596
subscriptionStat := stat.subStat
596-
if dataRange.StartTs < subscriptionStat.checkpointTs.Load() {
597-
log.Panic("dataRange startTs is larger than subscriptionStat checkpointTs, it should not happen",
597+
checkpoint := subscriptionStat.checkpointTs.Load()
598+
if dataRange.StartTs < checkpoint {
599+
log.Panic("dataRange startTs is smaller than subscriptionStat checkpointTs, it should not happen",
598600
zap.Stringer("dispatcherID", dispatcherID),
599-
zap.Uint64("checkpointTs", subscriptionStat.checkpointTs.Load()),
600-
zap.Uint64("startTs", dataRange.StartTs))
601+
zap.Uint64("startTs", dataRange.StartTs),
602+
zap.Uint64("checkpointTs", checkpoint))
601603
}
602604
db := e.dbs[subscriptionStat.dbIndex]
603605
e.dispatcherMeta.RUnlock()
@@ -611,10 +613,11 @@ func (e *eventStore) GetIterator(dispatcherID common.DispatcherID, dataRange com
611613
UpperBound: end,
612614
})
613615
if err != nil {
614-
return nil, err
616+
return nil, errors.Trace(err)
615617
}
616618
startTime := time.Now()
617-
iter.First()
619+
// todo: what happens if iter.First() returns false?
620+
_ = iter.First()
618621
metricEventStoreFirstReadDurationHistogram.Observe(time.Since(startTime).Seconds())
619622
metrics.EventStoreScanRequestsCount.Inc()
620623

@@ -759,7 +762,6 @@ func (e *eventStore) writeEvents(db *pebble.DB, events []eventWithCallback) erro
759762
CounterKv.Add(float64(kvCount))
760763
metrics.EventStoreWriteBatchEventsCountHist.Observe(float64(kvCount))
761764
metrics.EventStoreWriteBatchSizeHist.Observe(float64(batch.Len()))
762-
metrics.EventStoreWriteBytes.Add(float64(batch.Len()))
763765
start := time.Now()
764766
err := batch.Commit(pebble.NoSync)
765767
metrics.EventStoreWriteDurationHistogram.Observe(float64(time.Since(start).Milliseconds()) / 1000)
@@ -790,10 +792,6 @@ type eventStoreIter struct {
790792
}
791793

792794
func (iter *eventStoreIter) Next() (*common.RawKVEntry, bool, error) {
793-
if iter.innerIter == nil {
794-
log.Panic("iter is nil")
795-
}
796-
797795
rawKV := &common.RawKVEntry{}
798796
for {
799797
if !iter.innerIter.Valid() {
@@ -805,20 +803,19 @@ func (iter *eventStoreIter) Next() (*common.RawKVEntry, bool, error) {
805803
log.Panic("fail to decode raw kv entry", zap.Error(err))
806804
}
807805
metrics.EventStoreScanBytes.Add(float64(len(value)))
808-
if iter.needCheckSpan {
809-
comparableKey := common.ToComparableKey(rawKV.Key)
810-
if bytes.Compare(comparableKey, iter.tableSpan.StartKey) >= 0 &&
811-
bytes.Compare(comparableKey, iter.tableSpan.EndKey) <= 0 {
812-
break
813-
}
814-
log.Debug("event store iter skip kv not in table span",
815-
zap.String("tableSpan", common.FormatTableSpan(iter.tableSpan)),
816-
zap.String("key", hex.EncodeToString(rawKV.Key)),
817-
zap.Uint64("startTs", rawKV.StartTs),
818-
zap.Uint64("commitTs", rawKV.CRTs))
819-
} else {
806+
if !iter.needCheckSpan {
807+
break
808+
}
809+
comparableKey := common.ToComparableKey(rawKV.Key)
810+
if bytes.Compare(comparableKey, iter.tableSpan.StartKey) >= 0 &&
811+
bytes.Compare(comparableKey, iter.tableSpan.EndKey) <= 0 {
820812
break
821813
}
814+
log.Debug("event store iter skip kv not in table span",
815+
zap.String("tableSpan", common.FormatTableSpan(iter.tableSpan)),
816+
zap.String("key", hex.EncodeToString(rawKV.Key)),
817+
zap.Uint64("startTs", rawKV.StartTs),
818+
zap.Uint64("commitTs", rawKV.CRTs))
822819
iter.innerIter.Next()
823820
}
824821
isNewTxn := false
@@ -830,7 +827,7 @@ func (iter *eventStoreIter) Next() (*common.RawKVEntry, bool, error) {
830827
iter.rowCount++
831828
startTime := time.Now()
832829
iter.innerIter.Next()
833-
metricEventStoreNextReadDurationHistogram.Observe(float64(time.Since(startTime).Seconds()))
830+
metricEventStoreNextReadDurationHistogram.Observe(time.Since(startTime).Seconds())
834831
return rawKV, isNewTxn, nil
835832
}
836833

logservice/schemastore/schema_store.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ type SchemaStore interface {
4242

4343
UnregisterTable(tableID int64) error
4444

45-
// return table info with largest version <= ts
45+
// GetTableInfo return table info with the largest version <= ts
4646
GetTableInfo(tableID int64, ts uint64) (*common.TableInfo, error)
4747

4848
// TODO: how to respect tableFilter

0 commit comments

Comments
 (0)