Skip to content

Commit 3d500e8

Browse files
authored
Fixes + unit tests for streaming PubSub implementation (#1415)
* Added null check - the proto suggests this shouldn't ever be null, but there's an issue reporting as much, so this fixes that Signed-off-by: Whit Waldo <[email protected]> * Removed the Task.WhenAll making the operation non-blocking Signed-off-by: Whit Waldo <[email protected]> * Added unit test to validate that the subscription is no longer blocking Signed-off-by: Whit Waldo <[email protected]> * Removed unused line from previous test, added another test Signed-off-by: Whit Waldo <[email protected]> * Added another test Signed-off-by: Whit Waldo <[email protected]> * More unit tests Signed-off-by: Whit Waldo <[email protected]> * Added more unit tests Signed-off-by: Whit Waldo <[email protected]> * Updated to make DaprPublishSubscribeClientBuilder configurable via a registered IConfiguration Signed-off-by: Whit Waldo <[email protected]> * Added missing copyright statements Signed-off-by: Whit Waldo <[email protected]> * Added missing package reference Signed-off-by: Whit Waldo <[email protected]> * Fixed bad reference (missed in merge) Signed-off-by: Whit Waldo <[email protected]> * Fixed failing unit test Signed-off-by: Whit Waldo <[email protected]> * Tweak to only pass along EventMessage payloads to developers as it's expected that the initial response will be null if EventMessage is populated Signed-off-by: Whit Waldo <[email protected]> * Was missing assignment of the Data property in the TopicMessage. Shout out to both @tommorvolloriddle and @Aimless321 for catching this! Signed-off-by: Whit Waldo <[email protected]> * Fix - return would be bad. Continue is the right move. Signed-off-by: Whit Waldo <[email protected]> * Added a simple test Signed-off-by: Whit Waldo <[email protected]> * Fixed unit tests Signed-off-by: Whit Waldo <[email protected]> * Merged in tweaks from #1422 Signed-off-by: Whit Waldo <[email protected]> --------- Signed-off-by: Whit Waldo <[email protected]>
1 parent 3a930c2 commit 3d500e8

File tree

7 files changed

+358
-17
lines changed

7 files changed

+358
-17
lines changed

src/Dapr.Messaging/AssemblyInfo.cs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
// ------------------------------------------------------------------------
2+
// Copyright 2024 The Dapr Authors
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
// http://www.apache.org/licenses/LICENSE-2.0
7+
// Unless required by applicable law or agreed to in writing, software
8+
// distributed under the License is distributed on an "AS IS" BASIS,
9+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
// See the License for the specific language governing permissions and
11+
// limitations under the License.
12+
// ------------------------------------------------------------------------
13+
14+
using System.Runtime.CompilerServices;
15+
16+
[assembly: InternalsVisibleTo("Dapr.Messaging.Test, PublicKey=0024000004800000940000000602000000240000525341310004000001000100b1f597635c44597fcecb493e2b1327033b29b1a98ac956a1a538664b68f87d45fbaada0438a15a6265e62864947cc067d8da3a7d93c5eb2fcbb850e396c8684dba74ea477d82a1bbb18932c0efb30b64ff1677f85ae833818707ac8b49ad8062ca01d2c89d8ab1843ae73e8ba9649cd28666b539444dcdee3639f95e2a099bb2")]
17+
18+

src/Dapr.Messaging/PublishSubscribe/DaprPublishSubscribeGrpcClient.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ internal sealed class DaprPublishSubscribeGrpcClient : DaprPublishSubscribeClien
4747
/// </summary>
4848
public DaprPublishSubscribeGrpcClient(P.DaprClient client, HttpClient httpClient, string? daprApiToken)
4949
{
50-
Client = client;
50+
this.Client = client;
5151
this.HttpClient = httpClient;
5252
this.DaprApiToken = daprApiToken;
5353
}
@@ -63,7 +63,7 @@ public DaprPublishSubscribeGrpcClient(P.DaprClient client, HttpClient httpClient
6363
/// <returns></returns>
6464
public override async Task<IAsyncDisposable> SubscribeAsync(string pubSubName, string topicName, DaprSubscriptionOptions options, TopicMessageHandler messageHandler, CancellationToken cancellationToken = default)
6565
{
66-
var receiver = new PublishSubscribeReceiver(pubSubName, topicName, options, messageHandler, Client);
66+
var receiver = new PublishSubscribeReceiver(pubSubName, topicName, options, messageHandler, this.Client);
6767
await receiver.SubscribeAsync(cancellationToken);
6868
return receiver;
6969
}

src/Dapr.Messaging/PublishSubscribe/PublishSubscribeReceiver.cs

Lines changed: 45 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,11 @@ internal sealed class PublishSubscribeReceiver : IAsyncDisposable
7777
/// </summary>
7878
private bool isDisposed;
7979

80+
// Internal property for testing purposes
81+
internal Task TopicMessagesChannelCompletion => topicMessagesChannel.Reader.Completion;
82+
// Internal property for testing purposes
83+
internal Task AcknowledgementsChannelCompletion => acknowledgementsChannel.Reader.Completion;
84+
8085
/// <summary>
8186
/// Constructs a new instance of a <see cref="PublishSubscribeReceiver"/> instance.
8287
/// </summary>
@@ -115,20 +120,40 @@ internal async Task SubscribeAsync(CancellationToken cancellationToken = default
115120

116121
var stream = await GetStreamAsync(cancellationToken);
117122

118-
//Retrieve the messages from the sidecar and write to the messages channel
119-
var fetchMessagesTask = FetchDataFromSidecarAsync(stream, topicMessagesChannel.Writer, cancellationToken);
123+
//Retrieve the messages from the sidecar and write to the messages channel - start without awaiting so this isn't blocking
124+
_ = FetchDataFromSidecarAsync(stream, topicMessagesChannel.Writer, cancellationToken)
125+
.ContinueWith(HandleTaskCompletion, null, cancellationToken, TaskContinuationOptions.OnlyOnFaulted,
126+
TaskScheduler.Default);
120127

121128
//Process the messages as they're written to either channel
122-
var acknowledgementProcessorTask = ProcessAcknowledgementChannelMessagesAsync(stream, cancellationToken);
123-
var topicMessageProcessorTask = ProcessTopicChannelMessagesAsync(cancellationToken);
129+
_ = ProcessAcknowledgementChannelMessagesAsync(stream, cancellationToken).ContinueWith(HandleTaskCompletion,
130+
null, cancellationToken, TaskContinuationOptions.OnlyOnFaulted, TaskScheduler.Default);
131+
_ = ProcessTopicChannelMessagesAsync(cancellationToken).ContinueWith(HandleTaskCompletion, null,
132+
cancellationToken,
133+
TaskContinuationOptions.OnlyOnFaulted, TaskScheduler.Default);
134+
}
124135

125-
try
126-
{
127-
await Task.WhenAll(fetchMessagesTask, acknowledgementProcessorTask, topicMessageProcessorTask);
128-
}
129-
catch (OperationCanceledException)
136+
/// <summary>
137+
/// Exposed for testing purposes only.
138+
/// </summary>
139+
/// <param name="message">The test message to write.</param>
140+
internal async Task WriteMessageToChannelAsync(TopicMessage message)
141+
{
142+
await topicMessagesChannel.Writer.WriteAsync(message);
143+
}
144+
145+
//Exposed for testing purposes only
146+
internal async Task WriteAcknowledgementToChannelAsync(TopicAcknowledgement acknowledgement)
147+
{
148+
await acknowledgementsChannel.Writer.WriteAsync(acknowledgement);
149+
}
150+
151+
//Exposed for testing purposes only
152+
internal static void HandleTaskCompletion(Task task, object? state)
153+
{
154+
if (task.Exception != null)
130155
{
131-
// Will be cleaned up during DisposeAsync
156+
throw task.Exception;
132157
}
133158
}
134159

@@ -251,13 +276,21 @@ await stream.RequestStream.WriteAsync(
251276
//Each time a message is received from the stream, push it into the topic messages channel
252277
await foreach (var response in stream.ResponseStream.ReadAllAsync(cancellationToken))
253278
{
279+
//https://github.com/dapr/dotnet-sdk/issues/1412 reports that this is sometimes null
280+
//Skip the initial response - we only want to pass along TopicMessage payloads to developers
281+
if (response?.EventMessage is null)
282+
{
283+
continue;
284+
}
285+
254286
var message =
255287
new TopicMessage(response.EventMessage.Id, response.EventMessage.Source, response.EventMessage.Type,
256288
response.EventMessage.SpecVersion, response.EventMessage.DataContentType,
257289
response.EventMessage.Topic, response.EventMessage.PubsubName)
258290
{
259291
Path = response.EventMessage.Path,
260-
Extensions = response.EventMessage.Extensions.Fields.ToDictionary(f => f.Key, kvp => kvp.Value)
292+
Extensions = response.EventMessage.Extensions.Fields.ToDictionary(f => f.Key, kvp => kvp.Value),
293+
Data = response.EventMessage.Data.ToByteArray()
261294
};
262295

263296
try
@@ -308,6 +341,6 @@ public async ValueTask DisposeAsync()
308341
/// </summary>
309342
/// <param name="MessageId">The identifier of the message.</param>
310343
/// <param name="Action">The action to take on the message in the acknowledgement request.</param>
311-
private sealed record TopicAcknowledgement(string MessageId, TopicEventResponse.Types.TopicEventResponseStatus Action);
344+
internal sealed record TopicAcknowledgement(string MessageId, TopicEventResponse.Types.TopicEventResponseStatus Action);
312345
}
313346

test/Dapr.Messaging.Test/Dapr.Messaging.Test.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
<PackageReference Include="Grpc.Net.Client" />
2626
<PackageReference Include="protobuf-net.Grpc.AspNetCore" />
2727
<PackageReference Include="Grpc.Tools" PrivateAssets="All" />
28+
<PackageReference Include="Microsoft.Extensions.DependencyInjection" />
2829
</ItemGroup>
2930

3031
<ItemGroup>

test/Dapr.Messaging.Test/Extensions/PublishSubscribeServiceCollectionExtensionsTests.cs

Lines changed: 72 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,90 @@
1-
using Dapr.Messaging.PublishSubscribe;
1+
// ------------------------------------------------------------------------
2+
// Copyright 2024 The Dapr Authors
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
// http://www.apache.org/licenses/LICENSE-2.0
7+
// Unless required by applicable law or agreed to in writing, software
8+
// distributed under the License is distributed on an "AS IS" BASIS,
9+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
// See the License for the specific language governing permissions and
11+
// limitations under the License.
12+
// ------------------------------------------------------------------------
13+
14+
using Dapr.Messaging.PublishSubscribe;
215
using Dapr.Messaging.PublishSubscribe.Extensions;
16+
using Microsoft.Extensions.Configuration;
317
using Microsoft.Extensions.DependencyInjection;
18+
using Moq;
419

520
namespace Dapr.Messaging.Test.Extensions;
621

722
public sealed class PublishSubscribeServiceCollectionExtensionsTests
823
{
924
[Fact]
10-
public void AddDaprPubSubClient_RegistersIHttpClientFactory()
25+
public void AddDaprMessagingClient_FromIConfiguration()
1126
{
27+
const string apiToken = "abc123";
28+
var configuration = new ConfigurationBuilder()
29+
.AddInMemoryCollection(new Dictionary<string, string?>
30+
{
31+
{"DAPR_API_TOKEN", apiToken }
32+
})
33+
.Build();
34+
1235
var services = new ServiceCollection();
1336

37+
services.AddSingleton<IConfiguration>(configuration);
38+
39+
services.AddDaprPubSubClient();
40+
41+
var app = services.BuildServiceProvider();
42+
43+
var pubSubClient = app.GetRequiredService<DaprPublishSubscribeClient>() as DaprPublishSubscribeGrpcClient;
44+
45+
Assert.NotNull(pubSubClient!);
46+
Assert.Equal(apiToken, pubSubClient.DaprApiToken);
47+
}
48+
49+
[Fact]
50+
public void AddDaprPubSubClient_RegistersIHttpClientFactory()
51+
{
52+
var services = new ServiceCollection();
1453
services.AddDaprPubSubClient();
1554

1655
var serviceProvider = services.BuildServiceProvider();
56+
var daprClient = serviceProvider.GetService<DaprPublishSubscribeClient>();
57+
Assert.NotNull(daprClient);
58+
}
59+
60+
[Fact]
61+
public void AddDaprPubSubClient_CallsConfigureAction()
62+
{
63+
var services = new ServiceCollection();
64+
65+
var configureCalled = false;
66+
67+
services.AddDaprPubSubClient(Configure);
68+
69+
var serviceProvider = services.BuildServiceProvider();
70+
var daprClient = serviceProvider.GetService<DaprPublishSubscribeClient>();
71+
Assert.NotNull(daprClient);
72+
Assert.True(configureCalled);
73+
return;
74+
75+
void Configure(IServiceProvider sp, DaprPublishSubscribeClientBuilder builder)
76+
{
77+
configureCalled = true;
78+
}
79+
}
1780

81+
[Fact]
82+
public void AddDaprPubSubClient_RegistersServicesCorrectly()
83+
{
84+
var services = new ServiceCollection();
85+
services.AddDaprPubSubClient();
86+
var serviceProvider = services.BuildServiceProvider();
87+
1888
var httpClientFactory = serviceProvider.GetService<IHttpClientFactory>();
1989
Assert.NotNull(httpClientFactory);
2090

test/Dapr.Messaging.Test/PublishSubscribe/MessageHandlingPolicyTest.cs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,17 @@
1-
using Dapr.Messaging.PublishSubscribe;
1+
// ------------------------------------------------------------------------
2+
// Copyright 2024 The Dapr Authors
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
// http://www.apache.org/licenses/LICENSE-2.0
7+
// Unless required by applicable law or agreed to in writing, software
8+
// distributed under the License is distributed on an "AS IS" BASIS,
9+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
// See the License for the specific language governing permissions and
11+
// limitations under the License.
12+
// ------------------------------------------------------------------------
13+
14+
using Dapr.Messaging.PublishSubscribe;
215

316
namespace Dapr.Messaging.Test.PublishSubscribe
417
{

0 commit comments

Comments
 (0)