11# stream
2-
32--
43 import "."
54
@@ -50,18 +49,16 @@ var ErrFnRequired = fmt.Errorf("nil InterceptFunc, Fn is required")
5049``` go
5150func Any [T any](ctx context.Context , in <- chan T ) <- chan any
5251```
53-
5452Any accepts an incoming data channel and converts the channel to a readonly
5553channel of the ` any ` type.
5654
5755#### func Distribute
5856
5957``` go
6058func Distribute [T any](
61- ctx context.Context , in <- chan T , out ...chan <- T ,
59+ ctx context.Context , in <- chan T , out ...chan <- T ,
6260)
6361```
64-
6562Distribute accepts an incoming data channel and distributes the data among the
6663supplied outgoing data channels using a dynamic select statement.
6764
@@ -74,7 +71,6 @@ ensure that the goroutine is properly terminated.
7471``` go
7572func Drain [T any](ctx context.Context , in <- chan T )
7673```
77-
7874Drain accepts a channel and drains the channel until the channel is closed or
7975the context is canceled.
8076
@@ -83,7 +79,6 @@ the context is canceled.
8379``` go
8480func FanIn [T any](ctx context.Context , in ...<- chan T ) <- chan T
8581```
86-
8782FanIn accepts incoming data channels and forwards returns a single channel that
8883receives all the data from the supplied channels.
8984
@@ -95,10 +90,9 @@ ensure that the goroutine is terminated.
9590
9691``` go
9792func FanOut [T any](
98- ctx context.Context , in <- chan T , out ...chan <- T ,
93+ ctx context.Context , in <- chan T , out ...chan <- T ,
9994)
10095```
101-
10296FanOut accepts an incoming data channel and copies the data to each of the
10397supplied outgoing data channels.
10498
@@ -110,12 +104,11 @@ ensure that the goroutine is properly terminated.
110104
111105``` go
112106func Intercept [T, U any](
113- ctx context.Context ,
114- in <- chan T ,
115- fn InterceptFunc [T, U ],
107+ ctx context.Context ,
108+ in <- chan T ,
109+ fn InterceptFunc [T, U ],
116110) <- chan U
117111```
118-
119112Intercept accepts an incoming data channel and a function literal that accepts
120113the incoming data and returns data of the same type and a boolean indicating
121114whether the data should be forwarded to the output channel. The function is
@@ -126,10 +119,9 @@ not canceled or the incoming channel remains open.
126119
127120``` go
128121func Pipe [T any](
129- ctx context.Context , in <- chan T , out chan <- T ,
122+ ctx context.Context , in <- chan T , out chan <- T ,
130123)
131124```
132-
133125Pipe accepts an incoming data channel and pipes it to the supplied outgoing data
134126channel.
135127
@@ -141,25 +133,25 @@ that the goroutine is properly terminated.
141133
142134``` go
143135type DurationScaler struct {
144- // Interval is the number the current step must be divisible by in order
145- // to modify the time.Duration.
146- Interval int
147-
148- // ScalingFactor is a value between -1 and 1 that is used to modify the
149- // time.Duration of a ticker or timer. The value is multiplied by
150- // the ScalingFactor is multiplied by the duration for scaling.
151- //
152- // For example, if the ScalingFactor is 0.5, then the duration will be
153- // multiplied by 0.5. If the ScalingFactor is -0.5, then the duration will
154- // be divided by 0.5. If the ScalingFactor is 0, then the duration will
155- // not be modified.
156- //
157- // A negative ScalingFactor will cause the duration to decrease as the
158- // step value increases causing the ticker or timer to fire more often
159- // and create more routines. A positive ScalingFactor will cause the
160- // duration to increase as the step value increases causing the ticker
161- // or timer to fire less often and create less routines.
162- ScalingFactor float64
136+ // Interval is the number the current step must be divisible by in order
137+ // to modify the time.Duration.
138+ Interval int
139+
140+ // ScalingFactor is a value between -1 and 1 that is used to modify the
141+ // time.Duration of a ticker or timer. The value is multiplied by
142+ // the ScalingFactor is multiplied by the duration for scaling.
143+ //
144+ // For example, if the ScalingFactor is 0.5, then the duration will be
145+ // multiplied by 0.5. If the ScalingFactor is -0.5, then the duration will
146+ // be divided by 0.5. If the ScalingFactor is 0, then the duration will
147+ // not be modified.
148+ //
149+ // A negative ScalingFactor will cause the duration to decrease as the
150+ // step value increases causing the ticker or timer to fire more often
151+ // and create more routines. A positive ScalingFactor will cause the
152+ // duration to increase as the step value increases causing the ticker
153+ // or timer to fire less often and create less routines.
154+ ScalingFactor float64
163155}
164156```
165157
@@ -172,18 +164,19 @@ a configured step value and modifier (between -1 and 1) value.
172164type InterceptFunc [T, U any] func (context.Context , T ) (U, bool )
173165```
174166
167+
175168#### type Scaler
176169
177170``` go
178171type Scaler [T, U any] struct {
179- Wait time.Duration
180- Life time.Duration
181- Fn InterceptFunc [T, U ]
182-
183- // WaitModifier is used to modify the Wait time based on the number of
184- // times the Scaler has scaled up. This is useful for systems
185- // that are CPU bound and need to scale up more/less quickly.
186- WaitModifier DurationScaler
172+ Wait time.Duration
173+ Life time.Duration
174+ Fn InterceptFunc [T, U ]
175+
176+ // WaitModifier is used to modify the Wait time based on the number of
177+ // times the Scaler has scaled up. This is useful for systems
178+ // that are CPU bound and need to scale up more/less quickly.
179+ WaitModifier DurationScaler
187180}
188181```
189182
@@ -199,7 +192,7 @@ To use Scalar, simply create a new Scaler[T, U], configuring the Wait, Life, and
199192InterceptFunc fields. These fields are what configure the functionality of the
200193Scaler.
201194
202- NOTE: Fn is REQUIRED!
195+ NOTE: Fn is REQUIRED! Defaults: Wait = 1ns, Life = 1µs
203196
204197After creating the Scaler instance and configuring it, call the Exec method
205198passing the appropriate context and input channel.
@@ -216,7 +209,6 @@ successful send occurs. (This should only loop twice).
216209``` go
217210func (s Scaler [T , U ]) Exec (ctx context .Context , in <-chan T ) (<-chan U , error )
218211```
219-
220212Exec starts the internal Scaler routine (the first layer of processing) and
221213returns the output channel where the resulting data from the Fn function will be
222214sent.
0 commit comments