@@ -208,12 +208,10 @@ function initialize_streaming!(self_streams, spec, task)
208208 end
209209 output_buffer = get (spec. options, :stream_output_buffer , ProcessRingBuffer)
210210 stream = Stream {T,output_buffer} (output_buffer_amount)
211- spec. options = NamedTuple (filter (opt -> opt[1 ] != :stream_output_buffer &&
212- opt[1 ] != :stream_output_buffer_amount ,
213- Base. pairs (spec. options)))
214211 self_streams[task. uid] = stream
215212
216- spec. f = StreamingFunction (spec. f, stream)
213+ max_evals = get (spec. options, :stream_max_evals , - 1 )
214+ spec. f = StreamingFunction (spec. f, stream, max_evals)
217215 spec. options = merge (spec. options, (;occupancy= Dict (Any=> 0 )))
218216
219217 # Register Stream globally
@@ -256,6 +254,7 @@ const STREAM_THUNK_ID = TaskLocalValue{Int}(()->0)
256254struct StreamingFunction{F, S}
257255 f:: F
258256 stream:: S
257+ max_evals:: Int
259258end
260259chunktype (sf:: StreamingFunction{F} ) where F = F
261260function (sf:: StreamingFunction )(args... ; kwargs... )
@@ -319,14 +318,17 @@ end
319318function stream! (sf:: StreamingFunction , uid,
320319 args:: Tuple , kwarg_names:: Tuple , kwarg_values:: Tuple )
321320 f = move (thunk_processor (), sf. f)
322- while true
321+ counter = 0
322+
323+ while sf. max_evals < 0 || counter < sf. max_evals
323324 # Get values from Stream args/kwargs
324325 stream_args = _stream_take_values! (args, uid)
325326 stream_kwarg_values = _stream_take_values! (kwarg_values, uid)
326327 stream_kwargs = _stream_namedtuple (kwarg_names, stream_kwarg_values)
327328
328329 # Run a single cycle of f
329330 stream_result = f (stream_args... ; stream_kwargs... )
331+ counter += 1
330332
331333 # Exit streaming on graceful request
332334 if stream_result isa FinishStream
@@ -412,7 +414,8 @@ function finalize_streaming!(tasks::Vector{Pair{DTaskSpec,DTask}}, self_streams)
412414
413415 # Filter out all streaming options
414416 to_filter = (:stream_input_buffer , :stream_input_buffer_amount ,
415- :stream_output_buffer , :stream_output_buffer_amount )
417+ :stream_output_buffer , :stream_output_buffer_amount ,
418+ :stream_max_evals )
416419 spec. options = NamedTuple (filter (opt -> ! (opt[1 ] in to_filter),
417420 Base. pairs (spec. options)))
418421 if haskey (spec. options, :propagates )
0 commit comments