-
-
Notifications
You must be signed in to change notification settings - Fork 451
Description
If you lifts a Channel into a SourceT and repeatedly consume its elements, the associated MultiListenerPureSourceT instance keeps creating new channel and adding it to its internal listeners dictionary indefinitely, potentially reading to problems described in this post, or eventual OutOfMemory error:

I managed to create a minimal example code that reproduce this symptom, as shown below:
var channel = Channel.CreateBounded<int>(100);
var source = channel.AsSourceT<IO, int>();
var sink = Sink.lift(channel);
repeat(Schedule.spaced(500.Milliseconds()), sink.Post(1)).RunAsync();
var count =
from values in source
.TakeFor(2.Seconds())
.Collect()
.As()
from _ in IO.lift(() =>
{
Console.WriteLine(values.Count);
})
select unit;
repeat(Schedule.spaced(2.Seconds()), count).Run();Running this code in a debug mode with a breakpoint in ReduceInternalM, you'll see that the listeners property keeps growing after a few iterations.
I was unable to reproduce the exact symptom described in the linked discussion, but I suspect it was caused by multiple channels consuming the source channel before my code has a chance to do it, so I think there's a good chance that this might be the root cause of it.