Skip to content

Commit 16409a2

Browse files
committed
Add goroutine core affinity support for RP2040/RP2350 systems
- Introduced support for CPU core pinning and affinity for tasks and goroutines. - Updated the scheduler to respect affinity constraints with separate queues for pinned and shared tasks. - Added new runtime API functions `LockToCore`, `UnlockFromCore`, `GetAffinity`, and `CurrentCPU`. - Example program demonstrates core pinning and unpinned execution behavior.
1 parent 6dee98c commit 16409a2

File tree

3 files changed

+194
-7
lines changed

3 files changed

+194
-7
lines changed

src/examples/core-pinning/main.go

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
// This example demonstrates goroutine core pinning on multi-core systems (RP2040/RP2350).
2+
// It shows how to pin goroutines to specific CPU cores and verify their execution.
3+
//
4+
5+
//go:build rp2040 || rp2350
6+
7+
package main
8+
9+
import (
10+
"runtime"
11+
"time"
12+
)
13+
14+
func main() {
15+
println("=== Core Pinning Example ===")
16+
println("Number of CPU cores:", runtime.NumCPU())
17+
println("Main starting on core:", runtime.CurrentCPU())
18+
println()
19+
20+
// Pin main goroutine to core 0
21+
runtime.LockToCore(0)
22+
println("Main pinned to core:", runtime.GetAffinity())
23+
println()
24+
25+
// Start a goroutine pinned to core 1
26+
go core1Worker()
27+
28+
// Start an unpinned goroutine (can run on either core)
29+
go unpinnedWorker()
30+
31+
// Main loop on core 0
32+
for i := 0; i < 10; i++ {
33+
println("Core 0 (main):", i, "on CPU", runtime.CurrentCPU())
34+
time.Sleep(500 * time.Millisecond)
35+
}
36+
37+
// Unpin and let main run on any core
38+
runtime.UnlockFromCore()
39+
println()
40+
println("Main unpinned, affinity:", runtime.GetAffinity())
41+
42+
// Continue running for a bit to show migration
43+
for i := 0; i < 5; i++ {
44+
println("Unpinned main on CPU", runtime.CurrentCPU())
45+
time.Sleep(500 * time.Millisecond)
46+
}
47+
48+
println()
49+
println("Example complete!")
50+
}
51+
52+
// Worker function that runs on core 1
53+
func core1Worker() {
54+
// Pin this goroutine to core 1
55+
runtime.LockToCore(1)
56+
println("Worker pinned to core:", runtime.GetAffinity())
57+
58+
for i := 0; i < 10; i++ {
59+
println(" Core 1 (worker):", i, "on CPU", runtime.CurrentCPU())
60+
time.Sleep(500 * time.Millisecond)
61+
}
62+
63+
println(" Core 1 worker finished")
64+
}
65+
66+
// Worker function that is not pinned (can run on any core)
67+
func unpinnedWorker() {
68+
println("Unpinned worker starting, affinity:", runtime.GetAffinity())
69+
70+
for i := 0; i < 10; i++ {
71+
cpu := runtime.CurrentCPU()
72+
println(" Unpinned worker:", i, "on CPU", cpu)
73+
time.Sleep(700 * time.Millisecond)
74+
75+
// Yield to potentially migrate to another core
76+
runtime.Gosched()
77+
}
78+
79+
println(" Unpinned worker finished")
80+
}

src/internal/task/task.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,14 @@ type Task struct {
2929
// since it falls into the padding of the FipsIndicator bit above.
3030
RunState uint8
3131

32+
// Affinity specifies which CPU core this task should run on.
33+
// -1 means no affinity (can run on any core)
34+
// 0, 1, etc. means pinned to that specific core
35+
// To be used ONLY with the "cores" scheduler.
36+
// By default, all goroutines are unpinned (Affinity = -1)
37+
// Pinning takes effect at the next scheduling point (e.g., after time.Sleep(), channel operations, or runtime.Gosched())
38+
Affinity int8
39+
3240
// DeferFrame stores a pointer to the (stack allocated) defer frame of the
3341
// goroutine that is used for the recover builtin.
3442
DeferFrame unsafe.Pointer

src/runtime/scheduler_cores.go

Lines changed: 106 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,9 @@ var secondaryCoresStarted bool
2222
var cpuTasks [numCPU]*task.Task
2323

2424
var (
25-
sleepQueue *task.Task
26-
runqueue task.Queue
25+
sleepQueue *task.Task
26+
runqueueShared task.Queue // For unpinned tasks (affinity = -1)
27+
runqueueCore [numCPU]task.Queue // Per-core queues for pinned tasks
2728
)
2829

2930
func deadlock() {
@@ -39,8 +40,14 @@ func scheduleTask(t *task.Task) {
3940
switch t.RunState {
4041
case task.RunStatePaused:
4142
// Paused, state is saved on the stack.
42-
// Add it to the runqueue...
43-
runqueue.Push(t)
43+
// Route to appropriate queue based on affinity.
44+
if t.Affinity >= 0 && t.Affinity < numCPU {
45+
// Pinned to specific core
46+
runqueueCore[t.Affinity].Push(t)
47+
} else {
48+
// Not pinned, use shared queue
49+
runqueueShared.Push(t)
50+
}
4451
// ...and wake up a sleeping core, if there is one.
4552
// (If all cores are already busy, this is a no-op).
4653
schedulerWake()
@@ -86,7 +93,15 @@ func addSleepTask(t *task.Task, wakeup timeUnit) {
8693

8794
func Gosched() {
8895
schedulerLock.Lock()
89-
runqueue.Push(task.Current())
96+
t := task.Current()
97+
98+
// Respect affinity when re-queueing.
99+
if t.Affinity >= 0 && t.Affinity < numCPU {
100+
runqueueCore[t.Affinity].Push(t)
101+
} else {
102+
runqueueShared.Push(t)
103+
}
104+
90105
task.PauseLocked()
91106
}
92107

@@ -95,6 +110,53 @@ func NumCPU() int {
95110
return numCPU
96111
}
97112

113+
// CurrentCPU returns the current CPU core number.
114+
// On RP2040/RP2350, this returns 0 or 1.
115+
func CurrentCPU() int {
116+
return int(currentCPU())
117+
}
118+
119+
// LockToCore pins the current goroutine to the specified CPU core.
120+
// Use core = -1 to unpin (allow running on any core).
121+
// Use core = 0 or 1 to pin to a specific core.
122+
// Panics if core is invalid (not -1, 0, or 1 on RP2040/RP2350).
123+
func LockToCore(core int) {
124+
if core < -1 || core >= numCPU {
125+
panic("runtime: invalid core number")
126+
}
127+
128+
schedulerLock.Lock()
129+
t := task.Current()
130+
if t != nil {
131+
t.Affinity = int8(core)
132+
}
133+
schedulerLock.Unlock()
134+
}
135+
136+
// UnlockFromCore unpins the current goroutine, allowing it to run on any core.
137+
// This is equivalent to LockToCore(-1).
138+
func UnlockFromCore() {
139+
schedulerLock.Lock()
140+
t := task.Current()
141+
if t != nil {
142+
t.Affinity = -1
143+
}
144+
schedulerLock.Unlock()
145+
}
146+
147+
// GetAffinity returns the CPU core affinity of the current goroutine.
148+
// Returns -1 if not pinned, or 0/1 if pinned to a specific core.
149+
func GetAffinity() int {
150+
schedulerLock.Lock()
151+
t := task.Current()
152+
affinity := -1
153+
if t != nil {
154+
affinity = int(t.Affinity)
155+
}
156+
schedulerLock.Unlock()
157+
return affinity
158+
}
159+
98160
func addTimer(tn *timerNode) {
99161
schedulerLock.Lock()
100162
timerQueueAdd(tn)
@@ -110,7 +172,7 @@ func removeTimer(t *timer) *timerNode {
110172
}
111173

112174
func schedulerRunQueue() *task.Queue {
113-
return &runqueue
175+
return &runqueueShared
114176
}
115177

116178
// Pause the current task for a given time.
@@ -160,9 +222,33 @@ func run() {
160222
}
161223

162224
func scheduler(_ bool) {
225+
currentCore := int(currentCPU())
226+
163227
for mainExited.Load() == 0 {
164228
// Check for ready-to-run tasks.
165-
if runnable := runqueue.Pop(); runnable != nil {
229+
// First, try to get a task pinned to this core.
230+
var runnable *task.Task
231+
if currentCore < numCPU {
232+
runnable = runqueueCore[currentCore].Pop()
233+
}
234+
235+
// If no pinned tasks, try the shared queue.
236+
if runnable == nil {
237+
runnable = runqueueShared.Pop()
238+
}
239+
240+
if runnable != nil {
241+
// Verify affinity constraint (sanity check).
242+
if runnable.Affinity >= 0 && runnable.Affinity != int8(currentCore) {
243+
// Shouldn't happen, but put it back on correct queue.
244+
if runnable.Affinity < numCPU {
245+
runqueueCore[runnable.Affinity].Push(runnable)
246+
} else {
247+
runqueueShared.Push(runnable)
248+
}
249+
continue
250+
}
251+
166252
// Resume it now.
167253
setCurrentTask(runnable)
168254
runnable.RunState = task.RunStateRunning
@@ -183,6 +269,19 @@ func scheduler(_ bool) {
183269
sleepQueue = sleepQueue.Next
184270
sleepingTask.Next = nil
185271

272+
// Check affinity before running.
273+
if sleepingTask.Affinity >= 0 && sleepingTask.Affinity != int8(currentCore) {
274+
// Task is pinned to a different core, re-queue it.
275+
sleepingTask.RunState = task.RunStatePaused
276+
if sleepingTask.Affinity < numCPU {
277+
runqueueCore[sleepingTask.Affinity].Push(sleepingTask)
278+
} else {
279+
runqueueShared.Push(sleepingTask)
280+
}
281+
schedulerWake()
282+
continue
283+
}
284+
186285
// Run it now.
187286
setCurrentTask(sleepingTask)
188287
sleepingTask.RunState = task.RunStateRunning

0 commit comments

Comments
 (0)