Skip to content

Commit d538b26

Browse files
IliasPoliWhitWaldo
authored andcommitted
Support case insensitive cloudevent payloads and forward cloudevent props s headers (dapr#1153)
* forward cloudevent props Signed-off-by: Ilias Politsopoulos <[email protected]> * refactor middleware Signed-off-by: Ilias Politsopoulos <[email protected]> * add cloud event property filters Signed-off-by: Ilias Politsopoulos <[email protected]> * update string check Signed-off-by: Ilias Politsopoulos <[email protected]> * forward cloudevent props Signed-off-by: Ilias Politsopoulos <[email protected]> * refactor middleware Signed-off-by: Ilias Politsopoulos <[email protected]> * add cloud event property filters Signed-off-by: Ilias Politsopoulos <[email protected]> * update checks Signed-off-by: Ilias Politsopoulos <[email protected]> --------- Signed-off-by: Whit Waldo <[email protected]> Co-authored-by: Whit Waldo <[email protected]> Signed-off-by: Siri Varma Vegiraju <[email protected]>
1 parent a8e1b79 commit d538b26

File tree

6 files changed

+332
-32
lines changed

6 files changed

+332
-32
lines changed

examples/AspNetCore/ControllerSample/Controllers/SampleController.cs

Lines changed: 32 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
// limitations under the License.
1212
// ------------------------------------------------------------------------
1313

14+
using System.Linq;
15+
1416
namespace ControllerSample.Controllers
1517
{
1618
using System;
@@ -43,6 +45,7 @@ public SampleController(ILogger<SampleController> logger)
4345
/// State store name.
4446
/// </summary>
4547
public const string StoreName = "statestore";
48+
4649
private readonly ILogger<SampleController> logger;
4750

4851
/// <summary>
@@ -72,6 +75,11 @@ public ActionResult<Account> Get([FromState(StoreName)] StateEntry<Account> acco
7275
[HttpPost("deposit")]
7376
public async Task<ActionResult<Account>> Deposit(Transaction transaction, [FromServices] DaprClient daprClient)
7477
{
78+
// Example reading cloudevent properties from the headers
79+
var headerEntries = Request.Headers.Aggregate("", (current, header) => current + ($"------- Header: {header.Key} : {header.Value}" + Environment.NewLine));
80+
81+
logger.LogInformation(headerEntries);
82+
7583
logger.LogInformation("Enter deposit");
7684
var state = await daprClient.GetStateEntryAsync<Account>(StoreName, transaction.Id);
7785
state.Value ??= new Account() { Id = transaction.Id, };
@@ -83,7 +91,7 @@ public async Task<ActionResult<Account>> Deposit(Transaction transaction, [FromS
8391
}
8492

8593
state.Value.Balance += transaction.Amount;
86-
logger.LogInformation("Balance for Id {0} is {1}",state.Value.Id, state.Value.Balance);
94+
logger.LogInformation("Balance for Id {0} is {1}", state.Value.Id, state.Value.Balance);
8795
await state.SaveAsync();
8896
return state.Value;
8997
}
@@ -98,22 +106,23 @@ public async Task<ActionResult<Account>> Deposit(Transaction transaction, [FromS
98106
[Topic("pubsub", "multideposit", "amountDeadLetterTopic", false)]
99107
[BulkSubscribe("multideposit", 500, 2000)]
100108
[HttpPost("multideposit")]
101-
public async Task<ActionResult<BulkSubscribeAppResponse>> MultiDeposit([FromBody] BulkSubscribeMessage<BulkMessageModel<Transaction>>
102-
bulkMessage, [FromServices] DaprClient daprClient)
109+
public async Task<ActionResult<BulkSubscribeAppResponse>> MultiDeposit([FromBody]
110+
BulkSubscribeMessage<BulkMessageModel<Transaction>>
111+
bulkMessage, [FromServices] DaprClient daprClient)
103112
{
104113
logger.LogInformation("Enter bulk deposit");
105-
114+
106115
List<BulkSubscribeAppResponseEntry> entries = new List<BulkSubscribeAppResponseEntry>();
107116

108117
foreach (var entry in bulkMessage.Entries)
109-
{
118+
{
110119
try
111120
{
112121
var transaction = entry.Event.Data;
113122

114123
var state = await daprClient.GetStateEntryAsync<Account>(StoreName, transaction.Id);
115124
state.Value ??= new Account() { Id = transaction.Id, };
116-
logger.LogInformation("Id is {0}, the amount to be deposited is {1}",
125+
logger.LogInformation("Id is {0}, the amount to be deposited is {1}",
117126
transaction.Id, transaction.Amount);
118127

119128
if (transaction.Amount < 0m)
@@ -124,12 +133,16 @@ public async Task<ActionResult<BulkSubscribeAppResponse>> MultiDeposit([FromBody
124133
state.Value.Balance += transaction.Amount;
125134
logger.LogInformation("Balance is {0}", state.Value.Balance);
126135
await state.SaveAsync();
127-
entries.Add(new BulkSubscribeAppResponseEntry(entry.EntryId, BulkSubscribeAppResponseStatus.SUCCESS));
128-
} catch (Exception e) {
136+
entries.Add(
137+
new BulkSubscribeAppResponseEntry(entry.EntryId, BulkSubscribeAppResponseStatus.SUCCESS));
138+
}
139+
catch (Exception e)
140+
{
129141
logger.LogError(e.Message);
130142
entries.Add(new BulkSubscribeAppResponseEntry(entry.EntryId, BulkSubscribeAppResponseStatus.RETRY));
131143
}
132144
}
145+
133146
return new BulkSubscribeAppResponse(entries);
134147
}
135148

@@ -165,6 +178,7 @@ public async Task<ActionResult<Account>> Withdraw(Transaction transaction, [From
165178
{
166179
return this.NotFound();
167180
}
181+
168182
if (transaction.Amount < 0m)
169183
{
170184
return BadRequest(new { statusCode = 400, message = "bad request" });
@@ -185,7 +199,8 @@ public async Task<ActionResult<Account>> Withdraw(Transaction transaction, [From
185199
/// "pubsub", the first parameter into the Topic attribute, is name of the default pub/sub configured by the Dapr CLI.
186200
[Topic("pubsub", "withdraw", "event.type ==\"withdraw.v2\"", 1)]
187201
[HttpPost("withdraw.v2")]
188-
public async Task<ActionResult<Account>> WithdrawV2(TransactionV2 transaction, [FromServices] DaprClient daprClient)
202+
public async Task<ActionResult<Account>> WithdrawV2(TransactionV2 transaction,
203+
[FromServices] DaprClient daprClient)
189204
{
190205
logger.LogInformation("Enter withdraw.v2");
191206
if (transaction.Channel == "mobile" && transaction.Amount > 10000)
@@ -214,12 +229,15 @@ public async Task<ActionResult<Account>> WithdrawV2(TransactionV2 transaction, [
214229
/// "pubsub", the first parameter into the Topic attribute, is name of the default pub/sub configured by the Dapr CLI.
215230
[Topic("pubsub", "rawDeposit", true)]
216231
[HttpPost("rawDeposit")]
217-
public async Task<ActionResult<Account>> RawDeposit([FromBody] JsonDocument rawTransaction, [FromServices] DaprClient daprClient)
232+
public async Task<ActionResult<Account>> RawDeposit([FromBody] JsonDocument rawTransaction,
233+
[FromServices] DaprClient daprClient)
218234
{
219235
var transactionString = rawTransaction.RootElement.GetProperty("data_base64").GetString();
220-
logger.LogInformation($"Enter deposit: {transactionString} - {Encoding.UTF8.GetString(Convert.FromBase64String(transactionString))}");
236+
logger.LogInformation(
237+
$"Enter deposit: {transactionString} - {Encoding.UTF8.GetString(Convert.FromBase64String(transactionString))}");
221238
var transactionJson = JsonSerializer.Deserialize<JsonDocument>(Convert.FromBase64String(transactionString));
222-
var transaction = JsonSerializer.Deserialize<Transaction>(transactionJson.RootElement.GetProperty("data").GetRawText());
239+
var transaction =
240+
JsonSerializer.Deserialize<Transaction>(transactionJson.RootElement.GetProperty("data").GetRawText());
223241
var state = await daprClient.GetStateEntryAsync<Account>(StoreName, transaction.Id);
224242
state.Value ??= new Account() { Id = transaction.Id, };
225243
logger.LogInformation("Id is {0}, the amount to be deposited is {1}", transaction.Id, transaction.Amount);
@@ -239,7 +257,8 @@ public async Task<ActionResult<Account>> RawDeposit([FromBody] JsonDocument rawT
239257
/// Method for returning a BadRequest result which will cause Dapr sidecar to throw an RpcException
240258
/// </summary>
241259
[HttpPost("throwException")]
242-
public async Task<ActionResult<Account>> ThrowException(Transaction transaction, [FromServices] DaprClient daprClient)
260+
public async Task<ActionResult<Account>> ThrowException(Transaction transaction,
261+
[FromServices] DaprClient daprClient)
243262
{
244263
logger.LogInformation("Enter ThrowException");
245264
var task = Task.Delay(10);

examples/AspNetCore/ControllerSample/Startup.cs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,11 @@
1111
// limitations under the License.
1212
// ------------------------------------------------------------------------
1313

14+
15+
using Dapr;
1416
using Dapr.AspNetCore;
1517

18+
1619
namespace ControllerSample
1720
{
1821
using Microsoft.AspNetCore.Builder;
@@ -63,7 +66,10 @@ public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
6366

6467
app.UseRouting();
6568

66-
app.UseCloudEvents();
69+
app.UseCloudEvents(new CloudEventsMiddlewareOptions
70+
{
71+
ForwardCloudEventPropertiesAsHeaders = true
72+
});
6773

6874
app.UseAuthorization();
6975

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
namespace Dapr
2+
{
3+
internal static class CloudEventPropertyNames
4+
{
5+
public const string Data = "data";
6+
public const string DataContentType = "datacontenttype";
7+
public const string DataBase64 = "data_base64";
8+
}
9+
}

src/Dapr.AspNetCore/CloudEventsMiddleware.cs

Lines changed: 99 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@
1111
// limitations under the License.
1212
// ------------------------------------------------------------------------
1313

14+
using System.Collections.Generic;
15+
using System.Linq;
16+
1417
namespace Dapr
1518
{
1619
using System;
@@ -27,6 +30,15 @@ namespace Dapr
2730
internal class CloudEventsMiddleware
2831
{
2932
private const string ContentType = "application/cloudevents+json";
33+
34+
// These cloudevent properties are either containing the body of the message or
35+
// are included in the headers by other components of Dapr earlier in the pipeline
36+
private static readonly string[] ExcludedPropertiesFromHeaders =
37+
{
38+
CloudEventPropertyNames.DataContentType, CloudEventPropertyNames.Data,
39+
CloudEventPropertyNames.DataBase64, "pubsubname", "traceparent"
40+
};
41+
3042
private readonly RequestDelegate next;
3143
private readonly CloudEventsMiddlewareOptions options;
3244

@@ -52,7 +64,7 @@ public Task InvokeAsync(HttpContext httpContext)
5264
// The philosophy here is that we don't report an error for things we don't support, because
5365
// that would block someone from implementing their own support for it. We only report an error
5466
// when something we do support isn't correct.
55-
if (!this.MatchesContentType(httpContext, out var charSet))
67+
if (!MatchesContentType(httpContext, out var charSet))
5668
{
5769
return this.next(httpContext);
5870
}
@@ -69,7 +81,8 @@ private async Task ProcessBodyAsync(HttpContext httpContext, string charSet)
6981
}
7082
else
7183
{
72-
using (var reader = new HttpRequestStreamReader(httpContext.Request.Body, Encoding.GetEncoding(charSet)))
84+
using (var reader =
85+
new HttpRequestStreamReader(httpContext.Request.Body, Encoding.GetEncoding(charSet)))
7386
{
7487
var text = await reader.ReadToEndAsync();
7588
json = JsonSerializer.Deserialize<JsonElement>(text);
@@ -83,17 +96,43 @@ private async Task ProcessBodyAsync(HttpContext httpContext, string charSet)
8396
string contentType;
8497

8598
// Check whether to use data or data_base64 as per https://github.com/cloudevents/spec/blob/v1.0.1/json-format.md#31-handling-of-data
86-
var isDataSet = json.TryGetProperty("data", out var data);
87-
var isBinaryDataSet = json.TryGetProperty("data_base64", out var binaryData);
99+
// Get the property names by OrdinalIgnoreCase comparison to support case insensitive JSON as the Json Serializer for AspCore already supports it by default.
100+
var jsonPropNames = json.EnumerateObject().ToArray();
101+
102+
var dataPropName = jsonPropNames
103+
.Select(d => d.Name)
104+
.FirstOrDefault(d => d.Equals(CloudEventPropertyNames.Data, StringComparison.OrdinalIgnoreCase));
105+
106+
var dataBase64PropName = jsonPropNames
107+
.Select(d => d.Name)
108+
.FirstOrDefault(d =>
109+
d.Equals(CloudEventPropertyNames.DataBase64, StringComparison.OrdinalIgnoreCase));
110+
111+
var isDataSet = false;
112+
var isBinaryDataSet = false;
113+
JsonElement data = default;
114+
115+
if (dataPropName != null)
116+
{
117+
isDataSet = true;
118+
data = json.TryGetProperty(dataPropName, out var dataJsonElement) ? dataJsonElement : data;
119+
}
120+
121+
if (dataBase64PropName != null)
122+
{
123+
isBinaryDataSet = true;
124+
data = json.TryGetProperty(dataBase64PropName, out var dataJsonElement) ? dataJsonElement : data;
125+
}
88126

89127
if (isDataSet && isBinaryDataSet)
90128
{
91129
httpContext.Response.StatusCode = (int)HttpStatusCode.BadRequest;
92130
return;
93131
}
94-
else if (isDataSet)
132+
133+
if (isDataSet)
95134
{
96-
contentType = this.GetDataContentType(json, out var isJson);
135+
contentType = GetDataContentType(json, out var isJson);
97136

98137
// If the value is anything other than a JSON string, treat it as JSON. Cloud Events requires
99138
// non-JSON text to be enclosed in a JSON string.
@@ -109,8 +148,8 @@ private async Task ProcessBodyAsync(HttpContext httpContext, string charSet)
109148
{
110149
// Rehydrate body from contents of the string
111150
var text = data.GetString();
112-
using var writer = new HttpResponseStreamWriter(body, Encoding.UTF8);
113-
writer.Write(text);
151+
await using var writer = new HttpResponseStreamWriter(body, Encoding.UTF8);
152+
await writer.WriteAsync(text);
114153
}
115154

116155
body.Seek(0L, SeekOrigin.Begin);
@@ -120,17 +159,19 @@ private async Task ProcessBodyAsync(HttpContext httpContext, string charSet)
120159
// As per the spec, if the implementation determines that the type of data is Binary,
121160
// the value MUST be represented as a JSON string expression containing the Base64 encoded
122161
// binary value, and use the member name data_base64 to store it inside the JSON object.
123-
var decodedBody = binaryData.GetBytesFromBase64();
162+
var decodedBody = data.GetBytesFromBase64();
124163
body = new MemoryStream(decodedBody);
125164
body.Seek(0L, SeekOrigin.Begin);
126-
contentType = this.GetDataContentType(json, out _);
165+
contentType = GetDataContentType(json, out _);
127166
}
128167
else
129168
{
130169
body = new MemoryStream();
131170
contentType = null;
132171
}
133172

173+
ForwardCloudEventPropertiesAsHeaders(httpContext, jsonPropNames);
174+
134175
originalBody = httpContext.Request.Body;
135176
originalContentType = httpContext.Request.ContentType;
136177

@@ -148,16 +189,57 @@ private async Task ProcessBodyAsync(HttpContext httpContext, string charSet)
148189
}
149190
}
150191

151-
private string GetDataContentType(JsonElement json, out bool isJson)
192+
private void ForwardCloudEventPropertiesAsHeaders(
193+
HttpContext httpContext,
194+
IEnumerable<JsonProperty> jsonPropNames)
195+
{
196+
if (!options.ForwardCloudEventPropertiesAsHeaders)
197+
{
198+
return;
199+
}
200+
201+
var filteredPropertyNames = jsonPropNames
202+
.Where(d => !ExcludedPropertiesFromHeaders.Contains(d.Name, StringComparer.OrdinalIgnoreCase));
203+
204+
if (options.IncludedCloudEventPropertiesAsHeaders != null)
205+
{
206+
filteredPropertyNames = filteredPropertyNames
207+
.Where(d => options.IncludedCloudEventPropertiesAsHeaders
208+
.Contains(d.Name, StringComparer.OrdinalIgnoreCase));
209+
}
210+
else if (options.ExcludedCloudEventPropertiesFromHeaders != null)
211+
{
212+
filteredPropertyNames = filteredPropertyNames
213+
.Where(d => !options.ExcludedCloudEventPropertiesFromHeaders
214+
.Contains(d.Name, StringComparer.OrdinalIgnoreCase));
215+
}
216+
217+
foreach (var jsonProperty in filteredPropertyNames)
218+
{
219+
httpContext.Request.Headers.TryAdd($"Cloudevent.{jsonProperty.Name.ToLowerInvariant()}",
220+
jsonProperty.Value.GetRawText().Trim('\"'));
221+
}
222+
}
223+
224+
private static string GetDataContentType(JsonElement json, out bool isJson)
152225
{
226+
var dataContentTypePropName = json
227+
.EnumerateObject()
228+
.Select(d => d.Name)
229+
.FirstOrDefault(d =>
230+
d.Equals(CloudEventPropertyNames.DataContentType,
231+
StringComparison.OrdinalIgnoreCase));
232+
153233
string contentType;
154-
if (json.TryGetProperty("datacontenttype", out var dataContentType) &&
155-
dataContentType.ValueKind == JsonValueKind.String &&
156-
MediaTypeHeaderValue.TryParse(dataContentType.GetString(), out var parsed))
234+
235+
if (dataContentTypePropName != null
236+
&& json.TryGetProperty(dataContentTypePropName, out var dataContentType)
237+
&& dataContentType.ValueKind == JsonValueKind.String
238+
&& MediaTypeHeaderValue.TryParse(dataContentType.GetString(), out var parsed))
157239
{
158240
contentType = dataContentType.GetString();
159-
isJson =
160-
parsed.MediaType.Equals( "application/json", StringComparison.Ordinal) ||
241+
isJson =
242+
parsed.MediaType.Equals("application/json", StringComparison.Ordinal) ||
161243
parsed.Suffix.EndsWith("+json", StringComparison.Ordinal);
162244

163245
// Since S.T.Json always outputs utf-8, we may need to normalize the data content type
@@ -179,7 +261,7 @@ private string GetDataContentType(JsonElement json, out bool isJson)
179261
return contentType;
180262
}
181263

182-
private bool MatchesContentType(HttpContext httpContext, out string charSet)
264+
private static bool MatchesContentType(HttpContext httpContext, out string charSet)
183265
{
184266
if (httpContext.Request.ContentType == null)
185267
{

0 commit comments

Comments
 (0)