Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 18 additions & 53 deletions src/core/Akka/IO/SocketEventArgsPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public interface ISocketEventArgsPool
{
SocketAsyncEventArgs Acquire(IActorRef actor);
void Release(SocketAsyncEventArgs e);

BufferPoolInfo BufferPoolInfo { get; }
}

Expand All @@ -40,16 +40,17 @@ internal class PreallocatedSocketEventAgrsPool : ISocketEventArgsPool
// There is no reason why users or developers would need to touch memory management code because it is
// very specific for providing byte buffers for SocketAsyncEventArgs
private readonly IBufferPool _bufferPool;

private readonly EventHandler<SocketAsyncEventArgs> _onComplete;

public PreallocatedSocketEventAgrsPool(int initSize, IBufferPool bufferPool, EventHandler<SocketAsyncEventArgs> onComplete)
public PreallocatedSocketEventAgrsPool(int initSize, IBufferPool bufferPool,
EventHandler<SocketAsyncEventArgs> onComplete)
{
_bufferPool = bufferPool;
_onComplete = onComplete;
}


public SocketAsyncEventArgs Acquire(IActorRef actor)
{
var buffer = _bufferPool.Rent();
Expand All @@ -67,13 +68,15 @@ public void Release(SocketAsyncEventArgs e)
{
_bufferPool.Release(new ArraySegment<byte>(e.Buffer, e.Offset, e.Count));
}

if (e.BufferList != null)
{
foreach (var segment in e.BufferList)
{
_bufferPool.Release(segment);
}
}

e.Dispose();
}

Expand All @@ -82,58 +85,20 @@ public void Release(SocketAsyncEventArgs e)

internal static class SocketAsyncEventArgsExtensions
{
public static void SetBuffer(this SocketAsyncEventArgs args, ByteString data)
{
if (data.IsCompact)
{
var buffer = data.Buffers[0];
if (args.BufferList != null)
{
// BufferList property setter is not simple member association operation,
// but the getter is. Therefore we first check if we need to clear buffer list
// and only do so if necessary.
args.BufferList = null;
}
args.SetBuffer(buffer.Array, buffer.Offset, buffer.Count);
}
else
{
if (RuntimeDetector.IsMono)
{
// Mono doesn't support BufferList - falback to compacting ByteString
var compacted = data.Compact();
var buffer = compacted.Buffers[0];
args.SetBuffer(buffer.Array, buffer.Offset, buffer.Count);
}
else
{
args.SetBuffer(null, 0, 0);
args.BufferList = data.Buffers;
}
}
}

public static void SetBuffer(this SocketAsyncEventArgs args, IEnumerable<ByteString> dataCollection)
{
if (RuntimeDetector.IsMono)
// Mono doesn't support BufferList - fallback to compacting ByteString
var dataList = dataCollection.ToList();
var totalSize = dataList.Sum(d => d.Count);
var bytes = new byte[totalSize];
var position = 0;
foreach (var byteString in dataList)
{
// Mono doesn't support BufferList - falback to compacting ByteString
var dataList = dataCollection.ToList();
var totalSize = dataList.SelectMany(d => d.Buffers).Sum(d => d.Count);
var bytes = new byte[totalSize];
var position = 0;
foreach (var byteString in dataList)
{
var copied = byteString.CopyTo(bytes, position, byteString.Count);
position += copied;
}
args.SetBuffer(bytes, 0, bytes.Length);
}
else
{
args.SetBuffer(null, 0, 0);
args.BufferList = dataCollection.SelectMany(d => d.Buffers).ToList();
var copied = byteString.CopyTo(bytes, position, byteString.Count);
position += copied;
}

args.SetBuffer(bytes, 0, bytes.Length);
}
}
}
}
8 changes: 6 additions & 2 deletions src/core/Akka/IO/UdpConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,12 @@ private void DoWrite()
{
var (send, sender) = _pendingSend.Value;
var data = send.Payload;

var bytesWritten = _socket.Send(data.Buffers);

#if NETSTANDARD2_0
var bytesWritten = _socket.Send(data.Buffers.ToArray());
#else
var bytesWritten = _socket.Send(data.Buffers.Span);
#endif
if (Udp.Settings.TraceLogging)
Log.Debug("Wrote [{0}] bytes to socket", bytesWritten);

Expand Down
Loading