11# stream
2+
23--
34 import "."
45
@@ -49,16 +50,18 @@ var ErrFnRequired = fmt.Errorf("nil InterceptFunc, Fn is required")
4950``` go
5051func Any [T any](ctx context.Context , in <- chan T ) <- chan any
5152```
53+
5254Any accepts an incoming data channel and converts the channel to a readonly
5355channel of the ` any ` type.
5456
5557#### func Distribute
5658
5759``` go
5860func Distribute [T any](
59- ctx context.Context , in <- chan T , out ...chan <- T ,
61+ ctx context.Context , in <- chan T , out ...chan <- T ,
6062)
6163```
64+
6265Distribute accepts an incoming data channel and distributes the data among the
6366supplied outgoing data channels using a dynamic select statement.
6467
@@ -71,6 +74,7 @@ ensure that the goroutine is properly terminated.
7174``` go
7275func Drain [T any](ctx context.Context , in <- chan T )
7376```
77+
7478Drain accepts a channel and drains the channel until the channel is closed or
7579the context is canceled.
7680
@@ -79,6 +83,7 @@ the context is canceled.
7983``` go
8084func FanIn [T any](ctx context.Context , in ...<- chan T ) <- chan T
8185```
86+
8287FanIn accepts incoming data channels and forwards returns a single channel that
8388receives all the data from the supplied channels.
8489
@@ -90,9 +95,10 @@ ensure that the goroutine is terminated.
9095
9196``` go
9297func FanOut [T any](
93- ctx context.Context , in <- chan T , out ...chan <- T ,
98+ ctx context.Context , in <- chan T , out ...chan <- T ,
9499)
95100```
101+
96102FanOut accepts an incoming data channel and copies the data to each of the
97103supplied outgoing data channels.
98104
@@ -104,11 +110,12 @@ ensure that the goroutine is properly terminated.
104110
105111``` go
106112func Intercept [T, U any](
107- ctx context.Context ,
108- in <- chan T ,
109- fn InterceptFunc [T, U ],
113+ ctx context.Context ,
114+ in <- chan T ,
115+ fn InterceptFunc [T, U ],
110116) <- chan U
111117```
118+
112119Intercept accepts an incoming data channel and a function literal that accepts
113120the incoming data and returns data of the same type and a boolean indicating
114121whether the data should be forwarded to the output channel. The function is
@@ -119,9 +126,10 @@ not canceled or the incoming channel remains open.
119126
120127``` go
121128func Pipe [T any](
122- ctx context.Context , in <- chan T , out chan <- T ,
129+ ctx context.Context , in <- chan T , out chan <- T ,
123130)
124131```
132+
125133Pipe accepts an incoming data channel and pipes it to the supplied outgoing data
126134channel.
127135
@@ -133,25 +141,25 @@ that the goroutine is properly terminated.
133141
134142``` go
135143type DurationScaler struct {
136- // Interval is the number the current step must be divisible by in order
137- // to modify the time.Duration.
138- Interval int
139-
140- // ScalingFactor is a value between -1 and 1 that is used to modify the
141- // time.Duration of a ticker or timer. The value is multiplied by
142- // the ScalingFactor is multiplied by the duration for scaling.
143- //
144- // For example, if the ScalingFactor is 0.5, then the duration will be
145- // multiplied by 0.5. If the ScalingFactor is -0.5, then the duration will
146- // be divided by 0.5. If the ScalingFactor is 0, then the duration will
147- // not be modified.
148- //
149- // A negative ScalingFactor will cause the duration to decrease as the
150- // step value increases causing the ticker or timer to fire more often
151- // and create more routines. A positive ScalingFactor will cause the
152- // duration to increase as the step value increases causing the ticker
153- // or timer to fire less often and create less routines.
154- ScalingFactor float64
144+ // Interval is the number the current step must be divisible by in order
145+ // to modify the time.Duration.
146+ Interval int
147+
148+ // ScalingFactor is a value between -1 and 1 that is used to modify the
149+ // time.Duration of a ticker or timer. The value is multiplied by
150+ // the ScalingFactor is multiplied by the duration for scaling.
151+ //
152+ // For example, if the ScalingFactor is 0.5, then the duration will be
153+ // multiplied by 0.5. If the ScalingFactor is -0.5, then the duration will
154+ // be divided by 0.5. If the ScalingFactor is 0, then the duration will
155+ // not be modified.
156+ //
157+ // A negative ScalingFactor will cause the duration to decrease as the
158+ // step value increases causing the ticker or timer to fire more often
159+ // and create more routines. A positive ScalingFactor will cause the
160+ // duration to increase as the step value increases causing the ticker
161+ // or timer to fire less often and create less routines.
162+ ScalingFactor float64
155163}
156164```
157165
@@ -164,19 +172,18 @@ a configured step value and modifier (between -1 and 1) value.
164172type InterceptFunc [T, U any] func (context.Context , T ) (U, bool )
165173```
166174
167-
168175#### type Scaler
169176
170177``` go
171178type Scaler [T, U any] struct {
172- Wait time.Duration
173- Life time.Duration
174- Fn InterceptFunc [T, U ]
175-
176- // WaitModifier is used to modify the Wait time based on the number of
177- // times the Scaler has scaled up. This is useful for systems
178- // that are CPU bound and need to scale up more/less quickly.
179- WaitModifier DurationScaler
179+ Wait time.Duration
180+ Life time.Duration
181+ Fn InterceptFunc [T, U ]
182+
183+ // WaitModifier is used to modify the Wait time based on the number of
184+ // times the Scaler has scaled up. This is useful for systems
185+ // that are CPU bound and need to scale up more/less quickly.
186+ WaitModifier DurationScaler
180187}
181188```
182189
@@ -209,6 +216,7 @@ successful send occurs. (This should only loop twice).
209216``` go
210217func (s Scaler [T , U ]) Exec (ctx context .Context , in <-chan T ) (<-chan U , error )
211218```
219+
212220Exec starts the internal Scaler routine (the first layer of processing) and
213221returns the output channel where the resulting data from the Fn function will be
214222sent.
0 commit comments