Skip to content

Commit 9e1bb23

Browse files
authored
Use Select with cases instead of selector instance
1 parent 80a15b9 commit 9e1bb23

File tree

10 files changed

+186
-180
lines changed

10 files changed

+186
-180
lines changed

internal/sync/context.go

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -182,15 +182,16 @@ func propagateCancel(parent Context, child canceler) {
182182
return // parent is never canceled
183183
}
184184

185-
s := NewSelector()
186-
s.AddChannelReceive(done, func(ctx Context, c Channel) {
187-
// Parent is already canceled
188-
child.cancel(false, parent.Err())
189-
})
190-
s.AddDefault(func() {
191-
// Ignore
192-
})
193-
s.Select(parent)
185+
Select(
186+
parent,
187+
ReceiveChan(done, func(ctx Context, c Channel) {
188+
// Parent is already canceled
189+
child.cancel(false, parent.Err())
190+
}),
191+
Default(func(_ Context) {
192+
// Ignore
193+
}),
194+
)
194195

195196
if p, ok := parentCancelCtx(parent); ok {
196197
if p.err != nil {

internal/sync/context_test.go

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,12 @@ func TestWithCancel(t *testing.T) {
2323
// Create child context, canceled when parent is canceled
2424
ctx, _ = WithCancel(ctx)
2525

26-
s := NewSelector()
27-
28-
s.AddChannelReceive(ctx.Done(), func(ctx Context, c Channel) {
29-
canceled = true
30-
})
31-
32-
s.Select(ctx)
26+
Select(
27+
ctx,
28+
ReceiveChan(ctx.Done(), func(ctx Context, c Channel) {
29+
canceled = true
30+
}),
31+
)
3332

3433
return nil
3534
})

internal/sync/selector.go

Lines changed: 38 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -1,84 +1,60 @@
11
package sync
22

3-
type Selector interface {
4-
AddFuture(f Future, handler func(ctx Context, f Future)) Selector
3+
// type Selector interface {
4+
// AddFuture(f Future, handler func(ctx Context, f Future)) Selector
55

6-
AddChannelReceive(c Channel, handler func(ctx Context, c Channel)) Selector
6+
// AddChannelReceive(c Channel, handler func(ctx Context, c Channel)) Selector
77

8-
AddDefault(handler func()) Selector
8+
// AddDefault(handler func()) Selector
99

10-
Select(ctx Context)
11-
}
12-
13-
func NewSelector() Selector {
14-
return &selector{
15-
cases: make([]selectorCase, 0),
16-
}
17-
}
10+
// Select(ctx Context)
11+
// }
1812

19-
type selector struct {
20-
cases []selectorCase
21-
22-
defaultFunc func()
13+
type SelectCase interface {
14+
Ready() bool
15+
Handle(ctx Context)
2316
}
2417

25-
func (s *selector) AddFuture(f Future, handler func(ctx Context, f Future)) Selector {
26-
s.cases = append(s.cases, &futureCase{
18+
func Await(f Future, handler func(Context, Future)) SelectCase {
19+
return &futureCase{
2720
f: f.(*futureImpl),
2821
fn: handler,
29-
})
30-
31-
return s
22+
}
3223
}
3324

34-
func (s *selector) AddChannelReceive(c Channel, handler func(ctx Context, c Channel)) Selector {
25+
func ReceiveChan(c Channel, handler func(Context, Channel)) SelectCase {
3526
channel := c.(*channel)
3627

37-
s.cases = append(s.cases, &channelCase{
28+
return &channelCase{
3829
c: channel,
3930
fn: handler,
40-
})
41-
42-
return s
31+
}
4332
}
4433

45-
func (s *selector) AddDefault(handler func()) Selector {
46-
s.defaultFunc = handler
47-
48-
return s
34+
func Default(handler func(Context)) SelectCase {
35+
return &defaultCase{
36+
fn: handler,
37+
}
4938
}
5039

51-
func (s *selector) Select(ctx Context) {
40+
func Select(ctx Context, cases ...SelectCase) {
5241
cs := getCoState(ctx)
5342

5443
for {
5544
// Is any case ready?
56-
for i, c := range s.cases {
45+
for _, c := range cases {
5746
if c.Ready() {
5847
c.Handle(ctx)
59-
60-
// Remove handled case
61-
s.cases = append(s.cases[:i], s.cases[i+1:]...)
6248
return
6349
}
6450
}
6551

66-
if s.defaultFunc != nil {
67-
s.defaultFunc()
68-
return
69-
}
70-
7152
// else, yield and wait for result
7253
cs.Yield()
7354
}
7455
}
7556

76-
type selectorCase interface {
77-
Ready() bool
78-
Handle(ctx Context)
79-
}
80-
81-
var _ = selectorCase(&futureCase{})
57+
var _ = SelectCase(&futureCase{})
8258

8359
type futureCase struct {
8460
f *futureImpl
@@ -93,6 +69,8 @@ func (fc *futureCase) Handle(ctx Context) {
9369
fc.fn(ctx, fc.f)
9470
}
9571

72+
var _ = SelectCase(&channelCase{})
73+
9674
type channelCase struct {
9775
c *channel
9876
fn func(Context, Channel)
@@ -105,3 +83,17 @@ func (cc *channelCase) Ready() bool {
10583
func (cc *channelCase) Handle(ctx Context) {
10684
cc.fn(ctx, cc.c)
10785
}
86+
87+
var _ = SelectCase(&defaultCase{})
88+
89+
type defaultCase struct {
90+
fn func(Context)
91+
}
92+
93+
func (dc *defaultCase) Ready() bool {
94+
return true
95+
}
96+
97+
func (dc *defaultCase) Handle(ctx Context) {
98+
dc.fn(ctx)
99+
}

internal/sync/selector_test.go

Lines changed: 46 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -12,18 +12,16 @@ func Test_FutureSelector_SelectWaits(t *testing.T) {
1212
reachedEnd := false
1313

1414
cr := NewCoroutine(ctx, func(ctx Context) error {
15-
s := NewSelector()
15+
Select(
16+
ctx,
17+
Await(f, func(ctx Context, f Future) {
18+
var r int
19+
err := f.Get(ctx, &r)
20+
require.Nil(t, err)
1621

17-
s.AddFuture(f, func(ctx Context, f Future) {
18-
var r int
19-
err := f.Get(ctx, &r)
20-
require.Nil(t, err)
21-
22-
require.Equal(t, 42, r)
23-
})
24-
25-
// Wait for result
26-
s.Select(ctx)
22+
require.Equal(t, 42, r)
23+
}),
24+
)
2725

2826
reachedEnd = true
2927

@@ -49,27 +47,27 @@ func Test_FutureSelector_SelectWaitsWithSameOrder(t *testing.T) {
4947
order := make([]int, 0)
5048

5149
cs := NewCoroutine(ctx, func(ctx Context) error {
52-
s := NewSelector()
53-
54-
s.AddFuture(f, func(ctx Context, f Future) {
55-
var r int
56-
err := f.Get(ctx, &r)
57-
require.Nil(t, err)
58-
require.Equal(t, 42, r)
59-
order = append(order, 42)
60-
})
61-
62-
s.AddFuture(f2, func(ctx Context, f Future) {
63-
var r int
64-
err := f.Get(ctx, &r)
65-
require.Nil(t, err)
66-
require.Equal(t, 23, r)
67-
order = append(order, 23)
68-
})
69-
70-
// Wait for result
71-
s.Select(ctx)
72-
s.Select(ctx)
50+
for i := 0; i < 2; i++ {
51+
// Wait for result
52+
Select(
53+
ctx,
54+
Await(f, func(ctx Context, f Future) {
55+
var r int
56+
err := f.Get(ctx, &r)
57+
require.Nil(t, err)
58+
require.Equal(t, 42, r)
59+
order = append(order, 42)
60+
}),
61+
62+
Await(f2, func(ctx Context, f Future) {
63+
var r int
64+
err := f.Get(ctx, &r)
65+
require.Nil(t, err)
66+
require.Equal(t, 23, r)
67+
order = append(order, 23)
68+
}),
69+
)
70+
}
7371

7472
reachedEnd = true
7573

@@ -98,18 +96,18 @@ func Test_FutureSelector_DefaultCase(t *testing.T) {
9896
reachedEnd := false
9997

10098
cs := NewCoroutine(Background(), func(ctx Context) error {
101-
s := NewSelector()
102-
103-
s.AddFuture(f, func(_ Context, _ Future) {
104-
require.Fail(t, "should not be called")
105-
})
99+
// Wait for result
100+
Select(
101+
ctx,
106102

107-
s.AddDefault(func() {
108-
defaultHandled = true
109-
})
103+
Await(f, func(_ Context, _ Future) {
104+
require.Fail(t, "should not be called")
105+
}),
110106

111-
// Wait for result
112-
s.Select(ctx)
107+
Default(func(_ Context) {
108+
defaultHandled = true
109+
}),
110+
)
113111

114112
reachedEnd = true
115113

@@ -132,14 +130,13 @@ func Test_ChannelSelector_Select(t *testing.T) {
132130
var r int
133131

134132
cr := NewCoroutine(ctx, func(ctx Context) error {
135-
s := NewSelector()
136-
137-
s.AddChannelReceive(c, func(ctx Context, c Channel) {
138-
c.Receive(ctx, &r)
139-
})
140-
141133
// Wait for result
142-
s.Select(ctx)
134+
Select(
135+
ctx,
136+
ReceiveChan(c, func(ctx Context, c Channel) {
137+
c.Receive(ctx, &r)
138+
}),
139+
)
143140

144141
reachedEnd = true
145142

internal/workflow/executor_test.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -243,16 +243,16 @@ func workflowWithSelector(ctx sync.Context) error {
243243
f1 := ExecuteActivity(ctx, DefaultActivityOptions, activity1, 42)
244244
t := ScheduleTimer(ctx, time.Millisecond*2)
245245

246-
s := sync.NewSelector()
247-
s.AddFuture(f1, func(ctx sync.Context, f sync.Future) {
248-
workflowWithSelectorHits++
249-
})
250-
251-
s.AddFuture(t, func(ctx sync.Context, t sync.Future) {
252-
workflowWithSelectorHits++
253-
})
254-
255-
s.Select(ctx)
246+
sync.Select(
247+
ctx,
248+
sync.Await(f1, func(ctx sync.Context, f sync.Future) {
249+
workflowWithSelectorHits++
250+
}),
251+
252+
sync.Await(t, func(ctx sync.Context, t sync.Future) {
253+
workflowWithSelectorHits++
254+
}),
255+
)
256256

257257
workflowWithSelectorHits++
258258

0 commit comments

Comments
 (0)