Skip to content

Commit 8ae17d7

Browse files
Kirner-jskeet
authored andcommitted
Add optional IGenericRecordSerializer to Avro formatter
Signed-off-by: Kirner- <[email protected]>
1 parent 88f7225 commit 8ae17d7

File tree

6 files changed

+210
-22
lines changed

6 files changed

+210
-22
lines changed

src/CloudNative.CloudEvents.Avro/AvroEventFormatter.cs

Lines changed: 35 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
using Avro;
66
using Avro.Generic;
7-
using Avro.IO;
7+
using CloudNative.CloudEvents.Avro.Interfaces;
88
using CloudNative.CloudEvents.Core;
99
using System;
1010
using System.Collections.Generic;
@@ -41,30 +41,34 @@ public class AvroEventFormatter : CloudEventFormatter
4141
private const string DataName = "data";
4242

4343
private static readonly string CloudEventAvroMediaType = MimeUtilities.MediaType + MediaTypeSuffix;
44-
private static readonly RecordSchema avroSchema;
45-
private static readonly DefaultReader avroReader;
46-
private static readonly DefaultWriter avroWriter;
47-
48-
static AvroEventFormatter()
44+
private readonly IGenericRecordSerializer serializer;
45+
46+
/// <summary>
47+
/// Creates an AvroEventFormatter using the default serializer.
48+
/// </summary>
49+
public AvroEventFormatter() : this(new BasicGenericRecordSerializer()) { }
50+
51+
/// <summary>
52+
/// Creates an AvroEventFormatter that uses a custom <see cref="IGenericRecordSerializer"/>.
53+
/// </summary>
54+
/// <remarks>
55+
/// It is recommended to use the default serializer before defining your own wherever possible.
56+
/// </remarks>
57+
public AvroEventFormatter(IGenericRecordSerializer genericRecordSerializer)
4958
{
50-
// We're going to confidently assume that the embedded schema works. If not, type initialization
51-
// will fail and that's okay since the type is useless without the proper schema.
52-
using (var sr = new StreamReader(typeof(AvroEventFormatter).Assembly.GetManifestResourceStream("CloudNative.CloudEvents.Avro.AvroSchema.json")))
53-
{
54-
avroSchema = (RecordSchema) Schema.Parse(sr.ReadToEnd());
55-
}
56-
avroReader = new DefaultReader(avroSchema, avroSchema);
57-
avroWriter = new DefaultWriter(avroSchema);
59+
serializer = genericRecordSerializer;
5860
}
5961

62+
/// <summary>
63+
/// Avro schema used to serialize and deserialize the CloudEvent.
64+
/// </summary>
65+
public static RecordSchema AvroSchema { get; } = ParseEmbeddedSchema();
66+
6067
/// <inheritdoc />
6168
public override CloudEvent DecodeStructuredModeMessage(Stream body, ContentType? contentType, IEnumerable<CloudEventAttribute>? extensionAttributes)
6269
{
6370
Validation.CheckNotNull(body, nameof(body));
64-
65-
var decoder = new BinaryDecoder(body);
66-
// The reuse parameter *is* allowed to be null...
67-
var rawEvent = avroReader.Read<GenericRecord>(reuse: null!, decoder);
71+
var rawEvent = serializer.Deserialize(body);
6872
return DecodeGenericRecord(rawEvent, extensionAttributes);
6973
}
7074

@@ -146,7 +150,7 @@ public override ReadOnlyMemory<byte> EncodeStructuredModeMessage(CloudEvent clou
146150
contentType = new ContentType(CloudEventAvroMediaType);
147151

148152
// We expect the Avro encoded to detect data types that can't be represented in the schema.
149-
GenericRecord record = new GenericRecord(avroSchema);
153+
GenericRecord record = new GenericRecord(AvroSchema);
150154
record.Add(DataName, cloudEvent.Data);
151155
var recordAttributes = new Dictionary<string, object>();
152156
recordAttributes[CloudEventsSpecVersion.SpecVersionAttribute.Name] = cloudEvent.SpecVersion.VersionId;
@@ -162,9 +166,7 @@ public override ReadOnlyMemory<byte> EncodeStructuredModeMessage(CloudEvent clou
162166
recordAttributes[attribute.Name] = avroValue;
163167
}
164168
record.Add(AttributeName, recordAttributes);
165-
MemoryStream memStream = new MemoryStream();
166-
BinaryEncoder encoder = new BinaryEncoder(memStream);
167-
avroWriter.Write(record, encoder);
169+
var memStream = serializer.Serialize(record);
168170
return memStream.ToArray();
169171
}
170172

@@ -176,5 +178,16 @@ public override ReadOnlyMemory<byte> EncodeBinaryModeEventData(CloudEvent cloudE
176178
/// <inheritdoc />
177179
public override void DecodeBinaryModeEventData(ReadOnlyMemory<byte> body, CloudEvent cloudEvent) =>
178180
throw new NotSupportedException("The Avro event formatter does not support binary content mode");
181+
182+
private static RecordSchema ParseEmbeddedSchema()
183+
{
184+
// We're going to confidently assume that the embedded schema works. If not, type initialization
185+
// will fail and that's okay since the type is useless without the proper schema.
186+
using var sr = new StreamReader(typeof(AvroEventFormatter)
187+
.Assembly
188+
.GetManifestResourceStream("CloudNative.CloudEvents.Avro.AvroSchema.json"));
189+
190+
return (RecordSchema) Schema.Parse(sr.ReadToEnd());
191+
}
179192
}
180193
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
// Copyright (c) Cloud Native Foundation.
2+
// Licensed under the Apache 2.0 license.
3+
// See LICENSE file in the project root for full license information.
4+
5+
using Avro.Generic;
6+
using Avro.IO;
7+
using CloudNative.CloudEvents.Avro.Interfaces;
8+
using System;
9+
using System.IO;
10+
11+
namespace CloudNative.CloudEvents.Avro;
12+
13+
/// <summary>
14+
/// The default implementation of the <see cref="IGenericRecordSerializer"/>.
15+
/// </summary>
16+
/// <remarks>
17+
/// Makes use of the Avro <see cref="DefaultReader"/> and <see cref="DefaultWriter"/>
18+
/// together with the embedded Avro schema.
19+
/// </remarks>
20+
internal sealed class BasicGenericRecordSerializer : IGenericRecordSerializer
21+
{
22+
private readonly DefaultReader avroReader;
23+
private readonly DefaultWriter avroWriter;
24+
25+
public BasicGenericRecordSerializer()
26+
{
27+
avroReader = new DefaultReader(AvroEventFormatter.AvroSchema, AvroEventFormatter.AvroSchema);
28+
avroWriter = new DefaultWriter(AvroEventFormatter.AvroSchema);
29+
}
30+
31+
/// <inheritdoc />
32+
public ReadOnlyMemory<byte> Serialize(GenericRecord record)
33+
{
34+
var memStream = new MemoryStream();
35+
var encoder = new BinaryEncoder(memStream);
36+
avroWriter.Write(record, encoder);
37+
return memStream.ToArray();
38+
}
39+
40+
/// <inheritdoc />
41+
public GenericRecord Deserialize(Stream rawMessagebody)
42+
{
43+
var decoder = new BinaryDecoder(rawMessagebody);
44+
// The reuse parameter *is* allowed to be null...
45+
return avroReader.Read<GenericRecord>(reuse: null!, decoder);
46+
}
47+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
using Avro.Generic;
2+
using System;
3+
using System.IO;
4+
5+
namespace CloudNative.CloudEvents.Avro.Interfaces;
6+
7+
/// <summary>
8+
/// Used to serialize and deserialize an Avro <see cref="GenericRecord"/>
9+
/// matching the <see href="https://github.com/cloudevents/spec/blob/main/cloudevents/formats/cloudevents.avsc">
10+
/// CloudEvent Avro schema</see>.
11+
/// </summary>
12+
/// <remarks>
13+
/// <para>
14+
/// An implementation of this interface can optionally be supplied to the <see cref="AvroEventFormatter"/> in cases
15+
/// where a custom Avro serialiser is required for integration with pre-existing tools/infrastructure.
16+
/// </para>
17+
/// <para>
18+
/// It is recommended to use the default serializer before defining your own wherever possible.
19+
/// </para>
20+
/// </remarks>
21+
public interface IGenericRecordSerializer
22+
{
23+
/// <summary>
24+
/// Serialize an Avro <see cref="GenericRecord"/>.
25+
/// </summary>
26+
/// <remarks>
27+
/// The record is guaranteed to match the
28+
/// <see href="https://github.com/cloudevents/spec/blob/main/cloudevents/formats/cloudevents.avsc">
29+
/// CloudEvent Avro schema</see>.
30+
/// </remarks>
31+
ReadOnlyMemory<byte> Serialize(GenericRecord value);
32+
33+
/// <summary>
34+
/// Deserialize a <see cref="GenericRecord"/> matching the
35+
/// <see href="https://github.com/cloudevents/spec/blob/main/cloudevents/formats/cloudevents.avsc">
36+
/// CloudEvent Avro schema</see>, represented as a stream.
37+
/// </summary>
38+
GenericRecord Deserialize(Stream messageBody);
39+
}

test/CloudNative.CloudEvents.UnitTests/Avro/AvroEventFormatterTest.cs

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
// See LICENSE file in the project root for full license information.
44

55
using CloudNative.CloudEvents.NewtonsoftJson;
6+
using CloudNative.CloudEvents.UnitTests.Avro.Helpers;
67
using System;
78
using System.Net.Mime;
89
using Xunit;
@@ -83,5 +84,43 @@ public void StructuredParseWithExtensionsSuccess()
8384

8485
Assert.Equal("value", cloudEvent[extensionAttribute]);
8586
}
87+
88+
[Fact]
89+
public void StructuredParseSerializationWithCustomSerializer()
90+
{
91+
var serializer = new FakeGenericRecordSerializer();
92+
var jsonFormatter = new JsonEventFormatter();
93+
var avroFormatter = new AvroEventFormatter(serializer);
94+
95+
var expectedSerializedData = new byte[] { 0x1, 0x2, 0x3, };
96+
serializer.SetSerializeResponse(expectedSerializedData);
97+
98+
var inputCloudEvent = jsonFormatter.DecodeStructuredModeText(jsonv10);
99+
var avroData = avroFormatter
100+
.EncodeStructuredModeMessage(inputCloudEvent, out var _)
101+
.ToArray();
102+
103+
Assert.Equal(1, serializer.SerializeCalls);
104+
Assert.Equal(expectedSerializedData, avroData);
105+
}
106+
107+
[Fact]
108+
public void StructuredParseDeserializationWithCustomSerializer()
109+
{
110+
var serializer = new FakeGenericRecordSerializer();
111+
var avroFormatter = new AvroEventFormatter(serializer);
112+
var expectedCloudEventId = "4321";
113+
var expectedCloudEventType = "MyBrilliantEvent";
114+
var expectedCloudEventSource = "https://cloudevents.io.test/test-event";
115+
serializer.SetDeserializeResponseAttributes(
116+
expectedCloudEventId, expectedCloudEventType, expectedCloudEventSource);
117+
118+
var actualCloudEvent = avroFormatter.DecodeStructuredModeMessage(Array.Empty<byte>(), null, null);
119+
120+
Assert.Equal(1, serializer.DeserializeCalls);
121+
Assert.Equal(expectedCloudEventId, actualCloudEvent.Id);
122+
Assert.Equal(expectedCloudEventType, actualCloudEvent.Type);
123+
Assert.Equal(expectedCloudEventSource, actualCloudEvent.Source!.ToString());
124+
}
86125
}
87126
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
// Copyright (c) Cloud Native Foundation.
2+
// Licensed under the Apache 2.0 license.
3+
// See LICENSE file in the project root for full license information.
4+
5+
using Avro.Generic;
6+
using CloudNative.CloudEvents.Avro.Interfaces;
7+
using System;
8+
using System.Collections.Generic;
9+
using System.IO;
10+
11+
namespace CloudNative.CloudEvents.UnitTests.Avro.Helpers;
12+
13+
internal class FakeGenericRecordSerializer : IGenericRecordSerializer
14+
{
15+
public byte[]? SerializeResponse { get; private set; }
16+
public GenericRecord DeserializeResponse { get; private set; }
17+
public int DeserializeCalls { get; private set; } = 0;
18+
public int SerializeCalls { get; private set; } = 0;
19+
20+
public FakeGenericRecordSerializer()
21+
{
22+
DeserializeResponse = new GenericRecord(CloudEvents.Avro.AvroEventFormatter.AvroSchema);
23+
}
24+
25+
public GenericRecord Deserialize(Stream messageBody)
26+
{
27+
DeserializeCalls++;
28+
return DeserializeResponse;
29+
}
30+
31+
public ReadOnlyMemory<byte> Serialize(GenericRecord value)
32+
{
33+
SerializeCalls++;
34+
return SerializeResponse;
35+
}
36+
37+
public void SetSerializeResponse(byte[] response) => SerializeResponse = response;
38+
39+
public void SetDeserializeResponseAttributes(string id, string type, string source) =>
40+
DeserializeResponse.Add("attribute", new Dictionary<string, object>()
41+
{
42+
{ CloudEventsSpecVersion.SpecVersionAttribute.Name, CloudEventsSpecVersion.Default.VersionId },
43+
{ CloudEventsSpecVersion.Default.IdAttribute.Name, id},
44+
{ CloudEventsSpecVersion.Default.TypeAttribute.Name, type},
45+
{ CloudEventsSpecVersion.Default.SourceAttribute.Name, source}
46+
});
47+
}

test/CloudNative.CloudEvents.UnitTests/CloudNative.CloudEvents.UnitTests.csproj

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,4 +28,7 @@
2828
<ProjectReference Include="..\..\src\CloudNative.CloudEvents\CloudNative.CloudEvents.csproj" />
2929
</ItemGroup>
3030

31+
<ItemGroup>
32+
<EmbeddedResource Include="..\..\src\CloudNative.CloudEvents.Avro\AvroSchema.json" />
33+
</ItemGroup>
3134
</Project>

0 commit comments

Comments
 (0)