Skip to content

Update Akka.IO to use System.IO.Pipelines for reading from socket#4017

Open
IgorFedchenko wants to merge 3 commits intoakkadotnet:devfrom
IgorFedchenko:akka-io-to-pipeplnes
Open

Update Akka.IO to use System.IO.Pipelines for reading from socket#4017
IgorFedchenko wants to merge 3 commits intoakkadotnet:devfrom
IgorFedchenko:akka-io-to-pipeplnes

Conversation

@IgorFedchenko
Copy link
Contributor

This is going to be a major part of work (if not the whole part) on the issue #3328 .

In particular, currently all sockets communication is implemented in TcpConnection actor class (with some helpers involved), and uses SocketAsyncEventArgs to handle connections. In this PR I am going to substitute this logic with using Pipe class from System.IO.Pipelines, which helps reading network stream.

@IgorFedchenko
Copy link
Contributor Author

As a first step, had to update Akka to net461 target, because System.IO.Pipelines nuget package requires .NET 4.6+

@IgorFedchenko
Copy link
Contributor Author

IgorFedchenko commented Nov 1, 2019

Sending byte arrays (which are getting from commander actor's messages) via network does not require Pipelines API, because we always getting ByteString of specific size and can just put it on the socket.

Even more then that, looking at the usage of protected SocketAsyncEventArgs SendArgs; defined, it is clear that it is not used, at all. It is acquired on connection start, and released to the pool once connection is finished. The only usage is passing to PendingBufferWrite helper class, but it is not used there at all.

With that said, currently sending data is blocking in the actor. Basically, when Write command is received, in ends up with been sent to the socket in the blocking while loop (getting transferred bytes count until all data is sent). Instead, we could use socket.SendAsync(sendEventArgs) to perform operation asynchronously - in that case, actor could receive more messages until send operation is finished, and wait for it to finish only if another Write command is coming.
Or even enqueue incoming write commands and dequeue them once previous SendAsync is finished (in the callback). To not allow too much buffering we can start making send's blocking once buffer is growing (i.e. get getting another Write command, wait until current SendAsync finishes before putting new one into the queue). This is what Pipelines library does with their Pipe.

At least, this might me an issue when actor is actively sending and receiving messages of not-small-enough size. I do not thing that this might influent #4015 Akka.IO benchmark, because send-receive cycle there is sequencial and there should not be any commands waiting until the blocking Send will be finished.

@Aaronontheweb This optimization is not related to this PR directly, and can be done in separate PR. Should I make separate issue for this? This might be a good piece of work, because after performing blocking Send some other things are done and this should be accurately moved to the callback and not break actors internal state management.

@Aaronontheweb
Copy link
Member

Aaronontheweb commented Nov 1, 2019 via email

@IgorFedchenko
Copy link
Contributor Author

IgorFedchenko commented Nov 2, 2019

Now I got some understanding of how data receiving works in TcpConnection actor.

The way how receiving is implemented is fine: once connected, actor starts async reading with callback on finish, and in the callback sends SocketReceived message to himself. When receiving that message, it makes Array.Copy from socket result buffer to new byte[], which is wrapped to ByteString and sent to subscriber.

There is no much profit of using System.IO.Pipelines.Pipe class here. The point of using pipe is that we can ignore the fact that data is received by chunks, ignore all that buffering stuff, and work with incoming data like with continuous sequence of bytes.
But here, we do not need that - we are receiving chunks of data (this could be two chunks like ['h', 'e', 'l'] and ['l', 'o'] for hello word transferred), and just sending them as is in 2 ByteString instances to subscriber.

So there is no much space for optimization.
BUT: I think this might be an issue that we are performing copy each time data is received. This may be possible to update ByteString to contain some ReadOnlyCollection<byte> instead of ArraySegment<byte>, and use pipeline API to make socket read populate that ReadOnlyCollection<byte> collection directly. Something like this:

// reader is getting data from pipe.Writer, which is getting data from socket with minimal allocations
ReadResult result = await reader.ReadAsync(); 
ReadOnlySequence<byte> buffer = result.Buffer;
// Put obtained data to ByteString without allocations
var byteStringToSend = ByteString.CopyFrom(buffer); // Or mabe .Slice() call will be needed?
// Mark that part of buffer as already handled
reader.AdvanceTo(buffer.Start, buffer.End);

This way we will not perform additional allocations, which may improve throughput.
This is an idea, need to try it. @Aaronontheweb What do you think? I am not very familiar with ByteString implementation yet, so does this sound doable?

@IgorFedchenko
Copy link
Contributor Author

IgorFedchenko commented Nov 2, 2019

And one more thing we need to keep in mind that reading from socket to this pipe directly (socket.ReadAsync(pipeMemory)) will require netcoreapp2.1 (or netstandard2.1) to be the target - this API is not supported by .NET Framework and .NET Standard 2.0.
So I am going to change NetStandardLibVersion in common.props from netstandard2.0 to netstandard2.1

Update: just realized that to keep .NET Framework support, I will need to put some compiler directives to switch implementation between SocketAsyncEventArgs and System.IO.Pipelines.Pipe usage... Is it still a good idea?

@Horusiath
Copy link
Contributor

Horusiath commented Nov 4, 2019

@IgorFedchenko my initial idea was to make ByteString cast-able to ReadOnlySequence<byte> (technically it works in very similar manner).

Regarding SendAsync assuming the modern async variant available in .NET Standard 2.0 - I'm not sure how much of use does it give to us, if the actor itself won't support async method state machine. The reason why actor post a send message on socket completion is exactly because in Akka, an actor itself is a state machine (if you'd unwrap async method call, the code output generated by C# compiler wouldn't be conceptually that much different from the way how actor itself operates).

Also there was an old SendAsync code, which was non-blocking, but not async/await compatible, but @Aaronontheweb removed it 1-2 years ago, as it caused some bugs somewhere close to Akka.NET v1.2. Keep that in mind.

@IgorFedchenko
Copy link
Contributor Author

@Horusiath About SendAsync - indeed, once I started working on that PR, I found that some part was already implemented (SocketSent message already implemented and handled, etc). I just was going to ask if it was already there and was removed for some reason... Thanks for your note, now I am sure that there is something tricky :)

For modern async option I was going to use PipeTo, and send SocketSent message from there. But I guess the same reason why it was removed before will still remain, so will try to investigate this.

@IgorFedchenko
Copy link
Contributor Author

IgorFedchenko commented Nov 4, 2019

Updated TcpConnection to use Pipe class from Pipelines library, when targeting netstandard2.1. For now, just updated common.props to use new standard version instead of previous one, but we can add netstandard2.0 there later in this PR. Just making proof-of-concept work now.

So far, instead of just receiving data via SocketAsyncEventArgs (which is still there for old targets), I have added Pipe class usage: data is read to pipe.Writer's buffer, then it is flushed to the reader, and then reader is reading data from pipe and sends message to consumer.

First optimization we might get here is that we can start reading new data from socket right after it was read to the writer's buffer - even before it was flushed to the reader (and of course before data is converted to ByteString and sent to the consumer).

Second optimization is still in TODO state - I am going to update ByteString to use ReadOnlySequence<byte> or ReadOnlySpan<byte> instead of ArraySegment<byte> internally, and pass reader result there. I am still thinking about how to use it better, and is it possible to gain any performance improvements here (relative to just copying ReadOnlySequence<byte> to it's internal byte[] field).

@Aaronontheweb
Copy link
Member

Also there was an old SendAsync code, which was non-blocking, but not async/await compatible, but @Aaronontheweb removed it 1-2 years ago, as it caused some bugs somewhere close to Akka.NET v1.2. Keep that in mind.

The code I removed was broken - IIRC sending order wasn't preserved due to how it was implemented. Opted for synchronous send just to simplify things and get the fix out there. Re-visiting things now to make better use of I/O wait times seems like a good idea.

@Aaronontheweb
Copy link
Member

Update: just realized that to keep .NET Framework support, I will need to put some compiler directives to switch implementation between SocketAsyncEventArgs and System.IO.Pipelines.Pipe usage... Is it still a good idea?

I think it might make things too complicated in the short run. Why don't we stick with fixing the SocketAsyncEventArgs implementation for the time being and see how that goes?

Copy link
Member

@Aaronontheweb Aaronontheweb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to change some global settings in order to stay on-target for 1.4 release.

<NBenchVersion>1.2.2</NBenchVersion>
<ProtobufVersion>3.10.0</ProtobufVersion>
<NetCoreTestVersion>netcoreapp2.1</NetCoreTestVersion>
<NetCoreTestVersion>netcoreapp3.0</NetCoreTestVersion>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hold off on this if you can - requires a much bigger set of changes on the build system side to support it.

<NetStandardLibVersion>netstandard2.0</NetStandardLibVersion>
<NetFrameworkLibVersion>net452</NetFrameworkLibVersion>
<NetStandardLibVersion>netstandard2.1</NetStandardLibVersion>
<NetFrameworkLibVersion>net461</NetFrameworkLibVersion>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once we figure out what to do with the parts of Akka.NET that depend on System.Configuration, we will drop explicit .NET Framework support and just stick to standard.

<NetFrameworkTestVersion>net461</NetFrameworkTestVersion>
<NetStandardLibVersion>netstandard2.0</NetStandardLibVersion>
<NetFrameworkLibVersion>net452</NetFrameworkLibVersion>
<NetStandardLibVersion>netstandard2.1</NetStandardLibVersion>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like I mentioned in PR comments, stick to .NET Standard 2.0 for the time being - we can add the pipelines stuff in later. Let's just get SocketAsyncEventArgs usage optimized / improved doing what we can for now.

@IgorFedchenko IgorFedchenko marked this pull request as ready for review November 4, 2019 19:31
@IgorFedchenko
Copy link
Contributor Author

@Aaronontheweb I am suggesting to keep this PR as is, because sooner or later we might be able to add .netstandard2.1 target. At least, work with Pipelines to read the data is implemented here, and working (at least for simple communication like in Akka.IO benchmark).

So, it is still some progress for #3328 , but we will just hold this until the time we will be able to target ,netstandard2.1. What do you think?

And I will make performance fixes (like for #4018 ) in other PRs.

@Aaronontheweb
Copy link
Member

I am suggesting to keep this PR as is, because sooner or later we might be able to add .netstandard2.1 target. At least, work with Pipelines to read the data is implemented here, and working (at least for simple communication like in Akka.IO benchmark).

Sounds good.

@Aaronontheweb Aaronontheweb added this to the 1.5.0 milestone Feb 12, 2020
@imkow
Copy link

imkow commented Feb 18, 2020

after updated to .Net core 3.1, this could be merged sometime

@Aaronontheweb Aaronontheweb modified the milestones: 1.5.0, 1.5.1 Mar 2, 2023
@Aaronontheweb Aaronontheweb modified the milestones: 1.5.1, 1.5.2 Mar 15, 2023
@Aaronontheweb Aaronontheweb modified the milestones: 1.5.2, 1.5.3 Apr 6, 2023
@Aaronontheweb Aaronontheweb modified the milestones: 1.5.3, 1.5.4 Apr 20, 2023
@Aaronontheweb Aaronontheweb added this to the 1.5.5 milestone Apr 25, 2023
@Aaronontheweb Aaronontheweb modified the milestones: 1.5.5, 1.5.6, 1.5.7 May 4, 2023
@Aaronontheweb Aaronontheweb modified the milestones: 1.5.7, 1.5.8 May 17, 2023
@Aaronontheweb Aaronontheweb modified the milestones: 1.5.8, 1.5.9, 1.6.0 Jun 15, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants