diff --git a/src/Dapr.Client/DaprClient.cs b/src/Dapr.Client/DaprClient.cs index a988bd8c0..97372f869 100644 --- a/src/Dapr.Client/DaprClient.cs +++ b/src/Dapr.Client/DaprClient.cs @@ -314,11 +314,11 @@ public HttpRequestMessage CreateInvokeMethodRequest(string appId, string methodN /// The name of the method to invoke. /// A collection of key/value pairs to populate the query string from. /// An for use with SendInvokeMethodRequestAsync. - public HttpRequestMessage CreateInvokeMethodRequest(string appId, string methodName, IReadOnlyCollection> queryStringParameters) + public HttpRequestMessage CreateInvokeMethodRequest(string appId, string methodName, IReadOnlyCollection> queryStringParameters) { return CreateInvokeMethodRequest(HttpMethod.Post, appId, methodName, queryStringParameters); } - + /// /// Creates an that can be used to perform service invocation for the /// application identified by and invokes the method specified by @@ -357,7 +357,7 @@ public HttpRequestMessage CreateInvokeMethodRequest(string appId, stri { return CreateInvokeMethodRequest(HttpMethod.Post, appId, methodName, new List>(), data); } - + /// /// Creates an that can be used to perform service invocation for the /// application identified by and invokes the method specified by @@ -371,8 +371,8 @@ public HttpRequestMessage CreateInvokeMethodRequest(string appId, stri /// The data that will be JSON serialized and provided as the request body. /// A collection of key/value pairs to populate the query string from. /// An for use with SendInvokeMethodRequestAsync. - public abstract HttpRequestMessage CreateInvokeMethodRequest(HttpMethod httpMethod, string appId, string methodName, IReadOnlyCollection> queryStringParameters, TRequest data); - + public abstract HttpRequestMessage CreateInvokeMethodRequest(HttpMethod httpMethod, string appId, string methodName, IReadOnlyCollection> queryStringParameters, TRequest data); + /// /// Perform health-check of Dapr sidecar. Return 'true' if sidecar is healthy. Otherwise 'false'. /// CheckHealthAsync handle and will return 'false' if error will occur on transport level @@ -446,7 +446,7 @@ public HttpRequestMessage CreateInvokeMethodRequest(string appId, stri /// A that can be used to cancel the operation. /// A that will return the value when the operation has completed. public abstract Task InvokeMethodWithResponseAsync(HttpRequestMessage request, CancellationToken cancellationToken = default); - + /// /// /// Creates an that can be used to perform Dapr service invocation using @@ -779,7 +779,7 @@ public abstract Task InvokeMethodGrpcAsync( /// A that can be used to cancel the operation. /// A that will return the list of deserialized values when the operation has completed. public abstract Task>> GetBulkStateAsync(string storeName, IReadOnlyList keys, int? parallelism, IReadOnlyDictionary? metadata = null, CancellationToken cancellationToken = default); - + /// /// Saves a list of to the Dapr state store. /// @@ -917,10 +917,10 @@ public abstract Task> GetByteStateAsync( /// A that can be used to cancel the operation. /// A that will return the value when the operation has completed. This wraps the read value and an ETag. public abstract Task<(ReadOnlyMemory, string etag)> GetByteStateAndETagAsync( - string storeName, - string key, - ConsistencyMode? consistencyMode = null, - IReadOnlyDictionary? metadata = null, + string storeName, + string key, + ConsistencyMode? consistencyMode = null, + IReadOnlyDictionary? metadata = null, CancellationToken cancellationToken = default); /// @@ -1356,7 +1356,7 @@ public abstract IAsyncEnumerable> DecryptAsync(string vault #endregion #region Distributed Lock - + /// /// Attempt to lock the given resourceId with response indicating success. /// @@ -1389,9 +1389,9 @@ public abstract Task Unlock( string resourceId, string lockOwner, CancellationToken cancellationToken = default); - + #endregion - + /// public void Dispose() { @@ -1423,4 +1423,22 @@ protected static ProductInfoHeaderValue UserAgent() return new ProductInfoHeaderValue("dapr-sdk-dotnet", $"v{assemblyVersion}"); } + + /// + /// Bulk publishes raw byte events to the specified topic. + /// + /// The name of the pubsub component to use. + /// The name of the topic the request should be published to. + /// The list of raw byte events to be published. + /// The content type of the given bytes, defaults to application/json. + /// A collection of metadata key-value pairs that will be provided to the pubsub. The valid metadata keys and values are determined by the type of binding used. + /// A that can be used to cancel the operation. + /// A that will complete when the operation has completed. + public abstract Task> PublishBulkByteEventAsync( + string pubsubName, + string topicName, + IReadOnlyList> events, + string dataContentType = "application/json", + Dictionary? metadata = null, + CancellationToken cancellationToken = default); } diff --git a/src/Dapr.Client/DaprClientGrpc.cs b/src/Dapr.Client/DaprClientGrpc.cs index d5f61192d..8cf44ed44 100644 --- a/src/Dapr.Client/DaprClientGrpc.cs +++ b/src/Dapr.Client/DaprClientGrpc.cs @@ -263,6 +263,71 @@ events[counter] is CloudEvent } } +#nullable enable + public override async Task> PublishBulkByteEventAsync( + string pubsubName, + string topicName, + IReadOnlyList> events, + string dataContentType = "application/json", + Dictionary? metadata = null, + CancellationToken cancellationToken = default) + { + var request = new Autogenerated.BulkPublishRequest + { + PubsubName = pubsubName, + Topic = topicName, + }; + + if (metadata != null) + { + foreach (var kvp in metadata) + { + request.Metadata.Add(kvp.Key, kvp.Value); + } + } + + for (int i = 0; i < events.Count; i++) + { + var entry = new Autogenerated.BulkPublishRequestEntry + { + EntryId = i.ToString(), + Event = Google.Protobuf.ByteString.CopyFrom(events[i].ToArray()), + ContentType = dataContentType + }; + if (metadata != null) + { + foreach (var kvp in metadata) + { + entry.Metadata.Add(kvp.Key, kvp.Value); + } + } + request.Entries.Add(entry); + } + + var response = await client.BulkPublishEventAlpha1Async(request, cancellationToken: cancellationToken); + // Map response to BulkPublishResponse + var failedEntries = new List>(); + + foreach (var failed in response.FailedEntries) + { + // Try to find the original event data by EntryId + byte[] eventData = Array.Empty(); + if (int.TryParse(failed.EntryId, out int idx) && idx >= 0 && idx < events.Count) + { + eventData = events[idx].ToArray(); + } + var entry = new BulkPublishEntry( + failed.EntryId, + eventData, + dataContentType, + metadata); + failedEntries.Add(new BulkPublishResponseFailedEntry(entry, failed.Error)); + } + + return new BulkPublishResponse(failedEntries); + } +#nullable restore + #endregion #region InvokeBinding Apis @@ -574,7 +639,8 @@ public override async Task InvokeMethodGrpcAsync(string appId, string methodName var envelope = new Autogenerated.InvokeServiceRequest() { - Id = appId, Message = new Autogenerated.InvokeRequest() { Method = methodName, }, + Id = appId, + Message = new Autogenerated.InvokeRequest() { Method = methodName, }, }; var options = CreateCallOptions(headers: null, cancellationToken); @@ -600,7 +666,9 @@ public override async Task InvokeMethodGrpcAsync(string appId, string Id = appId, Message = new Autogenerated.InvokeRequest() { - Method = methodName, ContentType = Constants.ContentTypeApplicationGrpc, Data = Any.Pack(data), + Method = methodName, + ContentType = Constants.ContentTypeApplicationGrpc, + Data = Any.Pack(data), }, }; @@ -625,7 +693,8 @@ public override async Task InvokeMethodGrpcAsync(string ap var envelope = new Autogenerated.InvokeServiceRequest() { - Id = appId, Message = new Autogenerated.InvokeRequest() { Method = methodName, }, + Id = appId, + Message = new Autogenerated.InvokeRequest() { Method = methodName, }, }; var options = CreateCallOptions(headers: null, cancellationToken); @@ -652,7 +721,9 @@ public override async Task InvokeMethodGrpcAsync Id = appId, Message = new Autogenerated.InvokeRequest() { - Method = methodName, ContentType = Constants.ContentTypeApplicationGrpc, Data = Any.Pack(data), + Method = methodName, + ContentType = Constants.ContentTypeApplicationGrpc, + Data = Any.Pack(data), }, }; @@ -726,7 +797,8 @@ public override async Task>> GetBulkStateAsy var envelope = new Autogenerated.GetBulkStateRequest() { - StoreName = storeName, Parallelism = parallelism ?? default + StoreName = storeName, + Parallelism = parallelism ?? default }; if (metadata != null) @@ -1287,7 +1359,8 @@ private async Task MakeExecuteStateTransactionCallAsync( { var stateOperation = new Autogenerated.TransactionalStateOperation { - OperationType = state.OperationType.ToString()?.ToLower(), Request = ToAutogeneratedStateItem(state) + OperationType = state.OperationType.ToString()?.ToLower(), + Request = ToAutogeneratedStateItem(state) }; envelope.Operations.Add(stateOperation); @@ -1690,14 +1763,14 @@ public override async Task> EncryptAsync(string vaultResour [Experimental("DAPR_CRYPTOGRAPHY", UrlFormat = "https://docs.dapr.io/developing-applications/building-blocks/cryptography/cryptography-overview/")] public override async IAsyncEnumerable> EncryptAsync(string vaultResourceName, Stream plaintextStream, - string keyName, EncryptionOptions encryptionOptions, + string keyName, EncryptionOptions encryptionOptions, [EnumeratorCancellation] CancellationToken cancellationToken = default) { ArgumentVerifier.ThrowIfNullOrEmpty(vaultResourceName, nameof(vaultResourceName)); ArgumentVerifier.ThrowIfNullOrEmpty(keyName, nameof(keyName)); ArgumentVerifier.ThrowIfNull(plaintextStream, nameof(plaintextStream)); ArgumentVerifier.ThrowIfNull(encryptionOptions, nameof(encryptionOptions)); - + EventHandler exceptionHandler = (_, ex) => throw ex; var shouldOmitDecryptionKeyName = @@ -1746,7 +1819,7 @@ await streamProcessor.ProcessStreamAsync(plaintextStream, duplexStream, encryptR [Experimental("DAPR_CRYPTOGRAPHY", UrlFormat = "https://docs.dapr.io/developing-applications/building-blocks/cryptography/cryptography-overview/")] public override async IAsyncEnumerable> DecryptAsync(string vaultResourceName, Stream ciphertextStream, string keyName, - DecryptionOptions decryptionOptions, + DecryptionOptions decryptionOptions, [EnumeratorCancellation] CancellationToken cancellationToken = default) { ArgumentVerifier.ThrowIfNullOrEmpty(vaultResourceName, nameof(vaultResourceName)); @@ -1758,7 +1831,7 @@ public override async IAsyncEnumerable> DecryptAsync(string var decryptRequestOptions = new Autogenerated.DecryptRequestOptions { - ComponentName = vaultResourceName, + ComponentName = vaultResourceName, KeyName = keyName }; @@ -1800,7 +1873,7 @@ public override async Task> DecryptAsync(string vaultResour using var memoryStream = ciphertextBytes.CreateMemoryStream(true); var decryptionResult = DecryptAsync(vaultResourceName, memoryStream, keyName, decryptionOptions, cancellationToken); - + var bufferedResult = new ArrayBufferWriter(); await foreach (var item in decryptionResult) { @@ -2088,7 +2161,10 @@ public async override Task Lock( var request = new Autogenerated.TryLockRequest() { - StoreName = storeName, ResourceId = resourceId, LockOwner = lockOwner, ExpiryInSeconds = expiryInSeconds + StoreName = storeName, + ResourceId = resourceId, + LockOwner = lockOwner, + ExpiryInSeconds = expiryInSeconds }; try @@ -2098,7 +2174,10 @@ public async override Task Lock( var response = await client.TryLockAlpha1Async(request, options); return new TryLockResponse() { - StoreName = storeName, ResourceId = resourceId, LockOwner = lockOwner, Success = response.Success + StoreName = storeName, + ResourceId = resourceId, + LockOwner = lockOwner, + Success = response.Success }; } catch (RpcException ex) @@ -2122,7 +2201,9 @@ public async override Task Unlock( var request = new Autogenerated.UnlockRequest() { - StoreName = storeName, ResourceId = resourceId, LockOwner = lockOwner + StoreName = storeName, + ResourceId = resourceId, + LockOwner = lockOwner }; var options = CreateCallOptions(headers: null, cancellationToken); diff --git a/test/Dapr.Client.Test/PublishEventApiTest.cs b/test/Dapr.Client.Test/PublishEventApiTest.cs index 7130fa4de..9b37a997b 100644 --- a/test/Dapr.Client.Test/PublishEventApiTest.cs +++ b/test/Dapr.Client.Test/PublishEventApiTest.cs @@ -65,20 +65,21 @@ public async Task PublishEvent_ShouldRespectJsonStringEnumConverter() var clientBuilder = new DaprClientBuilder() .UseJsonSerializationOptions(new JsonSerializerOptions() { - Converters = {new JsonStringEnumConverter(null, false)} + Converters = { new JsonStringEnumConverter(null, false) } }) .UseHttpClientFactory(() => httpClient) .UseGrpcChannelOptions(new GrpcChannelOptions() { - HttpClient = httpClient, ThrowOperationCanceledOnCancellation = true + HttpClient = httpClient, + ThrowOperationCanceledOnCancellation = true }); var client = new TestClient(clientBuilder.Build(), handler); - + //Ensure that the JsonStringEnumConverter is registered client.InnerClient.JsonSerializerOptions.Converters.Count.ShouldBe(1); client.InnerClient.JsonSerializerOptions.Converters.First().GetType().Name.ShouldMatch(nameof(JsonStringEnumConverter)); - var publishData = new Widget {Size = "Large", Color = WidgetColor.Red}; + var publishData = new Widget { Size = "Large", Color = WidgetColor.Red }; var request = await client.CaptureGrpcRequestAsync(async daprClient => { await daprClient.PublishEventAsync(TestPubsubName, "test", publishData); @@ -294,6 +295,37 @@ public async Task PublishEventAsync_CanPublishWithRawData() envelope.Metadata.Count.ShouldBe(0); } + [Fact] + public async Task PublishBulkByteEventAsync_PublishesEventsAndReturnsResponse() + { + // Arrange + var client = new DaprClientBuilder().Build(); + var pubsubName = "pubsub"; + var topicName = "bulk-deposit"; + var events = new List> + { + new byte[] { 1, 2, 3 }, + new byte[] { 4, 5, 6 } + }; + var metadata = new Dictionary + { + { "test-key", "test-value" } + }; + + // Act + var response = await client.PublishBulkByteEventAsync( + pubsubName, + topicName, + events, + "application/json", + metadata, + CancellationToken.None); + + // Assert + Assert.NotNull(response); + Assert.True(response.FailedEntries.Count == 0, "No failed entries expected for successful publish."); + } + private class PublishData { public string PublishObjectParameter { get; set; } @@ -311,4 +343,4 @@ private enum WidgetColor Green, Yellow } -} \ No newline at end of file +}