Skip to content

Commit 0dc3646

Browse files
authored
Store Messages and DataStreamMetadata as bytes, Allow manipulation of how the Request/Response messages are stored e.g. support encryption/compression/etc (#673)
* Store Messages and DataStreamMetadata as bytes * Support wrapping the message streams * . * . * Fix for net48 * .
1 parent f7c6291 commit 0dc3646

18 files changed

+411
-84
lines changed

source/Halibut.Tests/Queue/QueueMessageSerializerBuilder.cs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
using System;
2+
using System.Collections.Generic;
23
using Halibut.Diagnostics;
34
using Halibut.Queue;
5+
using Halibut.Queue.MessageStreamWrapping;
46
using Halibut.Transport.Protocol;
57
using Newtonsoft.Json;
68

@@ -11,12 +13,20 @@ public class QueueMessageSerializerBuilder
1113
ITypeRegistry? typeRegistry;
1214
Action<JsonSerializerSettings>? configureSerializer;
1315

16+
MessageStreamWrappers messageStreamWrappers = new MessageStreamWrappers(new List<IMessageStreamWrapper>());
17+
1418
public QueueMessageSerializerBuilder WithTypeRegistry(ITypeRegistry typeRegistry)
1519
{
1620
this.typeRegistry = typeRegistry;
1721
return this;
1822
}
1923

24+
public QueueMessageSerializerBuilder WithMessageStreamWrappers(MessageStreamWrappers messageStreamWrappers)
25+
{
26+
this.messageStreamWrappers = messageStreamWrappers;
27+
return this;
28+
}
29+
2030
public QueueMessageSerializerBuilder WithSerializerSettings(Action<JsonSerializerSettings> configure)
2131
{
2232
configureSerializer = configure;
@@ -36,7 +46,7 @@ StreamCapturingJsonSerializer StreamCapturingSerializer()
3646
return new StreamCapturingJsonSerializer(settings);
3747
}
3848

39-
return new QueueMessageSerializer(StreamCapturingSerializer);
49+
return new QueueMessageSerializer(StreamCapturingSerializer, messageStreamWrappers);
4050
}
4151
}
4252
}

source/Halibut.Tests/Queue/QueueMessageSerializerFixture.cs

Lines changed: 165 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,14 @@
11
#if NET8_0_OR_GREATER
22
using System;
3+
using System.Collections.Generic;
34
using System.IO;
5+
using System.Security.Cryptography;
6+
using System.Text;
47
using System.Threading;
58
using System.Threading.Tasks;
69
using FluentAssertions;
710
using Halibut.Diagnostics;
11+
using Halibut.Queue.MessageStreamWrapping;
812
using Halibut.Tests.Support;
913
using Halibut.Transport.Protocol;
1014
using NUnit.Framework;
@@ -86,12 +90,13 @@ public void SerializeAndDeserializeRequestMessageWithDataStream_ShouldRoundTrip_
8690

8791
dataStreams[1].Should().BeOfType<RepeatingStringDataStream>();
8892

89-
json.Should().Contain("TypeWithDataStreams");
90-
json.Should().NotContain("RepeatingStringDataStream");
93+
// Assert
94+
var jsonString = Encoding.UTF8.GetString(json);
95+
jsonString.Should().Contain("TypeWithDataStreams");
96+
jsonString.Should().NotContain("RepeatingStringDataStream");
9197

9298
var (deserializedMessage, deserializedDataStreams) = sut.ReadMessage<RequestMessage>(json);
93-
94-
// Assert
99+
95100
// Manually check each field of the deserializedMessage matches the request
96101
deserializedMessage.Id.Should().Be(request.Id);
97102
deserializedMessage.ActivityId.Should().Be(request.ActivityId);
@@ -106,6 +111,68 @@ public void SerializeAndDeserializeRequestMessageWithDataStream_ShouldRoundTrip_
106111

107112
deserializedDataStreams.Count.Should().Be(2);
108113
}
114+
115+
[Test]
116+
public void SerializeAndDeserializeSimpleStringMessage_WithStreamWrappers_ShouldRoundTrip()
117+
{
118+
// Arrange
119+
var sut = new QueueMessageSerializerBuilder()
120+
.WithMessageStreamWrappers(MessageStreamWrappersBuilder
121+
.WrapStreamWith(new GzipMessageStreamWrapper())
122+
.AndThenWrapThatWith(new Base64StreamWrapper())
123+
.Build())
124+
.Build();
125+
126+
const string testMessage = "Hello, Queue!";
127+
128+
// Act
129+
var (json, dataStreams) = sut.WriteMessage(testMessage);
130+
var (deserializedMessage, deserializedDataStreams) = sut.ReadMessage<string>(json);
131+
132+
// Assert
133+
deserializedMessage.Should().Be(testMessage);
134+
dataStreams.Should().BeEmpty();
135+
deserializedDataStreams.Should().BeEmpty();
136+
}
137+
138+
[Test]
139+
public void SerializeAndDeserializeSimpleStringMessage_WithStreamWrappers_ShouldDisposeStreamsInCorrectOrder()
140+
{
141+
// Arrange
142+
var disposeOrderWriter = new List<string>();
143+
var disposeOrderReader = new List<string>();
144+
145+
var firstWrapper = new FuncMessageStreamWrapper(
146+
writingStream => new StreamWithOnDisposeFunc(writingStream, () => disposeOrderWriter.Add("FirstWriteDisposed")),
147+
readingStream => new StreamWithOnDisposeFunc(readingStream, () => disposeOrderReader.Add("FirstReadDisposed")));
148+
149+
var secondWrapper = new FuncMessageStreamWrapper(
150+
writingStream => new StreamWithOnDisposeFunc(writingStream, () => disposeOrderWriter.Add("SecondWriteDisposed")),
151+
readingStream => new StreamWithOnDisposeFunc(readingStream, () => disposeOrderReader.Add("SecondReadDisposed")));
152+
153+
154+
var sut = new QueueMessageSerializerBuilder()
155+
.WithMessageStreamWrappers(MessageStreamWrappersBuilder
156+
.WrapStreamWith(firstWrapper)
157+
.AndThenWrapThatWith(secondWrapper)
158+
.Build())
159+
.Build();
160+
161+
const string testMessage = "Hello, Queue!";
162+
163+
// Act
164+
var (json, dataStreams) = sut.WriteMessage(testMessage);
165+
var (deserializedMessage, deserializedDataStreams) = sut.ReadMessage<string>(json);
166+
167+
// Assert
168+
deserializedMessage.Should().Be(testMessage);
169+
dataStreams.Should().BeEmpty();
170+
deserializedDataStreams.Should().BeEmpty();
171+
172+
// The dispose order should be in reverse to how the streams were created.
173+
disposeOrderWriter.Should().BeEquivalentTo("SecondWriteDisposed", "FirstWriteDisposed");
174+
disposeOrderReader.Should().BeEquivalentTo("SecondReadDisposed", "FirstReadDisposed");
175+
}
109176

110177
public interface IHaveTypeWithDataStreamsService
111178
{
@@ -146,5 +213,99 @@ static Func<Stream, CancellationToken, Task> WriteRepeatedStringsAsync(string to
146213
}
147214
}
148215
}
216+
217+
public class Base64StreamWrapper : IMessageStreamWrapper
218+
{
219+
public Stream WrapMessageSerialisationStream(Stream stream)
220+
{
221+
return new CryptoStream(stream, new ToBase64Transform(), CryptoStreamMode.Write, leaveOpen: true);
222+
}
223+
224+
public Stream WrapMessageDeserialisationStream(Stream stream)
225+
{
226+
return new CryptoStream(stream, new FromBase64Transform(), CryptoStreamMode.Read, leaveOpen: true);
227+
}
228+
}
229+
230+
public class StreamWithOnDisposeFunc : Stream
231+
{
232+
Action OnDispose;
233+
234+
private readonly Stream stream;
235+
236+
/// <summary>
237+
/// The given stream is not disposed by this stream.
238+
/// </summary>
239+
/// <param name="stream"></param>
240+
/// <param name="onDispose"></param>
241+
public StreamWithOnDisposeFunc(Stream stream, Action onDispose)
242+
{
243+
this.stream = stream;
244+
OnDispose = onDispose;
245+
}
246+
247+
public override void Flush()
248+
{
249+
stream.Flush();
250+
}
251+
252+
public override int Read(byte[] buffer, int offset, int count)
253+
{
254+
return stream.Read(buffer, offset, count);
255+
}
256+
257+
public override long Seek(long offset, SeekOrigin origin)
258+
{
259+
return stream.Seek(offset, origin);
260+
}
261+
262+
public override void SetLength(long value)
263+
{
264+
stream.SetLength(value);
265+
}
266+
267+
public override void Write(byte[] buffer, int offset, int count)
268+
{
269+
stream.Write(buffer, offset, count);
270+
}
271+
272+
public override bool CanRead => stream.CanRead;
273+
274+
public override bool CanSeek => stream.CanSeek;
275+
276+
public override bool CanWrite => stream.CanWrite;
277+
278+
public override long Length => stream.Length;
279+
280+
public override long Position
281+
{
282+
get => stream.Position;
283+
set => stream.Position = value;
284+
}
285+
286+
protected override void Dispose(bool disposing)
287+
{
288+
OnDispose();
289+
base.Dispose(disposing);
290+
}
291+
}
292+
293+
public class FuncMessageStreamWrapper : IMessageStreamWrapper
294+
{
295+
Func<Stream, Stream> wrappedForWriting;
296+
Func<Stream, Stream> wrappedForReading;
297+
298+
299+
public FuncMessageStreamWrapper(Func<Stream, Stream> wrappedForWriting, Func<Stream, Stream> wrappedForReading)
300+
{
301+
this.wrappedForWriting = wrappedForWriting;
302+
this.wrappedForReading = wrappedForReading;
303+
}
304+
305+
public Stream WrapMessageSerialisationStream(Stream stream) => wrappedForWriting(stream);
306+
307+
308+
public Stream WrapMessageDeserialisationStream(Stream stream) => wrappedForReading(stream);
309+
}
149310
}
150311
#endif

source/Halibut.Tests/Queue/Redis/RedisHelpers/RedisFacadeFixture.cs

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,10 @@
22
using System;
33
using System.Collections.Generic;
44
using System.Linq;
5+
using System.Text;
56
using System.Threading.Tasks;
67
using FluentAssertions;
78
using Halibut.Logging;
8-
using Halibut.Queue.Redis;
99
using Halibut.Queue.Redis.RedisHelpers;
1010
using Halibut.Tests.Queue.Redis.Utils;
1111
using Halibut.Tests.Support;
@@ -57,15 +57,17 @@ public async Task SetInHash_ShouldStoreValueInHash()
5757
var key = Guid.NewGuid().ToString();
5858
var field = "test-field";
5959
var payload = "test-payload";
60-
var values = new Dictionary<string, string> { { field, payload } };
60+
var values = new Dictionary<string, byte[]> { { field, Encoding.UTF8.GetBytes(payload) } };
6161

6262
// Act
6363
await redisFacade.SetInHash(key, values, TimeSpan.FromMinutes(1), CancellationToken);
6464

6565
// Assert - We'll verify by trying to get and delete it
6666
var retrievedValues = await redisFacade.TryGetAndDeleteFromHash(key, new[] { field }, CancellationToken);
67-
retrievedValues.Should().NotBeNull();
68-
retrievedValues![field].Should().Be(payload);
67+
var bytes = retrievedValues.Should().NotBeNull()
68+
.And.Subject.Should().ContainKey(field)
69+
.WhoseValue!;
70+
Encoding.UTF8.GetString(bytes).Should().Be(payload);
6971
}
7072

7173
[Test]
@@ -76,16 +78,18 @@ public async Task TryGetAndDeleteFromHash_WithExistingValue_ShouldReturnValueAnd
7678
var key = Guid.NewGuid().ToString();
7779
var field = "test-field";
7880
var payload = "test-payload";
79-
var values = new Dictionary<string, string> { { field, payload } };
81+
var values = new Dictionary<string, byte[]> { { field, Encoding.UTF8.GetBytes(payload) } };
8082

8183
await redisFacade.SetInHash(key, values, TimeSpan.FromMinutes(1), CancellationToken);
8284

8385
// Act
8486
var retrievedValues = await redisFacade.TryGetAndDeleteFromHash(key, new[] { field }, CancellationToken);
8587

8688
// Assert
87-
retrievedValues.Should().NotBeNull();
88-
retrievedValues![field].Should().Be(payload);
89+
var bytes = retrievedValues.Should().NotBeNull()
90+
.And.Subject.Should().ContainKey(field)
91+
.WhoseValue!;
92+
Encoding.UTF8.GetString(bytes).Should().Be(payload);
8993
}
9094

9195
[Test]
@@ -96,7 +100,7 @@ public async Task HashContainsKey_WithExistingField_ShouldReturnTrue()
96100
var key = Guid.NewGuid().ToString();
97101
var field = "test-field";
98102
var payload = "test-payload";
99-
var values = new Dictionary<string, string> { { field, payload } };
103+
var values = new Dictionary<string, byte[]> { { field, Encoding.UTF8.GetBytes(payload) } };
100104

101105
await redisFacade.SetInHash(key, values, TimeSpan.FromMinutes(1), CancellationToken);
102106

@@ -145,7 +149,7 @@ public async Task TryGetAndDeleteFromHash_ShouldDeleteTheEntireKey()
145149
var key = Guid.NewGuid().ToString();
146150
var field = "test-field";
147151
var payload = "test-payload";
148-
var values = new Dictionary<string, string> { { field, payload } };
152+
var values = new Dictionary<string, byte[]> { { field, Encoding.UTF8.GetBytes(payload) } };
149153

150154
await redisFacade.SetInHash(key, values, TimeSpan.FromMinutes(1), CancellationToken);
151155

@@ -158,7 +162,7 @@ public async Task TryGetAndDeleteFromHash_ShouldDeleteTheEntireKey()
158162

159163
// Assert
160164
retrievedValues.Should().NotBeNull();
161-
retrievedValues![field].Should().Be(payload);
165+
Encoding.UTF8.GetString(retrievedValues![field]!).Should().Be(payload);
162166

163167
// Verify the entire key was deleted (not just the field)
164168
var existsAfter = await redisFacade.HashContainsKey(key, field, CancellationToken);
@@ -340,7 +344,7 @@ public async Task SetInHash_WithTTL_ShouldExpireAfterSpecifiedTime()
340344
var key = Guid.NewGuid().ToString();
341345
var field = "test-field";
342346
var payload = "test-payload";
343-
var values = new Dictionary<string, string> { { field, payload } };
347+
var values = new Dictionary<string, byte[]> { { field, Encoding.UTF8.GetBytes(payload) } };
344348

345349
// Act - Set a value in hash with short TTL that we can actually test
346350
await redisFacade.SetInHash(key, values, TimeSpan.FromMinutes(3), CancellationToken);
@@ -352,7 +356,7 @@ public async Task SetInHash_WithTTL_ShouldExpireAfterSpecifiedTime()
352356
// Also verify we can retrieve the value immediately
353357
var immediateValues = await redisFacade.TryGetAndDeleteFromHash(key, new[] { field }, CancellationToken);
354358
immediateValues.Should().NotBeNull();
355-
immediateValues![field].Should().Be(payload);
359+
Encoding.UTF8.GetString(immediateValues![field]!).Should().Be(payload);
356360

357361
// Set the value again to test expiration (since TryGetAndDeleteFromHash removes it)
358362
await redisFacade.SetInHash(key, values, TimeSpan.FromMilliseconds(3), CancellationToken);
@@ -563,7 +567,7 @@ public async Task TryGetAndDeleteFromHash_WithConcurrentCalls_ShouldReturnValueT
563567
var key = Guid.NewGuid().ToString();
564568
var field = "test-field";
565569
var payload = "test-payload";
566-
var values = new Dictionary<string, string> { { field, payload } };
570+
var values = new Dictionary<string, byte[]> { { field, Encoding.UTF8.GetBytes(payload) } };
567571
const int concurrentCallCount = 20;
568572

569573
// Set a value in the hash
@@ -572,7 +576,7 @@ public async Task TryGetAndDeleteFromHash_WithConcurrentCalls_ShouldReturnValueT
572576
var countDownLatch = new AsyncCountdownEvent(concurrentCallCount);
573577

574578
// Act - Make multiple concurrent calls to TryGetAndDeleteFromHash
575-
var concurrentTasks = new Task<Dictionary<string, string?>?>[concurrentCallCount];
579+
var concurrentTasks = new Task<Dictionary<string, byte[]?>?>[concurrentCallCount];
576580
for (int i = 0; i < concurrentCallCount; i++)
577581
{
578582
concurrentTasks[i] = Task.Run(async () =>
@@ -590,7 +594,7 @@ public async Task TryGetAndDeleteFromHash_WithConcurrentCalls_ShouldReturnValueT
590594
var nullResults = results.Where(result => result == null).ToArray();
591595

592596
nonNullResults.Should().HaveCount(1, "exactly one concurrent call should retrieve the value");
593-
nonNullResults[0]![field].Should().Be(payload, "the successful call should return the correct payload");
597+
Encoding.UTF8.GetString(nonNullResults[0]![field]!).Should().Be(payload, "the successful call should return the correct payload");
594598
nullResults.Should().HaveCount(concurrentCallCount - 1, "all other concurrent calls should return null");
595599

596600
// Verify the hash key no longer exists

0 commit comments

Comments
 (0)