Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 32 additions & 14 deletions src/Dapr.Client/DaprClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -314,11 +314,11 @@ public HttpRequestMessage CreateInvokeMethodRequest(string appId, string methodN
/// <param name="methodName">The name of the method to invoke.</param>
/// <param name="queryStringParameters">A collection of key/value pairs to populate the query string from.</param>
/// <returns>An <see cref="HttpRequestMessage" /> for use with <c>SendInvokeMethodRequestAsync</c>.</returns>
public HttpRequestMessage CreateInvokeMethodRequest(string appId, string methodName, IReadOnlyCollection<KeyValuePair<string,string>> queryStringParameters)
public HttpRequestMessage CreateInvokeMethodRequest(string appId, string methodName, IReadOnlyCollection<KeyValuePair<string, string>> queryStringParameters)
{
return CreateInvokeMethodRequest(HttpMethod.Post, appId, methodName, queryStringParameters);
}

/// <summary>
/// Creates an <see cref="HttpRequestMessage" /> that can be used to perform service invocation for the
/// application identified by <paramref name="appId" /> and invokes the method specified by <paramref name="methodName" />
Expand Down Expand Up @@ -357,7 +357,7 @@ public HttpRequestMessage CreateInvokeMethodRequest<TRequest>(string appId, stri
{
return CreateInvokeMethodRequest(HttpMethod.Post, appId, methodName, new List<KeyValuePair<string, string>>(), data);
}

/// <summary>
/// Creates an <see cref="HttpRequestMessage" /> that can be used to perform service invocation for the
/// application identified by <paramref name="appId" /> and invokes the method specified by <paramref name="methodName" />
Expand All @@ -371,8 +371,8 @@ public HttpRequestMessage CreateInvokeMethodRequest<TRequest>(string appId, stri
/// <param name="data">The data that will be JSON serialized and provided as the request body.</param>
/// <param name="queryStringParameters">A collection of key/value pairs to populate the query string from.</param>
/// <returns>An <see cref="HttpRequestMessage" /> for use with <c>SendInvokeMethodRequestAsync</c>.</returns>
public abstract HttpRequestMessage CreateInvokeMethodRequest<TRequest>(HttpMethod httpMethod, string appId, string methodName, IReadOnlyCollection<KeyValuePair<string,string>> queryStringParameters, TRequest data);
public abstract HttpRequestMessage CreateInvokeMethodRequest<TRequest>(HttpMethod httpMethod, string appId, string methodName, IReadOnlyCollection<KeyValuePair<string, string>> queryStringParameters, TRequest data);

/// <summary>
/// Perform health-check of Dapr sidecar. Return 'true' if sidecar is healthy. Otherwise 'false'.
/// CheckHealthAsync handle <see cref="HttpRequestException"/> and will return 'false' if error will occur on transport level
Expand Down Expand Up @@ -446,7 +446,7 @@ public HttpRequestMessage CreateInvokeMethodRequest<TRequest>(string appId, stri
/// <param name="cancellationToken">A <see cref="CancellationToken" /> that can be used to cancel the operation.</param>
/// <returns>A <see cref="Task{T}" /> that will return the value when the operation has completed.</returns>
public abstract Task<HttpResponseMessage> InvokeMethodWithResponseAsync(HttpRequestMessage request, CancellationToken cancellationToken = default);

/// <summary>
/// <para>
/// Creates an <see cref="HttpClient"/> that can be used to perform Dapr service invocation using <see cref="HttpRequestMessage"/>
Expand Down Expand Up @@ -779,7 +779,7 @@ public abstract Task<TResponse> InvokeMethodGrpcAsync<TRequest, TResponse>(
/// <param name="cancellationToken">A <see cref="CancellationToken" /> that can be used to cancel the operation.</param>
/// <returns>A <see cref="Task{IReadOnlyList}" /> that will return the list of deserialized values when the operation has completed.</returns>
public abstract Task<IReadOnlyList<BulkStateItem<TValue>>> GetBulkStateAsync<TValue>(string storeName, IReadOnlyList<string> keys, int? parallelism, IReadOnlyDictionary<string, string>? metadata = null, CancellationToken cancellationToken = default);

/// <summary>
/// Saves a list of <paramref name="items" /> to the Dapr state store.
/// </summary>
Expand Down Expand Up @@ -917,10 +917,10 @@ public abstract Task<ReadOnlyMemory<byte>> GetByteStateAsync(
/// <param name="cancellationToken">A <see cref="CancellationToken" /> that can be used to cancel the operation.</param>
/// <returns>A <see cref="Task{T}" /> that will return the value when the operation has completed. This wraps the read value and an ETag.</returns>
public abstract Task<(ReadOnlyMemory<byte>, string etag)> GetByteStateAndETagAsync(
string storeName,
string key,
ConsistencyMode? consistencyMode = null,
IReadOnlyDictionary<string, string>? metadata = null,
string storeName,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally, please don't submit PRs with formatting changes like this as everyone's IDE is going to add/remove stuff like this based on local preferences and it slows down actually reviewing the changes you've made.

string key,
ConsistencyMode? consistencyMode = null,
IReadOnlyDictionary<string, string>? metadata = null,
CancellationToken cancellationToken = default);

/// <summary>
Expand Down Expand Up @@ -1356,7 +1356,7 @@ public abstract IAsyncEnumerable<ReadOnlyMemory<byte>> DecryptAsync(string vault
#endregion

#region Distributed Lock

/// <summary>
/// Attempt to lock the given resourceId with response indicating success.
/// </summary>
Expand Down Expand Up @@ -1389,9 +1389,9 @@ public abstract Task<UnlockResponse> Unlock(
string resourceId,
string lockOwner,
CancellationToken cancellationToken = default);

#endregion

/// <inheritdoc />
public void Dispose()
{
Expand Down Expand Up @@ -1423,4 +1423,22 @@ protected static ProductInfoHeaderValue UserAgent()

return new ProductInfoHeaderValue("dapr-sdk-dotnet", $"v{assemblyVersion}");
}

/// <summary>
/// Bulk publishes raw byte events to the specified topic.
/// </summary>
/// <param name="pubsubName">The name of the pubsub component to use.</param>
/// <param name="topicName">The name of the topic the request should be published to.</param>
/// <param name="events">The list of raw byte events to be published.</param>
/// <param name="dataContentType">The content type of the given bytes, defaults to application/json.</param>
/// <param name="metadata">A collection of metadata key-value pairs that will be provided to the pubsub. The valid metadata keys and values are determined by the type of binding used.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken" /> that can be used to cancel the operation.</param>
/// <returns>A <see cref="Task" /> that will complete when the operation has completed.</returns>
public abstract Task<BulkPublishResponse<byte[]>> PublishBulkByteEventAsync(
string pubsubName,
string topicName,
IReadOnlyList<ReadOnlyMemory<byte>> events,
string dataContentType = "application/json",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For consistency, could you please have this use the same referenced constant as the non-bulk equivalent, Constants.ContentTypeApplicationJson?

Dictionary<string, string>? metadata = null,
CancellationToken cancellationToken = default);
}
109 changes: 95 additions & 14 deletions src/Dapr.Client/DaprClientGrpc.cs
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,71 @@ events[counter] is CloudEvent
}
}

#nullable enable
public override async Task<BulkPublishResponse<byte[]>> PublishBulkByteEventAsync(
string pubsubName,
string topicName,
IReadOnlyList<ReadOnlyMemory<byte>> events,
string dataContentType = "application/json",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, please update the default value to use the pre-existing constant Constants.ContentTypeApplicationJson.

Dictionary<string, string>? metadata = null,
CancellationToken cancellationToken = default)
{
var request = new Autogenerated.BulkPublishRequest
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason we cannot refactor and use the existing generic MakeBulkPublishRequest here instead of duplicating the method for this additional type?

{
PubsubName = pubsubName,
Topic = topicName,
};

if (metadata != null)
{
foreach (var kvp in metadata)
{
request.Metadata.Add(kvp.Key, kvp.Value);
}
}

for (int i = 0; i < events.Count; i++)
{
var entry = new Autogenerated.BulkPublishRequestEntry
{
EntryId = i.ToString(),
Event = Google.Protobuf.ByteString.CopyFrom(events[i].ToArray()),
ContentType = dataContentType
};
if (metadata != null)
{
foreach (var kvp in metadata)
{
entry.Metadata.Add(kvp.Key, kvp.Value);
}
}
request.Entries.Add(entry);
}

var response = await client.BulkPublishEventAlpha1Async(request, cancellationToken: cancellationToken);
// Map response to BulkPublishResponse<byte[]>
var failedEntries = new List<BulkPublishResponseFailedEntry<byte[]>>();

foreach (var failed in response.FailedEntries)
{
// Try to find the original event data by EntryId
byte[] eventData = Array.Empty<byte>();
if (int.TryParse(failed.EntryId, out int idx) && idx >= 0 && idx < events.Count)
{
eventData = events[idx].ToArray();
}
var entry = new BulkPublishEntry<byte[]>(
failed.EntryId,
eventData,
dataContentType,
metadata);
failedEntries.Add(new BulkPublishResponseFailedEntry<byte[]>(entry, failed.Error));
}

return new BulkPublishResponse<byte[]>(failedEntries);
}
#nullable restore

#endregion

#region InvokeBinding Apis
Expand Down Expand Up @@ -574,7 +639,8 @@ public override async Task InvokeMethodGrpcAsync(string appId, string methodName

var envelope = new Autogenerated.InvokeServiceRequest()
{
Id = appId, Message = new Autogenerated.InvokeRequest() { Method = methodName, },
Id = appId,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To this and the remaining formatting changes - same as above, please don't submit these generally.

Message = new Autogenerated.InvokeRequest() { Method = methodName, },
};

var options = CreateCallOptions(headers: null, cancellationToken);
Expand All @@ -600,7 +666,9 @@ public override async Task InvokeMethodGrpcAsync<TRequest>(string appId, string
Id = appId,
Message = new Autogenerated.InvokeRequest()
{
Method = methodName, ContentType = Constants.ContentTypeApplicationGrpc, Data = Any.Pack(data),
Method = methodName,
ContentType = Constants.ContentTypeApplicationGrpc,
Data = Any.Pack(data),
},
};

Expand All @@ -625,7 +693,8 @@ public override async Task<TResponse> InvokeMethodGrpcAsync<TResponse>(string ap

var envelope = new Autogenerated.InvokeServiceRequest()
{
Id = appId, Message = new Autogenerated.InvokeRequest() { Method = methodName, },
Id = appId,
Message = new Autogenerated.InvokeRequest() { Method = methodName, },
};

var options = CreateCallOptions(headers: null, cancellationToken);
Expand All @@ -652,7 +721,9 @@ public override async Task<TResponse> InvokeMethodGrpcAsync<TRequest, TResponse>
Id = appId,
Message = new Autogenerated.InvokeRequest()
{
Method = methodName, ContentType = Constants.ContentTypeApplicationGrpc, Data = Any.Pack(data),
Method = methodName,
ContentType = Constants.ContentTypeApplicationGrpc,
Data = Any.Pack(data),
},
};

Expand Down Expand Up @@ -726,7 +797,8 @@ public override async Task<IReadOnlyList<BulkStateItem<TValue>>> GetBulkStateAsy

var envelope = new Autogenerated.GetBulkStateRequest()
{
StoreName = storeName, Parallelism = parallelism ?? default
StoreName = storeName,
Parallelism = parallelism ?? default
};

if (metadata != null)
Expand Down Expand Up @@ -1287,7 +1359,8 @@ private async Task MakeExecuteStateTransactionCallAsync(
{
var stateOperation = new Autogenerated.TransactionalStateOperation
{
OperationType = state.OperationType.ToString()?.ToLower(), Request = ToAutogeneratedStateItem(state)
OperationType = state.OperationType.ToString()?.ToLower(),
Request = ToAutogeneratedStateItem(state)
};

envelope.Operations.Add(stateOperation);
Expand Down Expand Up @@ -1690,14 +1763,14 @@ public override async Task<ReadOnlyMemory<byte>> EncryptAsync(string vaultResour
[Experimental("DAPR_CRYPTOGRAPHY", UrlFormat = "https://docs.dapr.io/developing-applications/building-blocks/cryptography/cryptography-overview/")]
public override async IAsyncEnumerable<ReadOnlyMemory<byte>> EncryptAsync(string vaultResourceName,
Stream plaintextStream,
string keyName, EncryptionOptions encryptionOptions,
string keyName, EncryptionOptions encryptionOptions,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
ArgumentVerifier.ThrowIfNullOrEmpty(vaultResourceName, nameof(vaultResourceName));
ArgumentVerifier.ThrowIfNullOrEmpty(keyName, nameof(keyName));
ArgumentVerifier.ThrowIfNull(plaintextStream, nameof(plaintextStream));
ArgumentVerifier.ThrowIfNull(encryptionOptions, nameof(encryptionOptions));

EventHandler<Exception> exceptionHandler = (_, ex) => throw ex;

var shouldOmitDecryptionKeyName =
Expand Down Expand Up @@ -1746,7 +1819,7 @@ await streamProcessor.ProcessStreamAsync(plaintextStream, duplexStream, encryptR
[Experimental("DAPR_CRYPTOGRAPHY", UrlFormat = "https://docs.dapr.io/developing-applications/building-blocks/cryptography/cryptography-overview/")]
public override async IAsyncEnumerable<ReadOnlyMemory<byte>> DecryptAsync(string vaultResourceName,
Stream ciphertextStream, string keyName,
DecryptionOptions decryptionOptions,
DecryptionOptions decryptionOptions,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
ArgumentVerifier.ThrowIfNullOrEmpty(vaultResourceName, nameof(vaultResourceName));
Expand All @@ -1758,7 +1831,7 @@ public override async IAsyncEnumerable<ReadOnlyMemory<byte>> DecryptAsync(string

var decryptRequestOptions = new Autogenerated.DecryptRequestOptions
{
ComponentName = vaultResourceName,
ComponentName = vaultResourceName,
KeyName = keyName
};

Expand Down Expand Up @@ -1800,7 +1873,7 @@ public override async Task<ReadOnlyMemory<byte>> DecryptAsync(string vaultResour
using var memoryStream = ciphertextBytes.CreateMemoryStream(true);

var decryptionResult = DecryptAsync(vaultResourceName, memoryStream, keyName, decryptionOptions, cancellationToken);

var bufferedResult = new ArrayBufferWriter<byte>();
await foreach (var item in decryptionResult)
{
Expand Down Expand Up @@ -2088,7 +2161,10 @@ public async override Task<TryLockResponse> Lock(

var request = new Autogenerated.TryLockRequest()
{
StoreName = storeName, ResourceId = resourceId, LockOwner = lockOwner, ExpiryInSeconds = expiryInSeconds
StoreName = storeName,
ResourceId = resourceId,
LockOwner = lockOwner,
ExpiryInSeconds = expiryInSeconds
};

try
Expand All @@ -2098,7 +2174,10 @@ public async override Task<TryLockResponse> Lock(
var response = await client.TryLockAlpha1Async(request, options);
return new TryLockResponse()
{
StoreName = storeName, ResourceId = resourceId, LockOwner = lockOwner, Success = response.Success
StoreName = storeName,
ResourceId = resourceId,
LockOwner = lockOwner,
Success = response.Success
};
}
catch (RpcException ex)
Expand All @@ -2122,7 +2201,9 @@ public async override Task<UnlockResponse> Unlock(

var request = new Autogenerated.UnlockRequest()
{
StoreName = storeName, ResourceId = resourceId, LockOwner = lockOwner
StoreName = storeName,
ResourceId = resourceId,
LockOwner = lockOwner
};

var options = CreateCallOptions(headers: null, cancellationToken);
Expand Down
42 changes: 37 additions & 5 deletions test/Dapr.Client.Test/PublishEventApiTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,20 +65,21 @@ public async Task PublishEvent_ShouldRespectJsonStringEnumConverter()
var clientBuilder = new DaprClientBuilder()
.UseJsonSerializationOptions(new JsonSerializerOptions()
{
Converters = {new JsonStringEnumConverter(null, false)}
Converters = { new JsonStringEnumConverter(null, false) }
})
.UseHttpClientFactory(() => httpClient)
.UseGrpcChannelOptions(new GrpcChannelOptions()
{
HttpClient = httpClient, ThrowOperationCanceledOnCancellation = true
HttpClient = httpClient,
ThrowOperationCanceledOnCancellation = true
});
var client = new TestClient<DaprClient>(clientBuilder.Build(), handler);

//Ensure that the JsonStringEnumConverter is registered
client.InnerClient.JsonSerializerOptions.Converters.Count.ShouldBe(1);
client.InnerClient.JsonSerializerOptions.Converters.First().GetType().Name.ShouldMatch(nameof(JsonStringEnumConverter));

var publishData = new Widget {Size = "Large", Color = WidgetColor.Red};
var publishData = new Widget { Size = "Large", Color = WidgetColor.Red };
var request = await client.CaptureGrpcRequestAsync(async daprClient =>
{
await daprClient.PublishEventAsync<Widget>(TestPubsubName, "test", publishData);
Expand Down Expand Up @@ -294,6 +295,37 @@ public async Task PublishEventAsync_CanPublishWithRawData()
envelope.Metadata.Count.ShouldBe(0);
}

[Fact]
public async Task PublishBulkByteEventAsync_PublishesEventsAndReturnsResponse()
{
// Arrange
var client = new DaprClientBuilder().Build();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unit tests aren't run with a running instance of Dapr, so this isn't going to work correctly. I would advise copying the example above using the TestClient it produces so you can monitor the "captured" gRPC request and validate against that payload.

var pubsubName = "pubsub";
var topicName = "bulk-deposit";
var events = new List<ReadOnlyMemory<byte>>
{
new byte[] { 1, 2, 3 },
new byte[] { 4, 5, 6 }
};
var metadata = new Dictionary<string, string>
{
{ "test-key", "test-value" }
};

// Act
var response = await client.PublishBulkByteEventAsync(
pubsubName,
topicName,
events,
"application/json",
metadata,
CancellationToken.None);

// Assert
Assert.NotNull(response);
Assert.True(response.FailedEntries.Count == 0, "No failed entries expected for successful publish.");
}

private class PublishData
{
public string PublishObjectParameter { get; set; }
Expand All @@ -311,4 +343,4 @@ private enum WidgetColor
Green,
Yellow
}
}
}