Skip to content

chann: support to consume all data after Close the channel. #5

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions chann.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ func Cap(n int) Opt {
// one, and use Cap to configure the capacity of the channel.
type Chann[T any] struct {
in, out chan T
backlog chan T
close chan struct{}
cfg *config
q []T
Expand Down Expand Up @@ -129,6 +130,7 @@ func New[T any](opts ...Opt) *Chann[T] {
case unbounded:
ch.in = make(chan T, 16)
ch.out = make(chan T, 16)
ch.backlog = make(chan T, 1024)
go ch.unboundedProcessing()
}
return ch
Expand Down Expand Up @@ -208,14 +210,13 @@ func (ch *Chann[T]) unboundedTerminate() {
}
for len(ch.q) > 0 {
select {
// Note if receiver doesn't consume all data that has been sent to input
// channel, the `unboundedProcessing` goroutine will leak forever.
// Ref: https://github.com/golang-design/chann/issues/3
case ch.out <- ch.q[0]:
// The default branch exists because we need guarantee
// the loop can terminate. If there is a receiver, the
// first case will ways be selected. See #3.
default:
ch.q[0] = nilT // de-reference earlier to help GC
ch.q = ch.q[1:]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This reads like a previous implementation that was reported to be problematic, as discussed in #3. Could you help me to understand how your changes can improve the overall situation?

Copy link
Author

@amyangfei amyangfei Jan 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, in this PR, it behaves as follows
When an unbounded channel is closed

}
ch.q[0] = nilT // de-reference earlier to help GC
ch.q = ch.q[1:]
}
close(ch.out)
close(ch.close)
Expand Down
33 changes: 27 additions & 6 deletions chann_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,6 @@ func TestNonblockRecvRace(t *testing.T) {
for i := 0; i < n; i++ {
c := chann.New[int](chann.Cap(1))
c.In() <- 1
t.Log(i)
go func() {
select {
case <-c.Out():
Expand Down Expand Up @@ -433,11 +432,8 @@ func TestUnboundedChannClose(t *testing.T) {
}
ch.Close()

// Theoretically, this is not a dead loop. If the channel
// is closed, then this loop must terminate at somepoint.
// If not, we will meet timeout in the test.
for !chann.IsClosed(ch) {
t.Log("unbounded channel is still not entirely closed")
if chann.IsClosed(ch) {
t.Fatal("channel should not be closed if data is not consumed")
}
})

Expand Down Expand Up @@ -492,6 +488,31 @@ func TestUnboundedChannClose(t *testing.T) {
t.Fatalf("After close, not all elements are received, got %v, want %v", n, N)
}
})

// ref: https://github.com/golang-design/chann/issues/3#issuecomment-1150189421
t.Run("consume-data-after-close", func(t *testing.T) {
var wg sync.WaitGroup
ch := chann.New[int]()

wg.Add(1)
c := 0
go func() {
for range ch.Out() {
c++
}
wg.Done()
}()

for i := 0; i < 2048; i++ {
ch.In() <- 42
}
ch.Close()

wg.Wait()
if c != 2048 {
t.Fatalf("not all elements are received after channel being closed, want %v got %v", 2048, c)
}
})
}

func BenchmarkUnboundedChann(b *testing.B) {
Expand Down