Skip to content

Commit 619cda1

Browse files
committed
chann: a few more generic chann functions
1 parent cd193f6 commit 619cda1

File tree

7 files changed

+211
-12
lines changed

7 files changed

+211
-12
lines changed

README.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
# chann ![example workflow](https://github.com/golang-design/chann/actions/workflows/chann.yml/badge.svg) ![](https://changkun.de/urlstat?mode=github&repo=golang-design/chann)
22

3-
a unified representation of buffered, unbuffered, and unbounded channels in Go
3+
a unified channel package in Go
44

55
```go
66
import "golang.design/x/chann"
77
```
88

99
This package requires Go 1.18.
1010

11-
## Usage
11+
## Basic Usage
1212

1313
Different types of channels:
1414

@@ -34,8 +34,8 @@ ch.Close()
3434
Channel properties:
3535

3636
```go
37-
ch.ApproxLen() // an (approx. of) length of the channel
38-
ch.Cap() // the capacity of the channel
37+
ch.Len() // the length of the channel
38+
ch.Cap() // the capacity of the channel
3939
```
4040

4141
See https://golang.design/research/ultimate-channel for more details of

chann.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,7 @@
44
//
55
// Written by Changkun Ou <changkun.de>
66

7-
// Package chann provides a unified representation of buffered,
8-
// unbuffered, and unbounded channels in Go.
7+
// Package chann providesa a unified channel package.
98
//
109
// The package is compatible with existing buffered and unbuffered
1110
// channels. For example, in Go, to create a buffered or unbuffered
@@ -204,11 +203,11 @@ func (ch *Chann[T]) Close() {
204203
}
205204
}
206205

207-
// ApproxLen returns an approximation of the length of the channel.
206+
// Len returns an approximation of the length of the channel.
208207
//
209208
// Note that in a concurrent scenario, the returned length of a channel
210209
// may never be accurate. Hence the function is named with an Approx prefix.
211-
func (ch *Chann[T]) ApproxLen() int {
210+
func (ch *Chann[T]) Len() int {
212211
switch ch.cfg.typ {
213212
case buffered, unbuffered:
214213
return len(ch.in)

chann_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -186,14 +186,14 @@ func TestChan(t *testing.T) {
186186
{
187187
// Test len/cap.
188188
c := chann.New[int](chann.Cap(chanCap))
189-
if c.ApproxLen() != 0 || c.Cap() != chanCap {
190-
t.Fatalf("chan[%d]: bad len/cap, expect %v/%v, got %v/%v", chanCap, 0, chanCap, c.ApproxLen(), c.Cap())
189+
if c.Len() != 0 || c.Cap() != chanCap {
190+
t.Fatalf("chan[%d]: bad len/cap, expect %v/%v, got %v/%v", chanCap, 0, chanCap, c.Len(), c.Cap())
191191
}
192192
for i := 0; i < chanCap; i++ {
193193
c.In() <- i
194194
}
195-
if c.ApproxLen() != chanCap || c.Cap() != chanCap {
196-
t.Fatalf("chan[%d]: bad len/cap, expect %v/%v, got %v/%v", chanCap, chanCap, chanCap, c.ApproxLen(), c.Cap())
195+
if c.Len() != chanCap || c.Cap() != chanCap {
196+
t.Fatalf("chan[%d]: bad len/cap, expect %v/%v, got %v/%v", chanCap, chanCap, chanCap, c.Len(), c.Cap())
197197
}
198198
}
199199
}

lb.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
// Copyright 2022 The golang.design Initiative Authors.
2+
// All rights reserved. Use of this source code is governed
3+
// by a MIT license that can be found in the LICENSE file.
4+
//
5+
// Written by Changkun Ou <changkun.de>
6+
7+
package chann
8+
9+
import (
10+
"math/rand"
11+
"sync"
12+
)
13+
14+
// Fanin provides a generic fan-in functionality for variadic channels.
15+
func Fanin[T any](chans ...*Chann[T]) *Chann[T] {
16+
buf := 0
17+
for _, ch := range chans {
18+
if ch.Len() > buf {
19+
buf = ch.Len()
20+
}
21+
}
22+
out := New[T](Cap(buf))
23+
wg := sync.WaitGroup{}
24+
wg.Add(len(chans))
25+
for _, ch := range chans {
26+
go func(ch *Chann[T]) {
27+
for v := range ch.Out() {
28+
out.In() <- v
29+
}
30+
wg.Done()
31+
}(ch)
32+
}
33+
go func() {
34+
wg.Wait()
35+
out.Close()
36+
}()
37+
return out
38+
}
39+
40+
// Fanout provides a generic fan-out functionality for variadic channels.
41+
func Fanout[T any](randomizer func(max int) int, in *Chann[T], outs ...*Chann[T]) {
42+
l := len(outs)
43+
for v := range in.Out() {
44+
i := randomizer(l)
45+
if i < 0 || i > l {
46+
i = rand.Intn(l)
47+
}
48+
go func(v T) { outs[i].In() <- v }(v)
49+
}
50+
}
51+
52+
// LB load balances the given input channels to the output channels.
53+
func LB[T any](randomizer func(max int) int, ins []*Chann[T], outs []*Chann[T]) {
54+
Fanout(randomizer, Fanin(ins...), outs...)
55+
}

lb_test.go

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
// Copyright 2022 The golang.design Initiative Authors.
2+
// All rights reserved. Use of this source code is governed
3+
// by a MIT license that can be found in the LICENSE file.
4+
//
5+
// Written by Changkun Ou <changkun.de>
6+
7+
package chann_test
8+
9+
import (
10+
"math/rand"
11+
"testing"
12+
13+
"golang.design/x/chann"
14+
)
15+
16+
func getInputChan() *chann.Chann[int] {
17+
input := chann.New[int](chann.Cap(20))
18+
numbers := []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}
19+
go func() {
20+
for num := range numbers {
21+
input.In() <- num
22+
}
23+
input.Close()
24+
}()
25+
return input
26+
}
27+
28+
func TestFanin(t *testing.T) {
29+
chs := make([]*chann.Chann[int], 10)
30+
for i := 0; i < 10; i++ {
31+
chs[i] = getInputChan()
32+
}
33+
34+
out := chann.Fanin(chs...)
35+
count := 0
36+
for range out.Out() {
37+
count++
38+
}
39+
if count != 100 {
40+
t.Fatalf("Fanin failed")
41+
}
42+
}
43+
44+
func TestLB(t *testing.T) {
45+
ins := make([]*chann.Chann[int], 10)
46+
for i := 0; i < 10; i++ {
47+
ins[i] = getInputChan()
48+
}
49+
outs := make([]*chann.Chann[int], 10)
50+
for i := 0; i < 10; i++ {
51+
outs[i] = chann.New[int](chann.Cap(10))
52+
}
53+
chann.LB(func(m int) int { return rand.Intn(m) }, ins, outs)
54+
}

utils.go

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
// Copyright 2022 The golang.design Initiative Authors.
2+
// All rights reserved. Use of this source code is governed
3+
// by a MIT license that can be found in the LICENSE file.
4+
//
5+
// Written by Changkun Ou <changkun.de>
6+
7+
package chann
8+
9+
import "runtime"
10+
11+
// Ranger returns a Sender and a Receiver. The Receiver provides a
12+
// Next method to retrieve values. The Sender provides a Send method
13+
// to send values and a Close method to stop sending values. The Next
14+
// method indicates when the Sender has been closed, and the Send
15+
// method indicates when the Receiver has been freed.
16+
//
17+
// This is a convenient way to exit a goroutine sending values when
18+
// the receiver stops reading them.
19+
func Ranger[T any]() (*Sender[T], *Receiver[T]) {
20+
c := New[T]()
21+
d := New[bool]()
22+
s := &Sender[T]{values: c, done: d}
23+
r := &Receiver[T]{values: c, done: d}
24+
runtime.SetFinalizer(r, func(r *Receiver[T]) { r.finalize() })
25+
return s, r
26+
}
27+
28+
// A sender is used to send values to a Receiver.
29+
type Sender[T any] struct {
30+
values *Chann[T]
31+
done *Chann[bool]
32+
}
33+
34+
// Send sends a value to the receiver. It returns whether any more
35+
// values may be sent; if it returns false the value was not sent.
36+
func (s *Sender[T]) Send(v T) bool {
37+
select {
38+
case s.values.In() <- v:
39+
return true
40+
case <-s.done.Out():
41+
return false
42+
}
43+
}
44+
45+
// Close tells the receiver that no more values will arrive.
46+
// After Close is called, the Sender may no longer be used.
47+
func (s *Sender[T]) Close() { s.values.Close() }
48+
49+
// A Receiver receives values from a Sender.
50+
type Receiver[T any] struct {
51+
values *Chann[T]
52+
done *Chann[bool]
53+
}
54+
55+
// Next returns the next value from the channel. The bool result
56+
// indicates whether the value is valid, or whether the Sender has
57+
// been closed and no more values will be received.
58+
func (r *Receiver[T]) Next() (T, bool) {
59+
v, ok := <-r.values.Out()
60+
return v, ok
61+
}
62+
63+
// finalize is a finalizer for the receiver.
64+
func (r *Receiver[T]) finalize() { r.done.Close() }

utils_test.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
// Copyright 2021 The golang.design Initiative Authors.
2+
// All rights reserved. Use of this source code is governed
3+
// by a MIT license that can be found in the LICENSE file.
4+
//
5+
// Written by Changkun Ou <changkun.de>
6+
7+
package chann_test
8+
9+
import (
10+
"testing"
11+
12+
"golang.design/x/chann"
13+
)
14+
15+
func TestRanger(t *testing.T) {
16+
s, r := chann.Ranger[int]()
17+
18+
go func() {
19+
s.Send(42)
20+
}()
21+
22+
n, ok := r.Next()
23+
if !ok {
24+
t.Fatalf("cannot receive from senter")
25+
}
26+
t.Log(n)
27+
}

0 commit comments

Comments
 (0)