Skip to content

Commit 29c7e72

Browse files
authored
Reconnect example (#7)
1 parent 7cac9d2 commit 29c7e72

File tree

8 files changed

+248
-4
lines changed

8 files changed

+248
-4
lines changed

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,6 @@
33
.vs/
44
.vscode/
55
packages/
6+
dist
7+
.DS_STORE
8+
UpgradeLog.htm

installer/CliNetliteInstaller/CliNetliteInstaller.wixproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
<ProductVersion>3.10</ProductVersion>
77
<ProjectGuid>f13fb7ef-d892-4229-a1eb-8b595d71d64e</ProjectGuid>
88
<SchemaVersion>2.0</SchemaVersion>
9-
<OutputName>cli-netlite-2.1.2</OutputName>
9+
<OutputName>cli-netlite-2.1.4</OutputName>
1010
<OutputType>Package</OutputType>
1111
</PropertyGroup>
1212
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|x86' ">

installer/CliNetliteInstaller/Product.wxs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
<?xml version="1.0" encoding="UTF-8"?>
2-
<?define version="2.1.2"?>
2+
<?define version="2.1.4"?>
33
<?define UpgradeCode="a580c04f-caec-4e18-9134-c4b8f1c8f1be"?>
44
<Wix xmlns="http://schemas.microsoft.com/wix/2006/wi">
55
<Product Id="*"

src/dotNet/ClientLib/ClientLib.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
<Compile Include="Options.cs" />
6868
<Compile Include="OptionsParser.cs" />
6969
<Compile Include="ReceiverClient.cs" />
70+
<Compile Include="ReconnectSender.cs" />
7071
<Compile Include="SenderClient.cs" />
7172
<Compile Include="Utils.cs" />
7273
<Compile Include="Properties\AssemblyInfo.cs" />
Lines changed: 224 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,224 @@
1+
// ------------------------------------------------------------------------------------
2+
// Licensed under the Apache License, Version 2.0 (the ""License""); you may not use this
3+
// file except in compliance with the License. You may obtain a copy of the License at
4+
// http://www.apache.org/licenses/LICENSE-2.0
5+
//
6+
// THIS CODE IS PROVIDED *AS IS* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
7+
// EITHER EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION ANY IMPLIED WARRANTIES OR
8+
// CONDITIONS OF TITLE, FITNESS FOR A PARTICULAR PURPOSE, MERCHANTABLITY OR
9+
// NON-INFRINGEMENT.
10+
//
11+
// See the Apache Version 2.0 License for specific language governing permissions and
12+
// limitations under the License.
13+
// ------------------------------------------------------------------------------------
14+
15+
//
16+
// ReconnectSender
17+
// * Detects a failed AMQP connection and automatically reconnets it.
18+
// * Recovers from a peer failure by connecting to a list of AMQP brokers/peers.
19+
// * Builds or rebuilds the AMPQ object hierarchy in response to protocol events.
20+
// * Recovers from all failures by reconnecting the AMQP connection.
21+
//
22+
// Command line:
23+
// ReconnectSender [brokerUrl-csv-string [message-count]]
24+
//
25+
// Default:
26+
// ReconnectSender amqp://127.0.0.1:5672/q1,amqp://127.0.0.1:15672/q1 10
27+
//
28+
// Requires:
29+
// A broker or peer at the addresses given in arg 1.
30+
//
31+
using System;
32+
using System.Threading;
33+
using System.Collections.Generic;
34+
using System.Linq;
35+
using Amqp;
36+
using Amqp.Framing;
37+
38+
namespace ClientLib
39+
{
40+
public class Application
41+
{
42+
// AMQP connection selection
43+
List<Address> addresses;
44+
int aIndex = 0;
45+
46+
// Protocol objects
47+
Connection connection;
48+
Session session;
49+
SenderLink sender;
50+
51+
// Sender is ready to send messages
52+
ManualResetEvent sendable = new ManualResetEvent(false);
53+
54+
// Time in mS to wait for a connection to connect and become sendable
55+
// before failing over to the next host.
56+
const Int32 SENDABLE_WAIT_TIME = 10 * 1000;
57+
58+
// Application mission state
59+
ulong nToSend = 0;
60+
ulong nSent = 0;
61+
62+
/// <summary>
63+
/// Application constructor
64+
/// </summary>
65+
/// <param name="_addresses">Address objects that define the host, port, and target for messages.</param>
66+
/// <param name="_nToSend">Message count.</param>
67+
public Application(List<Address> _addresses, ulong _nToSend)
68+
{
69+
addresses = _addresses;
70+
nToSend = _nToSend;
71+
}
72+
73+
/// <summary>
74+
/// Connection closed event handler
75+
///
76+
/// This function provides information only. Calling Reconnect is redundant with
77+
/// calls from the Run loop.
78+
/// </summary>
79+
/// <param name="_">Connection that closed. There is only one connection so this is ignored.</param>
80+
/// <param name="error">Error object associated with connection close.</param>
81+
void connectionClosed(IAmqpObject _, Error error)
82+
{
83+
if (error == null)
84+
Trace.WriteLine(TraceLevel.Warning, "Connection closed with no error");
85+
else
86+
Trace.WriteLine(TraceLevel.Error, "Connection closed with error: {0}", error.ToString());
87+
}
88+
89+
/// <summary>
90+
/// Select the next host in the Address list and start it
91+
/// </summary>
92+
void Reconnect()
93+
{
94+
Trace.WriteLine(TraceLevel.Verbose, "Entering Reconnect()");
95+
96+
sendable.Reset();
97+
98+
if (nSent < nToSend)
99+
{
100+
if (++aIndex >= addresses.Count) aIndex = 0;
101+
OpenConnection();
102+
}
103+
}
104+
105+
106+
/// <summary>
107+
/// Start the current host in the address list
108+
/// </summary>
109+
async void OpenConnection()
110+
{
111+
try
112+
{
113+
Trace.WriteLine(TraceLevel.Verbose,
114+
"Attempting connection to {0}:{1}",
115+
addresses[aIndex].Host, addresses[aIndex].Port);
116+
117+
connection = await Connection.Factory.CreateAsync(addresses[aIndex], null, onOpened);
118+
119+
Trace.WriteLine(TraceLevel.Information,
120+
"Success: connecting to {0}:{1}",
121+
addresses[aIndex].Host, addresses[aIndex].Port);
122+
123+
connection.AddClosedCallback(connectionClosed);
124+
}
125+
catch (Exception e)
126+
{
127+
Trace.WriteLine(TraceLevel.Error,
128+
"Failure: exception connecting to '{0}:{1}': {2}",
129+
addresses[aIndex].Host, addresses[aIndex].Port, e.Message);
130+
}
131+
}
132+
133+
/// <summary>
134+
/// AMQP connection has opened
135+
/// </summary>
136+
/// <param name="_">Which connection (ignored).</param>
137+
/// <param name="__">Peer AMQP Open (ignored).</param>
138+
void onOpened(IConnection _, Open __)
139+
{
140+
Trace.WriteLine(TraceLevel.Verbose, "Event: OnOpened");
141+
142+
session = new Session(connection, new Begin() { }, onBegin);
143+
}
144+
145+
/// <summary>
146+
/// AMQP session has opened
147+
/// </summary>
148+
/// <param name="_">Which session (ignored).</param>
149+
/// <param name="__">Peer AMQP Begin (ignored).</param>
150+
void onBegin(ISession _, Begin __)
151+
{
152+
Trace.WriteLine(TraceLevel.Verbose, "Event: OnBegin");
153+
154+
string targetName = addresses[aIndex].Path.Substring(1); // no leading '/'
155+
Target target = new Target() { Address = targetName };
156+
sender = new SenderLink(session, "senderLink", target, onAttached);
157+
}
158+
159+
/// <summary>
160+
/// AMQP Link has attached. Signal that protocol stack is ready to send.
161+
/// </summary>
162+
/// <param name="_">Which link (ignored).</param>
163+
/// <param name="__">Peer AMQP Attach (ignored).</param>
164+
void onAttached(ILink _, Attach __)
165+
{
166+
Trace.WriteLine(TraceLevel.Verbose, "Event: OnAttached");
167+
168+
sendable.Set();
169+
}
170+
171+
/// <summary>
172+
/// Application mission code.
173+
/// Send N messages while automatically reconnecting to broker/peer as necessary.
174+
/// </summary>
175+
void Run()
176+
{
177+
OpenConnection();
178+
while (nSent < nToSend)
179+
{
180+
if (sendable.WaitOne(SENDABLE_WAIT_TIME))
181+
{
182+
try
183+
{
184+
Trace.WriteLine(TraceLevel.Information, "Sending message {0}", nSent);
185+
186+
Message message = new Message("message " + nSent.ToString());
187+
message.Properties = new Properties();
188+
message.Properties.SetMessageId((object)nSent);
189+
sender.Send(message);
190+
nSent += 1;
191+
192+
Trace.WriteLine(TraceLevel.Information, "Sent message {0}", nSent-1);
193+
}
194+
catch (Exception e)
195+
{
196+
Trace.WriteLine(TraceLevel.Error,
197+
"Exception sending message {0}: {1}", nSent, e.Message);
198+
Reconnect();
199+
}
200+
}
201+
else
202+
{
203+
Trace.WriteLine(TraceLevel.Warning, "Timeout waiting for connection");
204+
Reconnect();
205+
}
206+
}
207+
}
208+
209+
public static void Main(string[] args)
210+
{
211+
string addrs = args.Length >= 1 ? args[0] : "amqp://127.0.0.1:5672/q1,amqp://127.0.0.1:15672/q1";
212+
ulong count = args.Length >= 2 ? Convert.ToUInt64(args[1]) : 10;
213+
214+
Trace.TraceLevel = TraceLevel.Verbose;
215+
Trace.TraceListener = (l, f, a) => Console.WriteLine(DateTime.Now.ToString("[hh:mm:ss.fff]") + " " + string.Format(f, a));
216+
217+
List<Address> addresses = new List<Address>();
218+
foreach (var adr in addrs.Split(',').ToList()) addresses.Add(new Address(adr));
219+
220+
Application app = new Application(addresses, count);
221+
app.Run();
222+
}
223+
}
224+
}

src/dotNet/Sender/Sender.cs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
*/
1616

1717
using ClientLib;
18+
using System;
19+
using System.Linq;
1820

1921
namespace Sender
2022
{
@@ -29,8 +31,17 @@ class Sender
2931
/// <param name="args">args from command line</param>
3032
static void Main(string[] args)
3133
{
32-
SenderClient client = new SenderClient();
33-
client.Run(args);
34+
var index = Array.FindIndex(args, x => x == "--reconnect-example");
35+
36+
if (index != -1)
37+
{
38+
Application.Main(args.Skip(1).ToArray());
39+
}
40+
else
41+
{
42+
SenderClient client = new SenderClient();
43+
client.Run(args);
44+
}
3445
}
3546

3647
}

src/dotNet/Sender/Sender.csproj.user

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,4 +10,8 @@
1010
<FallbackCulture>en-US</FallbackCulture>
1111
<VerifyUploadedFiles>false</VerifyUploadedFiles>
1212
</PropertyGroup>
13+
<PropertyGroup Condition="'$(Configuration)|$(Platform)' == 'Debug|AnyCPU'">
14+
<StartArguments>
15+
</StartArguments>
16+
</PropertyGroup>
1317
</Project>

src/dotNetCore/ClientLibNetCore/ClientLibNetCore.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
<Compile Include="..\..\dotNet\ClientLib\Options.cs" Link="Options.cs" />
1313
<Compile Include="..\..\dotNet\ClientLib\OptionsParser.cs" Link="OptionsParser.cs" />
1414
<Compile Include="..\..\dotNet\ClientLib\ReceiverClient.cs" Link="ReceiverClient.cs" />
15+
<Compile Include="..\..\dotNet\ClientLib\ReconnectSender.cs" Link="ReconnectSender.cs" />
1516
<Compile Include="..\..\dotNet\ClientLib\SenderClient.cs" Link="SenderClient.cs" />
1617
<Compile Include="..\..\dotNet\ClientLib\Utils.cs" Link="Utils.cs" />
1718
</ItemGroup>

0 commit comments

Comments
 (0)