Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/common.props
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
<NewtonsoftJsonVersion>9.0.1</NewtonsoftJsonVersion>
<NBenchVersion>1.2.2</NBenchVersion>
<ProtobufVersion>3.10.0</ProtobufVersion>
<NetCoreTestVersion>netcoreapp2.1</NetCoreTestVersion>
<NetCoreTestVersion>netcoreapp3.0</NetCoreTestVersion>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hold off on this if you can - requires a much bigger set of changes on the build system side to support it.

<NetFrameworkTestVersion>net461</NetFrameworkTestVersion>
<NetStandardLibVersion>netstandard2.0</NetStandardLibVersion>
<NetFrameworkLibVersion>net452</NetFrameworkLibVersion>
<NetStandardLibVersion>netstandard2.1</NetStandardLibVersion>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like I mentioned in PR comments, stick to .NET Standard 2.0 for the time being - we can add the pipelines stuff in later. Let's just get SocketAsyncEventArgs usage optimized / improved doing what we can for now.

<NetFrameworkLibVersion>net461</NetFrameworkLibVersion>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once we figure out what to do with the parts of Akka.NET that depend on System.Configuration, we will drop explicit .NET Framework support and just stick to standard.

<FluentAssertionsVersion>4.14.0</FluentAssertionsVersion>
<FsCheckVersion>2.9.0</FsCheckVersion>
<AkkaPackageTags>akka;actors;actor model;Akka;concurrency</AkkaPackageTags>
Expand Down
3 changes: 2 additions & 1 deletion src/core/Akka/Akka.csproj
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
<Project Sdk="Microsoft.NET.Sdk">
<Project Sdk="Microsoft.NET.Sdk">
<Import Project="..\..\common.props" />

<PropertyGroup>
Expand All @@ -17,6 +17,7 @@
<ItemGroup>
<PackageReference Include="Newtonsoft.Json" Version="$(NewtonsoftJsonVersion)" />
<PackageReference Include="System.Collections.Immutable" Version="1.6.0" />
<PackageReference Include="System.IO.Pipelines" Version="4.6.0" />
</ItemGroup>

<ItemGroup Condition=" '$(TargetFramework)' == '$(NetFrameworkLibVersion)'">
Expand Down
30 changes: 30 additions & 0 deletions src/core/Akka/IO/Tcp.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using System.Reflection;
using System.Collections;
using System.Collections.Generic;
using System.IO.Pipelines;
using System.Linq;
using System.Net;
using System.Net.Sockets;
Expand All @@ -18,6 +19,7 @@
using Akka.Dispatch;
using Akka.Event;
using Akka.IO.Buffers;
using Akka.Util;

namespace Akka.IO
{
Expand Down Expand Up @@ -67,6 +69,34 @@ internal sealed class SocketReceived : SocketCompleted
private SocketReceived() { }
}

internal sealed class SocketReceivePipeUpdated
{
public SocketReceivePipeUpdated(int bytesReceived)
{
BytesReceived = bytesReceived;
Error = Option<Exception>.None;
}

public SocketReceivePipeUpdated(Exception error)
{
Error = error;
}

public int BytesReceived { get; }
public Option<Exception> Error { get; }
public bool IsFaulted => Error.HasValue;
}

internal sealed class SocketReceivePipeFlushCompleted
{
public SocketReceivePipeFlushCompleted(FlushResult result)
{
Result = result;
}

public FlushResult Result { get; }
}

internal sealed class SocketAccepted : SocketCompleted
{
public static readonly SocketAccepted Instance = new SocketAccepted();
Expand Down
141 changes: 134 additions & 7 deletions src/core/Akka/IO/TcpConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,14 @@
//-----------------------------------------------------------------------

using System;
using System.Buffers;
using System.Collections.Generic;
using System.IO.Pipelines;
using System.Linq;
using System.Net.Sockets;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Dispatch;
using Akka.Event;
Expand Down Expand Up @@ -83,6 +86,7 @@ enum ConnectionStatus
private ConnectionStatus status;
protected readonly TcpExt Tcp;
protected readonly Socket Socket;
protected Pipe ReceivePipe;
protected SocketAsyncEventArgs ReceiveArgs;
protected SocketAsyncEventArgs SendArgs;

Expand Down Expand Up @@ -191,6 +195,8 @@ private Receive Connected(ConnectionInfo info)
case SuspendReading _: SuspendReading(); return true;
case ResumeReading _: ResumeReading(); return true;
case SocketReceived _: DoRead(info, null); return true;
case SocketReceivePipeUpdated update: HandleReceivePipeUpdated(info, null, update); return true;
case SocketReceivePipeFlushCompleted flush: HandleReceivePipeFlushed(info, flush.Result); return true;
case CloseCommand cmd: HandleClose(info, Sender, cmd.Event); return true;
default: return false;
}
Expand Down Expand Up @@ -229,6 +235,8 @@ private Receive ClosingWithPendingWrite(ConnectionInfo info, IActorRef closeComm
case SuspendReading _: SuspendReading(); return true;
case ResumeReading _: ResumeReading(); return true;
case SocketReceived _: DoRead(info, closeCommander); return true;
case SocketReceivePipeUpdated update: HandleReceivePipeUpdated(info, closeCommander, update); return true;
case SocketReceivePipeFlushCompleted flush: HandleReceivePipeFlushed(info, flush.Result); return true;
case SocketSent _:
AcknowledgeSent();
if (IsWritePending) DoWrite(info);
Expand Down Expand Up @@ -258,6 +266,8 @@ private Receive Closing(ConnectionInfo info, IActorRef closeCommandor)
case SuspendReading _: SuspendReading(); return true;
case ResumeReading _: ResumeReading(); return true;
case SocketReceived _: DoRead(info, closeCommandor); return true;
case SocketReceivePipeUpdated update: HandleReceivePipeUpdated(info, closeCommandor, update); return true;
case SocketReceivePipeFlushCompleted flush: HandleReceivePipeFlushed(info, flush.Result); return true;
case Abort _: HandleClose(info, Sender, Aborted.Instance); return true;
default: return false;
}
Expand Down Expand Up @@ -387,6 +397,97 @@ private void AcknowledgeSent()
ClearStatus(ConnectionStatus.Sending);
}

private void HandleReceivePipeFlushed(ConnectionInfo info, FlushResult flushResult)
{
if (HasStatus(ConnectionStatus.ReadingSuspended))
return;

if (flushResult.IsCompleted)
ReceivePipe.Writer.Complete();

// Can make synchronous read, because we know that there is something ready in the buffer
if (ReceivePipe.Reader.TryRead(out var readResult))
{
ReadOnlySequence<byte> buffer = readResult.Buffer;
// TODO: eliminate ByteString copy operaiton - use same ReadOnlySequence<byte> internally
info.Handler.Tell(new Tcp.Received(ByteString.FromBytes(buffer.ToArray())));
ReceivePipe.Reader.AdvanceTo(buffer.End);

if (readResult.IsCompleted)
ReceivePipe.Reader.Complete();
}
}

private void HandleReceivePipeUpdated(ConnectionInfo info, IActorRef closeCommander, SocketReceivePipeUpdated update)
{
//TODO: What should we do if reading is suspended with an oustanding read - this will discard the read
// Should probably have an 'oustanding read'
if (HasStatus(ConnectionStatus.ReadingSuspended))
return;

try
{
var read = FlushPipe(info, Tcp.Settings.ReceivedMessageSizeLimit, ReceivePipe, update);
ClearStatus(ConnectionStatus.Receiving);
switch (read.Type)
{
case ReadResultType.AllRead:
if (!pullMode)
ReceiveAsync();
break;
case ReadResultType.EndOfStream:
if (isOutputShutdown)
{
if (traceLogging) Log.Debug("Read returned end-of-stream, our side already closed");
DoCloseConnection(info, closeCommander, ConfirmedClosed.Instance);
}
else
{
if (traceLogging) Log.Debug("Read returned end-of-stream, our side not yet closed");
HandleClose(info, closeCommander, PeerClosed.Instance);
}

break;
case ReadResultType.ReadError:
HandleError(info.Handler, read.Error);
break;
}
}
catch (SocketException cause)
{
HandleError(info.Handler, cause);
}
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private ReadResult FlushPipe(ConnectionInfo info, int remainingLimit, Pipe receivePipe, SocketReceivePipeUpdated update)
{
if (remainingLimit <= 0)
return ReadResult.AllRead;

if (update.IsFaulted)
return new ReadResult(ReadResultType.ReadError, update.Error.Value);

var readBytes = update.BytesReceived;
if (traceLogging)
Log.Debug("Read [{0}] bytes.", readBytes);

if (readBytes > 0)
{
receivePipe.Writer.Advance(readBytes);
receivePipe.Writer.FlushAsync().AsTask().PipeTo(Self, Self, result => new SocketReceivePipeFlushCompleted(result));
return ReadResult.AllRead;
}

if (readBytes == 0)
{
receivePipe.Writer.Complete();
return ReadResult.EndOfStream;
}

throw new IllegalStateException($"Unexpected value returned from read: {readBytes}");
}

private void DoRead(ConnectionInfo info, IActorRef closeCommander)
{
//TODO: What should we do if reading is suspended with an oustanding read - this will discard the read
Expand Down Expand Up @@ -414,9 +515,10 @@ private void DoRead(ConnectionInfo info, IActorRef closeCommander)
if (traceLogging) Log.Debug("Read returned end-of-stream, our side not yet closed");
HandleClose(info, closeCommander, PeerClosed.Instance);
}

break;
case ReadResultType.ReadError:
HandleError(info.Handler, new SocketException((int)read.Error));
HandleError(info.Handler, read.Error);
break;
}
}
Expand Down Expand Up @@ -512,7 +614,7 @@ private void DoCloseConnection(ConnectionInfo info, IActorRef closeCommander, Co
StopWith(new CloseInformation(notifications, closedEvent));
}

private void HandleError(IActorRef handler, SocketException exception)
private void HandleError(IActorRef handler, Exception exception)
{
Log.Debug("Closing connection due to IO error {0}", exception);
StopWith(new CloseInformation(new HashSet<IActorRef>(new[] { handler }), new ErrorClosed(exception.Message)));
Expand All @@ -534,18 +636,26 @@ private bool SafeShutdownOutput()

protected void AcquireSocketAsyncEventArgs()
{
#if NETSTANDARD2_1
ReceivePipe = new Pipe();
#else
if (ReceiveArgs != null) throw new InvalidOperationException($"Cannot acquire receive SocketAsyncEventArgs. It's already has been initialized");
if (SendArgs != null) throw new InvalidOperationException($"Cannot acquire send SocketAsyncEventArgs. It's already has been initialized");

ReceiveArgs = Tcp.SocketEventArgsPool.Acquire(Self);
var buffer = Tcp.BufferPool.Rent();
ReceiveArgs.SetBuffer(buffer.Array, buffer.Offset, buffer.Count);

#endif

SendArgs = Tcp.SocketEventArgsPool.Acquire(Self);
}

private void ReleaseSocketAsyncEventArgs()
{
#if NETSTANDARD2_1
ReceivePipe.Writer.Complete();
ReceivePipe.Reader.Complete();
#else
if (ReceiveArgs != null)
{
var buffer = new ByteBuffer(ReceiveArgs.Buffer, ReceiveArgs.Offset, ReceiveArgs.Count);
Expand All @@ -554,7 +664,8 @@ private void ReleaseSocketAsyncEventArgs()
Tcp.BufferPool.Release(buffer);
ReceiveArgs = null;
}

#endif

if (SendArgs != null)
{
Tcp.SocketEventArgsPool.Release(SendArgs);
Expand Down Expand Up @@ -594,10 +705,20 @@ private void ReceiveAsync()
{
if (!HasStatus(ConnectionStatus.Receiving))
{
SetStatus(ConnectionStatus.Receiving);

#if NETSTANDARD2_1
// TODO: Set some buffer size here
var memory = ReceivePipe.Writer.GetMemory(/*512*/);
Socket.ReceiveAsync(memory, SocketFlags.None)
.AsTask()
.PipeTo(Self, Self,
success: bytesReceived => new SocketReceivePipeUpdated(bytesReceived),
failure: ex => new SocketReceivePipeUpdated(ex));
#else
if (!Socket.ReceiveAsync(ReceiveArgs))
Self.Tell(SocketReceived.Instance);

SetStatus(ConnectionStatus.Receiving);
#endif
}
}

Expand Down Expand Up @@ -708,9 +829,15 @@ private struct ReadResult
public static readonly ReadResult AllRead = new ReadResult(ReadResultType.AllRead, SocketError.Success);

public readonly ReadResultType Type;
public readonly SocketError Error;
public readonly Exception Error;

public ReadResult(ReadResultType type, SocketError error)
{
Type = type;
Error = new SocketException((int)error);
}

public ReadResult(ReadResultType type, Exception error)
{
Type = type;
Error = error;
Expand Down