Skip to content

Commit ea09402

Browse files
authored
Async SASL (#618)
* Async SASL * Fix netmf build on vs2015
1 parent efc6efb commit ea09402

File tree

10 files changed

+358
-35
lines changed

10 files changed

+358
-35
lines changed

csproj/Amqp.NetFX40.csproj

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,9 @@
147147
<Link>Transactions\ResourceManager.cs</Link>
148148
</Compile>
149149
<Compile Include="..\src\Net\BufferManager.cs" />
150+
<Compile Include="..\src\Sasl\AsyncSaslProfile.cs">
151+
<Link>Sasl\AsyncSaslProfile.cs</Link>
152+
</Compile>
150153
<Compile Include="..\src\Types\IStringDecoder.cs">
151154
<Link>Types\IStringDecoder.cs</Link>
152155
</Compile>

csproj/Amqp.NetFX45.csproj

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,9 @@
141141
<Link>Transactions\ResourceManager.cs</Link>
142142
</Compile>
143143
<Compile Include="..\src\Net\BufferManager.cs" />
144+
<Compile Include="..\src\Sasl\AsyncSaslProfile.cs">
145+
<Link>Sasl\AsyncSaslProfile.cs</Link>
146+
</Compile>
144147
<Compile Include="..\src\Types\IStringDecoder.cs">
145148
<Link>Types\IStringDecoder.cs</Link>
146149
</Compile>

csproj/Amqp.Uwp.csproj

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,9 @@
226226
<Link>Properties\Version.cs</Link>
227227
</Compile>
228228
<Compile Include="..\src\ReceiverLink.cs" />
229+
<Compile Include="..\src\Sasl\AsyncSaslProfile.cs">
230+
<Link>Sasl\AsyncSaslProfile.cs</Link>
231+
</Compile>
229232
<Compile Include="..\src\Sasl\SaslChallenge.cs">
230233
<Link>Sasl\SaslChallenge.cs</Link>
231234
</Compile>

src/Amqp.Net.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
<DefineConstants>$(DefineConstants);NETFX</DefineConstants>
1010
<IntermediateOutputPath>..\obj\$(Configuration)\$(MSBuildProjectName)\</IntermediateOutputPath>
1111
<OutputPath>..\bin\$(Configuration)\$(MSBuildProjectName)\</OutputPath>
12+
<DocumentationFile>..\bin\$(Configuration)\$(MSBuildProjectName)\$(AssemblyName).XML</DocumentationFile>
1213
</PropertyGroup>
1314

1415
<ItemGroup>

src/Net/AsyncPump.cs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,10 @@ public void Start(Connection connection, Action<Exception> onException = null)
3838
Task task = this.StartAsync(connection, onException);
3939
}
4040

41-
public async Task PumpAsync(uint maxFrameSize, Func<ProtocolHeader, bool> onHeader, Func<ByteBuffer, bool> onBuffer)
41+
public async Task PumpAsync(uint maxFrameSize,
42+
Func<ProtocolHeader, bool> onHeader,
43+
Func<ByteBuffer, bool> onBuffer,
44+
Func<ByteBuffer, Task<bool>> onBufferAsync = null)
4245
{
4346
byte[] header = new byte[FixedWidth.ULong];
4447

@@ -73,7 +76,10 @@ public async Task PumpAsync(uint maxFrameSize, Func<ProtocolHeader, bool> onHead
7376
buffer.Append(frameSize);
7477
Trace.WriteBuffer("RECV {0}", buffer.Buffer, buffer.Offset, buffer.Length);
7578

76-
if (!onBuffer(buffer))
79+
var pending = onBufferAsync != null ?
80+
await onBufferAsync(buffer).ConfigureAwait(false) :
81+
onBuffer(buffer);
82+
if (!pending)
7783
{
7884
break;
7985
}

src/Net/TaskExtensions.cs

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -511,9 +511,30 @@ await pump.PumpAsync(
511511
saslProfile.OnHeader(myHeader, header);
512512
return true;
513513
},
514-
buffer =>
514+
null,
515+
async buffer =>
515516
{
516-
return saslProfile.OnFrame(hostname, writer, buffer, out code);
517+
bool shouldContinue = saslProfile.OnFrame(hostname, buffer, out var response, out code);
518+
if (response != null)
519+
{
520+
var responseTask = response as AsyncSaslProfile.ResponseTask;
521+
if (responseTask != null)
522+
{
523+
response = await responseTask.Task;
524+
if (response != null && response.Descriptor.Code == Codec.SaslOutcome.Code)
525+
{
526+
code = ((SaslOutcome)response).Code;
527+
shouldContinue = false;
528+
}
529+
}
530+
531+
if (response != null)
532+
{
533+
SaslProfile.SendCommand(writer, response);
534+
}
535+
}
536+
537+
return shouldContinue;
517538
}).ConfigureAwait(false);
518539

519540
await writer.FlushAsync().ConfigureAwait(false);

src/Sasl/AsyncSaslProfile.cs

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
// ------------------------------------------------------------------------------------
2+
// Copyright (c) Microsoft Corporation
3+
// All rights reserved.
4+
//
5+
// Licensed under the Apache License, Version 2.0 (the ""License""); you may not use this
6+
// file except in compliance with the License. You may obtain a copy of the License at
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// THIS CODE IS PROVIDED *AS IS* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
10+
// EITHER EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION ANY IMPLIED WARRANTIES OR
11+
// CONDITIONS OF TITLE, FITNESS FOR A PARTICULAR PURPOSE, MERCHANTABLITY OR
12+
// NON-INFRINGEMENT.
13+
//
14+
// See the Apache Version 2.0 License for specific language governing permissions and
15+
// limitations under the License.
16+
// ------------------------------------------------------------------------------------
17+
18+
namespace Amqp.Sasl
19+
{
20+
using System;
21+
using System.Runtime.CompilerServices;
22+
using System.Threading.Tasks;
23+
using Amqp.Types;
24+
25+
/// <summary>
26+
/// A <seealso cref="SaslProfile"/> that processes commands asynchronously.
27+
/// </summary>
28+
public abstract class AsyncSaslProfile : SaslProfile
29+
{
30+
/// <summary>
31+
/// <inheritdoc cref="SaslProfile.SaslProfile(Symbol)"/>
32+
/// </summary>
33+
public AsyncSaslProfile(Symbol mechanism)
34+
: base(mechanism)
35+
{
36+
}
37+
38+
/// <summary>
39+
/// Processes the received command asynchronously and returns a response. If returns
40+
/// null, the SASL handshake completes.
41+
/// </summary>
42+
/// <param name="command">The SASL command received from the peer.</param>
43+
/// <returns>A Task that returns a SASL command as a response to the incoming command.</returns>
44+
protected abstract Task<DescribedList> OnCommandAsync(DescribedList command);
45+
46+
/// <summary>
47+
/// <inheritdoc cref="SaslProfile.OnCommand(DescribedList)"/>
48+
/// </summary>
49+
protected sealed override DescribedList OnCommand(DescribedList command)
50+
{
51+
var task = this.OnCommandAsync(command);
52+
return new ResponseTask(task);
53+
}
54+
55+
// Just a wrapper of task that also satisfies the OnCommand contract.
56+
internal sealed class ResponseTask : DescribedList
57+
{
58+
static readonly Descriptor descriptor = new Descriptor(ulong.MaxValue, string.Empty);
59+
readonly Task<DescribedList> task;
60+
61+
public ResponseTask(Task<DescribedList> task)
62+
: base(descriptor, 0)
63+
{
64+
this.task = task;
65+
}
66+
67+
public Task<DescribedList> Task => this.task;
68+
69+
internal override void ReadField(ByteBuffer buffer, int index, byte formatCode)
70+
{
71+
throw new NotImplementedException();
72+
}
73+
74+
internal override void WriteField(ByteBuffer buffer, int index)
75+
{
76+
throw new NotImplementedException();
77+
}
78+
}
79+
}
80+
}

src/Sasl/SaslProfile.cs

Lines changed: 15 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -75,17 +75,20 @@ internal ITransport Open(string hostname, ITransport transport)
7575
this.OnHeader(myHeader, theirHeader);
7676

7777
SaslCode code = SaslCode.SysTemp;
78-
while (true)
78+
bool shouldContinue = true;
79+
while (shouldContinue)
7980
{
8081
ByteBuffer buffer = Reader.ReadFrameBuffer(transport, new byte[4], MaxFrameSize);
8182
if (buffer == null)
8283
{
8384
throw new OperationCanceledException(Fx.Format(SRAmqp.TransportClosed, transport.GetType().Name));
8485
}
8586

86-
if (!this.OnFrame(hostname, transport, buffer, out code))
87+
DescribedList response = null;
88+
shouldContinue = this.OnFrame(hostname, buffer, out response, out code);
89+
if (response != null)
8790
{
88-
break;
91+
SendCommand(transport, response);
8992
}
9093
}
9194

@@ -113,7 +116,7 @@ internal ProtocolHeader Start(ITransport transport, DescribedList command)
113116

114117
if (command != null)
115118
{
116-
this.SendCommand(transport, command);
119+
SendCommand(transport, command);
117120
}
118121

119122
return myHeader;
@@ -129,14 +132,15 @@ internal void OnHeader(ProtocolHeader myHeader, ProtocolHeader theirHeader)
129132
}
130133
}
131134

132-
internal bool OnFrame(string hostname, ITransport transport, ByteBuffer buffer, out SaslCode code)
135+
internal bool OnFrame(string hostname, ByteBuffer buffer, out DescribedList response, out SaslCode code)
133136
{
134137
ushort channel;
135138
DescribedList command;
136139
Frame.Decode(buffer, out channel, out command);
137140
Trace.WriteLine(TraceLevel.Frame, "RECV {0}", command);
138141

139142
bool shouldContinue = true;
143+
response = null;
140144
if (command.Descriptor.Code == Codec.SaslOutcome.Code)
141145
{
142146
code = ((SaslOutcome)command).Code;
@@ -162,24 +166,16 @@ internal bool OnFrame(string hostname, ITransport transport, ByteBuffer buffer,
162166
throw new AmqpException(ErrorCode.NotImplemented, mechanisms.ToString());
163167
}
164168

165-
DescribedList init = this.GetStartCommand(hostname);
166-
if (init != null)
167-
{
168-
this.SendCommand(transport, init);
169-
}
169+
response = this.GetStartCommand(hostname);
170170
}
171171
else
172172
{
173173
code = SaslCode.Ok;
174-
DescribedList response = this.OnCommand(command);
175-
if (response != null)
174+
response = this.OnCommand(command);
175+
if (response != null && response.Descriptor.Code == Codec.SaslOutcome.Code)
176176
{
177-
this.SendCommand(transport, response);
178-
if (response.Descriptor.Code == Codec.SaslOutcome.Code)
179-
{
180-
code = ((SaslOutcome)response).Code;
181-
shouldContinue = false;
182-
}
177+
code = ((SaslOutcome)response).Code;
178+
shouldContinue = false;
183179
}
184180
}
185181

@@ -231,7 +227,7 @@ protected virtual bool Match(Symbol mechanism)
231227
/// <returns>A SASL command as a response to the incoming command.</returns>
232228
protected abstract DescribedList OnCommand(DescribedList command);
233229

234-
void SendCommand(ITransport transport, DescribedList command)
230+
internal static void SendCommand(ITransport transport, DescribedList command)
235231
{
236232
ByteBuffer buffer = new ByteBuffer(Frame.CmdBufferSize, true);
237233
Frame.Encode(buffer, FrameType.Sasl, 0, command);

0 commit comments

Comments
 (0)