Skip to content
This repository was archived by the owner on Jan 23, 2023. It is now read-only.

Commit ce294a3

Browse files
committed
Merge pull request #2878 from Maxwe11/pipes-completion-source
Consolidate *CompletionSources logic in System.IO.Pipes
2 parents db6af39 + 6e448d6 commit ce294a3

File tree

5 files changed

+110
-236
lines changed

5 files changed

+110
-236
lines changed

src/System.IO.Pipes/src/System.IO.Pipes.csproj

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,8 +131,9 @@
131131
<Compile Include="System\IO\Pipes\ConnectionCompletionSource.cs" />
132132
<Compile Include="System\IO\Pipes\NamedPipeClientStream.Windows.cs" />
133133
<Compile Include="System\IO\Pipes\NamedPipeServerStream.Windows.cs" />
134-
<Compile Include="System\IO\Pipes\PipeIOCompletionSource.cs" />
134+
<Compile Include="System\IO\Pipes\PipeCompletionSource.cs" />
135135
<Compile Include="System\IO\Pipes\PipeStream.Windows.cs" />
136+
<Compile Include="System\IO\Pipes\ReadWriteCompletionSource.cs" />
136137
</ItemGroup>
137138
<ItemGroup Condition=" '$(TargetsUnix)' == 'true' ">
138139
<Compile Include="Microsoft\Win32\SafeHandles\SafePipeHandle.Unix.cs" />

src/System.IO.Pipes/src/System/IO/Pipes/ConnectionCompletionSource.cs

Lines changed: 7 additions & 170 deletions
Original file line numberDiff line numberDiff line change
@@ -1,204 +1,41 @@
11
// Copyright (c) Microsoft. All rights reserved.
22
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
33

4-
using System.Diagnostics;
5-
using System.Runtime.InteropServices;
6-
using System.Security;
74
using System.Threading;
8-
using System.Threading.Tasks;
95

106
namespace System.IO.Pipes
117
{
12-
internal unsafe sealed class ConnectionCompletionSource : TaskCompletionSource<VoidResult>
8+
internal sealed class ConnectionCompletionSource : PipeCompletionSource<VoidResult>
139
{
14-
private const int NoResult = 0;
15-
private const int ResultSuccess = 1;
16-
private const int ResultError = 2;
17-
private const int RegisteringCancellation = 4;
18-
private const int CompletedCallback = 8;
19-
2010
private readonly NamedPipeServerStream _serverStream;
21-
private readonly ThreadPoolBoundHandle _threadPoolBinding;
22-
23-
private CancellationTokenRegistration _cancellationRegistration;
24-
private CancellationToken _cancellationToken;
25-
private int _errorCode;
26-
private NativeOverlapped* _overlapped;
27-
private int _state;
28-
29-
#if DEBUG
30-
private bool _cancellationHasBeenRegistered;
31-
#endif
3211

3312
// Using RunContinuationsAsynchronously for compat reasons (old API used ThreadPool.QueueUserWorkItem for continuations)
3413
internal ConnectionCompletionSource(NamedPipeServerStream server, CancellationToken cancellationToken)
35-
: base(TaskCreationOptions.RunContinuationsAsynchronously)
14+
: base(server._threadPoolBinding, cancellationToken, pinData: null)
3615
{
37-
Debug.Assert(server != null, "server is null");
38-
Debug.Assert(server._threadPoolBinding != null, "server._threadPoolBinding is null");
39-
40-
_threadPoolBinding = server._threadPoolBinding;
4116
_serverStream = server;
42-
_cancellationToken = cancellationToken;
43-
44-
_overlapped = _threadPoolBinding.AllocateNativeOverlapped((errorCode, numBytes, pOverlapped) =>
45-
{
46-
var completionSource = (ConnectionCompletionSource)ThreadPoolBoundHandle.GetNativeOverlappedState(pOverlapped);
47-
Debug.Assert(completionSource.Overlapped == pOverlapped);
48-
49-
completionSource.AsyncCallback(errorCode, numBytes);
50-
}, this, null);
51-
52-
_state = NoResult;
53-
}
54-
55-
internal NativeOverlapped* Overlapped
56-
{
57-
[SecurityCritical]get { return _overlapped; }
58-
}
59-
60-
internal void RegisterForCancellation()
61-
{
62-
#if DEBUG
63-
Debug.Assert(!_cancellationHasBeenRegistered, "Cannot register for cancellation twice");
64-
_cancellationHasBeenRegistered = true;
65-
#endif
66-
67-
// Quick check to make sure that the cancellation token supports cancellation, and that the IO hasn't completed
68-
if (_cancellationToken.CanBeCanceled && Overlapped != null)
69-
{
70-
// Register the cancellation only if the IO hasn't completed
71-
int state = Interlocked.CompareExchange(ref _state, RegisteringCancellation, NoResult);
72-
if (state == NoResult)
73-
{
74-
// Register the cancellation
75-
_cancellationRegistration = _cancellationToken.Register(thisRef => ((ConnectionCompletionSource)thisRef).Cancel(), this);
76-
77-
// Grab the state for case if IO completed while we were setting the registration.
78-
state = Interlocked.Exchange(ref _state, NoResult);
79-
}
80-
else if (state != CompletedCallback)
81-
{
82-
// IO already completed and we have grabbed result state.
83-
// Set NoResult to prevent invocation of CompleteCallback(result state) from AsyncCallback(...)
84-
state = Interlocked.Exchange(ref _state, NoResult);
85-
}
86-
87-
// If we have the result state of completed IO call CompleteCallback(result).
88-
// Otherwise IO not completed.
89-
if (state == ResultSuccess || state == ResultError)
90-
{
91-
CompleteCallback(state);
92-
}
93-
}
94-
}
95-
96-
internal void ReleaseResources()
97-
{
98-
_cancellationRegistration.Dispose();
99-
100-
// NOTE: The cancellation must *NOT* be running at this point, or it may observe freed memory
101-
// (this is why we disposed the registration above)
102-
if (Overlapped != null)
103-
{
104-
_threadPoolBinding.FreeNativeOverlapped(Overlapped);
105-
_overlapped = null;
106-
}
10717
}
10818

109-
internal void SetCompletedSynchronously()
19+
internal override void SetCompletedSynchronously()
11020
{
11121
_serverStream.State = PipeState.Connected;
11222
TrySetResult(default(VoidResult));
11323
}
11424

115-
private void AsyncCallback(uint errorCode, uint numBytes)
25+
protected override void AsyncCallback(uint errorCode, uint numBytes)
11626
{
11727
// Special case for when the client has already connected to us.
11828
if (errorCode == Interop.mincore.Errors.ERROR_PIPE_CONNECTED)
11929
{
12030
errorCode = 0;
12131
}
12232

123-
_errorCode = (int)errorCode;
124-
125-
int resultState = errorCode == 0 ? ResultSuccess : ResultError;
126-
127-
// Store the result so that other threads can observe it
128-
// and if no other thread is registering cancellation, continue.
129-
// Otherwise CompleteCallback(resultState) will be invoked by RegisterForCancellation().
130-
if (Interlocked.Exchange(ref _state, resultState) == NoResult)
131-
{
132-
// Now try to prevent invocation of CompleteCallback(resultState) from RegisterForCancellation().
133-
// Otherwise, thread responsible for registering cancellation stole the result and it will invoke CompleteCallback(resultState).
134-
if (Interlocked.Exchange(ref _state, CompletedCallback) != NoResult)
135-
{
136-
CompleteCallback(resultState);
137-
}
138-
}
33+
base.AsyncCallback(errorCode, numBytes);
13934
}
14035

141-
/// <summary>
142-
/// Cancellation is not guaranteed to succeed.
143-
/// We ignore all errors here because operation could
144-
/// succeed just before it was called or someone already
145-
/// cancelled this operation without using token which should
146-
/// be manually detected - when operation finishes we should
147-
/// compare error code to ERROR_OPERATION_ABORTED and if cancellation
148-
/// token was not used to cancel we will throw.
149-
/// </summary>
150-
private void Cancel()
36+
protected override void HandleError(int errorCode)
15137
{
152-
// Storing to locals to avoid data races
153-
SafeHandle handle = _threadPoolBinding.Handle;
154-
NativeOverlapped* overlapped = Overlapped;
155-
156-
Debug.Assert(overlapped != null && !Task.IsCompleted, "IO should not have completed yet");
157-
158-
// If the handle is still valid, attempt to cancel the IO
159-
if (!handle.IsInvalid)
160-
{
161-
if (!Interop.mincore.CancelIoEx(handle, overlapped))
162-
{
163-
// This case should not have any consequences although
164-
// it will be easier to debug if there exists any special case
165-
// we are not aware of.
166-
int errorCode = Marshal.GetLastWin32Error();
167-
Debug.WriteLine("CancelIoEx finished with error code {0}.", errorCode);
168-
}
169-
}
170-
}
171-
172-
private void CompleteCallback(int resultState)
173-
{
174-
Debug.Assert(resultState == ResultSuccess || resultState == ResultError, "Unexpected result state " + resultState);
175-
176-
ReleaseResources();
177-
178-
if (resultState == ResultError)
179-
{
180-
if (_errorCode == Interop.mincore.Errors.ERROR_OPERATION_ABORTED)
181-
{
182-
if (_cancellationToken.CanBeCanceled && !_cancellationToken.IsCancellationRequested)
183-
{
184-
// If this is unexpected abortion
185-
TrySetException(__Error.GetOperationAborted());
186-
}
187-
else
188-
{
189-
// otherwise set canceled
190-
TrySetCanceled(_cancellationToken);
191-
}
192-
}
193-
else
194-
{
195-
TrySetException(Win32Marshal.GetExceptionForWin32Error(_errorCode));
196-
}
197-
}
198-
else
199-
{
200-
SetCompletedSynchronously();
201-
}
38+
TrySetException(Win32Marshal.GetExceptionForWin32Error(errorCode));
20239
}
20340
}
20441

0 commit comments

Comments
 (0)