From 6aa98873b07c155b908a527b261c648a8297a93a Mon Sep 17 00:00:00 2001 From: Glenn Watson <5834289+glennawatson@users.noreply.github.com> Date: Tue, 2 Jun 2026 23:28:54 +1000 Subject: [PATCH 1/4] feat(LinqMixins): add fused BlendUnique operator (merge + distinct-until-changed) Folds a concurrent merge of a fixed set of sources and distinct-until-changed into a single sink, avoiding the extra subscription hop and allocation of sources.Blend().Unique(). Forwards the first source error and completes once every source has completed; an optional comparer suppresses duplicates. --- .../SignalOperatorMixins.BlendUnique.cs | 213 ++++++++++++++++++ ...valTests.Primitives.DotNet9_0.verified.txt | 2 + .../BlendUniqueTests.cs | 118 ++++++++++ 3 files changed, 333 insertions(+) create mode 100644 src/ReactiveUI.Primitives/SignalOperatorMixins.BlendUnique.cs create mode 100644 src/tests/ReactiveUI.Primitives.Tests/BlendUniqueTests.cs diff --git a/src/ReactiveUI.Primitives/SignalOperatorMixins.BlendUnique.cs b/src/ReactiveUI.Primitives/SignalOperatorMixins.BlendUnique.cs new file mode 100644 index 0000000..a1eaf2a --- /dev/null +++ b/src/ReactiveUI.Primitives/SignalOperatorMixins.BlendUnique.cs @@ -0,0 +1,213 @@ +// Copyright (c) 2019-2026 ReactiveUI Association Incorporated. All rights reserved. +// ReactiveUI Association Incorporated licenses this file to you under the MIT license. +// See the LICENSE file in the project root for full license information. + +using ReactiveUI.Primitives.Disposables; + +namespace ReactiveUI.Primitives; + +/// +/// Fused Blend + Unique operator: concurrently merges a fixed set of sources and forwards a value +/// only when it differs from the previously forwarded one. Folding the merge and distinct-until-changed into a +/// single sink avoids the extra subscription hop and allocation of sources.Blend().Unique(). +/// +public static partial class LinqMixins +{ + /// + /// Concurrently merges the supplied sources and forwards only values that differ from the previously + /// forwarded value, using the default equality comparer. Errors are forwarded from the first failing source; + /// completion is signalled once every source has completed. + /// + /// The element type. + /// The sources to merge. + /// An observable of the distinct merged values. + public static IObservable BlendUnique(params IObservable[] sources) => + BlendUnique(sources, comparer: null); + + /// + /// Concurrently merges the supplied sources and forwards only values that differ from the previously + /// forwarded value, using the supplied comparer (or the default when ). + /// + /// The element type. + /// The sources to merge. + /// The equality comparer used to suppress duplicates, or for the default. + /// An observable of the distinct merged values. + public static IObservable BlendUnique(IObservable[] sources, IEqualityComparer? comparer) + { + if (sources == null) + { + throw new ArgumentNullException(nameof(sources)); + } + + return new BlendUniqueSignal(sources, comparer ?? EqualityComparer.Default); + } + + /// A fused merge + distinct-until-changed observable over a fixed set of sources. + /// The element type. + private sealed class BlendUniqueSignal : IObservable + { + /// The sources to merge. + private readonly IObservable[] _sources; + + /// The equality comparer used to suppress duplicates. + private readonly IEqualityComparer _comparer; + + /// Initializes a new instance of the class. + /// The sources to merge. + /// The equality comparer used to suppress duplicates. + internal BlendUniqueSignal(IObservable[] sources, IEqualityComparer comparer) + { + _sources = sources; + _comparer = comparer; + } + + /// + public IDisposable Subscribe(IObserver observer) + { + if (observer == null) + { + throw new ArgumentNullException(nameof(observer)); + } + + var sink = new BlendUniqueSink(observer, _comparer); + sink.Run(_sources); + return sink; + } + } + + /// Forwards distinct merged values downstream and tears down every source on dispose. + /// The element type. + private sealed class BlendUniqueSink : IDisposable + { + /// Serializes value forwarding and guards the distinct/completion state. + private readonly Lock _gate = new(); + + /// The per-source subscriptions, torn down on dispose. + private readonly MultipleDisposable _pocket = new(); + + /// The downstream observer. + private readonly IObserver _downstream; + + /// The equality comparer used to suppress duplicates. + private readonly IEqualityComparer _comparer; + + /// The most recently forwarded value (valid only once is set). + private T _last = default!; + + /// Whether a value has been forwarded yet. + private bool _hasLast; + + /// The number of sources that have not yet completed. + private int _active; + + /// Whether a terminal notification has been emitted. + private bool _done; + + /// Initializes a new instance of the class. + /// The downstream observer. + /// The equality comparer used to suppress duplicates. + internal BlendUniqueSink(IObserver downstream, IEqualityComparer comparer) + { + _downstream = downstream; + _comparer = comparer; + } + + /// Subscribes to every merged source. + /// The sources to merge. + public void Run(IObservable[] sources) + { + lock (_gate) + { + _active = sources.Length; + } + + if (sources.Length == 0) + { + Complete(); + return; + } + + for (var i = 0; i < sources.Length; i++) + { + var source = sources[i]; + if (source == null) + { + ForwardError(new InvalidOperationException("BlendUnique source contained null.")); + return; + } + + _pocket.Add(source.Subscribe(new Element(this))); + } + } + + /// + public void Dispose() => _pocket.Dispose(); + + /// Forwards a value when it differs from the previously forwarded one. + /// The merged value. + private void Forward(T value) + { + lock (_gate) + { + if (_done) + { + return; + } + + if (_hasLast && _comparer.Equals(_last, value)) + { + return; + } + + _last = value; + _hasLast = true; + _downstream.OnNext(value); + } + } + + /// Forwards the first terminal error and suppresses later notifications. + /// The error. + private void ForwardError(Exception error) + { + lock (_gate) + { + if (_done) + { + return; + } + + _done = true; + _downstream.OnError(error); + } + } + + /// Decrements the active count and completes once every source has completed. + private void Complete() + { + lock (_gate) + { + if (_done || --_active > 0) + { + return; + } + + _done = true; + _downstream.OnCompleted(); + } + } + + /// Observes a single merged source. + /// The owning sink. + private sealed class Element(BlendUniqueSink parent) : IObserver + { + /// + public void OnNext(T value) => parent.Forward(value); + + /// + public void OnError(Exception error) => parent.ForwardError(error); + + /// + public void OnCompleted() => parent.Complete(); + } + } +} diff --git a/src/tests/ReactiveUI.Primitives.Tests/ApiApprovalTests.Primitives.DotNet9_0.verified.txt b/src/tests/ReactiveUI.Primitives.Tests/ApiApprovalTests.Primitives.DotNet9_0.verified.txt index bed1b27..73f5ce0 100644 --- a/src/tests/ReactiveUI.Primitives.Tests/ApiApprovalTests.Primitives.DotNet9_0.verified.txt +++ b/src/tests/ReactiveUI.Primitives.Tests/ApiApprovalTests.Primitives.DotNet9_0.verified.txt @@ -189,6 +189,8 @@ namespace ReactiveUI.Primitives public static System.IObservable AsObservable(this System.IObservable source) { } public static System.IObservable Bind(this System.IObservable source, System.Func> selector) { } public static System.IObservable Blend(this System.IObservable> sources) { } + public static System.IObservable BlendUnique(params System.IObservable[] sources) { } + public static System.IObservable BlendUnique(System.IObservable[] sources, System.Collections.Generic.IEqualityComparer? comparer) { } public static System.IObservable> Buffer(this System.IObservable source, int count) { } public static System.IObservable> Buffer(this System.IObservable source, int count, int skip) { } public static System.IObservable Calm(this System.IObservable source, System.TimeSpan dueTime) { } diff --git a/src/tests/ReactiveUI.Primitives.Tests/BlendUniqueTests.cs b/src/tests/ReactiveUI.Primitives.Tests/BlendUniqueTests.cs new file mode 100644 index 0000000..b4288d1 --- /dev/null +++ b/src/tests/ReactiveUI.Primitives.Tests/BlendUniqueTests.cs @@ -0,0 +1,118 @@ +// Copyright (c) 2019-2026 ReactiveUI Association Incorporated. All rights reserved. +// ReactiveUI Association Incorporated licenses this file to you under the MIT license. +// See the LICENSE file in the project root for full license information. + +using ReactiveUI.Primitives.Signals; + +namespace ReactiveUI.Primitives.Tests; + +/// +/// Tests for the fused operator +/// (merge + distinct-until-changed in a single sink). +/// +public class BlendUniqueTests +{ + /// The value one. + private const int One = 1; + + /// The value two. + private const int Two = 2; + + /// The value three. + private const int Three = 3; + + /// The expected single-occurrence count. + private const int Once = 1; + + /// First source: a value, an immediate duplicate, then a second value. + private static readonly int[] _firstDuplicatedThenSecond = [One, One, Two]; + + /// Second source: a value equal to the first source's last, then a value duplicated. + private static readonly int[] _secondThenThirdDuplicated = [Two, Three, Three]; + + /// Expected distinct-until-changed merge of the two sources. + private static readonly int[] _distinctMerged = [One, Two, Three]; + + /// A single-value source. + private static readonly int[] _single = [One]; + + /// Case-varied values for the comparer test. + private static readonly string[] _caseVariants = ["a", "A", "B"]; + + /// Expected case-insensitive distinct result. + private static readonly string[] _distinctCaseInsensitive = ["a", "B"]; + + /// + /// Verifies that the merged stream forwards only values that differ from the previously forwarded one + /// and completes once every source has completed. + /// + [Test] + public void MergesSourcesAndSuppressesConsecutiveDuplicates() + { + var values = new List(); + var completed = 0; + + // source0 emits 1,1,2 (-> 1,2) then source1 emits 2,3,3 (2 == last, dropped -> 3). + LinqMixins.BlendUnique( + Signal.FromEnumerable(_firstDuplicatedThenSecond), + Signal.FromEnumerable(_secondThenThirdDuplicated)) + .Subscribe(values.Add, ex => throw ex, () => completed++); + + Assert.Equal(_distinctMerged, values); + Assert.Equal(Once, completed); + } + + /// Verifies that an empty source set completes immediately with no values. + [Test] + public void EmptySourcesCompletesImmediately() + { + var values = new List(); + var completed = 0; + + LinqMixins.BlendUnique() + .Subscribe(values.Add, ex => throw ex, () => completed++); + + Assert.Equal(0, values.Count); + Assert.Equal(Once, completed); + } + + /// Verifies that a custom comparer is used to suppress duplicates. + [Test] + public void UsesSuppliedComparer() + { + var values = new List(); + + // Case-insensitive: "a","A" collapse; "B" forwarded. + LinqMixins.BlendUnique( + [Signal.FromEnumerable(_caseVariants)], + StringComparer.OrdinalIgnoreCase) + .Subscribe(values.Add); + + Assert.Equal(_distinctCaseInsensitive, values); + } + + /// Verifies that the first source error terminates the merged stream. + [Test] + public void ForwardsFirstSourceError() + { + var values = new List(); + Exception? error = null; + + LinqMixins.BlendUnique( + Signal.FromEnumerable(_single), + Signal.Fail(new InvalidOperationException("boom"))) + .Subscribe(values.Add, ex => error = ex, () => { }); + + Assert.Equal(_single, values); + Assert.NotNull(error); + Assert.True(error is InvalidOperationException); + } + + /// Verifies argument validation for the sources array and the observer. + [Test] + public void NullArgumentsThrow() + { + Assert.Throws(() => LinqMixins.BlendUnique(null!, comparer: null)); + Assert.Throws(() => LinqMixins.BlendUnique(Signal.FromEnumerable(_single)).Subscribe(null!)); + } +} From 40351725899f4b8974248cc00421782d787ea475 Mon Sep 17 00:00:00 2001 From: Glenn Watson <5834289+glennawatson@users.noreply.github.com> Date: Wed, 3 Jun 2026 01:33:07 +1000 Subject: [PATCH 2/4] Address review: validate null source elements eagerly, fix empty-source completion, add net8/net10 API snapshots - BlendUnique now throws ArgumentNullException for a null source element at call time (matching the repo's params-factory convention) instead of erroring at subscription; drops the now-redundant in-sink null check. - Empty-source path completes directly instead of decrementing the active counter below zero. - Add a null-source-element test case. - Record the new BlendUnique API surface in the net8.0 and net10.0 approval snapshots (only net9.0 was updated). --- .../SignalOperatorMixins.BlendUnique.cs | 38 ++++++++++++------- ...alTests.Primitives.DotNet10_0.verified.txt | 2 + ...valTests.Primitives.DotNet8_0.verified.txt | 2 + .../BlendUniqueTests.cs | 3 +- 4 files changed, 31 insertions(+), 14 deletions(-) diff --git a/src/ReactiveUI.Primitives/SignalOperatorMixins.BlendUnique.cs b/src/ReactiveUI.Primitives/SignalOperatorMixins.BlendUnique.cs index a1eaf2a..db8b7c8 100644 --- a/src/ReactiveUI.Primitives/SignalOperatorMixins.BlendUnique.cs +++ b/src/ReactiveUI.Primitives/SignalOperatorMixins.BlendUnique.cs @@ -39,6 +39,14 @@ public static IObservable BlendUnique(IObservable[] sources, IEqualityC throw new ArgumentNullException(nameof(sources)); } + for (var i = 0; i < sources.Length; i++) + { + if (sources[i] == null) + { + throw new ArgumentNullException(nameof(sources)); + } + } + return new BlendUniqueSignal(sources, comparer ?? EqualityComparer.Default); } @@ -116,27 +124,31 @@ internal BlendUniqueSink(IObserver downstream, IEqualityComparer comparer) /// The sources to merge. public void Run(IObservable[] sources) { - lock (_gate) + // Sources and their elements are validated eagerly by the public entry point, so no null check here. + if (sources.Length == 0) { - _active = sources.Length; + lock (_gate) + { + if (_done) + { + return; + } + + _done = true; + } + + _downstream.OnCompleted(); + return; } - if (sources.Length == 0) + lock (_gate) { - Complete(); - return; + _active = sources.Length; } for (var i = 0; i < sources.Length; i++) { - var source = sources[i]; - if (source == null) - { - ForwardError(new InvalidOperationException("BlendUnique source contained null.")); - return; - } - - _pocket.Add(source.Subscribe(new Element(this))); + _pocket.Add(sources[i].Subscribe(new Element(this))); } } diff --git a/src/tests/ReactiveUI.Primitives.Tests/ApiApprovalTests.Primitives.DotNet10_0.verified.txt b/src/tests/ReactiveUI.Primitives.Tests/ApiApprovalTests.Primitives.DotNet10_0.verified.txt index bed1b27..73f5ce0 100644 --- a/src/tests/ReactiveUI.Primitives.Tests/ApiApprovalTests.Primitives.DotNet10_0.verified.txt +++ b/src/tests/ReactiveUI.Primitives.Tests/ApiApprovalTests.Primitives.DotNet10_0.verified.txt @@ -189,6 +189,8 @@ namespace ReactiveUI.Primitives public static System.IObservable AsObservable(this System.IObservable source) { } public static System.IObservable Bind(this System.IObservable source, System.Func> selector) { } public static System.IObservable Blend(this System.IObservable> sources) { } + public static System.IObservable BlendUnique(params System.IObservable[] sources) { } + public static System.IObservable BlendUnique(System.IObservable[] sources, System.Collections.Generic.IEqualityComparer? comparer) { } public static System.IObservable> Buffer(this System.IObservable source, int count) { } public static System.IObservable> Buffer(this System.IObservable source, int count, int skip) { } public static System.IObservable Calm(this System.IObservable source, System.TimeSpan dueTime) { } diff --git a/src/tests/ReactiveUI.Primitives.Tests/ApiApprovalTests.Primitives.DotNet8_0.verified.txt b/src/tests/ReactiveUI.Primitives.Tests/ApiApprovalTests.Primitives.DotNet8_0.verified.txt index bed1b27..73f5ce0 100644 --- a/src/tests/ReactiveUI.Primitives.Tests/ApiApprovalTests.Primitives.DotNet8_0.verified.txt +++ b/src/tests/ReactiveUI.Primitives.Tests/ApiApprovalTests.Primitives.DotNet8_0.verified.txt @@ -189,6 +189,8 @@ namespace ReactiveUI.Primitives public static System.IObservable AsObservable(this System.IObservable source) { } public static System.IObservable Bind(this System.IObservable source, System.Func> selector) { } public static System.IObservable Blend(this System.IObservable> sources) { } + public static System.IObservable BlendUnique(params System.IObservable[] sources) { } + public static System.IObservable BlendUnique(System.IObservable[] sources, System.Collections.Generic.IEqualityComparer? comparer) { } public static System.IObservable> Buffer(this System.IObservable source, int count) { } public static System.IObservable> Buffer(this System.IObservable source, int count, int skip) { } public static System.IObservable Calm(this System.IObservable source, System.TimeSpan dueTime) { } diff --git a/src/tests/ReactiveUI.Primitives.Tests/BlendUniqueTests.cs b/src/tests/ReactiveUI.Primitives.Tests/BlendUniqueTests.cs index b4288d1..959f379 100644 --- a/src/tests/ReactiveUI.Primitives.Tests/BlendUniqueTests.cs +++ b/src/tests/ReactiveUI.Primitives.Tests/BlendUniqueTests.cs @@ -108,11 +108,12 @@ public void ForwardsFirstSourceError() Assert.True(error is InvalidOperationException); } - /// Verifies argument validation for the sources array and the observer. + /// Verifies argument validation for the sources array, a null source element, and the observer. [Test] public void NullArgumentsThrow() { Assert.Throws(() => LinqMixins.BlendUnique(null!, comparer: null)); + Assert.Throws(() => LinqMixins.BlendUnique(Signal.FromEnumerable(_single), null!)); Assert.Throws(() => LinqMixins.BlendUnique(Signal.FromEnumerable(_single)).Subscribe(null!)); } } From 21bef80c028e447dea820b79dbf9485b97a7b7ff Mon Sep 17 00:00:00 2001 From: Glenn Watson <5834289+glennawatson@users.noreply.github.com> Date: Wed, 3 Jun 2026 01:50:45 +1000 Subject: [PATCH 3/4] test: cover BlendUnique dispose + post-terminal suppression paths Adds DisposeUnsubscribesFromSources and SuppressesNotificationsAfterTerminalError (value/completion/error all suppressed once a source has errored, via a third source) and drops the unreachable done-check in the empty-source path, bringing BlendUnique to full line coverage. --- .../SignalOperatorMixins.BlendUnique.cs | 8 +--- .../BlendUniqueTests.cs | 40 +++++++++++++++++++ 2 files changed, 42 insertions(+), 6 deletions(-) diff --git a/src/ReactiveUI.Primitives/SignalOperatorMixins.BlendUnique.cs b/src/ReactiveUI.Primitives/SignalOperatorMixins.BlendUnique.cs index db8b7c8..d14edcd 100644 --- a/src/ReactiveUI.Primitives/SignalOperatorMixins.BlendUnique.cs +++ b/src/ReactiveUI.Primitives/SignalOperatorMixins.BlendUnique.cs @@ -127,17 +127,13 @@ public void Run(IObservable[] sources) // Sources and their elements are validated eagerly by the public entry point, so no null check here. if (sources.Length == 0) { + // Runs once during subscription before any source can notify, so no _done check is needed. lock (_gate) { - if (_done) - { - return; - } - _done = true; + _downstream.OnCompleted(); } - _downstream.OnCompleted(); return; } diff --git a/src/tests/ReactiveUI.Primitives.Tests/BlendUniqueTests.cs b/src/tests/ReactiveUI.Primitives.Tests/BlendUniqueTests.cs index 959f379..35585a3 100644 --- a/src/tests/ReactiveUI.Primitives.Tests/BlendUniqueTests.cs +++ b/src/tests/ReactiveUI.Primitives.Tests/BlendUniqueTests.cs @@ -108,6 +108,46 @@ public void ForwardsFirstSourceError() Assert.True(error is InvalidOperationException); } + /// Verifies that disposing the subscription tears down every source subscription. + [Test] + public void DisposeUnsubscribesFromSources() + { + var source = new Signal(); + var values = new List(); + + var subscription = LinqMixins.BlendUnique(source).Subscribe(values.Add); + source.OnNext(One); + subscription.Dispose(); + source.OnNext(Two); // no longer subscribed -> ignored + + Assert.Equal(_single, values); + } + + /// Verifies that values, completion, and further errors are suppressed after the first terminal error. + [Test] + public void SuppressesNotificationsAfterTerminalError() + { + var first = new Signal(); + var second = new Signal(); + var third = new Signal(); + var values = new List(); + Exception? error = null; + var completed = 0; + + LinqMixins.BlendUnique(first, second, third) + .Subscribe(values.Add, ex => error = ex, () => completed++); + + first.OnNext(One); // forwarded + second.OnError(new InvalidOperationException("boom")); // terminal + first.OnNext(Two); // value suppressed (done) + first.OnCompleted(); // completion suppressed (done) + third.OnError(new InvalidOperationException("again")); // error suppressed (done) + + Assert.Equal(_single, values); + Assert.NotNull(error); + Assert.Equal(0, completed); + } + /// Verifies argument validation for the sources array, a null source element, and the observer. [Test] public void NullArgumentsThrow() From 4e326b32e4514eda9e4247284ce9a84830f0a651 Mon Sep 17 00:00:00 2001 From: Glenn Watson <5834289+glennawatson@users.noreply.github.com> Date: Wed, 3 Jun 2026 02:14:48 +1000 Subject: [PATCH 4/4] test: stabilize WaitFor* scheduler-overload tests on Windows CI MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The scheduler-routed WaitFor* tests schedule the subscribe on a sequencer and then block the calling thread on done.Wait(timeout). Routing through TaskPoolSequencer means the scheduled subscribe needs a thread-pool thread while the test thread is blocked; under parallel test load on Windows CI's few cores the pool starves and the waits time out spuriously. Route these tests through ImmediateSequencer so the subscribe runs inline before the blocking wait — the non-null-scheduler code path is still exercised, deterministically and without a thread-pool dependency. --- ...bscriptionExtensionsTests.SchedulerOverloads.cs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/tests/ReactiveUI.Primitives.Extensions.Tests/ObservableSubscriptionExtensionsTests.SchedulerOverloads.cs b/src/tests/ReactiveUI.Primitives.Extensions.Tests/ObservableSubscriptionExtensionsTests.SchedulerOverloads.cs index 8a3cc57..d245837 100644 --- a/src/tests/ReactiveUI.Primitives.Extensions.Tests/ObservableSubscriptionExtensionsTests.SchedulerOverloads.cs +++ b/src/tests/ReactiveUI.Primitives.Extensions.Tests/ObservableSubscriptionExtensionsTests.SchedulerOverloads.cs @@ -25,7 +25,7 @@ public partial class ObservableSubscriptionExtensionsTests public async Task WhenWaitForValueWithSchedulerOnly_ThenReturnsEmittedValue() { var result = Observable.Return(SchedulerSentinelValue) - .WaitForValue(TaskPoolSequencer.Default); + .WaitForValue(ImmediateSequencer.Instance); await Assert.That(result).IsEqualTo(SchedulerSentinelValue); } @@ -36,7 +36,7 @@ public async Task WhenWaitForValueWithSchedulerOnly_ThenReturnsEmittedValue() public async Task WhenWaitForValueWithSchedulerAndTimeout_ThenReturnsEmittedValue() { var result = Observable.Return(SchedulerSentinelValue) - .WaitForValue(TaskPoolSequencer.Default, SchedulerWaitTimeout); + .WaitForValue(ImmediateSequencer.Instance, SchedulerWaitTimeout); await Assert.That(result).IsEqualTo(SchedulerSentinelValue); } @@ -47,7 +47,7 @@ public async Task WhenWaitForValueWithSchedulerAndTimeout_ThenReturnsEmittedValu public async Task WhenWaitForValueWithSchedulerTimesOut_ThenTimeoutException() { Action call = () => Observable.Never() - .WaitForValue(TaskPoolSequencer.Default, TimeSpan.FromMilliseconds(50)); + .WaitForValue(ImmediateSequencer.Instance, TimeSpan.FromMilliseconds(50)); var ex = Assert.Throws(call); await Assert.That(ex).IsNotNull(); } @@ -58,7 +58,7 @@ public async Task WhenWaitForValueWithSchedulerTimesOut_ThenTimeoutException() public async Task WhenWaitForCompletionWithSchedulerOnly_ThenReturnsAfterTerminal() { Observable.Return(RxVoid.Default) - .WaitForCompletion(TaskPoolSequencer.Default); + .WaitForCompletion(ImmediateSequencer.Instance); // Sentinel follow-up to give TUnit a real assertion. var sentinel = Observable.Return(SchedulerSentinelValue).SubscribeGetValue(); @@ -71,7 +71,7 @@ public async Task WhenWaitForCompletionWithSchedulerOnly_ThenReturnsAfterTermina public async Task WhenWaitForCompletionWithSchedulerAndTimeout_ThenReturnsAfterTerminal() { Observable.Return(RxVoid.Default) - .WaitForCompletion(TaskPoolSequencer.Default, SchedulerWaitTimeout); + .WaitForCompletion(ImmediateSequencer.Instance, SchedulerWaitTimeout); var sentinel = Observable.Return(SchedulerSentinelValue).SubscribeGetValue(); await Assert.That(sentinel).IsEqualTo(SchedulerSentinelValue); @@ -83,7 +83,7 @@ public async Task WhenWaitForCompletionWithSchedulerAndTimeout_ThenReturnsAfterT public async Task WhenWaitForErrorWithSchedulerOnly_ThenReturnsNullOnNormalCompletion() { var error = Observable.Return(SchedulerSentinelValue) - .WaitForError(TaskPoolSequencer.Default); + .WaitForError(ImmediateSequencer.Instance); await Assert.That(error).IsNull(); } @@ -95,7 +95,7 @@ public async Task WhenWaitForErrorWithSchedulerAndTimeout_ThenCapturesError() { var expected = new InvalidOperationException("scheduler-captured"); var error = Observable.Throw(expected) - .WaitForError(TaskPoolSequencer.Default, SchedulerWaitTimeout); + .WaitForError(ImmediateSequencer.Instance, SchedulerWaitTimeout); await Assert.That(error).IsEqualTo(expected); }