Skip to content

Commit e55c5ab

Browse files
authored
Hosepipe. Publish messages to a queue instead of an exchange (#1122)
* Publish message to queue instead of exchange * Fix tests They could not run in parallel because of interception of Console.Out * Fix tests * Review comments
1 parent d914dc0 commit e55c5ab

12 files changed

+121
-152
lines changed

Source/EasyNetQ.Hosepipe.Tests/EasyNetQ.Hosepipe.Tests.csproj

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,11 @@
3636
<ItemGroup>
3737
<Service Include="{82a7f48d-3b50-4b1e-b82e-3ada8210c358}" />
3838
</ItemGroup>
39+
<ItemGroup>
40+
<None Update="xunit.runner.json">
41+
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
42+
</None>
43+
</ItemGroup>
3944
<PropertyGroup Condition=" '$(TargetFramework)' == 'netcoreapp3.0' ">
4045
<DefineConstants>$(DefineConstants);NET_STANDARD</DefineConstants>
4146
</PropertyGroup>

Source/EasyNetQ.Hosepipe.Tests/ErrorRetryTests.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@ namespace EasyNetQ.Hosepipe.Tests
99
{
1010
public class ErrorRetryTests
1111
{
12-
private ErrorRetry errorRetry;
13-
private IConventions conventions;
12+
private readonly ErrorRetry errorRetry;
13+
private readonly IConventions conventions;
1414

1515
public ErrorRetryTests()
1616
{
@@ -43,6 +43,7 @@ public void Should_republish_to_default_exchange()
4343
{
4444
Exchange = "", // default exchange
4545
RoutingKey = "hosepipe.test",
46+
Queue = "queue",
4647
Message = "Hosepipe test message",
4748
BasicProperties = new MessageProperties()
4849
};
@@ -53,10 +54,9 @@ public void Should_republish_to_default_exchange()
5354
Password = "guest",
5455
MessagesOutputDirectory = @"C:\temp\MessageOutput"
5556
};
56-
5757
errorRetry.RepublishError(error, parameters);
5858
}
5959
}
6060
}
6161

62-
// ReSharper restore InconsistentNaming
62+
// ReSharper restore InconsistentNaming

Source/EasyNetQ.Hosepipe.Tests/ProgramTests.cs

Lines changed: 38 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,6 @@
33
using System;
44
using System.Collections.Generic;
55
using System.IO;
6-
7-
using EasyNetQ.Consumer;
8-
96
using Xunit;
107

118
namespace EasyNetQ.Hosepipe.Tests
@@ -19,7 +16,6 @@ public class ProgramTests
1916
private MockQueueInsertion queueInsertion;
2017
private MockErrorRetry errorRetry;
2118
private Conventions conventions;
22-
private IErrorMessageSerializer defaultErrorMessageSerializer;
2319

2420
public ProgramTests()
2521
{
@@ -29,7 +25,6 @@ public ProgramTests()
2925
queueInsertion = new MockQueueInsertion();
3026
errorRetry = new MockErrorRetry();
3127
conventions = new Conventions(new LegacyTypeNameSerializer());
32-
defaultErrorMessageSerializer = new DefaultErrorMessageSerializer();
3328

3429
program = new Program(
3530
new ArgParser(),
@@ -38,11 +33,12 @@ public ProgramTests()
3833
messageReader,
3934
queueInsertion,
4035
errorRetry,
41-
conventions);
36+
conventions
37+
);
4238
}
4339

4440
private readonly string expectedDumpOutput =
45-
"2 Messages from queue 'EasyNetQ_Default_Error_Queue'\r\noutput to directory '" + Directory.GetCurrentDirectory() + "'\r\n";
41+
$"2 messages from queue 'EasyNetQ_Default_Error_Queue' were dumped to directory '{Directory.GetCurrentDirectory()}'{Environment.NewLine}";
4642

4743
[Fact]
4844
public void Should_output_messages_to_directory_with_dump()
@@ -59,14 +55,15 @@ public void Should_output_messages_to_directory_with_dump()
5955

6056
program.Start(args);
6157

62-
writer.GetStringBuilder().ToString().ShouldEqual(expectedDumpOutput);
58+
var actualOutput = writer.GetStringBuilder().ToString();
59+
actualOutput.ShouldEqual(expectedDumpOutput);
6360

6461
messageWriter.Parameters.QueueName.ShouldEqual("EasyNetQ_Default_Error_Queue");
6562
messageWriter.Parameters.HostName.ShouldEqual("localhost");
6663
}
6764

6865
private readonly string expectedInsertOutput =
69-
"2 Messages from directory '" + Directory.GetCurrentDirectory() + "'\r\ninserted into queue ''\r\n";
66+
$"{2} messages from directory '{Directory.GetCurrentDirectory()}' were inserted into queue ''{Environment.NewLine}";
7067

7168
[Fact]
7269
public void Should_insert_messages_with_insert()
@@ -82,13 +79,38 @@ public void Should_insert_messages_with_insert()
8279

8380
program.Start(args);
8481

85-
writer.GetStringBuilder().ToString().ShouldEqual(expectedInsertOutput);
82+
var actualInsertOutput = writer.GetStringBuilder().ToString();
83+
actualInsertOutput.ShouldEqual(expectedInsertOutput);
84+
85+
messageReader.Parameters.HostName.ShouldEqual("localhost");
86+
}
87+
88+
private readonly string expectedInsertOutputWithQueue =
89+
$"{2} messages from directory '{Directory.GetCurrentDirectory()}' were inserted into queue 'queue'{Environment.NewLine}";
90+
91+
[Fact]
92+
public void Should_insert_messages_with_insert_and_queue()
93+
{
94+
var args = new[]
95+
{
96+
"insert",
97+
"s:localhost",
98+
"q:queue"
99+
};
100+
101+
var writer = new StringWriter();
102+
Console.SetOut(writer);
103+
104+
program.Start(args);
105+
106+
var actualInsertOutput = writer.GetStringBuilder().ToString();
107+
actualInsertOutput.ShouldEqual(expectedInsertOutputWithQueue);
86108

87109
messageReader.Parameters.HostName.ShouldEqual("localhost");
88110
}
89111

90112
private readonly string expectedRetryOutput =
91-
"2 Error messages from directory '" + Directory.GetCurrentDirectory() + "' republished\r\n";
113+
$"2 error messages from directory '{Directory.GetCurrentDirectory()}' were republished{Environment.NewLine}";
92114

93115

94116
[Fact]
@@ -118,14 +140,13 @@ public class MockMessageWriter : IMessageWriter
118140
public void Write(IEnumerable<HosepipeMessage> messages, QueueParameters queueParameters)
119141
{
120142
Parameters = queueParameters;
121-
foreach (var message in messages)
143+
foreach (var _ in messages)
122144
{
123-
// Console.Out.WriteLine("message = {0}", message);
124145
}
125146
}
126147
}
127148

128-
public class MockQueueRetrieval : IQueueRetreival
149+
public class MockQueueRetrieval : IQueueRetrieval
129150
{
130151
public IEnumerable<HosepipeMessage> GetMessagesFromQueue(QueueParameters parameters)
131152
{
@@ -155,9 +176,8 @@ public class MockQueueInsertion : IQueueInsertion
155176
{
156177
public void PublishMessagesToQueue(IEnumerable<HosepipeMessage> messages, QueueParameters parameters)
157178
{
158-
foreach (var message in messages)
179+
foreach (var _ in messages)
159180
{
160-
// Console.Out.WriteLine("message = {0}", message);
161181
}
162182
}
163183
}
@@ -166,12 +186,11 @@ public class MockErrorRetry : IErrorRetry
166186
{
167187
public void RetryErrors(IEnumerable<HosepipeMessage> rawErrorMessages, QueueParameters parameters)
168188
{
169-
foreach (var rawErrorMessage in rawErrorMessages)
189+
foreach (var _ in rawErrorMessages)
170190
{
171-
//
172191
}
173192
}
174193
}
175194
}
176195

177-
// ReSharper restore InconsistentNaming
196+
// ReSharper restore InconsistentNaming

Source/EasyNetQ.Hosepipe.Tests/QueueRetrievalTests.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ public void TryGetMessagesFromQueue()
1515
{
1616
const string queue = "EasyNetQ_Hosepipe_Tests_QueueRetrievalTests+TestMessage:EasyNetQ_Hosepipe_Tests_hosepipe";
1717

18-
var queueRetrieval = new QueueRetreival(new DefaultErrorMessageSerializer());
18+
var queueRetrieval = new QueueRetrieval(new DefaultErrorMessageSerializer());
1919
var parameters = new QueueParameters
2020
{
2121
QueueName = queue,
Lines changed: 1 addition & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -1,72 +1,23 @@
1-
using System;
2-
using System.Linq;
31
using Xunit;
4-
using System.Collections.Generic;
52

63
namespace EasyNetQ.Hosepipe.Tests
74
{
85
public static class TestExtensions
96
{
10-
public static T ShouldNotBeNull<T>(this T obj) where T : class
11-
{
12-
Assert.NotNull(obj);
13-
return obj;
14-
}
15-
167
public static T ShouldEqual<T>(this T actual, object expected)
178
{
189
Assert.Equal(expected, actual);
1910
return actual;
2011
}
2112

22-
public static T ShouldBeThrownBy<T>(Action testDelegate) where T : Exception
23-
{
24-
return Assert.Throws<T>(testDelegate);
25-
}
26-
27-
public static void ShouldBe<T>(this object actual)
28-
{
29-
Assert.IsType<T>(actual);
30-
}
31-
3213
public static void ShouldBeNull(this object actual)
3314
{
3415
Assert.Null(actual);
3516
}
3617

37-
public static void ShouldBeTheSameAs(this object actual, object expected)
38-
{
39-
Assert.Same(expected, actual);
40-
}
41-
42-
public static T CastTo<T>(this object source)
43-
{
44-
return (T)source;
45-
}
46-
4718
public static void ShouldBeTrue(this bool source)
4819
{
4920
Assert.True(source);
5021
}
51-
52-
public static void ShouldBeTrue(this bool source, string message)
53-
{
54-
Assert.True(source, message);
55-
}
56-
57-
public static void ShouldBeFalse(this bool source)
58-
{
59-
Assert.False(source);
60-
}
61-
62-
public static void ShouldBeFalse(this bool source, string message)
63-
{
64-
Assert.False(source, message);
65-
}
66-
67-
public static void ShouldBeEmpty<T>(this IEnumerable<T> collection)
68-
{
69-
Assert.Empty(collection);
70-
}
7122
}
72-
}
23+
}
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
{
2+
"parallelizeTestCollections": false
3+
}
Lines changed: 6 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,6 @@
1-
using System;
21
using System.Collections.Generic;
3-
using System.Text;
4-
52
using EasyNetQ.Consumer;
63
using EasyNetQ.SystemMessages;
7-
using RabbitMQ.Client.Exceptions;
84

95
namespace EasyNetQ.Hosepipe
106
{
@@ -29,32 +25,16 @@ public void RetryErrors(IEnumerable<HosepipeMessage> rawErrorMessages, QueuePara
2925
}
3026
}
3127

32-
3328
public void RepublishError(Error error, QueueParameters parameters)
3429
{
3530
using (var connection = HosepipeConnection.FromParameters(parameters))
3631
using (var model = connection.CreateModel())
3732
{
38-
try
39-
{
40-
if (error.Exchange != string.Empty)
41-
{
42-
model.ExchangeDeclarePassive(error.Exchange);
43-
}
44-
45-
var properties = model.CreateBasicProperties();
46-
error.BasicProperties.CopyTo(properties);
47-
48-
var body = errorMessageSerializer.Deserialize(error.Message);
49-
50-
model.BasicPublish(error.Exchange, error.RoutingKey, true, properties, body);
51-
}
52-
catch (OperationInterruptedException)
53-
{
54-
Console.WriteLine("The exchange, '{0}', described in the error message does not exist on '{1}', '{2}'",
55-
error.Exchange, parameters.HostName, parameters.VHost);
56-
}
57-
}
33+
var properties = model.CreateBasicProperties();
34+
error.BasicProperties.CopyTo(properties);
35+
var body = errorMessageSerializer.Deserialize(error.Message);
36+
model.BasicPublish("", error.Queue, true, properties, body);
37+
}
5838
}
5939
}
60-
}
40+
}

Source/EasyNetQ.Hosepipe/FileMessageWriter.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,12 @@ public void Write(IEnumerable<HosepipeMessage> messages, QueueParameters paramet
2525
var bodyPath = Path.Combine(parameters.MessagesOutputDirectory, uniqueFileName + ".message.txt");
2626
var propertiesPath = Path.Combine(parameters.MessagesOutputDirectory, uniqueFileName + ".properties.txt");
2727
var infoPath = Path.Combine(parameters.MessagesOutputDirectory, uniqueFileName + ".info.txt");
28-
28+
2929
if(File.Exists(bodyPath))
3030
{
31-
Console.WriteLine("Overwriting existing messsage file: {0}", bodyPath);
31+
Console.WriteLine("Overwriting existing message file: {0}", bodyPath);
3232
}
33-
33+
3434
File.WriteAllText(bodyPath, message.Body);
3535
File.WriteAllText(propertiesPath, Newtonsoft.Json.JsonConvert.SerializeObject(message.Properties));
3636
File.WriteAllText(infoPath, Newtonsoft.Json.JsonConvert.SerializeObject(message.Info));
@@ -44,4 +44,4 @@ public static string SanitiseQueueName(string queueName)
4444
return InvalidCharRegex.Replace(queueName, "_");
4545
}
4646
}
47-
}
47+
}

Source/EasyNetQ.Hosepipe/IQueueInsertion.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@
22

33
namespace EasyNetQ.Hosepipe
44
{
5-
public interface IQueueInsertion {
5+
public interface IQueueInsertion
6+
{
67
void PublishMessagesToQueue(IEnumerable<HosepipeMessage> messages, QueueParameters parameters);
78
}
8-
}
9+
}

0 commit comments

Comments
 (0)