Skip to content

Commit 4a4a33c

Browse files
author
Greg Bielleman
committed
Use Reactive Extensions
1 parent 346c1e3 commit 4a4a33c

File tree

4 files changed

+67
-19
lines changed

4 files changed

+67
-19
lines changed

src/ServiceControl/CustomChecks/CustomCheckNotifications.cs

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,10 @@
22
{
33
using System;
44
using System.Linq;
5+
using System.Reactive.Concurrency;
6+
using System.Reactive.Linq;
57
using NServiceBus;
8+
using NServiceBus.Logging;
69
using Raven.Abstractions.Data;
710
using Raven.Client;
811
using INeedInitialization = NServiceBus.INeedInitialization;
@@ -12,7 +15,9 @@ public class CustomCheckNotifications : INeedInitialization, IWantToRunWhenBusSt
1215
IDocumentStore store;
1316
IBus bus;
1417
int lastCount;
15-
18+
IDisposable subscription;
19+
ILog logging = LogManager.GetLogger(typeof(CustomCheckNotifications));
20+
1621
public CustomCheckNotifications()
1722
{
1823
// Need this because INeedInitialization does not use DI instead use Activator.CreateInstance
@@ -36,14 +41,25 @@ public void OnNext(IndexChangeNotification value)
3641

3742
void UpdateCount()
3843
{
39-
using (var session = store.OpenSession())
44+
try
45+
{
46+
using (var session = store.OpenSession())
47+
{
48+
var failedCustomCheckCount = session.Query<CustomCheck, CustomChecksIndex>().Count(p => p.Status == Status.Fail);
49+
if (lastCount == failedCustomCheckCount)
50+
return;
51+
lastCount = failedCustomCheckCount;
52+
bus.Publish(new CustomChecksUpdated
53+
{
54+
Failed = lastCount
55+
});
56+
}
57+
}
58+
catch (Exception ex)
4059
{
41-
var failedCustomCheckCount = session.Query<CustomCheck, CustomChecksIndex>().Count(p => p.Status == Status.Fail);
42-
if (lastCount == failedCustomCheckCount)
43-
return;
44-
lastCount = failedCustomCheckCount;
45-
bus.Publish(new CustomChecksUpdated { Failed = lastCount });
60+
logging.WarnFormat("Failed to emit CustomCheckUpdated - {0}", ex);
4661
}
62+
4763
}
4864

4965
public void OnError(Exception error)
@@ -58,12 +74,12 @@ public void OnCompleted()
5874

5975
public void Start()
6076
{
61-
store.Changes().ForIndex("CustomChecksIndex").Subscribe(this);
77+
subscription = store.Changes().ForIndex("CustomChecksIndex").SubscribeOn(Scheduler.Default).Subscribe(this);
6278
}
6379

6480
public void Stop()
6581
{
66-
//Ignore
82+
subscription.Dispose();
6783
}
6884
}
6985
}

src/ServiceControl/MessageFailures/FailedMessageViewIndexNotifications.cs

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,10 @@
22
{
33
using System;
44
using System.Linq;
5+
using System.Reactive.Concurrency;
6+
using System.Reactive.Linq;
57
using NServiceBus;
8+
using NServiceBus.Logging;
69
using Raven.Abstractions.Data;
710
using Raven.Client;
811
using ServiceControl.Contracts.MessageFailures;
@@ -14,7 +17,9 @@ public class FailedMessageViewIndexNotifications : INeedInitialization, IWantToR
1417
IBus bus;
1518
IDocumentStore store;
1619
int lastCount;
17-
20+
IDisposable subscription;
21+
ILog logging = LogManager.GetLogger(typeof(FailedMessageViewIndexNotifications));
22+
1823
public FailedMessageViewIndexNotifications()
1924
{
2025
// Need this because INeedInitialization does not use DI instead use Activator.CreateInstance
@@ -33,7 +38,7 @@ public void OnNext(IndexChangeNotification value)
3338

3439
public void OnError(Exception error)
3540
{
36-
//Ignore
41+
//Ignore
3742
}
3843

3944
public void OnCompleted()
@@ -43,13 +48,23 @@ public void OnCompleted()
4348

4449
void UpdatedCount()
4550
{
46-
using (var session = store.OpenSession())
51+
try
4752
{
48-
var failedMessageCount = session.Query<FailedMessage, FailedMessageViewIndex>().Count(p => p.Status == FailedMessageStatus.Unresolved);
49-
if (lastCount == failedMessageCount)
50-
return;
51-
lastCount = failedMessageCount;
52-
bus.Publish(new MessageFailuresUpdated { Total = failedMessageCount });
53+
using (var session = store.OpenSession())
54+
{
55+
var failedMessageCount = session.Query<FailedMessage, FailedMessageViewIndex>().Count(p => p.Status == FailedMessageStatus.Unresolved);
56+
if (lastCount == failedMessageCount)
57+
return;
58+
lastCount = failedMessageCount;
59+
bus.Publish(new MessageFailuresUpdated
60+
{
61+
Total = failedMessageCount
62+
});
63+
}
64+
}
65+
catch(Exception ex)
66+
{
67+
logging.WarnFormat("Failed to emit MessageFailuresUpdated - {0}", ex);
5368
}
5469
}
5570

@@ -60,12 +75,12 @@ public void Init()
6075

6176
public void Start()
6277
{
63-
store.Changes().ForIndex("FailedMessageViewIndex").Subscribe(this);
78+
subscription = store.Changes().ForIndex("FailedMessageViewIndex").SubscribeOn(Scheduler.Default).Subscribe(this);
6479
}
6580

6681
public void Stop()
6782
{
68-
//Ignore
83+
subscription.Dispose();
6984
}
7085
}
7186
}

src/ServiceControl/ServiceControl.csproj

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,18 @@
147147
<Reference Include="System.Data" />
148148
<Reference Include="System.Data.Services.Client" />
149149
<Reference Include="System.Messaging" />
150+
<Reference Include="System.Reactive.Core">
151+
<HintPath>..\packages\Rx-Core.2.2.5\lib\net45\System.Reactive.Core.dll</HintPath>
152+
</Reference>
153+
<Reference Include="System.Reactive.Interfaces">
154+
<HintPath>..\packages\Rx-Interfaces.2.2.5\lib\net45\System.Reactive.Interfaces.dll</HintPath>
155+
</Reference>
156+
<Reference Include="System.Reactive.Linq">
157+
<HintPath>..\packages\Rx-Linq.2.2.5\lib\net45\System.Reactive.Linq.dll</HintPath>
158+
</Reference>
159+
<Reference Include="System.Reactive.PlatformServices">
160+
<HintPath>..\packages\Rx-PlatformServices.2.2.5\lib\net45\System.Reactive.PlatformServices.dll</HintPath>
161+
</Reference>
150162
<Reference Include="System.Security" />
151163
<Reference Include="System.ServiceProcess" />
152164
<Reference Include="System.Spatial, Version=5.6.3.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35, processorArchitecture=MSIL">

src/ServiceControl/packages.config

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,11 @@
2626
<package id="RavenDB.Client" version="2.5.2939" targetFramework="net45" />
2727
<package id="RavenDB.Database" version="2.5.2939" targetFramework="net45" />
2828
<package id="RavenDB.Embedded" version="2.5.2939" targetFramework="net45" />
29+
<package id="Rx-Core" version="2.2.5" targetFramework="net45" />
30+
<package id="Rx-Interfaces" version="2.2.5" targetFramework="net45" />
31+
<package id="Rx-Linq" version="2.2.5" targetFramework="net45" />
32+
<package id="Rx-Main" version="2.2.5" targetFramework="net45" />
33+
<package id="Rx-PlatformServices" version="2.2.5" targetFramework="net45" />
2934
<package id="ServiceControl.Contracts" version="1.0.0" targetFramework="net45" />
3035
<package id="SharpZipLib" version="0.86.0" targetFramework="net45" />
3136
<package id="System.Spatial" version="5.6.3" targetFramework="net45" />

0 commit comments

Comments
 (0)