@@ -208,12 +208,10 @@ function initialize_streaming!(self_streams, spec, task)
208208 end
209209 output_buffer = get(spec. options, :stream_output_buffer, ProcessRingBuffer)
210210 stream = Stream{T,output_buffer}(output_buffer_amount)
211- spec. options = NamedTuple(filter(opt -> opt[1 ] != :stream_output_buffer &&
212- opt[1 ] != :stream_output_buffer_amount,
213- Base. pairs(spec. options)))
214211 self_streams[task. uid] = stream
215212
216- spec. f = StreamingFunction(spec. f, stream)
213+ max_evals = get(spec. options, :stream_max_evals, - 1 )
214+ spec. f = StreamingFunction(spec. f, stream, max_evals)
217215 spec. options = merge(spec. options, (;occupancy= Dict(Any=> 0 )))
218216
219217 # Register Stream globally
@@ -256,6 +254,7 @@ const STREAM_THUNK_ID = TaskLocalValue{Int}(()->0)
256254struct StreamingFunction{F, S}
257255 f:: F
258256 stream:: S
257+ max_evals:: Int
259258end
260259chunktype(sf:: StreamingFunction{F} ) where F = F
261260function (sf:: StreamingFunction )(args... ; kwargs... )
@@ -319,14 +318,17 @@ end
319318function stream!(sf:: StreamingFunction , uid,
320319 args:: Tuple , kwarg_names:: Tuple , kwarg_values:: Tuple )
321320 f = move(thunk_processor(), sf. f)
322- while true
321+ counter = 0
322+
323+ while sf. max_evals < 0 || counter < sf. max_evals
323324 # Get values from Stream args/kwargs
324325 stream_args = _stream_take_values!(args, uid)
325326 stream_kwarg_values = _stream_take_values!(kwarg_values, uid)
326327 stream_kwargs = _stream_namedtuple(kwarg_names, stream_kwarg_values)
327328
328329 # Run a single cycle of f
329330 stream_result = f(stream_args... ; stream_kwargs... )
331+ counter += 1
330332
331333 # Exit streaming on graceful request
332334 if stream_result isa FinishStream
@@ -412,7 +414,8 @@ function finalize_streaming!(tasks::Vector{Pair{DTaskSpec,DTask}}, self_streams)
412414
413415 # Filter out all streaming options
414416 to_filter = (:stream_input_buffer, :stream_input_buffer_amount,
415- :stream_output_buffer, :stream_output_buffer_amount)
417+ :stream_output_buffer, :stream_output_buffer_amount,
418+ :stream_max_evals)
416419 spec. options = NamedTuple(filter(opt -> ! (opt[1 ] in to_filter),
417420 Base. pairs(spec. options)))
418421 if haskey(spec. options, :propagates)
0 commit comments