Skip to content

Commit 25c98a4

Browse files
authored
Fix channel close bug (#713)
1 parent d481995 commit 25c98a4

File tree

2 files changed

+58
-7
lines changed

2 files changed

+58
-7
lines changed

internal/internal_workflow.go

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -650,14 +650,15 @@ func (c *channelImpl) Send(ctx Context, v interface{}) {
650650
return
651651
}
652652
for {
653-
// Check for closed in the loop as close can be called when send is blocked
654-
if c.closed {
655-
panic("Closed channel")
656-
}
657653
if valueConsumed {
658654
state.unblocked()
659655
return
660656
}
657+
658+
// Check for closed in the loop as close can be called when send is blocked
659+
if c.closed {
660+
panic("Closed channel")
661+
}
661662
state.yield(fmt.Sprintf("blocked on %s.Send", c.name))
662663
}
663664
}
@@ -695,9 +696,6 @@ func (c *channelImpl) Close() {
695696
callback.fn(nil, false)
696697
}
697698
// All blocked sends are going to panic
698-
for _, callback := range c.blockedSends {
699-
callback.fn()
700-
}
701699
}
702700

703701
// Takes a value and assigns that 'to' value. logs a metric if it is unable to deserialize

internal/internal_workflow_test.go

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@ func (s *WorkflowUnitTest) SetupSuite() {
6060
RegisterWorkflow(receiveCorruptSignalOnClosedChannelWorkflowTest)
6161
RegisterWorkflow(bufferedChanWorkflowTest)
6262
RegisterWorkflow(bufferedChanWithSelectorWorkflowTest)
63+
RegisterWorkflow(closeChannelTest)
64+
RegisterWorkflow(closeChannelInSelectTest)
6365

6466
s.activityOptions = ActivityOptions{
6567
ScheduleToStartTimeout: time.Minute,
@@ -759,6 +761,57 @@ func (s *WorkflowUnitTest) Test_CorruptedSignalOnClosedChannelWorkflow_Receive_S
759761
s.EqualValues(0, len(result))
760762
}
761763

764+
func closeChannelTest(ctx Context) error {
765+
ch := NewChannel(ctx)
766+
Go(ctx, func(ctx Context) {
767+
var dummy struct{}
768+
ch.Receive(ctx, &dummy)
769+
ch.Close()
770+
})
771+
772+
ch.Send(ctx, struct{}{})
773+
return nil
774+
}
775+
776+
func (s *WorkflowUnitTest) Test_CloseChannelWorkflow() {
777+
env := s.NewTestWorkflowEnvironment()
778+
env.ExecuteWorkflow(closeChannelTest)
779+
s.True(env.IsWorkflowCompleted())
780+
s.NoError(env.GetWorkflowError())
781+
}
782+
783+
func closeChannelInSelectTest(ctx Context) error {
784+
s := NewSelector(ctx)
785+
sendCh := NewChannel(ctx)
786+
receiveCh := NewChannel(ctx)
787+
expectedValue := "expected value"
788+
789+
Go(ctx, func(ctx Context) {
790+
sendCh.Close()
791+
receiveCh.Send(ctx, expectedValue)
792+
})
793+
794+
var v string
795+
s.AddSend(sendCh, struct{}{}, func() {
796+
panic("callback for sendCh should not be executed")
797+
})
798+
s.AddReceive(receiveCh, func(c Channel, m bool) {
799+
c.Receive(ctx, &v)
800+
})
801+
s.Select(ctx)
802+
if v != expectedValue {
803+
panic("callback for receiveCh is not executed")
804+
}
805+
return nil
806+
}
807+
808+
func (s *WorkflowUnitTest) Test_CloseChannelInSelectWorkflow() {
809+
env := s.NewTestWorkflowEnvironment()
810+
env.ExecuteWorkflow(closeChannelInSelectTest)
811+
s.True(env.IsWorkflowCompleted())
812+
s.NoError(env.GetWorkflowError())
813+
}
814+
762815
func bufferedChanWorkflowTest(ctx Context, bufferSize int) error {
763816
bufferedCh := NewBufferedChannel(ctx, bufferSize)
764817

0 commit comments

Comments
 (0)