Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 111 additions & 0 deletions src/examples/core-pinning/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
// This example demonstrates goroutine core pinning on multi-core systems (RP2040/RP2350).
// It shows how to pin goroutines to specific CPU cores and verify their execution.

//go:build rp2040 || rp2350

package main

import (
"machine"
"runtime"
"time"
)

func main() {
time.Sleep(5 * time.Second)
println("=== Core Pinning Example ===")
println("Number of CPU cores:", runtime.NumCPU())
println("[main] Main starting on core:", machine.CurrentCore())
println()

// Example 1: Pin using standard Go API (LockOSThread)
// This pins to whichever core this goroutine is currently running on
runtime.LockOSThread()
println("[main] Pinned using runtime.LockOSThread()")
println("[main] Running on core:", machine.CurrentCore())
runtime.UnlockOSThread()
println("[main] Unpinned using runtime.UnlockOSThread()")
println()

// Example 2: Pin to a specific core using machine package
machine.LockCore(0)
println("[main] Explicitly pinned to core 0 using machine.LockCore()")
println()

// Start a goroutine pinned to core 1
go core1Worker()

// Start a goroutine using standard LockOSThread
go standardLockWorker()

// Start an unpinned goroutine (can run on either core)
go unpinnedWorker()

// Main loop on core 0
for i := 0; i < 10; i++ {
println("[main] loop", i, "on CPU", machine.CurrentCore())
time.Sleep(500 * time.Millisecond)
}

// Unpin and let main run on any core
machine.UnlockCore()
println()
println("[main] Unpinned using machine.UnlockCore()")

// Continue running for a bit to show potential migration
for i := 0; i < 5; i++ {
println("[main] unpinned loop on CPU", machine.CurrentCore())
time.Sleep(500 * time.Millisecond)
}

println()
println("Example complete!")
}

// Worker function that pins to core 1 using explicit core selection
func core1Worker() {
// Pin this goroutine to core 1 explicitly
machine.LockCore(1)
println("[core1-worker] Worker pinned to core 1 using machine.LockCore()")

for i := 0; i < 10; i++ {
println("[core1-worker] loop", i, "on CPU", machine.CurrentCore())
time.Sleep(500 * time.Millisecond)
}

println("[core1-worker] Finished")
}

// Worker function that uses standard Go LockOSThread()
func standardLockWorker() {
// Pin this goroutine to whichever core it starts on
runtime.LockOSThread()
defer runtime.UnlockOSThread()

core := machine.CurrentCore()
println("[std-lock-worker] Worker locked using runtime.LockOSThread()")
println("[std-lock-worker] Running on core:", core)

for i := 0; i < 10; i++ {
println("[std-lock-worker] loop", i, "on CPU", machine.CurrentCore())
time.Sleep(600 * time.Millisecond)
}

println("[std-lock-worker] Finished")
}

// Worker function that is not pinned (can run on any core)
func unpinnedWorker() {
println("[unpinned-worker] Starting")

for i := 0; i < 10; i++ {
cpu := machine.CurrentCore()
println("[unpinned-worker] loop", i, "on CPU", cpu)
time.Sleep(700 * time.Millisecond)

// Yield to potentially migrate to another core
runtime.Gosched()
}

println("[unpinned-worker] Finished")
}
8 changes: 8 additions & 0 deletions src/internal/task/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,14 @@ type Task struct {
// since it falls into the padding of the FipsIndicator bit above.
RunState uint8

// Affinity specifies which CPU core this task should run on.
// -1 means no affinity (can run on any core)
// 0, 1, etc. means pinned to that specific core
// To be used ONLY with the "cores" scheduler.
// By default, all goroutines are unpinned (Affinity = -1)
// Pinning takes effect at the next scheduling point (e.g., after time.Sleep(), channel operations, or runtime.Gosched())
Affinity int8

// DeferFrame stores a pointer to the (stack allocated) defer frame of the
// goroutine that is used for the recover builtin.
DeferFrame unsafe.Pointer
Expand Down
26 changes: 26 additions & 0 deletions src/machine/machine_rp2_cores.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
//go:build (rp2040 || rp2350) && scheduler.cores

package machine

const numCPU = 2 // RP2040 and RP2350 both have 2 cores

// LockCore implementation for the cores scheduler.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs a more detailed description. For example, it doesn't say what happens if the target core is busy. I believe LockCore returns. If so, this is surprising to me; I would expect that once LockCore returns, the calling goroutine is running on the target core.

func LockCore(core int) {
if core < 0 || core >= numCPU {
panic("machine: core out of range")
}
machineLockCore(core)
}

// UnlockCore implementation for the cores scheduler.
func UnlockCore() {
machineUnlockCore()
}

// Internal functions implemented in runtime/scheduler_cores.go
//
//go:linkname machineLockCore runtime.machineLockCore
func machineLockCore(core int)

//go:linkname machineUnlockCore runtime.machineUnlockCore
func machineUnlockCore()
15 changes: 15 additions & 0 deletions src/machine/machine_rp2_nocores.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
//go:build (rp2040 || rp2350) && !scheduler.cores

package machine

// LockCore is not available without the cores scheduler.
// This is a stub that panics.
func LockCore(core int) {
panic("machine.LockCore: not available without scheduler.cores")
}

// UnlockCore is not available without the cores scheduler.
// This is a stub that panics.
func UnlockCore() {
panic("machine.UnlockCore: not available without scheduler.cores")
}
8 changes: 6 additions & 2 deletions src/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,14 +98,18 @@ func os_sigpipe() {
}

// LockOSThread wires the calling goroutine to its current operating system thread.
// Stub for now
// On microcontrollers with multiple cores (e.g., RP2040/RP2350), this pins the
// goroutine to the core it's currently running on.
Comment on lines +101 to +102
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it more precise to say "with the "cores" scheduler"?

// Called by go1.18 standard library on windows, see https://github.com/golang/go/issues/49320
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While here, remove this now irrelevant comment.

func LockOSThread() {
lockOSThreadImpl()
}

// UnlockOSThread undoes an earlier call to LockOSThread.
// Stub for now
// On microcontrollers with multiple cores, this unpins the goroutine, allowing
// it to run on any available core.
func UnlockOSThread() {
unlockOSThreadImpl()
}

// KeepAlive makes sure the value in the interface is alive until at least the
Expand Down
10 changes: 10 additions & 0 deletions src/runtime/scheduler_cooperative.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,16 @@ func unlockAtomics(mask interrupt.State) {
interrupt.Restore(mask)
}

// lockOSThreadImpl is a no-op for the cooperative scheduler (single-threaded).
func lockOSThreadImpl() {
// Single-threaded, nothing to do.
}

// unlockOSThreadImpl is a no-op for the cooperative scheduler (single-threaded).
func unlockOSThreadImpl() {
// Single-threaded, nothing to do.
}

func printlock() {
// nothing to do
}
Expand Down
109 changes: 102 additions & 7 deletions src/runtime/scheduler_cores.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ var secondaryCoresStarted bool
var cpuTasks [numCPU]*task.Task

var (
sleepQueue *task.Task
runqueue task.Queue
sleepQueue *task.Task
runqueueShared task.Queue // For unpinned tasks (affinity = -1)
runqueueCore [numCPU]task.Queue // Per-core queues for pinned tasks
)

func deadlock() {
Expand All @@ -39,8 +40,14 @@ func scheduleTask(t *task.Task) {
switch t.RunState {
case task.RunStatePaused:
// Paused, state is saved on the stack.
// Add it to the runqueue...
runqueue.Push(t)
// Route to appropriate queue based on affinity.
if t.Affinity >= 0 && t.Affinity < numCPU {
// Pinned to specific core
runqueueCore[t.Affinity].Push(t)
} else {
// Not pinned, use shared queue
runqueueShared.Push(t)
}
// ...and wake up a sleeping core, if there is one.
// (If all cores are already busy, this is a no-op).
schedulerWake()
Expand Down Expand Up @@ -86,7 +93,15 @@ func addSleepTask(t *task.Task, wakeup timeUnit) {

func Gosched() {
schedulerLock.Lock()
runqueue.Push(task.Current())
t := task.Current()

// Respect affinity when re-queueing.
if t.Affinity >= 0 && t.Affinity < numCPU {
runqueueCore[t.Affinity].Push(t)
} else {
runqueueShared.Push(t)
}

task.PauseLocked()
}

Expand All @@ -95,6 +110,49 @@ func NumCPU() int {
return numCPU
}

//
// Warning: Pinning goroutines can lead to load imbalance. The goroutine will
// wait in the specified core's queue even if other cores are idle. Use this
// feature carefully and only when you need explicit core affinity.
//
// Valid core values are 0 and 1. Panics if core is out of range.
//

// machineLockCore pins the current goroutine to the specified CPU core.
// This is called by machine.LockCore() on RP2040/RP2350.
// It does not validate the core number - validation is done in machine package.
func machineLockCore(core int) {
schedulerLock.Lock()
t := task.Current()
if t != nil {
t.Affinity = int8(core)
}
schedulerLock.Unlock()
}

// machineUnlockCore unpins the current goroutine.
// This is called by machine.UnlockCore() on RP2040/RP2350.
func machineUnlockCore() {
schedulerLock.Lock()
t := task.Current()
if t != nil {
t.Affinity = -1
}
schedulerLock.Unlock()
}

// lockOSThreadImpl implements LockOSThread for the cores scheduler.
// It pins the current goroutine to whichever core it's currently running on.
func lockOSThreadImpl() {
core := int(currentCPU())
machineLockCore(core)
}

// unlockOSThreadImpl implements UnlockOSThread for the cores scheduler.
func unlockOSThreadImpl() {
machineUnlockCore()
}

func addTimer(tn *timerNode) {
schedulerLock.Lock()
timerQueueAdd(tn)
Expand All @@ -110,7 +168,7 @@ func removeTimer(t *timer) *timerNode {
}

func schedulerRunQueue() *task.Queue {
return &runqueue
return &runqueueShared
}

// Pause the current task for a given time.
Expand Down Expand Up @@ -160,9 +218,33 @@ func run() {
}

func scheduler(_ bool) {
currentCore := int(currentCPU())

for mainExited.Load() == 0 {
// Check for ready-to-run tasks.
if runnable := runqueue.Pop(); runnable != nil {
// First, try to get a task pinned to this core.
var runnable *task.Task
if currentCore < numCPU {
runnable = runqueueCore[currentCore].Pop()
}

// If no pinned tasks, try the shared queue.
if runnable == nil {
runnable = runqueueShared.Pop()
}

if runnable != nil {
// Verify affinity constraint (sanity check).
if runnable.Affinity >= 0 && runnable.Affinity != int8(currentCore) {
// Shouldn't happen, but put it back on correct queue.
if runnable.Affinity < numCPU {
runqueueCore[runnable.Affinity].Push(runnable)
} else {
runqueueShared.Push(runnable)
}
continue
}

// Resume it now.
setCurrentTask(runnable)
runnable.RunState = task.RunStateRunning
Expand All @@ -183,6 +265,19 @@ func scheduler(_ bool) {
sleepQueue = sleepQueue.Next
sleepingTask.Next = nil

// Check affinity before running.
if sleepingTask.Affinity >= 0 && sleepingTask.Affinity != int8(currentCore) {
// Task is pinned to a different core, re-queue it.
sleepingTask.RunState = task.RunStatePaused
if sleepingTask.Affinity < numCPU {
runqueueCore[sleepingTask.Affinity].Push(sleepingTask)
} else {
runqueueShared.Push(sleepingTask)
}
schedulerWake()
continue
}

// Run it now.
setCurrentTask(sleepingTask)
sleepingTask.RunState = task.RunStateRunning
Expand Down
10 changes: 10 additions & 0 deletions src/runtime/scheduler_none.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,16 @@ func unlockAtomics(mask interrupt.State) {
interrupt.Restore(mask)
}

// lockOSThreadImpl is a no-op for the cooperative scheduler (single-threaded).
func lockOSThreadImpl() {
// Single-threaded, nothing to do.
}

// unlockOSThreadImpl is a no-op for the cooperative scheduler (single-threaded).
func unlockOSThreadImpl() {
// Single-threaded, nothing to do.
}

func printlock() {
// nothing to do
}
Expand Down
10 changes: 10 additions & 0 deletions src/runtime/scheduler_threads.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,3 +157,13 @@ func lockAtomics() interrupt.State {
func unlockAtomics(mask interrupt.State) {
atomicsLock.Unlock()
}

// lockOSThreadImpl is a no-op for the cooperative scheduler (single-threaded).
func lockOSThreadImpl() {
// Single-threaded, nothing to do.
}

// unlockOSThreadImpl is a no-op for the cooperative scheduler (single-threaded).
func unlockOSThreadImpl() {
// Single-threaded, nothing to do.
}
Loading