11using System ;
22using System . Collections . Generic ;
3- using System . Diagnostics ;
3+ using System . Linq ;
44using System . Reactive ;
55using System . Reactive . Concurrency ;
6+ using System . Reactive . Disposables ;
67using System . Reactive . Linq ;
7- using System . Threading ;
88using Microsoft . Reactive . Testing ;
99
1010namespace GraphQL . Client . Tests . Common . FluentAssertions . Reactive
@@ -16,7 +16,7 @@ namespace GraphQL.Client.Tests.Common.FluentAssertions.Reactive
1616 public class FluentTestObserver < TPayload > : IObserver < TPayload > , IDisposable
1717 {
1818 private readonly IDisposable _subscription ;
19- private readonly EventLoopScheduler _observeScheduler = new EventLoopScheduler ( ) ;
19+ private readonly IScheduler _observeScheduler ;
2020 private readonly RollingReplaySubject < Recorded < Notification < TPayload > > > _rollingReplaySubject = new RollingReplaySubject < Recorded < Notification < TPayload > > > ( ) ;
2121
2222 /// <summary>
@@ -33,13 +33,29 @@ public class FluentTestObserver<TPayload> : IObserver<TPayload>, IDisposable
3333 /// The recorded <see cref="Notification{T}"/>s
3434 /// </summary>
3535 public IEnumerable < Recorded < Notification < TPayload > > > RecordedNotifications =>
36- _rollingReplaySubject . Snapshot ( ) ;
36+ _rollingReplaySubject . GetSnapshot ( ) ;
3737
3838 /// <summary>
3939 /// The recorded messages
4040 /// </summary>
4141 public IEnumerable < TPayload > RecordedMessages =>
4242 RecordedNotifications . GetMessages ( ) ;
43+
44+ /// <summary>
45+ /// The exception
46+ /// </summary>
47+ public Exception Error =>
48+ RecordedNotifications
49+ . Where ( r => r . Value . Kind == NotificationKind . OnError )
50+ . Select ( r => r . Value . Exception )
51+ . FirstOrDefault ( ) ;
52+
53+ /// <summary>
54+ /// The recorded messages
55+ /// </summary>
56+ public bool Completed =>
57+ RecordedNotifications
58+ . Any ( r => r . Value . Kind == NotificationKind . OnCompleted ) ;
4359
4460 /// <summary>
4561 /// Creates a new <see cref="FluentTestObserver{TPayload}"/> which subscribes to the supplied <see cref="IObservable{T}"/>
@@ -48,10 +64,30 @@ public class FluentTestObserver<TPayload> : IObserver<TPayload>, IDisposable
4864 public FluentTestObserver ( IObservable < TPayload > subject )
4965 {
5066 Subject = subject ;
51- _observeScheduler . Schedule ( ( ) =>
52- Debug . WriteLine ( $ "Observe scheduler thread id: { Thread . CurrentThread . ManagedThreadId } ") ) ;
67+ _observeScheduler = new EventLoopScheduler ( ) ;
68+ _subscription = new CompositeDisposable ( ) ; subject . ObserveOn ( _observeScheduler ) . Subscribe ( this ) ;
69+ }
70+
71+ /// <summary>
72+ /// Creates a new <see cref="FluentTestObserver{TPayload}"/> which subscribes to the supplied <see cref="IObservable{T}"/>
73+ /// </summary>
74+ /// <param name="subject">the <see cref="IObservable{T}"/> under test</param>
75+ public FluentTestObserver ( IObservable < TPayload > subject , IScheduler scheduler )
76+ {
77+ Subject = subject ;
78+ _observeScheduler = scheduler ;
79+ _subscription = subject . ObserveOn ( scheduler ) . Subscribe ( this ) ;
80+ }
5381
54- _subscription = subject . ObserveOn ( _observeScheduler ) . Subscribe ( this ) ;
82+ /// <summary>
83+ /// Creates a new <see cref="FluentTestObserver{TPayload}"/> which subscribes to the supplied <see cref="IObservable{T}"/>
84+ /// </summary>
85+ /// <param name="subject">the <see cref="IObservable{T}"/> under test</param>
86+ public FluentTestObserver ( IObservable < TPayload > subject , TestScheduler testScheduler )
87+ {
88+ Subject = subject ;
89+ _observeScheduler = testScheduler ;
90+ _subscription = subject . ObserveOn ( Scheduler . CurrentThread ) . Subscribe ( this ) ;
5591 }
5692
5793 /// <summary>
@@ -60,8 +96,11 @@ public FluentTestObserver(IObservable<TPayload> subject)
6096 public void Clear ( ) => _rollingReplaySubject . Clear ( ) ;
6197
6298 /// <inheritdoc />
63- public void OnNext ( TPayload value ) =>
64- _rollingReplaySubject . OnNext ( new Recorded < Notification < TPayload > > ( _observeScheduler . Now . UtcTicks , Notification . CreateOnNext ( value ) ) ) ;
99+ public void OnNext ( TPayload value )
100+ {
101+ _rollingReplaySubject . OnNext (
102+ new Recorded < Notification < TPayload > > ( _observeScheduler . Now . UtcTicks , Notification . CreateOnNext ( value ) ) ) ;
103+ }
65104
66105 /// <inheritdoc />
67106 public void OnError ( Exception exception ) =>
@@ -75,14 +114,13 @@ public void OnCompleted() =>
75114 public void Dispose ( )
76115 {
77116 _subscription ? . Dispose ( ) ;
78- _observeScheduler . Dispose ( ) ;
79117 _rollingReplaySubject ? . Dispose ( ) ;
80118 }
81119
82120 /// <summary>
83- /// Returns an <see cref="ObservableAssertions {TPayload}"/> object that can be used to assert the observed <see cref="IObservable{T}"/>
121+ /// Returns an <see cref="ReactiveAssertions {TPayload}"/> object that can be used to assert the observed <see cref="IObservable{T}"/>
84122 /// </summary>
85123 /// <returns></returns>
86- public ObservableAssertions < TPayload > Should ( ) => new ObservableAssertions < TPayload > ( this ) ;
124+ public ReactiveAssertions < TPayload > Should ( ) => new ReactiveAssertions < TPayload > ( this ) ;
87125 }
88126}
0 commit comments