Skip to content

Commit 5b0ce1e

Browse files
authored
Merge branch 'main' into start-cache-eviction
2 parents 07f7341 + 2e4b887 commit 5b0ce1e

File tree

5 files changed

+116
-17
lines changed

5 files changed

+116
-17
lines changed

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ require (
5757
go.opentelemetry.io/proto/otlp v0.19.0 // indirect
5858
go.tmz.dev/musttag v0.7.2 // indirect
5959
go.uber.org/atomic v1.7.0 // indirect
60+
go.uber.org/goleak v1.3.0 // indirect
6061
go.uber.org/multierr v1.6.0 // indirect
6162
go.uber.org/zap v1.24.0 // indirect
6263
golang.org/x/exp v0.0.0-20230510235704-dd950f8aeaea // indirect

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -674,6 +674,8 @@ go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
674674
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
675675
go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI=
676676
go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
677+
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
678+
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
677679
go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4=
678680
go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
679681
go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60=

internal/sync/coroutine.go

Lines changed: 34 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,17 @@
11
package sync
22

33
import (
4+
"errors"
45
"fmt"
5-
"io"
6-
"log"
76
"runtime"
87
"sync/atomic"
98
"time"
109
)
1110

1211
const DeadlockDetection = 40 * time.Second
1312

13+
var ErrCoroutineAlreadyFinished = errors.New("coroutine already finished")
14+
1415
type CoroutineCreator interface {
1516
NewCoroutine(ctx Context, fn func(Context) error)
1617
}
@@ -53,7 +54,8 @@ type coState struct {
5354

5455
err error
5556

56-
logger logger
57+
// logger logger
58+
// idx int
5759

5860
deadlockDetection time.Duration
5961

@@ -68,6 +70,11 @@ func NewCoroutine(ctx Context, fn func(ctx Context) error) Coroutine {
6870
defer s.finish() // Ensure we always mark the coroutine as finished
6971
defer func() {
7072
if r := recover(); r != nil {
73+
if err, ok := r.(error); ok && errors.Is(err, ErrCoroutineAlreadyFinished) {
74+
// Ignore this specific error
75+
return
76+
}
77+
7178
s.err = fmt.Errorf("panic: %v", r)
7279
}
7380
}()
@@ -86,21 +93,26 @@ func NewCoroutine(ctx Context, fn func(ctx Context) error) Coroutine {
8693
func newState() *coState {
8794
// i++
8895

89-
return &coState{
96+
c := &coState{
9097
blocking: make(chan bool, 1),
9198
unblock: make(chan bool),
9299
// Only used while debugging issues, default to discarding log messages
93-
logger: log.New(io.Discard, "[co]", log.LstdFlags),
94100
// logger: log.New(os.Stderr, fmt.Sprintf("[co %v]", i), log.Lmsgprefix|log.Ltime),
101+
// idx: i,
95102
deadlockDetection: DeadlockDetection,
96103
}
104+
105+
// Start out as blocked
106+
c.blocked.Store(true)
107+
108+
return c
97109
}
98110

99111
func (s *coState) finish() {
100112
s.finished.Store(true)
101113
s.blocking <- true
102114

103-
s.logger.Println("finish")
115+
// s.logger.Println("finish")
104116
}
105117

106118
func (s *coState) SetCoroutineCreator(creator CoroutineCreator) {
@@ -136,23 +148,28 @@ func (s *coState) Yield() {
136148
}
137149

138150
func (s *coState) yield(markBlocking bool) {
139-
s.logger.Println("yielding")
140-
141-
s.blocked.Store(true)
151+
// s.logger.Println("yielding")
142152

143153
if markBlocking {
154+
if s.shouldExit.Load() != nil {
155+
// s.logger.Println("yielding, but should exit")
156+
panic(ErrCoroutineAlreadyFinished)
157+
}
158+
159+
s.blocked.Store(true)
160+
144161
s.blocking <- true
145162
}
146163

147-
s.logger.Println("yielded")
164+
// s.logger.Println("yielded")
148165

149166
// Wait for the next Execute() call
150167
<-s.unblock
151168

152169
// Once we're here, another Execute() call has been made. s.blocking is empty
153170

154171
if s.shouldExit.Load() != nil {
155-
s.logger.Println("exiting")
172+
// s.logger.Println("exiting")
156173

157174
// Goexit runs all deferred functions, which includes calling finish() in the main
158175
// execution function. That marks the coroutine as finished and blocking.
@@ -161,37 +178,37 @@ func (s *coState) yield(markBlocking bool) {
161178

162179
s.blocked.Store(false)
163180

164-
s.logger.Println("done yielding, continuing")
181+
// s.logger.Println("done yielding, continuing")
165182
}
166183

167184
func (s *coState) Execute() {
168185
s.ResetProgress()
169186

170187
if s.Finished() {
171-
s.logger.Println("execute: already finished")
188+
// s.logger.Println("execute: already finished")
172189
return
173190
}
174191

175192
t := time.NewTimer(s.deadlockDetection)
176193
defer t.Stop()
177194

178-
s.logger.Println("execute: unblocking")
195+
// s.logger.Println("execute: unblocking")
179196
s.unblock <- true
180-
s.logger.Println("execute: unblocked")
197+
// s.logger.Println("execute: unblocked")
181198

182199
runtime.Gosched()
183200

184201
// Run until blocked (which is also true when finished)
185202
select {
186203
case <-s.blocking:
187-
s.logger.Println("execute: blocked")
204+
// s.logger.Println("execute: blocked")
188205
case <-t.C:
189206
panic("coroutine timed out")
190207
}
191208
}
192209

193210
func (s *coState) Exit() {
194-
s.logger.Println("exit")
211+
// s.logger.Println("exit")
195212

196213
if s.Finished() {
197214
return

internal/sync/coroutine_test.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,24 @@ func Test_Coroutine_ContinueAndBlock(t *testing.T) {
109109
require.True(t, reached)
110110
}
111111

112+
func Test_Coroutine_Exit_Before_Yield(t *testing.T) {
113+
c := NewCoroutine(Background(), func(ctx Context) error {
114+
s := getCoState(ctx)
115+
116+
s.Yield()
117+
118+
require.FailNow(t, "should not reach this")
119+
120+
return nil
121+
})
122+
123+
r := runtime.NumGoroutine()
124+
c.Exit()
125+
126+
require.True(t, c.Finished())
127+
require.Equal(t, r-1, runtime.NumGoroutine())
128+
}
129+
112130
func Test_Coroutine_Exit(t *testing.T) {
113131
c := NewCoroutine(Background(), func(ctx Context) error {
114132
s := getCoState(ctx)
@@ -120,10 +138,37 @@ func Test_Coroutine_Exit(t *testing.T) {
120138
return nil
121139
})
122140

141+
c.Execute()
142+
143+
r := runtime.NumGoroutine()
144+
c.Exit()
145+
146+
require.True(t, c.Finished())
147+
require.Equal(t, r-1, runtime.NumGoroutine())
148+
}
149+
150+
func Test_Coroutine_Exit_with_defer(t *testing.T) {
151+
c := NewCoroutine(Background(), func(ctx Context) error {
152+
s := getCoState(ctx)
153+
154+
defer func() {
155+
s.Yield()
156+
}()
157+
158+
s.Yield()
159+
160+
require.FailNow(t, "should not reach this")
161+
162+
return nil
163+
})
164+
165+
c.Execute()
166+
123167
r := runtime.NumGoroutine()
124168
c.Exit()
125169

126170
require.True(t, c.Finished())
171+
require.NoError(t, c.Error())
127172
require.Equal(t, r-1, runtime.NumGoroutine())
128173
}
129174

internal/workflow/executor_test.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"github.com/google/uuid"
2525
"github.com/stretchr/testify/require"
2626
"go.opentelemetry.io/otel/trace"
27+
"go.uber.org/goleak"
2728
)
2829

2930
type testHistoryProvider struct {
@@ -638,6 +639,37 @@ func Test_Executor(t *testing.T) {
638639
require.Equal(t, goRoutines, runtime.NumGoroutine())
639640
},
640641
},
642+
{
643+
name: "Close_removes_any_goroutines_defer",
644+
f: func(t *testing.T, r *registry.Registry, e *executor, i *core.WorkflowInstance, hp *testHistoryProvider) {
645+
wf := func(ctx sync.Context) error {
646+
defer func() {
647+
_, err := wf.SignalWorkflow[any](ctx, "some-id", "signal", nil).Get(ctx)
648+
if err != nil {
649+
panic(err)
650+
}
651+
}()
652+
653+
c := wf.NewSignalChannel[int](ctx, "signal")
654+
655+
// Block workflow
656+
c.Receive(ctx)
657+
658+
return nil
659+
}
660+
661+
r.RegisterWorkflow(wf)
662+
663+
task := startWorkflowTask(i.InstanceID, wf)
664+
665+
_, err := e.ExecuteTask(context.Background(), task)
666+
require.NoError(t, err)
667+
668+
e.Close()
669+
670+
goleak.VerifyNone(t)
671+
},
672+
},
641673
{
642674
name: "Close_removes_any_goroutines_nested",
643675
f: func(t *testing.T, r *registry.Registry, e *executor, i *core.WorkflowInstance, hp *testHistoryProvider) {
@@ -683,6 +715,8 @@ func Test_Executor(t *testing.T) {
683715
e, err := newExecutor(r, i, hp)
684716
require.NoError(t, err)
685717
tt.f(t, r, e, i, hp)
718+
719+
e.Close()
686720
})
687721
}
688722
}

0 commit comments

Comments
 (0)