Skip to content

Commit 8e4744d

Browse files
authored
scheduler: Introduce new scheduling algorithm to dynmaicly balance spliited table by traffic (pingcap#1468)
close pingcap#1409
1 parent c9e1b4e commit 8e4744d

File tree

65 files changed

+5946
-2566
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

65 files changed

+5946
-2566
lines changed

api/v2/model.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -514,7 +514,6 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig(
514514
RegionThreshold: c.Scheduler.RegionThreshold,
515515
RegionCountPerSpan: c.Scheduler.RegionCountPerSpan,
516516
WriteKeyThreshold: c.Scheduler.WriteKeyThreshold,
517-
SplitNumberPerNode: c.Scheduler.SplitNumberPerNode,
518517
SchedulingTaskCountPerNode: c.Scheduler.SchedulingTaskCountPerNode,
519518
}
520519
}
@@ -840,7 +839,6 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
840839
RegionThreshold: cloned.Scheduler.RegionThreshold,
841840
RegionCountPerSpan: cloned.Scheduler.RegionCountPerSpan,
842841
WriteKeyThreshold: cloned.Scheduler.WriteKeyThreshold,
843-
SplitNumberPerNode: cloned.Scheduler.SplitNumberPerNode,
844842
SchedulingTaskCountPerNode: cloned.Scheduler.SchedulingTaskCountPerNode,
845843
}
846844
}
@@ -1053,8 +1051,6 @@ type ChangefeedSchedulerConfig struct {
10531051
RegionCountPerSpan int `toml:"region-count-per-span" json:"region-count-per-span"`
10541052
// WriteKeyThreshold is the written keys threshold of splitting a table.
10551053
WriteKeyThreshold int `toml:"write_key_threshold" json:"write_key_threshold"`
1056-
// SplitNumberPerNode is the number of splits per node.
1057-
SplitNumberPerNode int `toml:"split_number_per_node" json:"split_number_per_node"`
10581054
// SchedulingTaskCountPerNode is the upper limit for scheduling tasks each node.
10591055
SchedulingTaskCountPerNode int `toml:"scheduling-task-count-per-node" json:"scheduling-task-per-node"`
10601056
}

cmd/kafka-consumer/event_group.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,5 @@ func (g *eventsGroup) Resolve(resolve uint64) []*commonEvent.DMLEvent {
8282
zap.Int("resolved", len(result)), zap.Int("remained", len(g.events)),
8383
zap.Uint64("resolveTs", resolve), zap.Uint64("firstCommitTs", g.events[0].CommitTs))
8484
}
85-
g.highWatermark = resolve
8685
return result
8786
}

cmd/pulsar-consumer/event_group.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,5 @@ func (g *eventsGroup) Resolve(resolve uint64) []*commonEvent.DMLEvent {
8282
zap.Int("resolved", len(result)), zap.Int("remained", len(g.events)),
8383
zap.Uint64("resolveTs", resolve), zap.Uint64("firstCommitTs", g.events[0].CommitTs))
8484
}
85-
g.highWatermark = resolve
8685
return result
8786
}

coordinator/controller.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,7 @@ func (c *Controller) onMessage(msg *messaging.TargetMessage) {
233233
}
234234

235235
func (c *Controller) onNodeChanged() {
236-
currentNodes := c.bootstrapper.GetAllNodes()
236+
currentNodes := c.bootstrapper.GetAllNodeIDs()
237237

238238
activeNodes := c.nodeManager.GetAliveNodes()
239239
newNodes := make([]*node.Info, 0, len(activeNodes))

coordinator/operator/operator_add.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,3 +116,8 @@ func (m *AddMaintainerOperator) String() string {
116116
func (m *AddMaintainerOperator) Type() string {
117117
return "add"
118118
}
119+
120+
func (m *AddMaintainerOperator) BlockTsForward() bool {
121+
log.Panic("unreachable code")
122+
return false
123+
}

coordinator/operator/operator_move.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,3 +184,8 @@ func (m *MoveMaintainerOperator) String() string {
184184
func (m *MoveMaintainerOperator) Type() string {
185185
return "move"
186186
}
187+
188+
func (m *MoveMaintainerOperator) BlockTsForward() bool {
189+
log.Panic("unreachable code")
190+
return false
191+
}

coordinator/operator/operator_stop.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,3 +126,8 @@ func (m *StopChangefeedOperator) String() string {
126126
func (m *StopChangefeedOperator) Type() string {
127127
return "stop"
128128
}
129+
130+
func (m *StopChangefeedOperator) BlockTsForward() bool {
131+
log.Panic("unreachable code")
132+
return false
133+
}

downstreamadapter/dispatcher/table_progress.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ func (p *TableProgress) GetEventSizePerSecond() float32 {
163163
if eventSizePerSecond == 0 {
164164
// The event size will only send to maintainer once per second.
165165
// So if no data is write, we use a tiny value instead of 0 to distinguish it from the status without eventSize
166-
return 0.1
166+
return 1
167167
}
168168

169169
return eventSizePerSecond

downstreamadapter/eventcollector/helper.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import (
2424

2525
func NewEventDynamicStream(collector *EventCollector) dynstream.DynamicStream[common.GID, common.DispatcherID, dispatcher.DispatcherEvent, *dispatcherStat, *EventsHandler] {
2626
option := dynstream.NewOption()
27-
option.BatchCount = 67136
27+
option.BatchCount = 4096
2828
option.UseBuffer = false
2929
// Enable memory control for dispatcher events dynamic stream.
3030
option.EnableMemoryControl = true

maintainer/barrier.go

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
package maintainer
1515

1616
import (
17+
"math"
1718
"sync"
1819

1920
"github.com/pingcap/log"
@@ -273,19 +274,16 @@ func (b *Barrier) Resend() []*messaging.TargetMessage {
273274
return msgs
274275
}
275276

276-
// ShouldBlockCheckpointTs returns ture if there is a block event need block the checkpoint ts forwarding
277-
// currently, when the block event is a create table event, we should block the checkpoint ts forwarding
278-
// because on the complete checkpointTs calculation should consider the new dispatcher.
279-
func (b *Barrier) ShouldBlockCheckpointTs() bool {
280-
flag := false
281-
b.blockedEvents.RangeWoLock(func(key eventKey, barrierEvent *BarrierEvent) bool {
282-
if barrierEvent.hasNewTable {
283-
flag = true
284-
return false
277+
// GetMinBlockedCheckpointTsForNewTables returns the minimum checkpoint ts for the new tables
278+
func (b *Barrier) GetMinBlockedCheckpointTsForNewTables() uint64 {
279+
minCheckpointTs := uint64(math.MaxUint64)
280+
b.blockedEvents.Range(func(key eventKey, barrierEvent *BarrierEvent) bool {
281+
if barrierEvent.hasNewTable && minCheckpointTs > barrierEvent.commitTs {
282+
minCheckpointTs = barrierEvent.commitTs
285283
}
286284
return true
287285
})
288-
return flag
286+
return minCheckpointTs
289287
}
290288

291289
func (b *Barrier) handleOneStatus(changefeedID *heartbeatpb.ChangefeedID, status *heartbeatpb.TableSpanBlockStatus) (*BarrierEvent, *heartbeatpb.DispatcherStatus) {

0 commit comments

Comments
 (0)