Skip to content

Commit 393ea28

Browse files
authored
fix race condition re CTS cleanup (#144)
1 parent 32fb0e2 commit 393ea28

File tree

1 file changed

+41
-18
lines changed

1 file changed

+41
-18
lines changed

src/protobuf-net.Grpc/CallContext.Streaming.cs

Lines changed: 41 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ public ValueTask FullDuplexAsync<T>(
106106
IAsyncEnumerable<T> source,
107107
Func<T, CallContext, ValueTask> consumer)
108108
{
109-
using var allDone = CancellationTokenSource.CreateLinkedTokenSource(CancellationToken, default);
109+
var allDone = CancellationTokenSource.CreateLinkedTokenSource(CancellationToken, default);
110110
try
111111
{
112112
var context = new CallContext(this, allDone.Token);
@@ -119,37 +119,60 @@ public ValueTask FullDuplexAsync<T>(
119119
}, context.CancellationToken);
120120
var produced = producer(context);
121121
if (produced.IsCompletedSuccessfully) return new ValueTask(consumed);
122-
return BothAsync(produced, consumed);
122+
return BothAsync(produced, consumed, SwapOut(ref allDone));
123123
}
124124
finally
125125
{
126126
// stop the producer, in any exit scenario
127-
allDone.Cancel();
127+
CancelAndDisposeTaskSource(allDone);
128128
}
129129
}
130130

131-
private static async ValueTask BothAsync(ValueTask produced, Task consumed)
131+
static T? SwapOut<T>(ref T? value) where T : class
132132
{
133-
try
133+
var tmp = value;
134+
value = default;
135+
return tmp;
136+
}
137+
138+
static void CancelAndDisposeTaskSource(CancellationTokenSource? cts)
139+
{
140+
if (cts is object)
134141
{
135-
await produced.ConfigureAwait(false);
142+
cts.Cancel();
143+
cts.Dispose();
136144
}
137-
catch (Exception producerEx)
145+
}
146+
147+
private static async ValueTask BothAsync(ValueTask produced, Task consumed, CancellationTokenSource? allDone)
148+
{
149+
try
138150
{
139151
try
140152
{
141-
await consumed.ConfigureAwait(false); // make sure we try and await both
153+
await produced.ConfigureAwait(false);
142154
}
143-
catch (Exception consumerEx)
155+
catch (Exception producerEx)
144156
{
145-
// so they *both* failed; talk about embarrassing!
146-
throw new AggregateException(producerEx, consumerEx);
157+
try
158+
{
159+
await consumed.ConfigureAwait(false); // make sure we try and await both
160+
}
161+
catch (Exception consumerEx)
162+
{
163+
// so they *both* failed; talk about embarrassing!
164+
throw new AggregateException(producerEx, consumerEx);
165+
}
166+
throw; // re-throw the exception from the producer
147167
}
148-
throw; // re-throw the exception from the producer
168+
// producer completed cleanly; we can just await the
169+
// consumer - if it throws, it throws
170+
await consumed.ConfigureAwait(false);
171+
}
172+
finally
173+
{
174+
CancelAndDisposeTaskSource(allDone);
149175
}
150-
// producer completed cleanly; we can just await the
151-
// consumer - if it throws, it throws
152-
await consumed.ConfigureAwait(false);
153176
}
154177

155178
/// <summary>
@@ -160,19 +183,19 @@ public ValueTask FullDuplexAsync<T>(
160183
IAsyncEnumerable<T> source,
161184
Func<IAsyncEnumerable<T>, CallContext, ValueTask> consumer)
162185
{
163-
using var allDone = CancellationTokenSource.CreateLinkedTokenSource(CancellationToken, default);
186+
var allDone = CancellationTokenSource.CreateLinkedTokenSource(CancellationToken, default);
164187
try
165188
{
166189
var context = new CallContext(this, allDone.Token);
167190
var consumed = Task.Run(() => consumer(source, context).AsTask(), context.CancellationToken); // note this shares a capture scope
168191
var produced = producer(context);
169192
if (produced.IsCompletedSuccessfully) return new ValueTask(consumed);
170-
return BothAsync(produced, consumed);
193+
return BothAsync(produced, consumed, SwapOut(ref allDone));
171194
}
172195
finally
173196
{
174197
// stop the producer, in any exit scenario
175-
allDone.Cancel();
198+
CancelAndDisposeTaskSource(allDone);
176199
}
177200
}
178201

0 commit comments

Comments
 (0)