Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 90 additions & 0 deletions src/runtime/cond.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// +build !scheduler.none

package runtime

import (
"internal/task"
"sync/atomic"
"unsafe"
)

// notifiedPlaceholder is a placeholder task which is used to indicate that the condition variable has been notified.
var notifiedPlaceholder task.Task

// Cond is a simplified condition variable, useful for notifying goroutines of interrupts.
type Cond struct {
t *task.Task
}

// Notify sends a notification.
// If the condition variable already has a pending notification, this returns false.
func (c *Cond) Notify() bool {
for {
t := (*task.Task)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&c.t))))
switch t {
case nil:
// Nothing is waiting yet.
// Apply the notification placeholder.
if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.t)), unsafe.Pointer(t), unsafe.Pointer(&notifiedPlaceholder)) {
return true
}
case &notifiedPlaceholder:
// The condition variable has already been notified.
return false
default:
// Unblock the waiting task.
if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.t)), unsafe.Pointer(t), nil) {
runqueuePushBack(t)
return true
}
}
}
}

// Poll checks for a notification.
// If a notification is found, it is cleared and this returns true.
func (c *Cond) Poll() bool {
for {
t := (*task.Task)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&c.t))))
switch t {
case nil:
// No notifications are present.
return false
case &notifiedPlaceholder:
// A notification arrived and there is no waiting goroutine.
// Clear the notification and return.
if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.t)), unsafe.Pointer(t), nil) {
return true
}
default:
// A task is blocked on the condition variable, which means it has not been notified.
return false
}
}
}

// Wait for a notification.
// If the condition variable was previously notified, this returns immediately.
func (c *Cond) Wait() {
cur := task.Current()
for {
t := (*task.Task)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&c.t))))
switch t {
case nil:
// Condition variable has not been notified.
// Block the current task on the condition variable.
if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.t)), nil, unsafe.Pointer(cur)) {
task.Pause()
return
}
case &notifiedPlaceholder:
// A notification arrived and there is no waiting goroutine.
// Clear the notification and return.
if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.t)), unsafe.Pointer(t), nil) {
return
}
default:
panic("interrupt.Cond: condition variable in use by another goroutine")
}
}
}
38 changes: 38 additions & 0 deletions src/runtime/cond_nosched.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// +build scheduler.none

package runtime

import "runtime/interrupt"

// Cond is a simplified condition variable, useful for notifying goroutines of interrupts.
type Cond struct {
notified bool
}

// Notify sends a notification.
// If the condition variable already has a pending notification, this returns false.
func (c *Cond) Notify() bool {
i := interrupt.Disable()
prev := c.notified
c.notified = true
interrupt.Restore(i)
return !prev
}

// Poll checks for a notification.
// If a notification is found, it is cleared and this returns true.
func (c *Cond) Poll() bool {
i := interrupt.Disable()
notified := c.notified
c.notified = false
interrupt.Restore(i)
return notified
}

// Wait for a notification.
// If the condition variable was previously notified, this returns immediately.
func (c *Cond) Wait() {
for !c.Poll() {
waitForEvents()
}
}
44 changes: 44 additions & 0 deletions testdata/coroutines.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"runtime"
"sync"
"time"
)
Expand Down Expand Up @@ -71,6 +72,8 @@ func main() {
startSimpleFunc(emptyFunc)

time.Sleep(2 * time.Millisecond)

testCond()
}

func acquire(m *sync.Mutex) {
Expand Down Expand Up @@ -127,3 +130,44 @@ type simpleFunc func()

func emptyFunc() {
}

func testCond() {
var cond runtime.Cond
go func() {
// Wait for the caller to wait on the cond.
time.Sleep(time.Millisecond)

// Notify the caller.
ok := cond.Notify()
if !ok {
panic("notification not sent")
}

// This notification will be buffered inside the cond.
ok = cond.Notify()
if !ok {
panic("notification not queued")
}

// This notification should fail, since there is already one buffered.
ok = cond.Notify()
if ok {
panic("notification double-sent")
}
}()

// Verify that the cond has no pending notifications.
ok := cond.Poll()
if ok {
panic("unexpected early notification")
}

// Wait for the goroutine spawned earlier to send a notification.
cond.Wait()

// The goroutine should have also queued a notification in the cond.
ok = cond.Poll()
if !ok {
panic("missing queued notification")
}
}