|
| 1 | +# Rules for when OnError notifications become thrown exceptions |
| 2 | + |
| 3 | +Rx uses .NET `Exception` objects in a slightly unusual way: they are typically not thrown. Instead they are passed as the argument to an observer's `OnError` method. There are some situations in which an error reported in this way will end up causing an exception to be thrown. For example, if you `await` an `IObservable<T>` that calls `OnError`, the `await` will throw: |
| 4 | + |
| 5 | +```cs |
| 6 | +IObservable<int> ts = Observable.Throw<int>(new Exception("Pow!")); |
| 7 | + |
| 8 | +await ts; // Exception thrown here |
| 9 | +``` |
| 10 | + |
| 11 | +This can cause problems. For example, as [#2187](https://github.com/dotnet/reactive/issues/2187) describes, if you `await` the observable shown in this example multiple times, the exception's `StackTrace` gets longer each time. |
| 12 | + |
| 13 | +Problems arise because the use of singleton exception objects is slightly tricky even with straightforward use of `throw`, but it becomes a good deal more subtle when you start to 'cross the streams' of normal .NET exception handling and Rx's use of `Exception` in `OnError`. |
| 14 | + |
| 15 | +Rx has never previously offered any guidance that would enable a developer to understand that the code shown above might have problems. The purpose of this ADR is to establish suitable rules. |
| 16 | + |
| 17 | +## Status |
| 18 | + |
| 19 | +Proposed |
| 20 | + |
| 21 | + |
| 22 | +## Authors |
| 23 | + |
| 24 | +@idg10 ([Ian Griffiths](https://endjin.com/who-we-are/our-people/ian-griffiths/)). |
| 25 | + |
| 26 | + |
| 27 | + |
| 28 | +## Context |
| 29 | + |
| 30 | +Exceptions may appear to be ordinary .NET objects, but they get special handling from the runtime. MSIL has a [`throw`](https://learn.microsoft.com/en-us/dotnet/api/system.reflection.emit.opcodes.throw?view=net-9.0) instruction for the purpose of raising exceptions, and there is C++ code inside the CLR that directly manipulates fields defined by the `Exception` class. Certain expectations around the use of exception types are baked deeply into the runtime. |
| 31 | + |
| 32 | +Rx does not generally use exceptions in the way the runtime expects. In particular it does not normally use the MSIL `throw` instruction to raise an exception. Instead, when an Rx `IObservable<T>` wants to report an error, it just passes an exception object as an argument to the `IObserver<T>.OnError` method. |
| 33 | + |
| 34 | +This causes no problems when an application remains entirely within Rx's world. But when we want to move into the more conventional .NET approach of throwing exceptions, it raises an interesting question: where should the exception appear to originate from? |
| 35 | + |
| 36 | +Consider this example: |
| 37 | + |
| 38 | +```cs |
| 39 | +IObservable<string> fileLines = Observable.Create<string>(async obs => |
| 40 | +{ |
| 41 | + using var reader = new StreamReader(@"c:\temp\test.txt"); |
| 42 | + |
| 43 | + while ((await reader.ReadLineAsync()) is string line) |
| 44 | + { |
| 45 | + obs.OnNext(line); |
| 46 | + } |
| 47 | +}); |
| 48 | + |
| 49 | +string firstNonEmptyLine = await fileLines |
| 50 | + .FirstAsync(line => line.Length > 0); |
| 51 | +Console.WriteLine(firstNonEmptyLine); |
| 52 | +``` |
| 53 | + |
| 54 | +If the attempt to open the file throws an exception, what do we expect to see? A developer familiar with how exceptions generally work with `async` in .NET might reasonably expect the exception to report two stack traces: one for the point at which the exception was originally thrown, and another for where it was rethrown from the `await`. And that's exactly what we see: |
| 55 | + |
| 56 | +``` |
| 57 | +Unhandled exception. System.IO.FileNotFoundException: Could not find file 'c:\temp\test.txt'. |
| 58 | +File name: 'c:\temp\test.txt' |
| 59 | + at Microsoft.Win32.SafeHandles.SafeFileHandle.CreateFile(String fullPath, FileMode mode, FileAccess access, FileShare share, FileOptions options) |
| 60 | + at Microsoft.Win32.SafeHandles.SafeFileHandle.Open(String fullPath, FileMode mode, FileAccess access, FileShare share, FileOptions options, Int64 preallocationSize, Nullable`1 unixCreateMode) |
| 61 | + at System.IO.Strategies.OSFileStreamStrategy..ctor(String path, FileMode mode, FileAccess access, FileShare share, FileOptions options, Int64 preallocationSize, Nullable`1 unixCreateMode) |
| 62 | + at System.IO.Strategies.FileStreamHelpers.ChooseStrategyCore(String path, FileMode mode, FileAccess access, FileShare share, FileOptions options, Int64 preallocationSize, Nullable`1 unixCreateMode) |
| 63 | + at System.IO.StreamReader.ValidateArgsAndOpenPath(String path, Encoding encoding, Int32 bufferSize) |
| 64 | + at System.IO.StreamReader..ctor(String path) |
| 65 | + at Program.<>c.<<<Main>$>b__0_0>d.MoveNext() in D:\source\RxThrowExamples\RxThrowExamples\Program.cs:line 5 |
| 66 | +--- End of stack trace from previous location --- |
| 67 | + at System.Reactive.PlatformServices.ExceptionServicesImpl.Rethrow(Exception exception) |
| 68 | + at System.Reactive.ExceptionHelpers.Throw(Exception exception) |
| 69 | + at System.Reactive.Subjects.AsyncSubject`1.GetResult() |
| 70 | + at Program.<Main>$(String[] args) in D:\source\RxThrowExamples\RxThrowExamples\Program.cs:line 13 |
| 71 | + at Program.<Main>(String[] args) |
| 72 | + ``` |
| 73 | + |
| 74 | +This is straightforward because the exception here is thrown in the conventional .NET manner. It happens to be caught by Rx—this overload of `Observable.Create` wraps the `Task` returned by the callback in an adapter that detects when the `Task` enters a faulted state, in which case it extracts the exception and passes it to the subscribing `IObserver<T>`. And then the awaiter that Rx provides when you `await` an observable rethrows this same exception. |
| 75 | + |
| 76 | +But what about the earlier example in which the exception originated from `Observable.Throw`? In that code, we construct an `Exception` but we never use the `throw` keyword with it, and nor do we invoke any API that might do that for us. What would you expect the call stack to show in that case? In practice we get this: |
| 77 | + |
| 78 | +``` |
| 79 | +Unhandled exception. System.Exception: Pow! |
| 80 | + at System.Reactive.PlatformServices.ExceptionServicesImpl.Rethrow(Exception exception) |
| 81 | + at System.Reactive.ExceptionHelpers.Throw(Exception exception) |
| 82 | + at System.Reactive.Subjects.AsyncSubject`1.GetResult() |
| 83 | + at Program.<Main>$(String[] args) in D:\source\RxThrowExamples\RxThrowExamples\Program.cs:line 20 |
| 84 | + at Program.<Main>(String[] args) |
| 85 | +``` |
| 86 | + |
| 87 | +This time, we've got just a single stack trace, effectively showing the `await`. This looks very similar to the 2nd trace from the previous example—the difference here is that we don't have an extra trace showing the original location from which the exception was first thrown. And you could argue that this makes sense: this particular exception wasn't thrown until it emerged from the `await`. |
| 88 | + |
| 89 | +So far so good. But look what happens if we use this same observable source a few times: |
| 90 | + |
| 91 | +```cs |
| 92 | +IObservable<int> ts = Observable.Throw<int>(new Exception("Pow!")); |
| 93 | + |
| 94 | +for (int i = 0; i < 3; ++i) |
| 95 | +{ |
| 96 | + Console.WriteLine(); |
| 97 | + Console.WriteLine(); |
| 98 | + |
| 99 | + try |
| 100 | + { |
| 101 | + await ts; // Exception thrown here |
| 102 | +
|
| 103 | + } |
| 104 | + catch (Exception x) |
| 105 | + { |
| 106 | + Console.WriteLine(x); |
| 107 | + } |
| 108 | +} |
| 109 | +``` |
| 110 | + |
| 111 | +Since we're doing the same thing three times, you might expect to see the same exception report three times. But that's not what happens: |
| 112 | + |
| 113 | +``` |
| 114 | +System.Exception: Pow! |
| 115 | + at System.Reactive.PlatformServices.ExceptionServicesImpl.Rethrow(Exception exception) |
| 116 | + at System.Reactive.ExceptionHelpers.Throw(Exception exception) |
| 117 | + at System.Reactive.Subjects.AsyncSubject`1.GetResult() |
| 118 | + at Program.<Main>$(String[] args) in D:\source\RxThrowExamples\RxThrowExamples\Program.cs:line 27 |
| 119 | +
|
| 120 | +
|
| 121 | +System.Exception: Pow! |
| 122 | + at System.Reactive.PlatformServices.ExceptionServicesImpl.Rethrow(Exception exception) |
| 123 | + at System.Reactive.ExceptionHelpers.Throw(Exception exception) |
| 124 | + at System.Reactive.Subjects.AsyncSubject`1.GetResult() |
| 125 | + at Program.<Main>$(String[] args) in D:\source\RxThrowExamples\RxThrowExamples\Program.cs:line 27 |
| 126 | + at System.Reactive.PlatformServices.ExceptionServicesImpl.Rethrow(Exception exception) |
| 127 | + at System.Reactive.ExceptionHelpers.Throw(Exception exception) |
| 128 | + at System.Reactive.Subjects.AsyncSubject`1.GetResult() |
| 129 | + at Program.<Main>$(String[] args) in D:\source\RxThrowExamples\RxThrowExamples\Program.cs:line 27 |
| 130 | +
|
| 131 | +
|
| 132 | +System.Exception: Pow! |
| 133 | + at System.Reactive.PlatformServices.ExceptionServicesImpl.Rethrow(Exception exception) |
| 134 | + at System.Reactive.ExceptionHelpers.Throw(Exception exception) |
| 135 | + at System.Reactive.Subjects.AsyncSubject`1.GetResult() |
| 136 | + at Program.<Main>$(String[] args) in D:\source\RxThrowExamples\RxThrowExamples\Program.cs:line 27 |
| 137 | + at System.Reactive.PlatformServices.ExceptionServicesImpl.Rethrow(Exception exception) |
| 138 | + at System.Reactive.ExceptionHelpers.Throw(Exception exception) |
| 139 | + at System.Reactive.Subjects.AsyncSubject`1.GetResult() |
| 140 | + at Program.<Main>$(String[] args) in D:\source\RxThrowExamples\RxThrowExamples\Program.cs:line 27 |
| 141 | + at System.Reactive.PlatformServices.ExceptionServicesImpl.Rethrow(Exception exception) |
| 142 | + at System.Reactive.ExceptionHelpers.Throw(Exception exception) |
| 143 | + at System.Reactive.Subjects.AsyncSubject`1.GetResult() |
| 144 | + at Program.<Main>$(String[] args) in D:\source\RxThrowExamples\RxThrowExamples\Program.cs:line 27 |
| 145 | +``` |
| 146 | + |
| 147 | +The stack trace gets longer each time! |
| 148 | + |
| 149 | +(This does not accurately reflect the actual runtime behaviour: the call stack does not in fact get deeper. It's just that the `StackTrace` string with which an `Exception` reports this information ends up containing multiple copies of the stack trace.) |
| 150 | + |
| 151 | +This makes no sense. |
| 152 | + |
| 153 | +It occurs as a direct result of the steps Rx takes to produce the stack trace we expect in the earlier example. It uses the .NET runtime library's `ExceptionDispatchInfo.Throw` method to rethrow the exception from the `await`. That method preserves the original context in which the exception was thrown, and appends the context from which it is rethrown: this is how we end up with the multiple stack traces that .NET developers are accustomed to with normal use of `async` and `await`. (In fact, Rx is using exactly the same rethrow mechanism that enables this behaviour in conventional `async` code.) |
| 154 | + |
| 155 | +This behaviour is not peculiar to Rx. It originates from `ExceptionDispatchInfo.Throw` and we can create a `Task`-based version of this behaviour without using Rx: |
| 156 | + |
| 157 | +``` |
| 158 | +Exception ox = new("Kaboom!"); |
| 159 | +
|
| 160 | +for (int i = 0; i < 3; ++i) |
| 161 | +{ |
| 162 | + Console.WriteLine(); |
| 163 | + Console.WriteLine(); |
| 164 | +
|
| 165 | + try |
| 166 | + { |
| 167 | + await Task.FromException(ox); // Exception thrown here |
| 168 | +
|
| 169 | + } |
| 170 | + catch (Exception x) |
| 171 | + { |
| 172 | + Console.WriteLine(x); |
| 173 | + } |
| 174 | +} |
| 175 | +``` |
| 176 | + |
| 177 | +The stack traces are shorter, but we see the same repeating behaviour: |
| 178 | + |
| 179 | +``` |
| 180 | +System.Exception: Kaboom! |
| 181 | + at Program.<Main>$(String[] args) in D:\source\RxThrowExamples\ThrowStackDupWithoutRx\Program.cs:line 10 |
| 182 | +
|
| 183 | +
|
| 184 | +System.Exception: Kaboom! |
| 185 | + at Program.<Main>$(String[] args) in D:\source\RxThrowExamples\ThrowStackDupWithoutRx\Program.cs:line 10 |
| 186 | + at Program.<Main>$(String[] args) in D:\source\RxThrowExamples\ThrowStackDupWithoutRx\Program.cs:line 10 |
| 187 | +
|
| 188 | +
|
| 189 | +System.Exception: Kaboom! |
| 190 | + at Program.<Main>$(String[] args) in D:\source\RxThrowExamples\ThrowStackDupWithoutRx\Program.cs:line 10 |
| 191 | + at Program.<Main>$(String[] args) in D:\source\RxThrowExamples\ThrowStackDupWithoutRx\Program.cs:line 10 |
| 192 | + at Program.<Main>$(String[] args) in D:\source\RxThrowExamples\ThrowStackDupWithoutRx\Program.cs:line 10 |
| 193 | +``` |
| 194 | + |
| 195 | +(To be precise, this _doesn't_ happen if you create a single `Task` and `await` that multiple times. It's the combination of `ExceptionDispatchInfo.Capture` and `ExceptionDispatchInfo.Throw` that causes this accumulation, and this `Task` captures exception information at the point when we create it with `Task.FromException`.) |
| 196 | + |
| 197 | +This appending of exception data is by design: `ExceptionDispatchInfo.Throw` is intended to append the current context to whatever was captured in the `ExceptionDispatchInfo`. The .NET runtime assumes that if you want to represent a new exceptional event that you will execute a `throw`. Rx does not do this in `await` (or other mechanisms that can rethrow an exception delivered through `OnError` such as `ToEnumerable`) precisely because it preserves whatever context was present when it received the exception. It does not perform a `throw` (or do anything else to reset the exception context) because this would prevent the full context being preserved in examples such as the `FileNotFoundException` handling shown earlier. |
| 198 | + |
| 199 | +This behaviour makes sense in the context for which it was designed—capturing the context in which an exception was initially thrown and augmenting it with additional information if it is rethrown from a different context. But unless you are aware of that, it's not at all obvious that although there's nothing inherently wrong with using `Observable.Throw<int>()`, it is not compatible with having multiple subscribers that will each rethrow the exception. |
| 200 | + |
| 201 | + |
| 202 | +## Decision |
| 203 | + |
| 204 | +Rx.NET will explicitly adopt this position: if a developer using Rx chooses to use a mechanism that takes exceptions delivered by an `IObservable<T>` and throws them (e.g. if you `await` an `IObservable<T>`) then it is the developer's responsibility to ensure that either: |
| 205 | + |
| 206 | +* each exception object is used only once |
| 207 | + |
| 208 | +or |
| 209 | + |
| 210 | +* the exception's dispatch state is reset prior to being supplied to the observer that will be rethrowing it (e.g., by executing a `throw`) |
| 211 | + |
| 212 | +Since Rx defines operators that won't conform to the first option (notably `Observable.Throw`, but also `ReplaySubject` and the related `Observable.Replay`) Rx 6.1 introduces a new operator, `ResetExceptionDispatchState`. This passes all notifications through, but effectively performs a `throw` on any `Exception` before forwarding it. It can be used like this: |
| 213 | + |
| 214 | +```cs |
| 215 | +var ts = Observable.Throw<int>(new Exception("Aaargh!")).ResetExceptionDispatchState(); |
| 216 | +``` |
| 217 | + |
| 218 | +When an observer subscribes to this, the `Throw` immediately calls `OnError`, and the `ResetExceptionDispatchState` will throw (and immediately catch) that exception before passing it on to the subscriber. (You would _not_ use this in scenarios such as the `Create` example shown earlier, because in that case each exception is freshly thrown, and has useful contextual information so we don't want to reset that. This is for use specifically in cases where the exception would not otherwise be thrown.) |
| 219 | + |
| 220 | + |
| 221 | +## Consequences |
| 222 | + |
| 223 | +By adopting this position, we make it clear that examples such as the one in [#2187](https://github.com/dotnet/reactive/issues/2187) are not expected to work correctly. |
| 224 | + |
| 225 | +More generally, this clarifies that observable sources that repeatedly produce the same exception object (e.g. `Observable.Throw` or `Observable.Repeat`) are incompatible with multiple calls to `await`. |
| 226 | + |
| 227 | +The addition of the `ResetExceptionDispatchState` operator provides a clear, simple way to fix code that runs into this problem. |
0 commit comments