Skip to content

Commit fa6f1cb

Browse files
committed
Merge branch 'hotfix-1.4.2'
2 parents e292cda + 05bd383 commit fa6f1cb

File tree

9 files changed

+109
-34
lines changed

9 files changed

+109
-34
lines changed

src/ServiceControl.UnitTests/Expiration/CustomExpirationBundleTests.cs

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
using NUnit.Framework;
1111
using Raven.Client;
1212
using ServiceBus.Management.Infrastructure.Settings;
13-
using ServiceControl.CompositeViews.Messages;
13+
using ServiceControl.Infrastructure.RavenDB.Expiration;
1414

1515
[TestFixture]
1616
public class CustomExpirationBundleTests : TestWithRavenDB
@@ -54,13 +54,14 @@ public void Processed_messages_are_being_expired()
5454
[Test]
5555
public void Many_processed_messages_are_being_expired()
5656
{
57-
new MessagesViewIndex().Execute(documentStore);
57+
new ExpiryProcessedMessageIndex().Execute(documentStore);
5858

5959
var processedMessage = new ProcessedMessage
6060
{
6161
Id = Guid.NewGuid().ToString(),
6262
ProcessedAt = DateTime.UtcNow.AddMinutes(-DateTime.UtcNow.Millisecond%30).AddDays(-(Settings.HoursToKeepMessagesBeforeExpiring*3)),
6363
};
64+
6465
var processedMessage2 = new ProcessedMessage
6566
{
6667
Id = "2",
@@ -69,7 +70,7 @@ public void Many_processed_messages_are_being_expired()
6970

7071
using (var session = documentStore.OpenSession())
7172
{
72-
for (var i = 0; i < 2049; i++)
73+
for (var i = 0; i < 100; i++)
7374
{
7475
processedMessage = new ProcessedMessage
7576
{
@@ -85,13 +86,10 @@ public void Many_processed_messages_are_being_expired()
8586
}
8687

8788
WaitForIndexing(documentStore);
88-
Thread.Sleep(Settings.ExpirationProcessTimerInSeconds * 1000 * 4);
89-
89+
Thread.Sleep(Settings.ExpirationProcessTimerInSeconds * 1000 * 10);
9090
using (var session = documentStore.OpenSession())
9191
{
92-
var results = session.Query<ProcessedMessage, MessagesViewIndex>()
93-
.Customize(x => x.WaitForNonStaleResults())
94-
.ToArray();
92+
var results = session.Query<ProcessedMessage, ExpiryProcessedMessageIndex>().Customize(x => x.WaitForNonStaleResults()).ToArray();
9593
Assert.AreEqual(1, results.Length);
9694

9795
var msg = session.Load<ProcessedMessage>(processedMessage.Id);
@@ -105,6 +103,8 @@ public void Many_processed_messages_are_being_expired()
105103
[Test]
106104
public void Only_processed_messages_are_being_expired()
107105
{
106+
new ExpiryProcessedMessageIndex().Execute(documentStore);
107+
108108
var processedMessage = new ProcessedMessage
109109
{
110110
Id = "1",
@@ -141,6 +141,8 @@ public void Only_processed_messages_are_being_expired()
141141
[Test]
142142
public void Recent_processed_messages_are_not_being_expired()
143143
{
144+
new ExpiryProcessedMessageIndex().Execute(documentStore);
145+
144146
var processedMessage = new ProcessedMessage
145147
{
146148
Id = "1",
@@ -205,12 +207,8 @@ public void SetUp()
205207
{
206208
documentStore = InMemoryStoreBuilder.GetInMemoryStore(withExpiration: true);
207209

208-
var customIndex = new MessagesViewIndex();
210+
var customIndex = new ExpiryProcessedMessageIndex();
209211
customIndex.Execute(documentStore);
210-
211-
var transformer = new MessagesViewTransformer();
212-
213-
transformer.Execute(documentStore);
214212
}
215213

216214
[TearDown]

src/ServiceControl.UnitTests/ExternalIntegrations/ExternalIntegrationsFailedMessagesIndexTests.cs renamed to src/ServiceControl.UnitTests/ExternalIntegrations/MessageFailedConverterTests.cs

Lines changed: 53 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
using FailureDetails = ServiceControl.Contracts.Operations.FailureDetails;
1212

1313
[TestFixture]
14-
public class ExternalIntegrationsFailedMessagesIndexTests
14+
public class MessageFailedConverterTests
1515
{
1616
[Test]
1717
public void Archive_status_maps_to_archive()
@@ -59,6 +59,30 @@ public void If_not_present_in_metadata_body_is_ignored()
5959
Assert.IsNull(result.MessageDetails.Body);
6060
}
6161

62+
[Test]
63+
public void If_message_type_is_missing_it_is_ignored()
64+
{
65+
var failedMessage = new FailedMessageBuilder(FailedMessageStatus.Unresolved)
66+
.SetMessageType(null)
67+
.AddProcessingAttempt(pa => { })
68+
.Build();
69+
70+
var result = failedMessage.ToEvent();
71+
Assert.IsNull(result.MessageType);
72+
}
73+
74+
[Test]
75+
public void If_content_type_is_missing_it_is_ignored()
76+
{
77+
var failedMessage = new FailedMessageBuilder(FailedMessageStatus.Unresolved)
78+
.SetContentType(null)
79+
.AddProcessingAttempt(pa => { })
80+
.Build();
81+
82+
var result = failedMessage.ToEvent();
83+
Assert.IsNull(result.MessageDetails.ContentType);
84+
}
85+
6286
[Test]
6387
public void Body_is_mapped_from_metadata_of_last_processing_attempt()
6488
{
@@ -73,13 +97,27 @@ public void Body_is_mapped_from_metadata_of_last_processing_attempt()
7397
private class FailedMessageBuilder
7498
{
7599
private readonly FailedMessageStatus messageStatus;
100+
string messageType = "SomeMessage";
101+
string contentType = "application/json";
76102
private List<Action<FailedMessage.ProcessingAttempt>> processingAttempts = new List<Action<FailedMessage.ProcessingAttempt>>();
77103

78104
public FailedMessageBuilder(FailedMessageStatus messageStatus)
79105
{
80106
this.messageStatus = messageStatus;
81107
}
82108

109+
public FailedMessageBuilder SetMessageType(string messageType)
110+
{
111+
this.messageType = messageType;
112+
return this;
113+
}
114+
115+
public FailedMessageBuilder SetContentType(string contentType)
116+
{
117+
this.contentType = contentType;
118+
return this;
119+
}
120+
83121
public FailedMessageBuilder AddProcessingAttempt(Action<FailedMessage.ProcessingAttempt> callback)
84122
{
85123
processingAttempts.Add(callback);
@@ -92,19 +130,26 @@ public FailedMessage Build()
92130
{
93131
ProcessingAttempts = processingAttempts.Select(x =>
94132
{
133+
var messageMetadata = new Dictionary<string, object>
134+
{
135+
{"SendingEndpoint",new Contracts.Operations.EndpointDetails()},
136+
{"ReceivingEndpoint",new Contracts.Operations.EndpointDetails()},
137+
};
138+
if (messageType != null)
139+
{
140+
messageMetadata["MessageType"] = messageType;
141+
}
142+
if (contentType != null)
143+
{
144+
messageMetadata["ContentType"] = contentType;
145+
}
95146
var attempt = new FailedMessage.ProcessingAttempt
96147
{
97148
FailureDetails = new FailureDetails
98149
{
99150
Exception = new ExceptionDetails()
100151
},
101-
MessageMetadata = new Dictionary<string, object>
102-
{
103-
{"SendingEndpoint",new Contracts.Operations.EndpointDetails()},
104-
{"ReceivingEndpoint",new Contracts.Operations.EndpointDetails()},
105-
{"MessageType","SomeMessage"},
106-
{"ContentType","application/json"},
107-
},
152+
MessageMetadata = messageMetadata,
108153
};
109154
x(attempt);
110155
return attempt;

src/ServiceControl.UnitTests/Infrastructure/RavenDB/Indexes/InMemoryStoreBuilder.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
using System.IO;
33
using Raven.Client.Embedded;
44
using ServiceBus.Management.Infrastructure.Settings;
5-
using ServiceControl.CompositeViews.Messages;
5+
using ServiceControl.Infrastructure.RavenDB.Expiration;
66

77
public class InMemoryStoreBuilder
88
{
@@ -25,15 +25,15 @@ public static EmbeddableDocumentStore GetInMemoryStore(bool withExpiration = fal
2525
if (withExpiration)
2626
{
2727
Settings.ExpirationProcessTimerInSeconds = 1; // so we don't have to wait too much in tests
28-
store.Configuration.Catalog.Catalogs.Add(new AssemblyCatalog(typeof(ServiceControl.Infrastructure.RavenDB.Expiration.ExpiredDocumentsCleaner).Assembly));
28+
store.Configuration.Catalog.Catalogs.Add(new AssemblyCatalog(typeof(ExpiredDocumentsCleaner).Assembly));
2929
store.Configuration.Settings.Add("Raven/ActiveBundles", "CustomDocumentExpiration"); // Enable the expiration bundle
3030
}
3131

3232
store.Initialize();
3333

3434
if (withExpiration)
3535
{
36-
new MessagesViewIndex().Execute(store); // this index is being queried by our expiration bundle
36+
new ExpiryProcessedMessageIndex().Execute(store); // this index is being queried by our expiration bundle
3737
}
3838

3939
return store;

src/ServiceControl.UnitTests/ServiceControl.UnitTests.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@
8989
<Reference Include="System.Xml.Linq" />
9090
</ItemGroup>
9191
<ItemGroup>
92-
<Compile Include="ExternalIntegrations\ExternalIntegrationsFailedMessagesIndexTests.cs" />
92+
<Compile Include="ExternalIntegrations\MessageFailedConverterTests.cs" />
9393
<Compile Include="CompositeViews\MessagesViewTests.cs" />
9494
<Compile Include="Expiration\CustomExpirationBundleTests.cs" />
9595
<Compile Include="Heartbeats\HeartbeatStatusProviderTests.cs" />

src/ServiceControl/ExternalIntegrations/MessageFailedConverter.cs

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
namespace ServiceControl.ExternalIntegrations
22
{
3+
using System.Collections.Generic;
34
using System.Linq;
45
using ServiceControl.Contracts.Operations;
56
using ServiceControl.MessageFailures;
@@ -10,13 +11,12 @@ public static Contracts.MessageFailed ToEvent(this FailedMessage message)
1011
{
1112
var last = message.ProcessingAttempts.Last();
1213
var sendingEndpoint = (EndpointDetails)last.MessageMetadata["SendingEndpoint"];
13-
object tmp;
14-
last.MessageMetadata.TryGetValue("ReceivingEndpoint", out tmp);
15-
var receivingEndpoint = (EndpointDetails) tmp;
14+
var receivingEndpoint = GetNullableMetadataValue<EndpointDetails>(last.MessageMetadata, "ReceivingEndpoint");
15+
1616
return new Contracts.MessageFailed
1717
{
1818
FailedMessageId = message.UniqueMessageId,
19-
MessageType = (string)last.MessageMetadata["MessageType"],
19+
MessageType = GetNullableMetadataValue<string>(last.MessageMetadata,"MessageType"),
2020
NumberOfProcessingAttempts = message.ProcessingAttempts.Count,
2121
Status = message.Status == FailedMessageStatus.Archived
2222
? Contracts.MessageFailed.MessageStatus.ArchivedFailure
@@ -38,7 +38,7 @@ public static Contracts.MessageFailed ToEvent(this FailedMessage message)
3838
MessageDetails = new Contracts.MessageFailed.Message
3939
{
4040
Headers = last.Headers,
41-
ContentType = (string)last.MessageMetadata["ContentType"],
41+
ContentType = GetNullableMetadataValue<string>(last.MessageMetadata, "ContentType"),
4242
Body = GetBody(last),
4343
MessageId = last.MessageId,
4444
},
@@ -57,6 +57,14 @@ public static Contracts.MessageFailed ToEvent(this FailedMessage message)
5757
};
5858
}
5959

60+
static T GetNullableMetadataValue<T>(IReadOnlyDictionary<string, object> metadata, string key)
61+
where T : class
62+
{
63+
object value;
64+
metadata.TryGetValue(key, out value);
65+
return (T)value;
66+
}
67+
6068
static string GetBody(FailedMessage.ProcessingAttempt last)
6169
{
6270
object body;

src/ServiceControl/ExternalIntegrations/MessageFailedPublisher.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,10 @@ protected override DispatchContext CreateDispatchRequest(MessageFailed @event)
1919

2020
protected override IEnumerable<object> PublishEvents(IEnumerable<DispatchContext> contexts, IDocumentSession session)
2121
{
22+
2223
var documentIds = contexts.Select(x => x.FailedMessageId).Cast<ValueType>().ToArray();
23-
var failedMessageData = session.Load<FailedMessage>(documentIds);
24-
return failedMessageData.Select(x => x.ToEvent());
24+
var failedMessageData = session.Load<FailedMessage>(documentIds);
25+
return failedMessageData.Where(p => p != null).Select(x => x.ToEvent());
2526
}
2627

2728
public class DispatchContext

src/ServiceControl/Infrastructure/RavenDB/Expiration/ExpiredDocumentsCleaner.cs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
using System.Globalization;
88
using System.Linq;
99
using System.Threading;
10-
using CompositeViews.Messages;
1110
using Raven.Abstractions;
1211
using Raven.Abstractions.Commands;
1312
using Raven.Abstractions.Data;
@@ -31,7 +30,7 @@ public class ExpiredDocumentsCleaner : IStartupTask, IDisposable
3130
public void Execute(DocumentDatabase database)
3231
{
3332
Database = database;
34-
indexName = new MessagesViewIndex().IndexName;
33+
indexName = new ExpiryProcessedMessageIndex().IndexName;
3534

3635
deletionBatchSize = Settings.ExpirationProcessBatchSize;
3736
deleteFrequencyInSeconds = Settings.ExpirationProcessTimerInSeconds;
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
namespace ServiceControl.Infrastructure.RavenDB.Expiration
2+
{
3+
using System.Linq;
4+
using Contracts.Operations;
5+
using MessageAuditing;
6+
using Raven.Client.Indexes;
7+
8+
public class ExpiryProcessedMessageIndex : AbstractIndexCreationTask<ProcessedMessage>
9+
{
10+
public ExpiryProcessedMessageIndex()
11+
{
12+
Map = (messages => from message in messages
13+
select new
14+
{
15+
MessageId = (string) message.MessageMetadata["MessageId"],
16+
Status = (bool)message.MessageMetadata["IsRetried"] ? MessageStatus.ResolvedSuccessfully : MessageStatus.Successful,
17+
ProcessedAt = message.ProcessedAt,
18+
});
19+
20+
DisableInMemoryIndexing = true;
21+
}
22+
}
23+
}

src/ServiceControl/ServiceControl.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -271,6 +271,7 @@
271271
<SubType>Component</SubType>
272272
</Compile>
273273
<Compile Include="Hosting\Options.cs" />
274+
<Compile Include="Infrastructure\RavenDB\Expiration\ExpiryProcessedMessageIndex.cs" />
274275
<Compile Include="Infrastructure\Settings\RegistryReader.cs" />
275276
<Compile Include="Infrastructure\SubscribeToAllEvents.cs" />
276277
<Compile Include="MessageFailures\FailedMessageViewIndexNotifications.cs" />

0 commit comments

Comments
 (0)