From aa5a5349cfedc86b0df6ad88c8ebaad02e6e9f54 Mon Sep 17 00:00:00 2001 From: Ian Griffiths Date: Wed, 30 Apr 2025 16:06:35 +0100 Subject: [PATCH 1/4] Fix AsyncRx GroupByUntil double OnCompletedAsync bug --- .../Linq/Operators/GroupByUntil.cs | 49 +++++++++++++------ 1 file changed, 34 insertions(+), 15 deletions(-) diff --git a/AsyncRx.NET/System.Reactive.Async/Linq/Operators/GroupByUntil.cs b/AsyncRx.NET/System.Reactive.Async/Linq/Operators/GroupByUntil.cs index 5221045a9..995f0ac5d 100644 --- a/AsyncRx.NET/System.Reactive.Async/Linq/Operators/GroupByUntil.cs +++ b/AsyncRx.NET/System.Reactive.Async/Linq/Operators/GroupByUntil.cs @@ -723,18 +723,7 @@ async ValueTask Expire() { if (key == null) { - var oldNullGroup = default(IAsyncSubject); - - lock (nullGate) - { - oldNullGroup = nullGroup; - nullGroup = null; - } - - if (oldNullGroup != null) - { - await oldNullGroup.OnCompletedAsync().ConfigureAwait(false); - } + await CompleteAndRemoveNullGroupIfPresentAsync(); } else { @@ -778,12 +767,25 @@ async ValueTask Expire() { if (nullGroup != null) { - await nullGroup.OnCompletedAsync().ConfigureAwait(false); + await CompleteAndRemoveNullGroupIfPresentAsync(); } - foreach (var group in groups.Values) + foreach (var key in groups.Keys) { - await group.OnCompletedAsync().ConfigureAwait(false); + // The ConcurrentDictionary's Keys property is a snapshot, so + // although this TryRemove should always succeed for the first + // key in the dictionary (as long as our upstream observable is + // obeying the rules, and not making multiple concurrent calls + // to our observer) each await in this loop offers an opportunity + // for one of the group duration observables to complete, which + // will cause the Expire method above to run, meaning that an + // entry that was present when we retrieved Keys at the start of + // this loop might already have been completed and removed by the + // time this loop reaches it. + if (groups.TryRemove(key, out var group)) + { + await group.OnCompletedAsync().ConfigureAwait(false); + } } using (await gate.LockAsync().ConfigureAwait(false)) @@ -794,6 +796,23 @@ async ValueTask Expire() ), refCount ); + + async ValueTask CompleteAndRemoveNullGroupIfPresentAsync() + { + var oldNullGroup = default(IAsyncSubject); + + lock (nullGate) + { + oldNullGroup = nullGroup; + nullGroup = null; + } + + if (oldNullGroup != null) + { + await oldNullGroup.OnCompletedAsync().ConfigureAwait(false); + } + + } } } } From b879fbf5956a96ce448d520ebe73b63ce8591ab6 Mon Sep 17 00:00:00 2001 From: Ian Griffiths Date: Wed, 30 Apr 2025 16:25:44 +0100 Subject: [PATCH 2/4] Update asyncrx pipeline .NET SDK versions --- azure-pipelines.asyncrx.yml | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/azure-pipelines.asyncrx.yml b/azure-pipelines.asyncrx.yml index 5ff9cbb58..437a55270 100644 --- a/azure-pipelines.asyncrx.yml +++ b/azure-pipelines.asyncrx.yml @@ -33,12 +33,27 @@ stages: DOTNET_SKIP_FIRST_TIME_EXPERIENCE: true steps: + - task: UseDotNet@2 + displayName: Use .NET 8.0.x SDK + inputs: + version: 8.0.x + performMultiLevelLookup: true + + # We need .NET 7.0 and 6.0 to be able to run all tests. + # For .NET 7.0, the runtime package is sufficient because we don't need to build anything. + # That doesn't work for 6.0, because we need the desktop framework, and the only way to + # get that into a build agent seems to be to install the SDK. - task: UseDotNet@2 displayName: Use .NET Core 7.0.x SDK inputs: version: 7.0.x performMultiLevelLookup: true + - task: UseDotNet@2 + displayName: Use .NET 6.0 SDK + inputs: + version: '6.0.x' + - task: DotNetCoreCLI@2 inputs: command: custom From b108795154947c285023e6047159782003ef41a9 Mon Sep 17 00:00:00 2001 From: Ian Griffiths Date: Thu, 1 May 2025 10:44:59 +0100 Subject: [PATCH 3/4] Fix double remove in OnErrorAsync --- .../Linq/Operators/GroupByUntil.cs | 78 ++++++++++--------- 1 file changed, 43 insertions(+), 35 deletions(-) diff --git a/AsyncRx.NET/System.Reactive.Async/Linq/Operators/GroupByUntil.cs b/AsyncRx.NET/System.Reactive.Async/Linq/Operators/GroupByUntil.cs index 995f0ac5d..cc57675c2 100644 --- a/AsyncRx.NET/System.Reactive.Async/Linq/Operators/GroupByUntil.cs +++ b/AsyncRx.NET/System.Reactive.Async/Linq/Operators/GroupByUntil.cs @@ -616,22 +616,8 @@ public partial class AsyncObserver async ValueTask OnErrorAsync(Exception ex) { - var nullGroupLocal = default(IAsyncSubject); - - lock (nullGate) - { - nullGroupLocal = nullGroup; - } - - if (nullGroupLocal != null) - { - await nullGroupLocal.OnErrorAsync(ex).ConfigureAwait(false); - } - - foreach (var group in groups.Values) - { - await group.OnErrorAsync(ex).ConfigureAwait(false); - } + await ErrorAndRemoveNullGroupIfPresentAsync(ex); + await ErrorAndRemoveAllGroupsIfPresentAsync(ex); using (await gate.LockAsync().ConfigureAwait(false)) { @@ -770,23 +756,7 @@ async ValueTask Expire() await CompleteAndRemoveNullGroupIfPresentAsync(); } - foreach (var key in groups.Keys) - { - // The ConcurrentDictionary's Keys property is a snapshot, so - // although this TryRemove should always succeed for the first - // key in the dictionary (as long as our upstream observable is - // obeying the rules, and not making multiple concurrent calls - // to our observer) each await in this loop offers an opportunity - // for one of the group duration observables to complete, which - // will cause the Expire method above to run, meaning that an - // entry that was present when we retrieved Keys at the start of - // this loop might already have been completed and removed by the - // time this loop reaches it. - if (groups.TryRemove(key, out var group)) - { - await group.OnCompletedAsync().ConfigureAwait(false); - } - } + await CompleteAndRemoveAllGroupsIfPresentAsync(); using (await gate.LockAsync().ConfigureAwait(false)) { @@ -797,7 +767,9 @@ async ValueTask Expire() refCount ); - async ValueTask CompleteAndRemoveNullGroupIfPresentAsync() + ValueTask CompleteAndRemoveNullGroupIfPresentAsync() => CompleteOrErrorAndRemoveNullGroupIfPresentAsync(null); + ValueTask ErrorAndRemoveNullGroupIfPresentAsync(Exception x) => CompleteOrErrorAndRemoveNullGroupIfPresentAsync(x); + async ValueTask CompleteOrErrorAndRemoveNullGroupIfPresentAsync(Exception x) { var oldNullGroup = default(IAsyncSubject); @@ -809,9 +781,45 @@ async ValueTask CompleteAndRemoveNullGroupIfPresentAsync() if (oldNullGroup != null) { - await oldNullGroup.OnCompletedAsync().ConfigureAwait(false); + if (x is null) + { + await oldNullGroup.OnCompletedAsync().ConfigureAwait(false); + } + else + { + await oldNullGroup.OnErrorAsync(x).ConfigureAwait(false); + } } + } + ValueTask CompleteAndRemoveAllGroupsIfPresentAsync() => CompleteOrErrorAndRemoveAllGroupsAsync(null); + ValueTask ErrorAndRemoveAllGroupsIfPresentAsync(Exception x) => CompleteOrErrorAndRemoveAllGroupsAsync(x); + async ValueTask CompleteOrErrorAndRemoveAllGroupsAsync(Exception x) + { + foreach (var key in groups.Keys) + { + // The ConcurrentDictionary's Keys property is a snapshot, so + // although this TryRemove should always succeed for the first + // key in the dictionary (as long as our upstream observable is + // obeying the rules, and not making multiple concurrent calls + // to our observer) each await in this loop offers an opportunity + // for one of the group duration observables to complete, which + // will cause the Expire method above to run, meaning that an + // entry that was present when we retrieved Keys at the start of + // this loop might already have been completed and removed by the + // time this loop reaches it. + if (groups.TryRemove(key, out var group)) + { + if (x is null) + { + await group.OnCompletedAsync().ConfigureAwait(false); + } + else + { + await group.OnErrorAsync(x).ConfigureAwait(false); + } + } + } } } } From ed1f538746f27d0ec9e8d704f6cc8c624d3a6c8e Mon Sep 17 00:00:00 2001 From: Ian Griffiths Date: Thu, 1 May 2025 13:25:37 +0100 Subject: [PATCH 4/4] Ensure observer only gets either completion or error, and only once --- .../Linq/Operators/GroupByUntil.cs | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/AsyncRx.NET/System.Reactive.Async/Linq/Operators/GroupByUntil.cs b/AsyncRx.NET/System.Reactive.Async/Linq/Operators/GroupByUntil.cs index cc57675c2..284fb348e 100644 --- a/AsyncRx.NET/System.Reactive.Async/Linq/Operators/GroupByUntil.cs +++ b/AsyncRx.NET/System.Reactive.Async/Linq/Operators/GroupByUntil.cs @@ -613,6 +613,7 @@ public partial class AsyncObserver var nullGate = new object(); var nullGroup = default(IAsyncSubject); + bool observerComplete = false; async ValueTask OnErrorAsync(Exception ex) { @@ -621,7 +622,11 @@ async ValueTask OnErrorAsync(Exception ex) using (await gate.LockAsync().ConfigureAwait(false)) { - await observer.OnErrorAsync(ex).ConfigureAwait(false); + if (!observerComplete) + { + observerComplete = true; + await observer.OnErrorAsync(ex).ConfigureAwait(false); + } } } @@ -700,7 +705,10 @@ async ValueTask OnErrorAsync(Exception ex) using (await gate.LockAsync().ConfigureAwait(false)) { - await observer.OnNextAsync(g).ConfigureAwait(false); + if (!observerComplete) + { + await observer.OnNextAsync(g).ConfigureAwait(false); + } } var durationSubscription = new SingleAssignmentAsyncDisposable(); @@ -760,7 +768,11 @@ async ValueTask Expire() using (await gate.LockAsync().ConfigureAwait(false)) { - await observer.OnCompletedAsync().ConfigureAwait(false); + if (!observerComplete) + { + observerComplete = true; + await observer.OnCompletedAsync().ConfigureAwait(false); + } } } ),