11using System . Reactive . Subjects ;
22using Microsoft . Extensions . Logging ;
3+ using Microsoft . Extensions . Logging . Abstractions ;
34using NetDaemon . HassModel . Internal ;
45using NetDaemon . HassModel . Tests . TestHelpers ;
56
67namespace NetDaemon . HassModel . Tests . Internal ;
78
8- public class QueuedObservabeTest
9+ public class QueuedObservableTest
910{
1011 [ Fact ]
11- public async Task EventsSouldbeforwarded ( )
12+ public async Task OnNextShouldBeForwarded ( )
1213 {
1314 var source = new Subject < int > ( ) ;
1415
15- var queue = new QueuedObservable < int > ( Mock . Of < ILogger < IHaContext > > ( ) ) ;
16- queue . Initialize ( source ) ;
16+ var queue = new QueuedObservable < int > ( source , NullLogger . Instance ) ;
1717
1818 var subscriber = new Mock < IObserver < int > > ( ) ;
1919 queue . Subscribe ( subscriber . Object ) ;
@@ -27,20 +27,48 @@ public async Task EventsSouldbeforwarded()
2727 subscriber . Verify ( s => s . OnNext ( 3 ) , Times . Once ) ;
2828 }
2929
30+ [ Fact ]
31+ public async Task OnErrorShouldBeForwarded ( )
32+ {
33+ var source = new Subject < int > ( ) ;
34+ var queue = new QueuedObservable < int > ( source , NullLogger . Instance ) ;
35+ var subscriber = new Mock < IObserver < int > > ( ) ;
36+ queue . Subscribe ( subscriber . Object ) ;
37+
38+ var exception = new Exception ( "TestException" ) ;
39+ source . OnError ( exception ) ;
40+
41+ await queue . DisposeAsync ( ) ;
42+ subscriber . Verify ( s => s . OnError ( exception ) ) ;
43+ }
44+
45+ [ Fact ]
46+ public async Task ? OnCompletedShouldBeForwarded ( )
47+ {
48+ var source = new Subject < int > ( ) ;
49+ var queue = new QueuedObservable < int > ( source , NullLogger . Instance ) ;
50+ var subscriber = new Mock < IObserver < int > > ( ) ;
51+
52+ queue . Subscribe ( subscriber . Object ) ;
53+
54+ source . OnCompleted ( ) ;
55+
56+ await queue . DisposeAsync ( ) ;
57+ subscriber . Verify ( s => s . OnCompleted ( ) ) ;
58+ }
59+
3060 [ Fact ]
3161 public async Task SubscribersShouldNotBlockEachOther ( )
3262 {
3363 var source = new Subject < int > ( ) ;
3464
35- var queue1 = new QueuedObservable < int > ( Mock . Of < ILogger < IHaContext > > ( ) ) ;
36- queue1 . Initialize ( source ) ;
65+ var queue1 = new QueuedObservable < int > ( source , NullLogger . Instance ) ;
3766
3867 var subscriber = new Mock < IObserver < int > > ( ) ;
3968 queue1 . Subscribe ( subscriber . Object ) ;
4069
4170 // The second subscriber will block the first event
42- var queue2 = new QueuedObservable < int > ( Mock . Of < ILogger < IHaContext > > ( ) ) ;
43- queue2 . Initialize ( source ) ;
71+ var queue2 = new QueuedObservable < int > ( source , NullLogger . Instance ) ;
4472 var blockSubscribers = new ManualResetEvent ( false ) ;
4573 queue2 . Subscribe ( _ => blockSubscribers . WaitOne ( ) ) ;
4674
@@ -62,12 +90,9 @@ public async Task SubscribersShouldNotBlockEachOther()
6290 public async Task WhenScopeIsDisposedSubscribersAreDetached ( )
6391 {
6492 var testSubject = new Subject < string > ( ) ;
65- var loggerMock = new Mock < ILogger < IHaContext > > ( ) ;
6693 // Create 2 ScopedObservables for the same subject
67- var scoped1 = new QueuedObservable < string > ( loggerMock . Object ) ;
68- scoped1 . Initialize ( testSubject ) ;
69- var scoped2 = new QueuedObservable < string > ( loggerMock . Object ) ;
70- scoped2 . Initialize ( testSubject ) ;
94+ var scoped1 = new QueuedObservable < string > ( testSubject , NullLogger . Instance ) ;
95+ var scoped2 = new QueuedObservable < string > ( testSubject , NullLogger . Instance ) ;
7196
7297 // First scope has 2 subscribers, second has 1
7398 var scope1AObserverMock = new Mock < IObserver < string > > ( ) ;
@@ -78,11 +103,11 @@ public async Task WhenScopeIsDisposedSubscribersAreDetached()
78103 var scope2ObserverMock = new Mock < IObserver < string > > ( ) ;
79104 scoped2 . Subscribe ( scope2ObserverMock . Object ) ;
80105
81- var waitTasks = new Task [ ]
106+ var waitTasks = new [ ]
82107 {
83- scope1AObserverMock . WaitForInvocationAndVerify ( o => o . OnNext ( "Event1" ) ) ,
84- scope1BObserverMock . WaitForInvocationAndVerify ( o => o . OnNext ( "Event1" ) ) ,
85- scope2ObserverMock . WaitForInvocationAndVerify ( o => o . OnNext ( "Event1" ) )
108+ scope1AObserverMock . WaitForInvocationAndVerify ( o => o . OnNext ( "Event1" ) ) ,
109+ scope1BObserverMock . WaitForInvocationAndVerify ( o => o . OnNext ( "Event1" ) ) ,
110+ scope2ObserverMock . WaitForInvocationAndVerify ( o => o . OnNext ( "Event1" ) )
86111 } ;
87112
88113 // Now start firing events
@@ -109,8 +134,7 @@ public async Task WhenScopeIsDisposedSubscribersAreDetached()
109134 public async Task TestQueuedObservableShouldHaveFinishedTasksOnDispose ( )
110135 {
111136 var source = new Subject < int > ( ) ;
112- var queue = new QueuedObservable < int > ( Mock . Of < ILogger < IHaContext > > ( ) ) ;
113- queue . Initialize ( source ) ;
137+ var queue = new QueuedObservable < int > ( source , NullLogger . Instance ) ;
114138 var subscriber = new Mock < IObserver < int > > ( ) ;
115139 queue . Subscribe ( subscriber . Object ) ;
116140
@@ -129,8 +153,7 @@ public async Task TestQueuedObservableShouldLogOnException()
129153 {
130154 var source = new Subject < int > ( ) ;
131155 var loggerMock = new Mock < ILogger < IHaContext > > ( ) ;
132- var queue = new QueuedObservable < int > ( loggerMock . Object ) ;
133- queue . Initialize ( source ) ;
156+ var queue = new QueuedObservable < int > ( source , loggerMock . Object ) ;
134157 var subscriber = new Mock < IObserver < int > > ( ) ;
135158 subscriber . Setup ( n => n . OnNext ( 1 ) ) . Throws < InvalidOperationException > ( ) ;
136159 queue . Subscribe ( subscriber . Object ) ;
@@ -152,9 +175,7 @@ public async Task TestQueuedObservableShouldLogOnException()
152175 public async Task TestQueuedObservableShouldStillHaveSubscribersOnException ( )
153176 {
154177 var source = new Subject < int > ( ) ;
155- var loggerMock = new Mock < ILogger < IHaContext > > ( ) ;
156- var queue = new QueuedObservable < int > ( loggerMock . Object ) ;
157- queue . Initialize ( source ) ;
178+ var queue = new QueuedObservable < int > ( source , NullLogger . Instance ) ;
158179 var subscriber = new Mock < IObserver < int > > ( ) ;
159180 subscriber . Setup ( n => n . OnNext ( 1 ) ) . Throws < InvalidOperationException > ( ) ;
160181 queue . Subscribe ( subscriber . Object ) ;
0 commit comments