Skip to content

Commit 74b1b32

Browse files
GeorgeTsiokosclaude
andcommitted
Fix SubscribeAsync silently failing when Dapr sidecar is unavailable (#1663)
- Throw DaprException wrapping RpcException on initial connection failure - Add SubscriptionErrorHandler callback for runtime errors in background tasks - Reset hasInitialized on any failure (initial or mid-stream) to allow retry - Guard ErrorHandler invocation with try/catch to prevent unobserved exceptions - Use CancellationToken.None for ContinueWith to ensure error handlers fire - Include topic/pubsub names in error messages for multi-subscription debugging Resolves #1663 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 896bff1 commit 74b1b32

File tree

4 files changed

+256
-13
lines changed

4 files changed

+256
-13
lines changed

src/Dapr.Messaging/PublishSubscribe/DaprSubscriptionOptions.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,5 +40,11 @@ public sealed record DaprSubscriptionOptions(MessageHandlingPolicy MessageHandli
4040
/// been signaled.
4141
/// </summary>
4242
public TimeSpan MaximumCleanupTimeout { get; init; } = TimeSpan.FromSeconds(30);
43+
44+
/// <summary>
45+
/// An optional callback invoked when errors occur during an active subscription.
46+
/// If not set, runtime errors from background tasks will be silently swallowed.
47+
/// </summary>
48+
public SubscriptionErrorHandler? ErrorHandler { get; init; }
4349
}
4450

src/Dapr.Messaging/PublishSubscribe/PublishSubscribeReceiver.cs

Lines changed: 43 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
// ------------------------------------------------------------------------
1313

1414
using System.Threading.Channels;
15+
using Dapr;
1516
using Dapr.AppCallback.Autogen.Grpc.v1;
1617
using Grpc.Core;
1718
using P = Dapr.Client.Autogen.Grpc.v1;
@@ -118,18 +119,36 @@ internal async Task SubscribeAsync(CancellationToken cancellationToken = default
118119
return;
119120
}
120121

121-
var stream = await GetStreamAsync(cancellationToken);
122+
AsyncDuplexStreamingCall<P.SubscribeTopicEventsRequestAlpha1, P.SubscribeTopicEventsResponseAlpha1> stream;
123+
try
124+
{
125+
stream = await GetStreamAsync(cancellationToken);
126+
}
127+
catch (RpcException ex)
128+
{
129+
// Reset so the caller can retry after the sidecar becomes available
130+
Interlocked.Exchange(ref hasInitialized, 0);
131+
throw new DaprException(
132+
$"Unable to subscribe to topic '{topicName}' on pubsub '{pubSubName}'. The Dapr sidecar may be unavailable.",
133+
ex);
134+
}
135+
catch (Exception)
136+
{
137+
// Reset so the caller can retry regardless of the exception type
138+
Interlocked.Exchange(ref hasInitialized, 0);
139+
throw;
140+
}
122141

123142
//Retrieve the messages from the sidecar and write to the messages channel - start without awaiting so this isn't blocking
124143
_ = FetchDataFromSidecarAsync(stream, topicMessagesChannel.Writer, cancellationToken)
125-
.ContinueWith(HandleTaskCompletion, null, cancellationToken, TaskContinuationOptions.OnlyOnFaulted,
144+
.ContinueWith(HandleTaskCompletion, null, CancellationToken.None, TaskContinuationOptions.OnlyOnFaulted,
126145
TaskScheduler.Default);
127146

128147
//Process the messages as they're written to either channel
129148
_ = ProcessAcknowledgementChannelMessagesAsync(stream, cancellationToken).ContinueWith(HandleTaskCompletion,
130-
null, cancellationToken, TaskContinuationOptions.OnlyOnFaulted, TaskScheduler.Default);
149+
null, CancellationToken.None, TaskContinuationOptions.OnlyOnFaulted, TaskScheduler.Default);
131150
_ = ProcessTopicChannelMessagesAsync(cancellationToken).ContinueWith(HandleTaskCompletion, null,
132-
cancellationToken,
151+
CancellationToken.None,
133152
TaskContinuationOptions.OnlyOnFaulted, TaskScheduler.Default);
134153
}
135154

@@ -149,11 +168,28 @@ internal async Task WriteAcknowledgementToChannelAsync(TopicAcknowledgement ackn
149168
}
150169

151170
//Exposed for testing purposes only
152-
internal static void HandleTaskCompletion(Task task, object? state)
171+
internal void HandleTaskCompletion(Task task, object? state)
153172
{
154-
if (task.Exception != null)
173+
if (task.Exception is null)
174+
{
175+
return;
176+
}
177+
178+
// Allow the caller to retry after a background task failure
179+
Interlocked.Exchange(ref hasInitialized, 0);
180+
181+
var innerException = task.Exception.InnerException ?? task.Exception;
182+
var daprException = new DaprException(
183+
$"An error occurred during an active subscription to topic '{topicName}' on pubsub '{pubSubName}'.",
184+
innerException);
185+
186+
try
187+
{
188+
options.ErrorHandler?.Invoke(daprException);
189+
}
190+
catch (Exception)
155191
{
156-
throw task.Exception;
192+
// Prevent a faulty error handler from becoming an unobserved task exception
157193
}
158194
}
159195

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
// ------------------------------------------------------------------------
2+
// Copyright 2024 The Dapr Authors
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
// http://www.apache.org/licenses/LICENSE-2.0
7+
// Unless required by applicable law or agreed to in writing, software
8+
// distributed under the License is distributed on an "AS IS" BASIS,
9+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
// See the License for the specific language governing permissions and
11+
// limitations under the License.
12+
// ------------------------------------------------------------------------
13+
14+
namespace Dapr.Messaging.PublishSubscribe;
15+
16+
/// <summary>
17+
/// A delegate that handles errors occurring during an active subscription.
18+
/// </summary>
19+
/// <param name="exception">The <see cref="DaprException"/> wrapping the original error.</param>
20+
public delegate void SubscriptionErrorHandler(DaprException exception);

test/Dapr.Messaging.Test/PublishSubscribe/PublishSubscribeReceiverTests.cs

Lines changed: 187 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
// ------------------------------------------------------------------------
1313

1414
using System.Threading.Channels;
15+
using Dapr;
1516
using Dapr.AppCallback.Autogen.Grpc.v1;
1617
using Dapr.Messaging.PublishSubscribe;
1718
using Grpc.Core;
@@ -193,14 +194,194 @@ public async Task DisposeAsync_ShouldCompleteChannels()
193194
}
194195

195196
[Fact]
196-
public void HandleTaskCompletion_ShouldThrowException_WhenTaskHasException()
197+
public void HandleTaskCompletion_ShouldInvokeErrorHandler_WhenTaskHasException()
197198
{
199+
const string pubSubName = "testPubSub";
200+
const string topicName = "testTopic";
201+
DaprException? receivedException = null;
202+
var options =
203+
new DaprSubscriptionOptions(new MessageHandlingPolicy(TimeSpan.FromSeconds(5), TopicResponseAction.Success))
204+
{
205+
ErrorHandler = ex => receivedException = ex
206+
};
207+
208+
var messageHandler = new TopicMessageHandler((message, token) => Task.FromResult(TopicResponseAction.Success));
209+
var mockDaprClient = new Mock<P.Dapr.DaprClient>();
210+
var receiver = new PublishSubscribeReceiver(pubSubName, topicName, options, messageHandler, mockDaprClient.Object);
211+
212+
var task = Task.FromException(new InvalidOperationException("Test exception"));
213+
214+
receiver.HandleTaskCompletion(task, null);
215+
216+
Assert.NotNull(receivedException);
217+
Assert.IsType<InvalidOperationException>(receivedException.InnerException);
218+
Assert.Equal("Test exception", receivedException.InnerException.Message);
219+
Assert.Contains("testTopic", receivedException.Message);
220+
Assert.Contains("testPubSub", receivedException.Message);
221+
}
222+
223+
[Fact]
224+
public void HandleTaskCompletion_ShouldNotThrow_WhenNoErrorHandler()
225+
{
226+
const string pubSubName = "testPubSub";
227+
const string topicName = "testTopic";
228+
var options =
229+
new DaprSubscriptionOptions(new MessageHandlingPolicy(TimeSpan.FromSeconds(5), TopicResponseAction.Success));
230+
231+
var messageHandler = new TopicMessageHandler((message, token) => Task.FromResult(TopicResponseAction.Success));
232+
var mockDaprClient = new Mock<P.Dapr.DaprClient>();
233+
var receiver = new PublishSubscribeReceiver(pubSubName, topicName, options, messageHandler, mockDaprClient.Object);
234+
235+
var task = Task.FromException(new InvalidOperationException("Test exception"));
236+
237+
var exception = Record.Exception(() => receiver.HandleTaskCompletion(task, null));
238+
239+
Assert.Null(exception);
240+
}
241+
242+
[Fact]
243+
public void HandleTaskCompletion_ShouldNotThrow_WhenErrorHandlerThrows()
244+
{
245+
const string pubSubName = "testPubSub";
246+
const string topicName = "testTopic";
247+
var options =
248+
new DaprSubscriptionOptions(new MessageHandlingPolicy(TimeSpan.FromSeconds(5), TopicResponseAction.Success))
249+
{
250+
ErrorHandler = _ => throw new InvalidOperationException("Handler failed")
251+
};
252+
253+
var messageHandler = new TopicMessageHandler((message, token) => Task.FromResult(TopicResponseAction.Success));
254+
var mockDaprClient = new Mock<P.Dapr.DaprClient>();
255+
var receiver = new PublishSubscribeReceiver(pubSubName, topicName, options, messageHandler, mockDaprClient.Object);
256+
198257
var task = Task.FromException(new InvalidOperationException("Test exception"));
199258

200-
var exception = Assert.Throws<AggregateException>(() =>
201-
PublishSubscribeReceiver.HandleTaskCompletion(task, null));
202-
203-
Assert.IsType<InvalidOperationException>(exception.InnerException);
204-
Assert.Equal("Test exception", exception.InnerException.Message);
259+
var exception = Record.Exception(() => receiver.HandleTaskCompletion(task, null));
260+
261+
Assert.Null(exception);
262+
}
263+
264+
[Fact]
265+
public void HandleTaskCompletion_ShouldNotInvokeErrorHandler_WhenTaskSucceeded()
266+
{
267+
const string pubSubName = "testPubSub";
268+
const string topicName = "testTopic";
269+
var handlerInvoked = false;
270+
var options =
271+
new DaprSubscriptionOptions(new MessageHandlingPolicy(TimeSpan.FromSeconds(5), TopicResponseAction.Success))
272+
{
273+
ErrorHandler = _ => handlerInvoked = true
274+
};
275+
276+
var messageHandler = new TopicMessageHandler((message, token) => Task.FromResult(TopicResponseAction.Success));
277+
var mockDaprClient = new Mock<P.Dapr.DaprClient>();
278+
var receiver = new PublishSubscribeReceiver(pubSubName, topicName, options, messageHandler, mockDaprClient.Object);
279+
280+
receiver.HandleTaskCompletion(Task.CompletedTask, null);
281+
282+
Assert.False(handlerInvoked);
283+
}
284+
285+
[Fact]
286+
public async Task SubscribeAsync_ShouldThrowDaprException_WhenSidecarUnavailable()
287+
{
288+
const string pubSubName = "testPubSub";
289+
const string topicName = "testTopic";
290+
var options =
291+
new DaprSubscriptionOptions(new MessageHandlingPolicy(TimeSpan.FromSeconds(5), TopicResponseAction.Success));
292+
293+
var messageHandler = new TopicMessageHandler((message, token) => Task.FromResult(TopicResponseAction.Success));
294+
var mockDaprClient = new Mock<P.Dapr.DaprClient>();
295+
296+
// Setup the mock to throw RpcException (simulating unavailable sidecar)
297+
mockDaprClient.Setup(client =>
298+
client.SubscribeTopicEventsAlpha1(null, null, It.IsAny<CancellationToken>()))
299+
.Throws(new RpcException(new Status(StatusCode.Unavailable, "Connect Failed")));
300+
301+
var receiver = new PublishSubscribeReceiver(pubSubName, topicName, options, messageHandler, mockDaprClient.Object);
302+
303+
var exception = await Assert.ThrowsAsync<DaprException>(() => receiver.SubscribeAsync());
304+
305+
Assert.Contains("testTopic", exception.Message);
306+
Assert.Contains("testPubSub", exception.Message);
307+
Assert.IsType<RpcException>(exception.InnerException);
308+
}
309+
310+
[Fact]
311+
public async Task SubscribeAsync_ShouldAllowRetry_AfterSidecarFailure()
312+
{
313+
const string pubSubName = "testPubSub";
314+
const string topicName = "testTopic";
315+
var options =
316+
new DaprSubscriptionOptions(new MessageHandlingPolicy(TimeSpan.FromSeconds(5), TopicResponseAction.Success));
317+
318+
var messageHandler = new TopicMessageHandler((message, token) => Task.FromResult(TopicResponseAction.Success));
319+
var mockDaprClient = new Mock<P.Dapr.DaprClient>();
320+
321+
// First call throws RpcException
322+
mockDaprClient.Setup(client =>
323+
client.SubscribeTopicEventsAlpha1(null, null, It.IsAny<CancellationToken>()))
324+
.Throws(new RpcException(new Status(StatusCode.Unavailable, "Connect Failed")));
325+
326+
var receiver = new PublishSubscribeReceiver(pubSubName, topicName, options, messageHandler, mockDaprClient.Object);
327+
328+
await Assert.ThrowsAsync<DaprException>(() => receiver.SubscribeAsync());
329+
330+
// Now setup the mock to succeed on retry
331+
var mockRequestStream = new Mock<IClientStreamWriter<P.SubscribeTopicEventsRequestAlpha1>>();
332+
var mockResponseStream = new Mock<IAsyncStreamReader<P.SubscribeTopicEventsResponseAlpha1>>();
333+
var mockCall =
334+
new AsyncDuplexStreamingCall<P.SubscribeTopicEventsRequestAlpha1, P.SubscribeTopicEventsResponseAlpha1>(
335+
mockRequestStream.Object, mockResponseStream.Object, Task.FromResult(new Metadata()),
336+
() => new Status(), () => new Metadata(), () => { });
337+
338+
mockDaprClient.Setup(client =>
339+
client.SubscribeTopicEventsAlpha1(null, null, It.IsAny<CancellationToken>()))
340+
.Returns(mockCall);
341+
342+
// Second call should succeed (hasInitialized was reset)
343+
var retryException = await Record.ExceptionAsync(() => receiver.SubscribeAsync());
344+
Assert.Null(retryException);
345+
346+
// Verify the client was called twice
347+
mockDaprClient.Verify(client =>
348+
client.SubscribeTopicEventsAlpha1(null, null, It.IsAny<CancellationToken>()), Times.Exactly(2));
349+
}
350+
351+
[Fact]
352+
public async Task SubscribeAsync_ShouldResetHasInitialized_WhenNonRpcExceptionThrown()
353+
{
354+
const string pubSubName = "testPubSub";
355+
const string topicName = "testTopic";
356+
var options =
357+
new DaprSubscriptionOptions(new MessageHandlingPolicy(TimeSpan.FromSeconds(5), TopicResponseAction.Success));
358+
359+
var messageHandler = new TopicMessageHandler((message, token) => Task.FromResult(TopicResponseAction.Success));
360+
var mockDaprClient = new Mock<P.Dapr.DaprClient>();
361+
362+
// First call throws a non-RPC exception
363+
mockDaprClient.Setup(client =>
364+
client.SubscribeTopicEventsAlpha1(null, null, It.IsAny<CancellationToken>()))
365+
.Throws(new ObjectDisposedException("client"));
366+
367+
var receiver = new PublishSubscribeReceiver(pubSubName, topicName, options, messageHandler, mockDaprClient.Object);
368+
369+
await Assert.ThrowsAsync<ObjectDisposedException>(() => receiver.SubscribeAsync());
370+
371+
// Now setup the mock to succeed on retry
372+
var mockRequestStream = new Mock<IClientStreamWriter<P.SubscribeTopicEventsRequestAlpha1>>();
373+
var mockResponseStream = new Mock<IAsyncStreamReader<P.SubscribeTopicEventsResponseAlpha1>>();
374+
var mockCall =
375+
new AsyncDuplexStreamingCall<P.SubscribeTopicEventsRequestAlpha1, P.SubscribeTopicEventsResponseAlpha1>(
376+
mockRequestStream.Object, mockResponseStream.Object, Task.FromResult(new Metadata()),
377+
() => new Status(), () => new Metadata(), () => { });
378+
379+
mockDaprClient.Setup(client =>
380+
client.SubscribeTopicEventsAlpha1(null, null, It.IsAny<CancellationToken>()))
381+
.Returns(mockCall);
382+
383+
// Second call should succeed (hasInitialized was reset)
384+
var retryException = await Record.ExceptionAsync(() => receiver.SubscribeAsync());
385+
Assert.Null(retryException);
205386
}
206387
}

0 commit comments

Comments
 (0)