Skip to content

Commit 167f6fa

Browse files
authored
Prevent unawaited task (which could fault) from being dropped (#165)
* If the call to producer(...) throws, we may end up dropping a Task (the consumer) on the floor, which could itself fault (probably due to cancellation) 1. make sure that we observe the task in this scenario 2. cancel the composite CTS *before* we try this, so we've signalled everything * whitespace
1 parent b60536c commit 167f6fa

File tree

1 file changed

+49
-19
lines changed

1 file changed

+49
-19
lines changed

src/protobuf-net.Grpc/CallContext.Streaming.cs

Lines changed: 49 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -111,31 +111,60 @@ public ValueTask FullDuplexAsync<T>(
111111
await consumer(value, context).ConfigureAwait(false);
112112
}
113113
}, context.CancellationToken);
114-
var produced = producer(context);
114+
ValueTask produced;
115+
try
116+
{
117+
produced = producer(context);
118+
}
119+
catch (Exception ex)
120+
{
121+
var knownCancellation = CancelAndDisposeTaskSource(ref allDone);
122+
return ObserveErrorAsync(consumed, ex, knownCancellation);
123+
}
115124
if (produced.IsCompletedSuccessfully) return new ValueTask(consumed);
116125
return BothAsync(produced, consumed, SwapOut(ref allDone));
117126
}
118127
finally
119128
{
120129
// stop the producer, in any exit scenario
121-
CancelAndDisposeTaskSource(allDone);
130+
CancelAndDisposeTaskSource(ref allDone);
122131
}
123132
}
124133

125-
static T? SwapOut<T>(ref T? value) where T : class
134+
private static T? SwapOut<T>(ref T? value) where T : class
126135
{
127136
var tmp = value;
128137
value = default;
129138
return tmp;
130139
}
131140

132-
static void CancelAndDisposeTaskSource(CancellationTokenSource? cts)
141+
private static CancellationToken CancelAndDisposeTaskSource(ref CancellationTokenSource? cts)
133142
{
143+
CancellationToken result = default;
134144
if (cts is object)
135145
{
136-
cts.Cancel();
137-
cts.Dispose();
146+
try { result = cts.Token; } catch { }
147+
try { cts.Cancel(); } catch { }
148+
try { cts.Dispose(); } catch { }
149+
}
150+
cts = null;
151+
return result;
152+
}
153+
154+
private static async ValueTask ObserveErrorAsync(Task consumed, Exception producerEx, CancellationToken knownCancellation)
155+
{
156+
try
157+
{
158+
await consumed.ConfigureAwait(false); // make sure we try and await both
138159
}
160+
catch (OperationCanceledException oce) when (oce.CancellationToken == knownCancellation)
161+
{ } // just throw the real exception
162+
catch (Exception consumerEx)
163+
{
164+
// so they *both* failed; talk about embarrassing!
165+
throw new AggregateException(producerEx, consumerEx);
166+
}
167+
throw producerEx;
139168
}
140169

141170
private static async ValueTask BothAsync(ValueTask produced, Task consumed, CancellationTokenSource? allDone)
@@ -148,24 +177,16 @@ private static async ValueTask BothAsync(ValueTask produced, Task consumed, Canc
148177
}
149178
catch (Exception producerEx)
150179
{
151-
try
152-
{
153-
await consumed.ConfigureAwait(false); // make sure we try and await both
154-
}
155-
catch (Exception consumerEx)
156-
{
157-
// so they *both* failed; talk about embarrassing!
158-
throw new AggregateException(producerEx, consumerEx);
159-
}
160-
throw; // re-throw the exception from the producer
180+
var knownCancellation = CancelAndDisposeTaskSource(ref allDone);
181+
await ObserveErrorAsync(consumed, producerEx, knownCancellation).ConfigureAwait(false);
161182
}
162183
// producer completed cleanly; we can just await the
163184
// consumer - if it throws, it throws
164185
await consumed.ConfigureAwait(false);
165186
}
166187
finally
167188
{
168-
CancelAndDisposeTaskSource(allDone);
189+
CancelAndDisposeTaskSource(ref allDone);
169190
}
170191
}
171192

@@ -182,14 +203,23 @@ public ValueTask FullDuplexAsync<T>(
182203
{
183204
var context = new CallContext(this, allDone.Token);
184205
var consumed = Task.Run(() => consumer(source, context).AsTask(), context.CancellationToken); // note this shares a capture scope
185-
var produced = producer(context);
206+
ValueTask produced;
207+
try
208+
{
209+
produced = producer(context);
210+
}
211+
catch (Exception ex)
212+
{
213+
var knownCancellation = CancelAndDisposeTaskSource(ref allDone);
214+
return ObserveErrorAsync(consumed, ex, knownCancellation);
215+
}
186216
if (produced.IsCompletedSuccessfully) return new ValueTask(consumed);
187217
return BothAsync(produced, consumed, SwapOut(ref allDone));
188218
}
189219
finally
190220
{
191221
// stop the producer, in any exit scenario
192-
CancelAndDisposeTaskSource(allDone);
222+
CancelAndDisposeTaskSource(ref allDone);
193223
}
194224
}
195225

0 commit comments

Comments
 (0)