Skip to content

Commit 2b689b7

Browse files
committed
NEW: feature for p2p messaging
1 parent ae02c79 commit 2b689b7

File tree

8 files changed

+201
-78
lines changed

8 files changed

+201
-78
lines changed

CliNetliteInstaller/CliNetliteInstaller.wixproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
<ProductVersion>3.10</ProductVersion>
77
<ProjectGuid>f13fb7ef-d892-4229-a1eb-8b595d71d64e</ProjectGuid>
88
<SchemaVersion>2.0</SchemaVersion>
9-
<OutputName>cli-netlite-1.1.0.2</OutputName>
9+
<OutputName>cli-netlite-1.1.0.3</OutputName>
1010
<OutputType>Package</OutputType>
1111
</PropertyGroup>
1212
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|x86' ">

CliNetliteInstaller/Product.wxs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
<?xml version="1.0" encoding="UTF-8"?>
2-
<?define version="1.1.0.2"?>
2+
<?define version="1.1.0.3"?>
33
<?define UpgradeCode="a580c04f-caec-4e18-9134-c4b8f1c8f1be"?>
44
<Wix xmlns="http://schemas.microsoft.com/wix/2006/wi">
55
<Product Id="*"

ClientLib/ClientLib.csproj

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@
3737
<ItemGroup>
3838
<Reference Include="Amqp.Net, Version=1.2.0.0, Culture=neutral, PublicKeyToken=905a7b1e6458e0c3, processorArchitecture=MSIL">
3939
<HintPath>..\packages\AMQPNetLite.1.2.3\lib\net45\Amqp.Net.dll</HintPath>
40-
<Private>True</Private>
4140
</Reference>
4241
<Reference Include="NDesk.Options, Version=0.2.1.0, Culture=neutral, processorArchitecture=MSIL">
4342
<HintPath>..\packages\NDesk.Options.0.2.1\lib\NDesk.Options.dll</HintPath>

ClientLib/CoreClient.cs

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@
1818
using System.Collections.Generic;
1919
using Amqp;
2020
using Amqp.Framing;
21-
21+
using Amqp.Listener;
22+
2223
namespace ClientLib
2324
{
2425
/// <summary>
@@ -33,6 +34,7 @@ public class CoreClient
3334
protected Address address;
3435
protected Session session;
3536
protected Connection connection;
37+
protected ContainerHost containerHost;
3638

3739
/// <summary>
3840
/// Constructor of core class
@@ -102,8 +104,10 @@ protected void SetAddress(string url)
102104
/// </summary>
103105
protected void CreateConnection(ConnectionOptions options)
104106
{
105-
Open open = new Open();
106-
open.ContainerId = Guid.NewGuid().ToString();
107+
Open open = new Open()
108+
{
109+
ContainerId = Guid.NewGuid().ToString()
110+
};
107111
if (options.FrameSize > -1)
108112
open.MaxFrameSize = (uint)options.FrameSize;
109113
if (options.Heartbeat > -1)
@@ -112,6 +116,16 @@ protected void CreateConnection(ConnectionOptions options)
112116
this.connection = new Connection(this.address, null, open, null);
113117
}
114118

119+
/// <summary>
120+
/// Method for create container listener
121+
/// </summary>
122+
/// <param name="options">receiver options</param>
123+
protected void CreateContainerHost(ReceiverOptions options)
124+
{
125+
Uri addressUri = new Uri("amqp://localhost:" + options.RecvListenerPort);
126+
this.containerHost = new ContainerHost(addressUri);
127+
}
128+
115129
/// <summary>
116130
/// Method for create session
117131
/// </summary>

ClientLib/OptionsParser.cs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -412,6 +412,8 @@ public class ReceiverOptions : SenderReceiverOptions
412412
public bool RecvBrowse { get; private set; }
413413
public string MsgSelector { get; private set; }
414414
public bool ProccessReplyTo { get; private set; }
415+
public bool RecvListener { get; private set; }
416+
public int RecvListenerPort { get; private set; }
415417

416418
/// <summary>
417419
/// Constructor
@@ -420,6 +422,7 @@ public ReceiverOptions() : base()
420422
{
421423
//default values
422424
this.Action = String.Empty;
425+
this.RecvListenerPort = 5672;
423426

424427
//add options
425428
this.Add("action=", "action on acquired message [accept, release, reject]",
@@ -430,6 +433,10 @@ public ReceiverOptions() : base()
430433
(string recvSelector) => { this.MsgSelector = recvSelector; });
431434
this.Add("process-reply-to", "reply on reply_on address",
432435
(v) => { this.ProccessReplyTo = true; });
436+
this.Add("recv-listen=", "enable receiver as listener [true, false]",
437+
(bool recvListen) => { this.RecvListener = recvListen; });
438+
this.Add("recv-listen-port=", "port for p2p",
439+
(int port) => { this.RecvListenerPort = port; });
433440
}
434441
}
435442

ClientLib/ReceiverClient.cs

Lines changed: 161 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,12 @@
1515
*/
1616

1717
using System;
18-
using System.Collections;
19-
using System.Collections.Generic;
20-
using System.Collections.ObjectModel;
2118
using System.Transactions;
2219
using Amqp;
2320
using Amqp.Framing;
2421
using Amqp.Types;
22+
using System.Threading.Tasks;
23+
using Amqp.Listener;
2524

2625
namespace ClientLib
2726
{
@@ -38,40 +37,55 @@ public class ReceiverClient : CoreClient
3837
/// <returns>build receiver link</returns>
3938
private ReceiverLink PeprareReceiverLink(ReceiverOptions options)
4039
{
41-
Source recvSource = new Source();
42-
recvSource.Address = options.Address;
43-
40+
Source recvSource = new Source()
41+
{
42+
Address = options.Address
43+
};
4444
if (options.RecvBrowse)
4545
recvSource.DistributionMode = new Symbol("copy");
4646

4747
//source
4848
if (!string.IsNullOrEmpty(options.MsgSelector))
4949
{
50-
Map filters = new Map();
51-
filters.Add(new Symbol("filter"),
52-
new DescribedValue(
50+
Map filters = new Map
51+
{
52+
{
53+
new Symbol("filter"),
54+
new DescribedValue(
5355
new Symbol("apache.org:selector-filter:float"),
54-
options.MsgSelector));
55-
filters.Add(new Symbol("filter1"),
56-
new DescribedValue(
56+
options.MsgSelector)
57+
},
58+
{
59+
new Symbol("filter1"),
60+
new DescribedValue(
5761
new Symbol("apache.org:selector-filter:string"),
58-
options.MsgSelector));
59-
filters.Add(new Symbol("filter2"),
60-
new DescribedValue(
62+
options.MsgSelector)
63+
},
64+
{
65+
new Symbol("filter2"),
66+
new DescribedValue(
6167
new Symbol("apache.org:selector-filter:int"),
62-
options.MsgSelector));
63-
filters.Add(new Symbol("filter3"),
64-
new DescribedValue(
68+
options.MsgSelector)
69+
},
70+
{
71+
new Symbol("filter3"),
72+
new DescribedValue(
6573
new Symbol("apache.org:selector-filter:boolean"),
66-
options.MsgSelector));
67-
filters.Add(new Symbol("filter4"),
68-
new DescribedValue(
74+
options.MsgSelector)
75+
},
76+
{
77+
new Symbol("filter4"),
78+
new DescribedValue(
6979
new Symbol("apache.org:selector-filter:list"),
70-
options.MsgSelector));
71-
filters.Add(new Symbol("filter5"),
72-
new DescribedValue(
80+
options.MsgSelector)
81+
},
82+
{
83+
new Symbol("filter5"),
84+
new DescribedValue(
7385
new Symbol("apache.org:selector-filter:map"),
74-
options.MsgSelector));
86+
options.MsgSelector)
87+
}
88+
};
7589
recvSource.FilterSet = filters;
7690
}
7791
Attach attach = new Attach()
@@ -86,6 +100,83 @@ private ReceiverLink PeprareReceiverLink(ReceiverOptions options)
86100
}
87101
#endregion
88102

103+
#region Listener methods
104+
/// <summary>
105+
/// Method for init container listener
106+
/// </summary>
107+
/// <param name="options">receiver options</param>
108+
private void InitListener(ReceiverOptions options)
109+
{
110+
this.CreateContainerHost(options);
111+
this.containerHost.Open();
112+
this.containerHost.RegisterMessageProcessor(options.Address, new MessageProcessor(options, this.containerHost));
113+
System.Threading.Thread.Sleep(options.Timeout);
114+
}
115+
116+
/// <summary>
117+
/// Private class for handling requests on listener
118+
/// </summary>
119+
class MessageProcessor : IMessageProcessor
120+
{
121+
int received;
122+
int count;
123+
ReceiverOptions options;
124+
ContainerHost host;
125+
126+
/// <summary>
127+
/// Constructor of MessageProcessor
128+
/// </summary>
129+
/// <param name="options">receiver options</param>
130+
/// <param name="host">container host listener</param>
131+
public MessageProcessor(ReceiverOptions options, ContainerHost host)
132+
{
133+
this.received = 0;
134+
this.options = options;
135+
this.count = options.MsgCount;
136+
this.host = host;
137+
}
138+
139+
public int Credit { get { return options.Count; } }
140+
141+
/// <summary>
142+
/// init of message processor
143+
/// </summary>
144+
/// <param name="messageContext">context of messsage</param>
145+
public void Process(MessageContext messageContext)
146+
{
147+
var task = this.ReplyAsync(messageContext);
148+
}
149+
150+
/// <summary>
151+
/// Async tassk for handling requst
152+
/// </summary>
153+
/// <param name="messageContext">context of message</param>
154+
/// <returns>async task</returns>
155+
async Task ReplyAsync(MessageContext messageContext)
156+
{
157+
while (this.received < count)
158+
{
159+
try
160+
{
161+
Message message = messageContext.Message;
162+
Formatter.LogMessage(message, options);
163+
this.received++;
164+
messageContext.Complete();
165+
}
166+
catch (Exception exception)
167+
{
168+
Console.Error.WriteLine("ERROR: {{'cause': '{0}'}}", exception.Message);
169+
break;
170+
}
171+
172+
await Task.Delay(500);
173+
}
174+
host.Close();
175+
}
176+
}
177+
178+
#endregion
179+
89180
#region Receive methods
90181
/// <summary>
91182
/// Method for browse or selector receive
@@ -208,61 +299,68 @@ public void Run(string[] args)
208299
{
209300
this.ParseArguments(args, options);
210301

211-
//init timestamping
212-
this.ptsdata = Utils.TsInit(options.LogStats);
302+
if (options.RecvListener)
303+
{
304+
this.InitListener(options);
305+
}
306+
else
307+
{
308+
//init timestamping
309+
this.ptsdata = Utils.TsInit(options.LogStats);
213310

214-
Utils.TsSnapStore(this.ptsdata, 'B', options.LogStats);
311+
Utils.TsSnapStore(this.ptsdata, 'B', options.LogStats);
215312

216-
this.SetAddress(options.Url);
217-
this.CreateConnection(options);
313+
this.SetAddress(options.Url);
314+
this.CreateConnection(options);
218315

219-
Utils.TsSnapStore(this.ptsdata, 'C', options.LogStats);
316+
Utils.TsSnapStore(this.ptsdata, 'C', options.LogStats);
220317

221-
this.CreateSession();
318+
this.CreateSession();
222319

223-
Utils.TsSnapStore(this.ptsdata, 'D', options.LogStats);
320+
Utils.TsSnapStore(this.ptsdata, 'D', options.LogStats);
224321

225-
ReceiverLink receiver = this.PeprareReceiverLink(options);
322+
ReceiverLink receiver = this.PeprareReceiverLink(options);
226323

227-
Message message = new Message();
324+
Message message = new Message();
228325

229-
this.ts = Utils.GetTime();
326+
this.ts = Utils.GetTime();
230327

231-
Utils.TsSnapStore(this.ptsdata, 'E', options.LogStats);
232-
int nReceived = 0;
328+
Utils.TsSnapStore(this.ptsdata, 'E', options.LogStats);
329+
int nReceived = 0;
233330

234-
if(options.Capacity > -1)
235-
receiver.SetCredit(options.Capacity);
331+
if (options.Capacity > -1)
332+
receiver.SetCredit(options.Capacity);
236333

237-
bool tx_batch_flag = String.IsNullOrEmpty(options.TxLoopendAction) ? (options.TxSize > 0) : true;
334+
bool tx_batch_flag = String.IsNullOrEmpty(options.TxLoopendAction) ? (options.TxSize > 0) : true;
238335

239-
//receiving of messages
240-
if (options.RecvBrowse || !String.IsNullOrEmpty(options.MsgSelector))
241-
this.ReceiveAll(receiver, options);
242-
else
243-
{
244-
if (tx_batch_flag)
245-
this.TransactionReceive(receiver, options);
336+
//receiving of messages
337+
if (options.RecvBrowse || !String.IsNullOrEmpty(options.MsgSelector))
338+
this.ReceiveAll(receiver, options);
246339
else
247-
this.Receive(receiver, options);
248-
}
340+
{
341+
if (tx_batch_flag)
342+
this.TransactionReceive(receiver, options);
343+
else
344+
this.Receive(receiver, options);
345+
}
249346

250-
if (options.CloseSleep > 0)
251-
{
252-
System.Threading.Thread.Sleep(options.CloseSleep);
253-
}
347+
if (options.CloseSleep > 0)
348+
{
349+
System.Threading.Thread.Sleep(options.CloseSleep);
350+
}
254351

255-
//close connection and link
256-
this.CloseLink(receiver);
257-
this.CloseConnection();
352+
//close connection and link
353+
this.CloseLink(receiver);
354+
this.CloseConnection();
258355

259-
Utils.TsSnapStore(this.ptsdata, 'G', options.LogStats);
356+
Utils.TsSnapStore(this.ptsdata, 'G', options.LogStats);
260357

261-
//report timestamping
262-
if (this.ptsdata.Count > 0)
263-
{
264-
Console.WriteLine("STATS " + Utils.TsReport(this.ptsdata,
265-
nReceived, message.Body.ToString().Length * sizeof(Char), 0));
358+
//report timestamping
359+
if (this.ptsdata.Count > 0)
360+
{
361+
Console.WriteLine("STATS " + Utils.TsReport(this.ptsdata,
362+
nReceived, message.Body.ToString().Length * sizeof(Char), 0));
363+
}
266364
}
267365
this.exitCode = ReturnCode.ERROR_SUCCESS;
268366
}

0 commit comments

Comments
 (0)