Create an internal "xsync" package that reimplements mutexes.#1268
Create an internal "xsync" package that reimplements mutexes.#1268cupcicm wants to merge 1 commit intotwmb:masterfrom
Conversation
When using synctests to test code that uses franzgo, it is possible to get deadlock, as described in twmb#1250. When build with `GOFLAGS=-tags=synctests`, the tests will use the reimplemented mutexes, which are channel-based so that synctests can track whether they are durably blocked or not.
|
|
||
| // In this file we reimplement sync.Mutex and sync.RWMutex and we rely exclusively on channels to implement the mutex. | ||
| // The reason we do this is that synctest cannot track the state of mutexes, so it considers that a goroutine is not 'durably blocked' when it is waiting on a mutex. | ||
| // The consequence of this is that it is impossible to test code that uses mutexes with synctest. For example, this test hangs forever with synctest |
There was a problem hiding this comment.
formatting on comments, I stick to 80 unless I'm writing big comments on multiply-nested guts code
| } | ||
| } | ||
|
|
||
| type RWMutex struct { |
There was a problem hiding this comment.
I initially thought a rwmutex would be simpler and we'd just swap in the channel approach everywhere. That's not the case, so build tags make sense.
| readerCount int | ||
| writerWait bool | ||
| readerSignal chan struct{} | ||
| writerSignal chan struct{} |
There was a problem hiding this comment.
I pointed claude at this PR and asked if there's a simpler idea, it did come up with something that I think is simpler, but I want to know what you think (it's initial idea was too simple: just make the rwmutex be a mutex, but that doesn't preserve write locks blocking read locks). Anyway:
//go:build synctests
package xsync
import "sync"
// Mutex is a channel-based mutex for use with synctest. A goroutine blocked on
// a sync.Mutex is not "durably blocked" from synctest's perspective, which
// prevents time from advancing. Channel-based blocking is durably blocked.
type Mutex struct {
once sync.Once
ch chan struct{}
}
func (m *Mutex) init() {
m.once.Do(func() {
m.ch = make(chan struct{}, 1)
m.ch <- struct{}{}
})
}
func (m *Mutex) Lock() {
m.init()
<-m.ch
}
func (m *Mutex) TryLock() bool {
m.init()
select {
case <-m.ch:
return true
default:
return false
}
}
func (m *Mutex) Unlock() {
m.init()
select {
case m.ch <- struct{}{}:
default:
panic("sync: unlock of unlocked mutex")
}
}
// RWMutex is a channel-based readers-writer mutex with writer priority.
//
// The design uses a "gate token" pattern: a buffered-1 channel holds a single
// token. Readers take the token and immediately return it (passing through the
// gate). Writers take the token and hold it, which blocks all new readers until
// the writer unlocks. This provides writer priority naturally - the moment a
// writer calls Lock, new RLock calls block on the gate.
//
// A separate writerSignal channel (buffered 1) is used by the last active
// reader to wake a waiting writer. A stale signal can accumulate if readers
// drain while no writer is waiting; Lock drains any stale signal before
// checking the reader count.
type RWMutex struct {
once sync.Once
mu Mutex // protects readerCount
gate chan struct{} // buffered 1; readers take+return, writers take+hold
readerCount int
writerSignal chan struct{} // buffered 1; last reader signals writer
}
func (rw *RWMutex) init() {
rw.once.Do(func() {
rw.gate = make(chan struct{}, 1)
rw.gate <- struct{}{}
rw.writerSignal = make(chan struct{}, 1)
rw.mu.init()
})
}
func (rw *RWMutex) RLock() {
rw.init()
<-rw.gate // blocks while writer holds gate
rw.mu.Lock()
rw.readerCount++
rw.mu.Unlock()
rw.gate <- struct{}{} // pass gate along
}
func (rw *RWMutex) TryRLock() bool {
rw.init()
select {
case <-rw.gate:
default:
return false
}
rw.mu.Lock()
rw.readerCount++
rw.mu.Unlock()
rw.gate <- struct{}{}
return true
}
func (rw *RWMutex) RUnlock() {
rw.mu.Lock()
rw.readerCount--
if rw.readerCount < 0 {
rw.mu.Unlock()
panic("sync: RUnlock of unlocked RWMutex")
}
if rw.readerCount == 0 {
select {
case rw.writerSignal <- struct{}{}:
default:
}
}
rw.mu.Unlock()
}
func (rw *RWMutex) Lock() {
rw.init()
<-rw.gate // take gate, immediately blocking new readers
// Drain any stale signal from a previous reader cycle.
select {
case <-rw.writerSignal:
default:
}
rw.mu.Lock()
if rw.readerCount > 0 {
rw.mu.Unlock()
<-rw.writerSignal // wait for last active reader
} else {
rw.mu.Unlock()
}
}
func (rw *RWMutex) TryLock() bool {
rw.init()
select {
case <-rw.gate:
default:
return false
}
select {
case <-rw.writerSignal:
default:
}
rw.mu.Lock()
if rw.readerCount > 0 {
rw.mu.Unlock()
rw.gate <- struct{}{}
return false
}
rw.mu.Unlock()
return true
}
func (rw *RWMutex) Unlock() {
select {
case rw.gate <- struct{}{}: // return gate, unblocking readers
default:
panic("sync: Unlock of unlocked RWMutex")
}
}
func (rw *RWMutex) RLocker() sync.Locker { return (*rlocker)(rw) }
type rlocker RWMutex
func (r *rlocker) Lock() { (*RWMutex)(r).RLock() }
func (r *rlocker) Unlock() { (*RWMutex)(r).RUnlock() }point being: mutex implementation is the same, rwmutex implementation uses a nifty two channel "gate" approach. The downside is a spurious drain when grabbing the write lock, but the upside is it's a fair bit conceptually simpler.
When using synctests to test code that uses franzgo, it is possible to get deadlock, as described in #1250.
When build with
GOFLAGS=-tags=synctests, the tests will use the reimplemented mutexes, which are channel-based so that synctests can track whether they are durably blocked or not.