Skip to content

Commit 1a328be

Browse files
authored
Fix buffered channel bug (#666)
receivedAsync now will check if there's any blocked send when taking a value from buffer.
1 parent 91c82a2 commit 1a328be

File tree

2 files changed

+93
-1
lines changed

2 files changed

+93
-1
lines changed

internal/internal_workflow.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,15 @@ package internal
2525
import (
2626
"errors"
2727
"fmt"
28-
"github.com/robfig/cron"
2928
"reflect"
3029
"runtime"
3130
"strings"
3231
"sync"
3332
"time"
3433
"unicode"
3534

35+
"github.com/robfig/cron"
36+
3637
"github.com/uber-go/tally"
3738
"go.uber.org/atomic"
3839
"go.uber.org/cadence/.gen/go/shared"
@@ -584,6 +585,18 @@ func (c *channelImpl) receiveAsyncImpl(callback *receiveCallback) (v interface{}
584585
r := c.buffer[0]
585586
c.buffer[0] = nil
586587
c.buffer = c.buffer[1:]
588+
589+
// Move blocked sends into buffer
590+
for len(c.blockedSends) > 0 {
591+
b := c.blockedSends[0]
592+
c.blockedSends[0] = nil
593+
c.blockedSends = c.blockedSends[1:]
594+
if b.fn() {
595+
c.buffer = append(c.buffer, b.value)
596+
break
597+
}
598+
}
599+
587600
return r, true, true
588601
}
589602
if c.closed {

internal/internal_workflow_test.go

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,8 @@ func (s *WorkflowUnitTest) SetupSuite() {
5858
RegisterWorkflow(activityOptionsWorkflow)
5959
RegisterWorkflow(receiveAsync_CorruptSignalOnClosedChannelWorkflowTest)
6060
RegisterWorkflow(receive_CorruptSignalOnClosedChannelWorkflowTest)
61+
RegisterWorkflow(bufferedChanWorkflowTest)
62+
RegisterWorkflow(bufferedChanWithSelectorWorkflowTest)
6163

6264
s.activityOptions = ActivityOptions{
6365
ScheduleToStartTimeout: time.Minute,
@@ -757,6 +759,83 @@ func (s *WorkflowUnitTest) Test_CorruptedSignalOnClosedChannelWorkflow_Receive_S
757759
s.EqualValues(0, len(result))
758760
}
759761

762+
func bufferedChanWorkflowTest(ctx Context, bufferSize int) error {
763+
bufferedCh := NewBufferedChannel(ctx, bufferSize)
764+
765+
Go(ctx, func(ctx Context) {
766+
var dummy int
767+
for i := 0; i < bufferSize; i++ {
768+
bufferedCh.Receive(ctx, &dummy)
769+
}
770+
})
771+
772+
for i := 0; i < bufferSize+1; i++ {
773+
bufferedCh.Send(ctx, i)
774+
}
775+
return nil
776+
}
777+
778+
func (s *WorkflowUnitTest) Test_BufferedChanWorkflow() {
779+
bufferSizeList := []int{1, 5}
780+
for _, bufferSize := range bufferSizeList {
781+
env := s.NewTestWorkflowEnvironment()
782+
env.ExecuteWorkflow(bufferedChanWorkflowTest, bufferSize)
783+
s.True(env.IsWorkflowCompleted())
784+
s.NoError(env.GetWorkflowError())
785+
}
786+
}
787+
788+
func bufferedChanWithSelectorWorkflowTest(ctx Context, bufferSize int) error {
789+
bufferedCh := NewBufferedChannel(ctx, bufferSize)
790+
selectedCh := NewChannel(ctx)
791+
done := NewChannel(ctx)
792+
var dummy struct{}
793+
794+
// 1. First we need to fill the buffer
795+
for i := 0; i < bufferSize; i++ {
796+
bufferedCh.Send(ctx, dummy)
797+
}
798+
799+
// DO NOT change the order of these coroutines.
800+
Go(ctx, func(ctx Context) {
801+
// 3. Add another send callback to bufferedCh's blockedSends.
802+
bufferedCh.Send(ctx, dummy)
803+
done.Send(ctx, dummy)
804+
})
805+
806+
Go(ctx, func(ctx Context) {
807+
// 4. Make sure selectedCh is selected
808+
selectedCh.Receive(ctx, nil)
809+
810+
// 5. Get a value from channel buffer. Receive call will also check if there's any blocked sends.
811+
// The first blockedSends is added by Select(). Since bufferedCh is not selected, it's fn() will
812+
// return false. The Receive call should continue to check other blockedSends, until fn() returns
813+
// true or the list is empty. In this case, it will move the value sent in step 3 into buffer
814+
// and thus unblocks it.
815+
bufferedCh.Receive(ctx, nil)
816+
})
817+
818+
selector := NewSelector(ctx)
819+
selector.AddSend(selectedCh, dummy, func() {})
820+
selector.AddSend(bufferedCh, dummy, func() {})
821+
// 2. When select is called, callback for the second send will be added to bufferedCh's blockedSends
822+
selector.Select(ctx)
823+
824+
// Make sure no coroutine blocks
825+
done.Receive(ctx, nil)
826+
return nil
827+
}
828+
829+
func (s *WorkflowUnitTest) Test_BufferedChanWithSelectorWorkflow() {
830+
bufferSizeList := []int{1, 5}
831+
for _, bufferSize := range bufferSizeList {
832+
env := s.NewTestWorkflowEnvironment()
833+
env.ExecuteWorkflow(bufferedChanWithSelectorWorkflowTest, bufferSize)
834+
s.True(env.IsWorkflowCompleted())
835+
s.NoError(env.GetWorkflowError())
836+
}
837+
}
838+
760839
func activityOptionsWorkflow(ctx Context) (result string, err error) {
761840
ao1 := ActivityOptions{
762841
ActivityID: "id1",

0 commit comments

Comments
 (0)