Skip to content

Commit 2d5d87e

Browse files
authored
Merge pull request #124 from cschleiden/add-send-select-case
Add Send case to the Select function
2 parents 761e5e8 + acf82b8 commit 2d5d87e

File tree

5 files changed

+141
-16
lines changed

5 files changed

+141
-16
lines changed

README.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -410,6 +410,8 @@ Due its non-deterministic behavior you must not use a `select` statement in work
410410
var f1 workflow.Future[int]
411411
var c workflow.Channel[int]
412412
413+
value := 42
414+
413415
workflow.Select(
414416
ctx,
415417
workflow.Await(f1, func (ctx workflow.Context, f Future[int]) {
@@ -419,6 +421,9 @@ workflow.Select(
419421
workflow.Receive(c, func (ctx workflow.Context, v int, ok bool) {
420422
// use v
421423
}),
424+
workflow.Send(c, &value, func (ctx workflow.Context) {
425+
// value has been sent to the channel
426+
}),
422427
workflow.Default(ctx, func (ctx workflow.Context) {
423428
// ...
424429
})

internal/sync/channel.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,14 @@ func (c *channel[T]) canReceive() bool {
155155
return c.hasValue() || len(c.senders) > 0 || c.closed
156156
}
157157

158+
func (c *channel[T]) canSend() bool {
159+
if c.closed {
160+
return false
161+
}
162+
163+
return len(c.receivers) > 0 || c.hasCapacity()
164+
}
165+
158166
func (c *channel[T]) trySend(v T) bool {
159167
// If closed, we can't send, panic.
160168
if c.closed {

internal/sync/selector.go

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,16 @@ func Await[T any](f Future[T], handler func(ctx Context, f Future[T])) SelectCas
1212
}
1313
}
1414

15+
func Send[T any](c Channel[T], v *T, handler func(ctx Context)) SelectCase {
16+
return &channelSendCase[T]{
17+
c: c.(*channel[T]),
18+
v: v,
19+
fn: handler,
20+
}
21+
}
22+
1523
func Receive[T any](c Channel[T], handler func(ctx Context, v T, ok bool)) SelectCase {
16-
return &channelCase[T]{
24+
return &channelReceiveCase[T]{
1725
c: c.(*channel[T]),
1826
fn: handler,
1927
}
@@ -55,18 +63,33 @@ func (fc *futureCase[T]) Handle(ctx Context) {
5563
fc.fn(ctx, fc.f)
5664
}
5765

58-
type channelCase[T any] struct {
66+
type channelReceiveCase[T any] struct {
5967
c *channel[T]
6068
fn func(Context, T, bool)
6169
}
6270

63-
func (cc *channelCase[T]) Ready() bool {
64-
return cc.c.canReceive()
71+
func (crc *channelReceiveCase[T]) Ready() bool {
72+
return crc.c.canReceive()
73+
}
74+
75+
func (crc *channelReceiveCase[T]) Handle(ctx Context) {
76+
v, ok := crc.c.Receive(ctx)
77+
crc.fn(ctx, v, ok)
78+
}
79+
80+
type channelSendCase[T any] struct {
81+
c *channel[T]
82+
v *T
83+
fn func(Context)
84+
}
85+
86+
func (csc *channelSendCase[T]) Ready() bool {
87+
return csc.c.canSend()
6588
}
6689

67-
func (cc *channelCase[T]) Handle(ctx Context) {
68-
v, ok := cc.c.Receive(ctx)
69-
cc.fn(ctx, v, ok)
90+
func (csc *channelSendCase[T]) Handle(ctx Context) {
91+
csc.c.Send(ctx, *csc.v)
92+
csc.fn(ctx)
7093
}
7194

7295
type defaultCase struct {

internal/sync/selector_test.go

Lines changed: 80 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ func Test_FutureSelector_DefaultCase(t *testing.T) {
111111
require.True(t, defaultHandled)
112112
}
113113

114-
func Test_ChannelSelector_Select(t *testing.T) {
114+
func Test_ChannelSelector_Receive(t *testing.T) {
115115
c := NewChannel[int]()
116116

117117
reachedEnd := false
@@ -148,3 +148,82 @@ func Test_ChannelSelector_Select(t *testing.T) {
148148
require.True(t, reachedEnd)
149149
require.Equal(t, 42, r)
150150
}
151+
152+
func Test_ChannelSelector_SendBlocking(t *testing.T) {
153+
c := NewChannel[int]()
154+
ctx := Background()
155+
156+
input := 42
157+
158+
cs := NewCoroutine(ctx, func(ctx Context) error {
159+
Select(
160+
ctx,
161+
Send(c, &input, func(ctx Context) {
162+
// Element was sent
163+
}),
164+
)
165+
166+
return nil
167+
})
168+
169+
cs.Execute()
170+
171+
var v int
172+
var ok bool
173+
174+
cr := NewCoroutine(ctx, func(ctx Context) error {
175+
v, ok = c.Receive(ctx)
176+
177+
return nil
178+
})
179+
// Register receiver
180+
cr.Execute()
181+
182+
// Try to Select again, this time it sends
183+
cs.Execute()
184+
185+
// Allow receiver to finish
186+
cr.Execute()
187+
188+
require.True(t, cs.Finished())
189+
require.True(t, cr.Finished())
190+
require.Equal(t, 42, v)
191+
require.True(t, ok)
192+
}
193+
194+
func Test_ChannelSelector_SendNonBlocking(t *testing.T) {
195+
c := NewBufferedChannel[int](1)
196+
ctx := Background()
197+
198+
cs := NewCoroutine(ctx, func(ctx Context) error {
199+
input := 42
200+
Select(
201+
ctx,
202+
Send(c, &input, func(ctx Context) {
203+
// Element was sent
204+
}),
205+
)
206+
207+
return nil
208+
})
209+
210+
cs.Execute()
211+
require.True(t, cs.Finished())
212+
213+
cs = NewCoroutine(ctx, func(ctx Context) error {
214+
input := 23
215+
Select(
216+
ctx,
217+
Send(c, &input, func(ctx Context) {
218+
// Element was sent
219+
}),
220+
)
221+
222+
return nil
223+
})
224+
225+
cs.Execute()
226+
227+
// Channel does not have capacity, Select blocks
228+
require.False(t, cs.Finished())
229+
}

workflow/sync.go

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,36 +5,46 @@ import (
55
)
66

77
type Context = sync.Context
8-
type WaitGroup = sync.WaitGroup
98

109
var Canceled = sync.Canceled
1110

11+
type WaitGroup = sync.WaitGroup
12+
13+
func NewWaitGroup() WaitGroup {
14+
return sync.NewWaitGroup()
15+
}
16+
17+
// Go spawns a workflow goroutine
1218
func Go(ctx Context, f func(ctx Context)) {
1319
sync.Go(ctx, f)
1420
}
1521

1622
type SelectCase = sync.SelectCase
1723

24+
// Select is the workflow-save equivalent of the select statement.
1825
func Select(ctx Context, cases ...SelectCase) {
1926
sync.Select(ctx, cases...)
2027
}
2128

29+
// Await calls the provided handler when the given future is ready.
2230
func Await[T any](f Future[T], handler func(Context, Future[T])) SelectCase {
2331
return sync.Await[T](f, func(ctx sync.Context, f sync.Future[T]) {
2432
handler(ctx, f)
2533
})
2634
}
2735

36+
// Receive calls the provided handler if the given channel can receive a value. The handler receives
37+
// the received value, and the ok flag indicating whether the value was received or the channel was closed.
2838
func Receive[T any](c Channel[T], handler func(ctx Context, v T, ok bool)) SelectCase {
29-
return sync.Receive[T](c, func(ctx sync.Context, v T, ok bool) {
30-
handler(ctx, v, ok)
31-
})
39+
return sync.Receive[T](c, handler)
3240
}
3341

34-
func Default(handler func(Context)) SelectCase {
35-
return sync.Default(handler)
42+
// Send calls the provided handler if the given value can be sent to the channel.
43+
func Send[T any](c Channel[T], value *T, handler func(ctx Context)) SelectCase {
44+
return sync.Send[T](c, value, handler)
3645
}
3746

38-
func NewWaitGroup() WaitGroup {
39-
return sync.NewWaitGroup()
47+
// Default calls the given handler if none of the other cases match.
48+
func Default(handler func(Context)) SelectCase {
49+
return sync.Default(handler)
4050
}

0 commit comments

Comments
 (0)