Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 22 additions & 17 deletions src/Nethermind/Nethermind.Network/PeerManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -219,27 +219,32 @@ private class CandidateSelection
private async Task RunPeerUpdateLoop()
{
Channel<Peer> taskChannel = Channel.CreateBounded<Peer>(1);
using ArrayPoolList<Task> tasks = Enumerable.Range(0, _outgoingConnectParallelism).Select(async idx =>
using ArrayPoolList<Task> tasks = new(_outgoingConnectParallelism);
for (int idx = 0; idx < _outgoingConnectParallelism; idx++)
{
await foreach (Peer peer in taskChannel.Reader.ReadAllAsync(_cancellationTokenSource.Token))
int workerIdx = idx;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can probably avoid closure with something like:

for (int idx = 0; idx < _outgoingConnectParallelism; idx++)
{
    tasks.Add(RunWorker(idx));
}

async Task RunWorker(int workerIdx)
{
    await foreach (Peer peer in taskChannel.Reader.ReadAllAsync(_cancellationTokenSource.Token))
    {
        try
        {
            await SetupOutgoingPeerConnection(peer);
        }
        catch (TaskCanceledException)
        {
            if (_logger.IsDebug) _logger.Debug($"Connect worker {workerIdx} cancelled");
            break;
        }
        catch (Exception e)
        {
            if (_logger.IsError) _logger.Error($"Error setting up connection to {peer}, {e}");
        }
    }
    if (_logger.IsDebug) _logger.Debug($"Connect worker {workerIdx} completed");
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion! I switched the loop to call a local RunWorker(int workerIdx) helper so each worker receives its own index instead of capturing the loop variable. This removes the closure while keeping the behavior the same.

tasks.Add(Task.Run(async () =>
{
try
{
await SetupOutgoingPeerConnection(peer);
}
catch (TaskCanceledException)
{
if (_logger.IsDebug) _logger.Debug($"Connect worker {idx} cancelled");
break;
}
catch (Exception e)
await foreach (Peer peer in taskChannel.Reader.ReadAllAsync(_cancellationTokenSource.Token))
{
// This is strictly speaking not related to the connection, but something outside of it.
if (_logger.IsError) _logger.Error($"Error setting up connection to {peer}, {e}");
try
{
await SetupOutgoingPeerConnection(peer);
}
catch (TaskCanceledException)
{
if (_logger.IsDebug) _logger.Debug($"Connect worker {workerIdx} cancelled");
break;
}
catch (Exception e)
{
// This is strictly speaking not related to the connection, but something outside of it.
if (_logger.IsError) _logger.Error($"Error setting up connection to {peer}, {e}");
}
}
}
if (_logger.IsDebug) _logger.Debug($"Connect worker {idx} completed");
}).ToPooledList(_outgoingConnectParallelism);
if (_logger.IsDebug) _logger.Debug($"Connect worker {workerIdx} completed");
}));
}

int loopCount = 0;
long previousActivePeersCount = 0;
Expand Down
9 changes: 5 additions & 4 deletions src/Nethermind/Nethermind.Trie/BatchedTrieVisitor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
using System.Buffers.Binary;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using System.Threading;
Expand Down Expand Up @@ -146,9 +145,11 @@ public void Start(

try
{
using ArrayPoolListRef<Task> tasks = Enumerable.Range(0, trieVisitContext.MaxDegreeOfParallelism)
.Select(_ => Task.Run(BatchedThread))
.ToPooledListRef(trieVisitContext.MaxDegreeOfParallelism);
using ArrayPoolListRef<Task> tasks = new(trieVisitContext.MaxDegreeOfParallelism);
for (int i = 0; i < trieVisitContext.MaxDegreeOfParallelism; i++)
{
tasks.Add(Task.Run(BatchedThread));
}

Task.WaitAll(tasks.AsSpan());
}
Expand Down
Loading