Properly keep references and await for concurrent tasks #984
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
The data sourcing actor is parallelizing the sending of samples to the stream senders, but it was not properly keeping references to the tasks and awaiting for them to finish.
References were probably kept by
gather(), butgather()wasn't awaited, so it can potentially not run at all (although it seems it did, because things were working).This commit uses a
TaskGroupto keep references to the tasks inprocess_msg()and then makes eachprocess_msg()call a task too, but we don't use a task group here because we don't want to await until a batch of messages is sent before receiving the next one. Instead, we want to keep sending messages in the background. But we still need to clean up these tasks as soon as they are done.This replaces the event used to wait until there are no more pending messages to be sent, as it is enough to synchronize awaiting for the (pending) tasks to finish.
This was discovered while looking at warnings in the tests (#982).