Skip to content

Commit 6110f0b

Browse files
aykevldeadprogram
authored andcommitted
runtime: make channels parallelism-safe
1 parent 17302ca commit 6110f0b

File tree

4 files changed

+108
-30
lines changed

4 files changed

+108
-30
lines changed

src/internal/task/task.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ type Task struct {
2727
}
2828

2929
// DataUint32 returns the Data field as a uint32. The value is only valid after
30-
// setting it through SetDataUint32.
30+
// setting it through SetDataUint32 or by storing to it using DataAtomicUint32.
3131
func (t *Task) DataUint32() uint32 {
3232
return *(*uint32)(unsafe.Pointer(&t.Data))
3333
}
@@ -38,6 +38,11 @@ func (t *Task) SetDataUint32(val uint32) {
3838
*(*uint32)(unsafe.Pointer(&t.Data)) = val
3939
}
4040

41+
// DataAtomicUint32 returns the Data field as an atomic-if-needed Uint32 value.
42+
func (t *Task) DataAtomicUint32() *Uint32 {
43+
return (*Uint32)(unsafe.Pointer(&t.Data))
44+
}
45+
4146
// getGoroutineStackSize is a compiler intrinsic that returns the stack size for
4247
// the given function and falls back to the default stack size. It is replaced
4348
// with a load from a special section just before codegen.

src/runtime/chan.go

Lines changed: 93 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,12 @@ package runtime
3030
// non-select operations) so that the select operation knows which case did
3131
// proceed.
3232
// The value is at the same time also a way that goroutines can be the first
33-
// (and only) goroutine to 'take' a channel operation to change it from
34-
// 'waiting' to any other value. This is important for the select statement
35-
// because multiple goroutines could try to let different channels in the
36-
// select statement proceed at the same time. By using Task.Data, only a
37-
// single channel operation in the select statement can proceed.
33+
// (and only) goroutine to 'take' a channel operation using an atomic CAS
34+
// operation to change it from 'waiting' to any other value. This is important
35+
// for the select statement because multiple goroutines could try to let
36+
// different channels in the select statement proceed at the same time. By
37+
// using Task.Data, only a single channel operation in the select statement
38+
// can proceed.
3839
// - It is possible for the channel queues to contain already-processed senders
3940
// or receivers. This can happen when the select statement managed to proceed
4041
// but the goroutine doing the select has not yet cleaned up the stale queue
@@ -49,15 +50,17 @@ import (
4950

5051
// The runtime implementation of the Go 'chan' type.
5152
type channel struct {
52-
closed bool
53-
elementSize uintptr
54-
bufCap uintptr // 'cap'
55-
bufLen uintptr // 'len'
56-
bufHead uintptr
57-
bufTail uintptr
58-
senders chanQueue
59-
receivers chanQueue
60-
buf unsafe.Pointer
53+
closed bool
54+
selectLocked bool
55+
elementSize uintptr
56+
bufCap uintptr // 'cap'
57+
bufLen uintptr // 'len'
58+
bufHead uintptr
59+
bufTail uintptr
60+
senders chanQueue
61+
receivers chanQueue
62+
lock task.PMutex
63+
buf unsafe.Pointer
6164
}
6265

6366
const (
@@ -73,7 +76,8 @@ type chanQueue struct {
7376

7477
// Pus the next channel operation to the queue. All appropriate fields must have
7578
// been initialized already.
76-
// This function must be called with interrupts disabled.
79+
// This function must be called with interrupts disabled and the channel lock
80+
// held.
7781
func (q *chanQueue) push(node *channelOp) {
7882
node.next = q.first
7983
q.first = node
@@ -99,16 +103,17 @@ func (q *chanQueue) pop(chanOp uint32) *channelOp {
99103
newDataValue := chanOp | popped.index<<2
100104

101105
// Try to be the first to proceed with this goroutine.
102-
if popped.task.DataUint32() == chanOperationWaiting {
103-
popped.task.SetDataUint32(newDataValue)
106+
swapped := popped.task.DataAtomicUint32().CompareAndSwap(0, newDataValue)
107+
if swapped {
104108
return popped
105109
}
106110
}
107111
}
108112

109113
// Remove the given to-be-removed node from the queue if it is part of the
110114
// queue. If there are multiple, only one will be removed.
111-
// This function must be called with interrupts disabled.
115+
// This function must be called with interrupts disabled and the channel lock
116+
// held.
112117
func (q *chanQueue) remove(remove *channelOp) {
113118
n := &q.first
114119
for *n != nil {
@@ -159,8 +164,8 @@ func chanCap(c *channel) int {
159164
}
160165

161166
// Push the value to the channel buffer array, for a send operation.
162-
// This function may only be called when interrupts are disabled and it is known
163-
// there is space available in the buffer.
167+
// This function may only be called when interrupts are disabled, the channel is
168+
// locked and it is known there is space available in the buffer.
164169
func (ch *channel) bufferPush(value unsafe.Pointer) {
165170
elemAddr := unsafe.Add(ch.buf, ch.bufHead*ch.elementSize)
166171
ch.bufLen++
@@ -174,8 +179,8 @@ func (ch *channel) bufferPush(value unsafe.Pointer) {
174179

175180
// Pop a value from the channel buffer and store it in the 'value' pointer, for
176181
// a receive operation.
177-
// This function may only be called when interrupts are disabled and it is known
178-
// there is at least one value available in the buffer.
182+
// This function may only be called when interrupts are disabled, the channel is
183+
// locked and it is known there is at least one value available in the buffer.
179184
func (ch *channel) bufferPop(value unsafe.Pointer) {
180185
elemAddr := unsafe.Add(ch.buf, ch.bufTail*ch.elementSize)
181186
ch.bufLen--
@@ -191,7 +196,8 @@ func (ch *channel) bufferPop(value unsafe.Pointer) {
191196
}
192197

193198
// Try to proceed with this send operation without blocking, and return whether
194-
// the send succeeded. Interrupts must be disabled when calling this function.
199+
// the send succeeded. Interrupts must be disabled and the lock must be held
200+
// when calling this function.
195201
func (ch *channel) trySend(value unsafe.Pointer) bool {
196202
// To make sure we send values in the correct order, we can only send
197203
// directly to a receiver when there are no values in the buffer.
@@ -230,9 +236,11 @@ func chanSend(ch *channel, value unsafe.Pointer, op *channelOp) {
230236
}
231237

232238
mask := interrupt.Disable()
239+
ch.lock.Lock()
233240

234241
// See whether we can proceed immediately, and if so, return early.
235242
if ch.trySend(value) {
243+
ch.lock.Unlock()
236244
interrupt.Restore(mask)
237245
return
238246
}
@@ -244,9 +252,12 @@ func chanSend(ch *channel, value unsafe.Pointer, op *channelOp) {
244252
op.index = 0
245253
op.value = value
246254
ch.senders.push(op)
255+
ch.lock.Unlock()
247256
interrupt.Restore(mask)
248257

249258
// Wait until this goroutine is resumed.
259+
// It might be resumed after Unlock() and before Pause(). In that case,
260+
// because we use semaphores, the Pause() will continue immediately.
250261
task.Pause()
251262

252263
// Check whether the sent happened normally (not because the channel was
@@ -258,8 +269,8 @@ func chanSend(ch *channel, value unsafe.Pointer, op *channelOp) {
258269
}
259270

260271
// Try to proceed with this receive operation without blocking, and return
261-
// whether the receive operation succeeded. Interrupts must be disabled when
262-
// calling this function.
272+
// whether the receive operation succeeded. Interrupts must be disabled and the
273+
// lock must be held when calling this function.
263274
func (ch *channel) tryRecv(value unsafe.Pointer) (received, ok bool) {
264275
// To make sure we keep the values in the channel in the correct order, we
265276
// first have to read values from the buffer before we can look at the
@@ -303,8 +314,10 @@ func chanRecv(ch *channel, value unsafe.Pointer, op *channelOp) bool {
303314
}
304315

305316
mask := interrupt.Disable()
317+
ch.lock.Lock()
306318

307319
if received, ok := ch.tryRecv(value); received {
320+
ch.lock.Unlock()
308321
interrupt.Restore(mask)
309322
return ok
310323
}
@@ -317,6 +330,7 @@ func chanRecv(ch *channel, value unsafe.Pointer, op *channelOp) bool {
317330
op.task = t
318331
op.index = 0
319332
ch.receivers.push(op)
333+
ch.lock.Unlock()
320334
interrupt.Restore(mask)
321335

322336
// Wait until the goroutine is resumed.
@@ -335,9 +349,11 @@ func chanClose(ch *channel) {
335349
}
336350

337351
mask := interrupt.Disable()
352+
ch.lock.Lock()
338353

339354
if ch.closed {
340355
// Not allowed by the language spec.
356+
ch.lock.Unlock()
341357
interrupt.Restore(mask)
342358
runtimePanic("close of closed channel")
343359
}
@@ -370,14 +386,56 @@ func chanClose(ch *channel) {
370386

371387
ch.closed = true
372388

389+
ch.lock.Unlock()
373390
interrupt.Restore(mask)
374391
}
375392

393+
// We currently use a global select lock to avoid deadlocks while locking each
394+
// individual channel in the select. Without this global lock, two select
395+
// operations that have a different order of the same channels could end up in a
396+
// deadlock. This global lock is inefficient if there are many select operations
397+
// happening in parallel, but gets the job done.
398+
//
399+
// If this becomes a performance issue, we can see how the Go runtime does this.
400+
// I think it does this by sorting all states by channel address and then
401+
// locking them in that order to avoid this deadlock.
402+
var chanSelectLock task.PMutex
403+
404+
// Lock all channels (taking care to skip duplicate channels).
405+
func lockAllStates(states []chanSelectState) {
406+
if !hasParallelism {
407+
return
408+
}
409+
for _, state := range states {
410+
if state.ch != nil && !state.ch.selectLocked {
411+
state.ch.lock.Lock()
412+
state.ch.selectLocked = true
413+
}
414+
}
415+
}
416+
417+
// Unlock all channels (taking care to skip duplicate channels).
418+
func unlockAllStates(states []chanSelectState) {
419+
if !hasParallelism {
420+
return
421+
}
422+
for _, state := range states {
423+
if state.ch != nil && state.ch.selectLocked {
424+
state.ch.lock.Unlock()
425+
state.ch.selectLocked = false
426+
}
427+
}
428+
}
429+
376430
// chanSelect implements blocking or non-blocking select operations.
377431
// The 'ops' slice must be set if (and only if) this is a blocking select.
378432
func chanSelect(recvbuf unsafe.Pointer, states []chanSelectState, ops []channelOp) (uint32, bool) {
379433
mask := interrupt.Disable()
380434

435+
// Lock everything.
436+
chanSelectLock.Lock()
437+
lockAllStates(states)
438+
381439
const selectNoIndex = ^uint32(0)
382440
selectIndex := selectNoIndex
383441
selectOk := true
@@ -409,6 +467,8 @@ func chanSelect(recvbuf unsafe.Pointer, states []chanSelectState, ops []channelO
409467
// return early.
410468
blocking := len(ops) != 0
411469
if selectIndex != selectNoIndex || !blocking {
470+
unlockAllStates(states)
471+
chanSelectLock.Unlock()
412472
interrupt.Restore(mask)
413473
return selectIndex, selectOk
414474
}
@@ -417,8 +477,8 @@ func chanSelect(recvbuf unsafe.Pointer, states []chanSelectState, ops []channelO
417477
// become more complicated.
418478
// We add ourselves as a sender/receiver to every channel, and wait for the
419479
// first one to complete. Only one will successfully complete, because
420-
// senders and receivers will check t.Data for the state so that only one
421-
// will be able to "take" this select operation.
480+
// senders and receivers use a compare-and-exchange atomic operation on
481+
// t.Data so that only one will be able to "take" this select operation.
422482
t := task.Current()
423483
t.Ptr = recvbuf
424484
t.SetDataUint32(chanOperationWaiting)
@@ -438,13 +498,17 @@ func chanSelect(recvbuf unsafe.Pointer, states []chanSelectState, ops []channelO
438498
}
439499

440500
// Now we wait until one of the send/receive operations can proceed.
501+
unlockAllStates(states)
502+
chanSelectLock.Unlock()
441503
interrupt.Restore(mask)
442504
task.Pause()
443505

444506
// Resumed, so one channel operation must have progressed.
445507

446508
// Make sure all channel ops are removed from the senders/receivers
447509
// queue before we return and the memory of them becomes invalid.
510+
chanSelectLock.Lock()
511+
lockAllStates(states)
448512
for i, state := range states {
449513
if state.ch == nil {
450514
continue
@@ -458,6 +522,8 @@ func chanSelect(recvbuf unsafe.Pointer, states []chanSelectState, ops []channelO
458522
}
459523
interrupt.Restore(mask)
460524
}
525+
unlockAllStates(states)
526+
chanSelectLock.Unlock()
461527

462528
// Pull the return values out of t.Data (which contains two bitfields).
463529
selectIndex = t.DataUint32() >> 2

src/runtime/scheduler_cooperative.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,12 @@ import (
2121
// queue a new scheduler invocation using setTimeout.
2222
const asyncScheduler = GOOS == "js"
2323

24+
const hasScheduler = true
25+
26+
// Concurrency is not parallelism. While the cooperative scheduler has
27+
// concurrency, it does not have parallelism.
28+
const hasParallelism = false
29+
2430
// Queues used by the scheduler.
2531
var (
2632
runqueue task.Queue
@@ -248,5 +254,3 @@ func run() {
248254
}()
249255
scheduler(false)
250256
}
251-
252-
const hasScheduler = true

src/runtime/scheduler_none.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@ import "internal/task"
66

77
const hasScheduler = false
88

9+
// No goroutines are allowed, so there's no parallelism anywhere.
10+
const hasParallelism = false
11+
912
// run is called by the program entry point to execute the go program.
1013
// With the "none" scheduler, init and the main function are invoked directly.
1114
func run() {

0 commit comments

Comments
 (0)