Skip to content

Commit 6964033

Browse files
WhitWaldodivzi-p
authored andcommitted
Add .NET client for pub/sub support - streaming subscriptions (dapr#1381)
* Building out Dapr.Messaging and test project for streaming pubsub subscriptions Signed-off-by: Whit Waldo <[email protected]> * Added copyright notices Signed-off-by: Whit Waldo <[email protected]> * Minor stylistic updates Signed-off-by: Whit Waldo <[email protected]> * Added generic client builder to support publish/subscribe client builder Signed-off-by: Whit Waldo <[email protected]> * Tweaked XML comment Signed-off-by: Whit Waldo <[email protected]> * Added several unit tests for the generic client builder Signed-off-by: Whit Waldo <[email protected]> * Updated to include latest review changes: - Added lock so that while we guarantee the method is called only once, it should be thread-safe now - Marked PublishSubscribeReceiver as internal so its members aren't part of the public API - Updated TopicMessage to use IReadOnlyDictionary Signed-off-by: Whit Waldo <[email protected]> * Switched to interlock exchange instead of lock to slightly simplify code Signed-off-by: Whit Waldo <[email protected]> * Added sample project Signed-off-by: Whit Waldo <[email protected]> * Minor changes to unit test Signed-off-by: Whit Waldo <[email protected]> * Deleted protos folder Signed-off-by: Whit Waldo <[email protected]> * Using lowercase protos dir name Signed-off-by: Whit Waldo <[email protected]> * Added registration extension methods Signed-off-by: Whit Waldo <[email protected]> * Updated example to use DI registration Signed-off-by: Whit Waldo <[email protected]> * Added default cancellation token Signed-off-by: Whit Waldo <[email protected]> * Passing stream into method instead of creating it twice Signed-off-by: Whit Waldo <[email protected]> --------- Signed-off-by: Whit Waldo <[email protected]> Signed-off-by: Divya Perumal <[email protected]>
1 parent 180f622 commit 6964033

19 files changed

+1011
-49
lines changed

Directory.Packages.props

Lines changed: 49 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -1,51 +1,51 @@
11
<Project>
2-
<PropertyGroup>
3-
<ManagePackageVersionsCentrally>true</ManagePackageVersionsCentrally>
4-
<CentralPackageTransitivePinningEnabled>true</CentralPackageTransitivePinningEnabled>
5-
</PropertyGroup>
6-
<ItemGroup>
7-
<PackageVersion Include="BenchmarkDotNet" Version="0.14.0" />
8-
<PackageVersion Include="coverlet.collector" Version="6.0.2" />
9-
<PackageVersion Include="coverlet.msbuild" Version="6.0.2" />
10-
<PackageVersion Include="FluentAssertions" Version="5.9.0" />
11-
<PackageVersion Include="GitHubActionsTestLogger" Version="1.1.2" />
12-
<PackageVersion Include="Google.Api.CommonProtos" Version="2.2.0" />
13-
<PackageVersion Include="Google.Protobuf" Version="3.28.2" />
14-
<PackageVersion Include="Grpc.AspNetCore" Version="2.66.0" />
15-
<PackageVersion Include="Grpc.Core.Testing" Version="2.46.6" />
16-
<PackageVersion Include="Grpc.Net.Client" Version="2.66.0" />
17-
<PackageVersion Include="Grpc.Net.ClientFactory" Version="2.66.0" />
18-
<PackageVersion Include="Grpc.Tools" Version="2.67.0" />
19-
<PackageVersion Include="Microsoft.AspNetCore.Mvc.Testing" Version="6.0.35" />
20-
<PackageVersion Include="Microsoft.AspNetCore.TestHost" Version="6.0.35" />
21-
<PackageVersion Include="Microsoft.CodeAnalysis.Analyzers" Version="3.3.4" />
22-
<PackageVersion Include="Microsoft.CodeAnalysis.Common" Version="4.8.0" />
23-
<PackageVersion Include="Microsoft.CodeAnalysis.CSharp" Version="4.8.0" />
24-
<PackageVersion Include="Microsoft.CodeAnalysis.CSharp.SourceGenerators.Testing" Version="1.1.2" />
25-
<PackageVersion Include="Microsoft.CodeAnalysis.CSharp.SourceGenerators.Testing.XUnit" Version="1.1.2" />
26-
<PackageVersion Include="Microsoft.CodeAnalysis.CSharp.Workspaces" Version="4.8.0" />
27-
<PackageVersion Include="Microsoft.DurableTask.Client.Grpc" Version="1.3.0" />
28-
<PackageVersion Include="Microsoft.DurableTask.Worker.Grpc" Version="1.3.0" />
29-
<PackageVersion Include="Microsoft.Extensions.Configuration" Version="6.0.1" />
30-
<PackageVersion Include="Microsoft.Extensions.Configuration.Abstractions" Version="6.0.0" />
31-
<PackageVersion Include="Microsoft.Extensions.DependencyInjection" Version="6.0.1" />
32-
<PackageVersion Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="6.0.0" />
33-
<PackageVersion Include="Microsoft.Extensions.Http" Version="6.0.0" />
34-
<PackageVersion Include="Microsoft.Extensions.Logging" Version="6.0.0" />
35-
<PackageVersion Include="Microsoft.Extensions.Logging.Abstractions" Version="6.0.0" />
36-
<PackageVersion Include="Microsoft.NET.Test.Sdk" Version="16.8.3" />
37-
<PackageVersion Include="Microsoft.SourceLink.GitHub" Version="1.1.1" />
38-
<PackageVersion Include="MinVer" Version="2.3.0" />
39-
<PackageVersion Include="Moq" Version="4.20.72" />
40-
<PackageVersion Include="Newtonsoft.Json" Version="13.0.3" />
41-
<PackageVersion Include="protobuf-net.Grpc.AspNetCore" Version="1.2.2" />
42-
<PackageVersion Include="Serilog.AspNetCore" Version="6.1.0" />
43-
<PackageVersion Include="Serilog.Sinks.Console" Version="4.1.0" />
44-
<PackageVersion Include="Serilog.Sinks.File" Version="5.0.0" />
45-
<PackageVersion Include="System.Formats.Asn1" Version="6.0.1" />
46-
<PackageVersion Include="System.Text.Json" Version="8.0.5" />
47-
<PackageVersion Include="xunit" Version="2.9.2" />
48-
<PackageVersion Include="xunit.extensibility.core" Version="2.9.2" />
49-
<PackageVersion Include="xunit.runner.visualstudio" Version="2.8.2" />
50-
</ItemGroup>
2+
<PropertyGroup>
3+
<ManagePackageVersionsCentrally>true</ManagePackageVersionsCentrally>
4+
<CentralPackageTransitivePinningEnabled>true</CentralPackageTransitivePinningEnabled>
5+
</PropertyGroup>
6+
<ItemGroup>
7+
<PackageVersion Include="BenchmarkDotNet" Version="0.14.0" />
8+
<PackageVersion Include="coverlet.collector" Version="6.0.2" />
9+
<PackageVersion Include="coverlet.msbuild" Version="6.0.2" />
10+
<PackageVersion Include="FluentAssertions" Version="5.9.0" />
11+
<PackageVersion Include="GitHubActionsTestLogger" Version="1.1.2" />
12+
<PackageVersion Include="Google.Api.CommonProtos" Version="2.2.0" />
13+
<PackageVersion Include="Google.Protobuf" Version="3.28.2" />
14+
<PackageVersion Include="Grpc.AspNetCore" Version="2.66.0" />
15+
<PackageVersion Include="Grpc.Core.Testing" Version="2.46.6" />
16+
<PackageVersion Include="Grpc.Net.Client" Version="2.66.0" />
17+
<PackageVersion Include="Grpc.Net.ClientFactory" Version="2.66.0" />
18+
<PackageVersion Include="Grpc.Tools" Version="2.67.0" />
19+
<PackageVersion Include="Microsoft.AspNetCore.Mvc.Testing" Version="6.0.35" />
20+
<PackageVersion Include="Microsoft.AspNetCore.TestHost" Version="6.0.35" />
21+
<PackageVersion Include="Microsoft.CodeAnalysis.Analyzers" Version="3.3.4" />
22+
<PackageVersion Include="Microsoft.CodeAnalysis.Common" Version="4.8.0" />
23+
<PackageVersion Include="Microsoft.CodeAnalysis.CSharp" Version="4.8.0" />
24+
<PackageVersion Include="Microsoft.CodeAnalysis.CSharp.SourceGenerators.Testing" Version="1.1.2" />
25+
<PackageVersion Include="Microsoft.CodeAnalysis.CSharp.SourceGenerators.Testing.XUnit" Version="1.1.2" />
26+
<PackageVersion Include="Microsoft.CodeAnalysis.CSharp.Workspaces" Version="4.8.0" />
27+
<PackageVersion Include="Microsoft.DurableTask.Client.Grpc" Version="1.3.0" />
28+
<PackageVersion Include="Microsoft.DurableTask.Worker.Grpc" Version="1.3.0" />
29+
<PackageVersion Include="Microsoft.Extensions.Configuration" Version="6.0.1" />
30+
<PackageVersion Include="Microsoft.Extensions.Configuration.Abstractions" Version="6.0.0" />
31+
<PackageVersion Include="Microsoft.Extensions.DependencyInjection" Version="6.0.1" />
32+
<PackageVersion Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="6.0.0" />
33+
<PackageVersion Include="Microsoft.Extensions.Logging" Version="6.0.0" />
34+
<PackageVersion Include="Microsoft.Extensions.Logging.Abstractions" Version="6.0.4" />
35+
<PackageVersion Include="Microsoft.Extensions.Http" Version="6.0.0" />
36+
<PackageVersion Include="Microsoft.NET.Test.Sdk" Version="16.8.3" />
37+
<PackageVersion Include="Microsoft.SourceLink.GitHub" Version="1.1.1" />
38+
<PackageVersion Include="MinVer" Version="2.3.0" />
39+
<PackageVersion Include="Moq" Version="4.20.72" />
40+
<PackageVersion Include="Newtonsoft.Json" Version="13.0.3" />
41+
<PackageVersion Include="protobuf-net.Grpc.AspNetCore" Version="1.2.2" />
42+
<PackageVersion Include="Serilog.AspNetCore" Version="6.1.0" />
43+
<PackageVersion Include="Serilog.Sinks.Console" Version="4.1.0" />
44+
<PackageVersion Include="Serilog.Sinks.File" Version="5.0.0" />
45+
<PackageVersion Include="System.Formats.Asn1" Version="6.0.1" />
46+
<PackageVersion Include="System.Text.Json" Version="6.0.10" />
47+
<PackageVersion Include="xunit" Version="2.9.2" />
48+
<PackageVersion Include="xunit.extensibility.core" Version="2.9.2" />
49+
<PackageVersion Include="xunit.runner.visualstudio" Version="2.8.2" />
50+
</ItemGroup>
5151
</Project>

all.sln

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,11 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Dapr.Common", "src\Dapr.Com
119119
EndProject
120120
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Dapr.Common.Test", "test\Dapr.Common.Test\Dapr.Common.Test.csproj", "{CDB47863-BEBD-4841-A807-46D868962521}"
121121
EndProject
122+
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Dapr.Messaging.Test", "test\Dapr.Messaging.Test\Dapr.Messaging.Test.csproj", "{4E04EB35-7FD2-4FDB-B09A-F75CE24053B9}"
123+
EndProject
124+
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Dapr.Messaging", "src\Dapr.Messaging\Dapr.Messaging.csproj", "{0EAE36A1-B578-4F13-A113-7A477ECA1BDA}"
125+
EndProject
126+
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "StreamingSubscriptionExample", "examples\Client\PublishSubscribe\StreamingSubscriptionExample\StreamingSubscriptionExample.csproj", "{290D1278-F613-4DF3-9DF5-F37E38CDC363}"
122127
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Dapr.Jobs", "src\Dapr.Jobs\Dapr.Jobs.csproj", "{C8BB6A85-A7EA-40C0-893D-F36F317829B3}"
123128
EndProject
124129
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Dapr.Jobs.Test", "test\Dapr.Jobs.Test\Dapr.Jobs.Test.csproj", "{BF9828E9-5597-4D42-AA6E-6E6C12214204}"
@@ -311,6 +316,18 @@ Global
311316
{CDB47863-BEBD-4841-A807-46D868962521}.Debug|Any CPU.Build.0 = Debug|Any CPU
312317
{CDB47863-BEBD-4841-A807-46D868962521}.Release|Any CPU.ActiveCfg = Release|Any CPU
313318
{CDB47863-BEBD-4841-A807-46D868962521}.Release|Any CPU.Build.0 = Release|Any CPU
319+
{4E04EB35-7FD2-4FDB-B09A-F75CE24053B9}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
320+
{4E04EB35-7FD2-4FDB-B09A-F75CE24053B9}.Debug|Any CPU.Build.0 = Debug|Any CPU
321+
{4E04EB35-7FD2-4FDB-B09A-F75CE24053B9}.Release|Any CPU.ActiveCfg = Release|Any CPU
322+
{4E04EB35-7FD2-4FDB-B09A-F75CE24053B9}.Release|Any CPU.Build.0 = Release|Any CPU
323+
{0EAE36A1-B578-4F13-A113-7A477ECA1BDA}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
324+
{0EAE36A1-B578-4F13-A113-7A477ECA1BDA}.Debug|Any CPU.Build.0 = Debug|Any CPU
325+
{0EAE36A1-B578-4F13-A113-7A477ECA1BDA}.Release|Any CPU.ActiveCfg = Release|Any CPU
326+
{0EAE36A1-B578-4F13-A113-7A477ECA1BDA}.Release|Any CPU.Build.0 = Release|Any CPU
327+
{290D1278-F613-4DF3-9DF5-F37E38CDC363}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
328+
{290D1278-F613-4DF3-9DF5-F37E38CDC363}.Debug|Any CPU.Build.0 = Debug|Any CPU
329+
{290D1278-F613-4DF3-9DF5-F37E38CDC363}.Release|Any CPU.ActiveCfg = Release|Any CPU
330+
{290D1278-F613-4DF3-9DF5-F37E38CDC363}.Release|Any CPU.Build.0 = Release|Any CPU
314331
{C8BB6A85-A7EA-40C0-893D-F36F317829B3}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
315332
{C8BB6A85-A7EA-40C0-893D-F36F317829B3}.Debug|Any CPU.Build.0 = Debug|Any CPU
316333
{C8BB6A85-A7EA-40C0-893D-F36F317829B3}.Release|Any CPU.ActiveCfg = Release|Any CPU
@@ -379,6 +396,9 @@ Global
379396
{DFBABB04-50E9-42F6-B470-310E1B545638} = {27C5D71D-0721-4221-9286-B94AB07B58CF}
380397
{B445B19C-A925-4873-8CB7-8317898B6970} = {27C5D71D-0721-4221-9286-B94AB07B58CF}
381398
{CDB47863-BEBD-4841-A807-46D868962521} = {DD020B34-460F-455F-8D17-CF4A949F100B}
399+
{4E04EB35-7FD2-4FDB-B09A-F75CE24053B9} = {DD020B34-460F-455F-8D17-CF4A949F100B}
400+
{0EAE36A1-B578-4F13-A113-7A477ECA1BDA} = {27C5D71D-0721-4221-9286-B94AB07B58CF}
401+
{290D1278-F613-4DF3-9DF5-F37E38CDC363} = {0EF6EA64-D7C3-420D-9890-EAE8D54A57E6}
382402
{C8BB6A85-A7EA-40C0-893D-F36F317829B3} = {27C5D71D-0721-4221-9286-B94AB07B58CF}
383403
{BF9828E9-5597-4D42-AA6E-6E6C12214204} = {DD020B34-460F-455F-8D17-CF4A949F100B}
384404
{D9697361-232F-465D-A136-4561E0E88488} = {D687DDC4-66C5-4667-9E3A-FD8B78ECAA78}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
using System.Text;
2+
using Dapr.Messaging.PublishSubscribe;
3+
using Dapr.Messaging.PublishSubscribe.Extensions;
4+
5+
var builder = WebApplication.CreateBuilder(args);
6+
builder.Services.AddDaprPubSubClient();
7+
var app = builder.Build();
8+
9+
//Process each message returned from the subscription
10+
Task<TopicResponseAction> HandleMessageAsync(TopicMessage message, CancellationToken cancellationToken = default)
11+
{
12+
try
13+
{
14+
//Do something with the message
15+
Console.WriteLine(Encoding.UTF8.GetString(message.Data.Span));
16+
return Task.FromResult(TopicResponseAction.Success);
17+
}
18+
catch
19+
{
20+
return Task.FromResult(TopicResponseAction.Retry);
21+
}
22+
}
23+
24+
var messagingClient = app.Services.GetRequiredService<DaprPublishSubscribeClient>();
25+
26+
//Create a dynamic streaming subscription and subscribe with a timeout of 30 seconds and 10 seconds for message handling
27+
var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(30));
28+
var subscription = await messagingClient.SubscribeAsync("pubsub", "myTopic",
29+
new DaprSubscriptionOptions(new MessageHandlingPolicy(TimeSpan.FromSeconds(10), TopicResponseAction.Retry)),
30+
HandleMessageAsync, cancellationTokenSource.Token);
31+
32+
await Task.Delay(TimeSpan.FromMinutes(1));
33+
34+
//When you're done with the subscription, simply dispose of it
35+
await subscription.DisposeAsync();
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
<Project Sdk="Microsoft.NET.Sdk.Web">
2+
3+
<PropertyGroup>
4+
<OutputType>Exe</OutputType>
5+
<TargetFramework>net6.0</TargetFramework>
6+
<ImplicitUsings>enable</ImplicitUsings>
7+
<Nullable>enable</Nullable>
8+
</PropertyGroup>
9+
10+
<ItemGroup>
11+
<ProjectReference Include="..\..\..\..\src\Dapr.Messaging\Dapr.Messaging.csproj" />
12+
</ItemGroup>
13+
14+
</Project>
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
3+
<PropertyGroup>
4+
<Description>This package contains the reference assemblies for developing messaging services using Dapr.</Description>
5+
<ImplicitUsings>enable</ImplicitUsings>
6+
<Nullable>enable</Nullable>
7+
<PackageId>Dapr.Messaging</PackageId>
8+
<Title>Dapr Messaging SDK</Title>
9+
<Description>Dapr Messaging SDK for building applications that utilize messaging components.</Description>
10+
<VersionSuffix>alpha</VersionSuffix>
11+
</PropertyGroup>
12+
13+
<ItemGroup>
14+
<PackageReference Include="Microsoft.Extensions.DependencyInjection" />
15+
</ItemGroup>
16+
17+
<ItemGroup>
18+
<ProjectReference Include="..\Dapr.Common\Dapr.Common.csproj" />
19+
<ProjectReference Include="..\Dapr.Protos\Dapr.Protos.csproj" />
20+
</ItemGroup>
21+
22+
</Project>
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
// ------------------------------------------------------------------------
2+
// Copyright 2024 The Dapr Authors
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
// http://www.apache.org/licenses/LICENSE-2.0
7+
// Unless required by applicable law or agreed to in writing, software
8+
// distributed under the License is distributed on an "AS IS" BASIS,
9+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
// See the License for the specific language governing permissions and
11+
// limitations under the License.
12+
// ------------------------------------------------------------------------
13+
14+
namespace Dapr.Messaging.PublishSubscribe;
15+
16+
/// <summary>
17+
/// The base implementation of a Dapr pub/sub client.
18+
/// </summary>
19+
public abstract class DaprPublishSubscribeClient
20+
{
21+
/// <summary>
22+
/// Dynamically subscribes to a Publish/Subscribe component and topic.
23+
/// </summary>
24+
/// <param name="pubSubName">The name of the Publish/Subscribe component.</param>
25+
/// <param name="topicName">The name of the topic to subscribe to.</param>
26+
/// <param name="options">Configuration options.</param>
27+
/// <param name="messageHandler">The delegate reflecting the action to take upon messages received by the subscription.</param>
28+
/// <param name="cancellationToken">Cancellation token.</param>
29+
/// <returns></returns>
30+
public abstract Task<IAsyncDisposable> SubscribeAsync(string pubSubName, string topicName, DaprSubscriptionOptions options, TopicMessageHandler messageHandler, CancellationToken cancellationToken = default);
31+
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
// ------------------------------------------------------------------------
2+
// Copyright 2024 The Dapr Authors
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
// http://www.apache.org/licenses/LICENSE-2.0
7+
// Unless required by applicable law or agreed to in writing, software
8+
// distributed under the License is distributed on an "AS IS" BASIS,
9+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
// See the License for the specific language governing permissions and
11+
// limitations under the License.
12+
// ------------------------------------------------------------------------
13+
14+
using Dapr.Common;
15+
using Microsoft.Extensions.Configuration;
16+
using Autogenerated = Dapr.Client.Autogen.Grpc.v1;
17+
18+
namespace Dapr.Messaging.PublishSubscribe;
19+
20+
/// <summary>
21+
/// Builds a <see cref="DaprPublishSubscribeClient"/>.
22+
/// </summary>
23+
public sealed class DaprPublishSubscribeClientBuilder : DaprGenericClientBuilder<DaprPublishSubscribeClient>
24+
{
25+
/// <summary>
26+
/// Used to initialize a new instance of the <see cref="DaprPublishSubscribeClientBuilder"/>.
27+
/// </summary>
28+
/// <param name="configuration">An optional instance of <see cref="IConfiguration"/>.</param>
29+
public DaprPublishSubscribeClientBuilder(IConfiguration? configuration = null) : base(configuration)
30+
{
31+
}
32+
33+
/// <summary>
34+
/// Builds the client instance from the properties of the builder.
35+
/// </summary>
36+
/// <returns>The Dapr client instance.</returns>
37+
/// <summary>
38+
/// Builds the client instance from the properties of the builder.
39+
/// </summary>
40+
public override DaprPublishSubscribeClient Build()
41+
{
42+
var daprClientDependencies = BuildDaprClientDependencies();
43+
var client = new Autogenerated.Dapr.DaprClient(daprClientDependencies.channel);
44+
45+
return new DaprPublishSubscribeGrpcClient(client);
46+
}
47+
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
// ------------------------------------------------------------------------
2+
// Copyright 2024 The Dapr Authors
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
// http://www.apache.org/licenses/LICENSE-2.0
7+
// Unless required by applicable law or agreed to in writing, software
8+
// distributed under the License is distributed on an "AS IS" BASIS,
9+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
// See the License for the specific language governing permissions and
11+
// limitations under the License.
12+
// ------------------------------------------------------------------------
13+
14+
using P = Dapr.Client.Autogen.Grpc.v1.Dapr;
15+
16+
namespace Dapr.Messaging.PublishSubscribe;
17+
18+
/// <summary>
19+
/// A client for interacting with the Dapr endpoints.
20+
/// </summary>
21+
internal sealed class DaprPublishSubscribeGrpcClient : DaprPublishSubscribeClient
22+
{
23+
private readonly P.DaprClient daprClient;
24+
25+
/// <summary>
26+
/// Creates a new instance of a <see cref="DaprPublishSubscribeGrpcClient"/>
27+
/// </summary>
28+
public DaprPublishSubscribeGrpcClient(P.DaprClient client)
29+
{
30+
daprClient = client;
31+
}
32+
33+
/// <summary>
34+
/// Dynamically subscribes to a Publish/Subscribe component and topic.
35+
/// </summary>
36+
/// <param name="pubSubName">The name of the Publish/Subscribe component.</param>
37+
/// <param name="topicName">The name of the topic to subscribe to.</param>
38+
/// <param name="options">Configuration options.</param>
39+
/// <param name="messageHandler">The delegate reflecting the action to take upon messages received by the subscription.</param>
40+
/// <param name="cancellationToken">Cancellation token.</param>
41+
/// <returns></returns>
42+
public override async Task<IAsyncDisposable> SubscribeAsync(string pubSubName, string topicName, DaprSubscriptionOptions options, TopicMessageHandler messageHandler, CancellationToken cancellationToken = default)
43+
{
44+
var receiver = new PublishSubscribeReceiver(pubSubName, topicName, options, messageHandler, daprClient);
45+
await receiver.SubscribeAsync(cancellationToken);
46+
return receiver;
47+
}
48+
}
49+

0 commit comments

Comments
 (0)