Skip to content

Commit 44225ca

Browse files
asteurerdicej
andauthored
feat(go): preventing concurrent reads/writes on streams and futures (#1490)
fix: adding panic for empty dst slice, removing handle reassignment for futures docs(go): adding package comments feat(go): add future read concurrency test feat(go): adding remaining concurrent read/write tests doc(go): adding comments on drop methods Update crates/go/src/package/wit_types/wit_stream.go Update crates/go/src/package/wit_types/wit_future.go Update crates/go/src/package/wit_types/wit_future.go Co-authored-by: Joel Dice <[email protected]>
1 parent 85369ad commit 44225ca

File tree

4 files changed

+200
-17
lines changed

4 files changed

+200
-17
lines changed

crates/go/src/package/wit_types/wit_future.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,11 @@ type FutureReader[T any] struct {
2626
handle *wit_runtime.Handle
2727
}
2828

29+
// Blocks until the future completes and returns its value.
30+
//
31+
// # Panic
32+
//
33+
// Read will panic if multiple concurrent or sequential reads are attempted on the same future.
2934
func (self *FutureReader[T]) Read() T {
3035
handle := self.handle.Take()
3136
defer self.vtable.DropReadable(handle)
@@ -53,6 +58,7 @@ func (self *FutureReader[T]) Read() T {
5358
}
5459
}
5560

61+
// Notify the host that the FutureReader is no longer being used.
5662
func (self *FutureReader[T]) Drop() {
5763
handle := self.handle.TakeOrNil()
5864
if handle != 0 {
@@ -85,6 +91,11 @@ type FutureWriter[T any] struct {
8591
handle *wit_runtime.Handle
8692
}
8793

94+
// Writes data to a future.
95+
//
96+
// # Panic
97+
//
98+
// Write will panic if multiple concurrent or sequential writes are attempted on the same future.
8899
func (self *FutureWriter[T]) Write(item T) bool {
89100
handle := self.handle.Take()
90101
defer self.vtable.DropWritable(handle)
@@ -119,6 +130,7 @@ func (self *FutureWriter[T]) Write(item T) bool {
119130
}
120131
}
121132

133+
// Notify the host that the FutureWriter is no longer being used.
122134
func (self *FutureWriter[T]) Drop() {
123135
handle := self.handle.TakeOrNil()
124136
if handle != 0 {

crates/go/src/package/wit_types/wit_stream.go

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,22 @@ func (self *StreamReader[T]) WriterDropped() bool {
3131
return self.writerDropped
3232
}
3333

34+
// Reads data from a stream into a destination slice.
35+
//
36+
// Blocks until the read completes or the destination slice is full.
37+
//
38+
// # Panic
39+
//
40+
// Read will panic if:
41+
// - dst is empty (length 0)
42+
// - multiple concurrent reads are attempted on the same stream
3443
func (self *StreamReader[T]) Read(dst []T) uint32 {
35-
handle := self.handle.Use()
44+
if len(dst) == 0 {
45+
panic("StreamReader.Read: destination slice cannot be empty")
46+
}
47+
48+
handle := self.handle.Take()
49+
defer self.handle.Set(handle)
3650

3751
if self.writerDropped {
3852
return 0
@@ -68,6 +82,7 @@ func (self *StreamReader[T]) Read(dst []T) uint32 {
6882
return count
6983
}
7084

85+
// Notify the host that the StreamReader is no longer being used.
7186
func (self *StreamReader[T]) Drop() {
7287
handle := self.handle.TakeOrNil()
7388
if handle != 0 {
@@ -105,8 +120,14 @@ func (self *StreamWriter[T]) ReaderDropped() bool {
105120
return self.readerDropped
106121
}
107122

123+
// Writes items to a stream, returning the count written (may be partial).
124+
//
125+
// # Panic
126+
//
127+
// Write will panic if multiple concurrent writes are attempted on the same stream.
108128
func (self *StreamWriter[T]) Write(items []T) uint32 {
109-
handle := self.handle.Use()
129+
handle := self.handle.Take()
130+
defer self.handle.Set(handle)
110131

111132
if self.readerDropped {
112133
return 0
@@ -152,6 +173,11 @@ func (self *StreamWriter[T]) Write(items []T) uint32 {
152173
return count
153174
}
154175

176+
// Writes all items to the stream, looping until complete or reader drops.
177+
//
178+
// # Panic
179+
//
180+
// WriteAll will panic if multiple concurrent writes are attempted on the same stream.
155181
func (self *StreamWriter[T]) WriteAll(items []T) uint32 {
156182
offset := uint32(0)
157183
count := uint32(len(items))
@@ -161,6 +187,7 @@ func (self *StreamWriter[T]) WriteAll(items []T) uint32 {
161187
return offset
162188
}
163189

190+
// Notify the host that the StreamReader is no longer being used.
164191
func (self *StreamWriter[T]) Drop() {
165192
handle := self.handle.TakeOrNil()
166193
if handle != 0 {

tests/runtime-async/async/simple-future/runner.go

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,80 @@ func Run() {
3535
(<-read)
3636
assert(!(<-write))
3737
}
38+
39+
{
40+
tx, rx := test.MakeFutureUnit()
41+
syncBarrier := make(chan struct{})
42+
panicCh := make(chan any, 2)
43+
44+
for range 2 {
45+
go func() {
46+
// Because the channel is empty, it will block until it's closed, at which
47+
// point all Goroutines will attempt to simultaneously read from the future.
48+
<-syncBarrier
49+
panicCh <- checkPanicValue(func() {
50+
rx.Read()
51+
})
52+
}()
53+
}
54+
close(syncBarrier)
55+
56+
go func() {
57+
// If this is omitted, the host will see that the "rx.Read" operations aren't paired with
58+
// a "tx.Write" and will result in a "wasm trap: deadlock detected" error. Additionally,
59+
// this is placed after "close(syncBarrier)" to ensure that the panics are resulting from
60+
// concurrent reads, and not from other scenarios that result in a nil handle.
61+
tx.Write(wit_types.Unit{})
62+
}()
63+
64+
p1, p2 := <-panicCh, <-panicCh
65+
66+
// One should succeed (nil), one should panic
67+
assert((p1 == nil && p2 == "nil handle") || (p1 == "nil handle" && p2 == nil))
68+
}
69+
70+
{
71+
tx, rx := test.MakeFutureUnit()
72+
syncBarrier := make(chan struct{})
73+
panicCh := make(chan any, 2)
74+
75+
for range 2 {
76+
go func() {
77+
// Because the channel is empty, it will block until it's closed, at which
78+
// point all Goroutines will attempt to simultaneously write to the future.
79+
<-syncBarrier
80+
panicCh <- checkPanicValue(func() {
81+
tx.Write(wit_types.Unit{})
82+
})
83+
}()
84+
}
85+
close(syncBarrier)
86+
87+
go func() {
88+
// If this is omitted, the host will see that the "tx.Write" operations aren't paired with
89+
// an "rx.Read" and will result in a "wasm trap: deadlock detected" error. Additionally,
90+
// this is placed after "close(syncBarrier)" to ensure that the panics are resulting from
91+
// concurrent writes, and not from other scenarios that result in a nil handle.
92+
rx.Read()
93+
}()
94+
95+
p1, p2 := <-panicCh, <-panicCh
96+
97+
// One should succeed (nil), one should panic
98+
assert((p1 == nil && p2 == "nil handle") || (p1 == "nil handle" && p2 == nil))
99+
}
38100
}
39101

40102
func assert(v bool) {
41103
if !v {
42104
panic("assertion failed")
43105
}
44106
}
107+
108+
func checkPanicValue(f func()) (value any) {
109+
defer func() {
110+
value = recover()
111+
}()
112+
f()
113+
return nil
114+
}

tests/runtime-async/async/simple-stream/runner.go

Lines changed: 89 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -11,26 +11,92 @@ func Run() {
1111
write := make(chan wit_types.Unit)
1212
read := make(chan wit_types.Unit)
1313

14-
tx, rx := test.MakeStreamUnit()
15-
go func() {
16-
assertEqual(tx.Write([]wit_types.Unit{wit_types.Unit{}}), 1)
17-
assert(!tx.ReaderDropped())
14+
{
15+
tx, rx := test.MakeStreamUnit()
16+
go func() {
17+
assertEqual(tx.Write([]wit_types.Unit{wit_types.Unit{}}), 1)
18+
assert(!tx.ReaderDropped())
1819

19-
assertEqual(tx.Write([]wit_types.Unit{wit_types.Unit{}, wit_types.Unit{}}), 2)
20+
assertEqual(tx.Write([]wit_types.Unit{wit_types.Unit{}, wit_types.Unit{}}), 2)
2021

21-
assertEqual(tx.Write([]wit_types.Unit{wit_types.Unit{}, wit_types.Unit{}}), 0)
22-
assert(tx.ReaderDropped())
22+
assertEqual(tx.Write([]wit_types.Unit{wit_types.Unit{}, wit_types.Unit{}}), 0)
23+
assert(tx.ReaderDropped())
2324

24-
write <- wit_types.Unit{}
25-
}()
25+
write <- wit_types.Unit{}
26+
}()
2627

27-
go func() {
28-
test.ReadStream(rx)
29-
read <- wit_types.Unit{}
30-
}()
28+
go func() {
29+
test.ReadStream(rx)
30+
read <- wit_types.Unit{}
31+
}()
32+
33+
(<-read)
34+
(<-write)
35+
}
36+
37+
{
38+
tx, rx := test.MakeStreamUnit()
39+
syncBarrier := make(chan struct{})
40+
panicCh := make(chan any, 2)
41+
42+
for range 2 {
43+
go func() {
44+
// Because the channel is empty, it will block until it's closed, at which
45+
// point all Goroutines will attempt to simultaneously read from the stream.
46+
<-syncBarrier
47+
panicCh <- checkPanicValue(func() {
48+
result := make([]wit_types.Unit, 1)
49+
rx.Read(result)
50+
})
51+
}()
52+
}
53+
close(syncBarrier)
54+
55+
go func() {
56+
// If this is omitted, the host will see that the "rx.Read" operations aren't paired with
57+
// a "tx.WriteAll" and will result in a "wasm trap: deadlock detected" error. Additionally,
58+
// this is placed after "close(syncBarrier)" to ensure that the panics are resulting from
59+
// concurrent reads, and not from other scenarios that result in a nil handle.
60+
tx.WriteAll([]wit_types.Unit{wit_types.Unit{}})
61+
}()
62+
63+
p1, p2 := <-panicCh, <-panicCh
64+
65+
// One should succeed (nil), one should panic
66+
assert((p1 == nil && p2 == "nil handle") || (p1 == "nil handle" && p2 == nil))
67+
}
68+
69+
{
70+
tx, rx := test.MakeStreamUnit()
71+
syncBarrier := make(chan struct{})
72+
panicCh := make(chan any, 2)
73+
74+
for range 2 {
75+
go func() {
76+
// Because the channel is empty, it will block until it's closed, at which
77+
// point all Goroutines will attempt to simultaneously write to the stream.
78+
<-syncBarrier
79+
panicCh <- checkPanicValue(func() {
80+
tx.WriteAll([]wit_types.Unit{wit_types.Unit{}})
81+
})
82+
}()
83+
}
84+
close(syncBarrier)
85+
86+
go func() {
87+
// If this is omitted, the host will see that the "tx.WriteAll" operations aren't paired with
88+
// an "rx.Read" and will result in a "wasm trap: deadlock detected" error. Additionally,
89+
// this is placed after "close(syncBarrier)" to ensure that the panics are resulting from
90+
// concurrent writes, and not from other scenarios that result in a nil handle.
91+
result := make([]wit_types.Unit, 1)
92+
rx.Read(result)
93+
}()
94+
95+
p1, p2 := <-panicCh, <-panicCh
3196

32-
(<-read)
33-
(<-write)
97+
// One should succeed (nil), one should panic
98+
assert((p1 == nil && p2 == "nil handle") || (p1 == "nil handle" && p2 == nil))
99+
}
34100
}
35101

36102
func assertEqual[T comparable](a, b T) {
@@ -44,3 +110,11 @@ func assert(v bool) {
44110
panic("assertion failed")
45111
}
46112
}
113+
114+
func checkPanicValue(f func()) (value any) {
115+
defer func() {
116+
value = recover()
117+
}()
118+
f()
119+
return nil
120+
}

0 commit comments

Comments
 (0)