Skip to content

Commit 94746ff

Browse files
committed
Added tests for JSON log entries
1 parent 64395d6 commit 94746ff

File tree

4 files changed

+103
-1
lines changed

4 files changed

+103
-1
lines changed
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
using System.Diagnostics.CodeAnalysis;
2+
using DotNext.IO;
3+
4+
namespace DotNext.Net.Cluster.Consensus.Raft.StateMachine;
5+
6+
using Text.Json;
7+
8+
[Experimental("DOTNEXT001")]
9+
internal sealed class JsonStateMachine : SimpleStateMachine
10+
{
11+
private readonly List<TestJsonObject> entries = new();
12+
13+
public JsonStateMachine(DirectoryInfo location)
14+
: base(new(Path.Combine(location.FullName, "db")))
15+
{
16+
}
17+
18+
protected override ValueTask RestoreAsync(FileInfo snapshotFile, CancellationToken token)
19+
=> ValueTask.CompletedTask;
20+
21+
protected override ValueTask PersistAsync(IAsyncBinaryWriter writer, CancellationToken token)
22+
=> ValueTask.CompletedTask;
23+
24+
protected override async ValueTask<bool> ApplyAsync(LogEntry entry, CancellationToken token)
25+
{
26+
var content = await JsonSerializable<TestJsonObject>.TransformAsync(entry, token);
27+
entries.Add(content);
28+
return false;
29+
}
30+
31+
internal IReadOnlyList<TestJsonObject> Entries => entries;
32+
}

src/DotNext.Tests/Net/Cluster/Consensus/Raft/StateMachine/WriteAheadLogTests.cs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
namespace DotNext.Net.Cluster.Consensus.Raft.StateMachine;
99

1010
using Buffers.Binary;
11+
using Text.Json;
1112
using static IO.DataTransferObject;
1213
using LogEntryConsumer = IO.Log.LogEntryConsumer<IRaftLogEntry, Missing>;
1314
using LogEntryList = IO.Log.LogEntryProducer<IRaftLogEntry>;
@@ -348,4 +349,24 @@ public static void EmptyReader()
348349
var reader = new WriteAheadLog.LogEntryReader();
349350
Empty(reader);
350351
}
352+
353+
[Fact]
354+
public static async Task JsonSerialization()
355+
{
356+
var dir = Path.Combine(Path.GetTempPath(), Path.GetRandomFileName());
357+
await using var stateMachine = new JsonStateMachine(new(dir));
358+
await using var wal = new WriteAheadLog(new() { Location = dir }, stateMachine);
359+
360+
await wal.AppendJsonAsync(new TestJsonObject { StringField = "Entry1" });
361+
var index = await wal.AppendJsonAsync(new TestJsonObject { StringField = "Entry2" });
362+
await wal.CommitAsync(index);
363+
await wal.WaitForApplyAsync(index);
364+
Equal(2, stateMachine.Entries.Count);
365+
366+
var payload = stateMachine.Entries[0];
367+
Equal("Entry1", payload.StringField.Value);
368+
369+
payload = stateMachine.Entries[1];
370+
Equal("Entry2", payload.StringField.Value);
371+
}
351372
}

src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentStateExtensions.cs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
using Buffers.Binary;
44
using IO.Log;
5+
using Text.Json;
56

67
/// <summary>
78
/// Provides various extension methods for <see cref="IPersistentState"/> interface.
@@ -62,4 +63,18 @@ public static ValueTask<long> AppendAsync<T>(this IPersistentState state, T payl
6263
CancellationToken token = default)
6364
where T : IBinaryFormattable<T>
6465
=> state.AppendAsync<BinaryLogEntry<T>>(new() { Content = payload, Term = state.Term, Context = context }, token);
66+
67+
/// <summary>
68+
/// Appends JSON objec to the log tail.
69+
/// </summary>
70+
/// <param name="state">The log.</param>
71+
/// <param name="payload">The log entry payload.</param>
72+
/// <param name="context">The optional context to be passed to the state machine.</param>
73+
/// <param name="token">The token that can be used to cancel the operation.</param>
74+
/// <typeparam name="T">The type of the binary object.</typeparam>
75+
/// <returns>The index of the added command within the log.</returns>
76+
public static ValueTask<long> AppendJsonAsync<T>(this IPersistentState state, T payload, object? context = null,
77+
CancellationToken token = default)
78+
where T : IJsonSerializable<T>
79+
=> state.AppendAsync<JsonLogEntry<T>>(new() { Content = payload, Term = state.Term, Context = context }, token);
6580
}

src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/RaftClusterExtensions.cs

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
11
namespace DotNext.Net.Cluster.Consensus.Raft;
22

3+
using Buffers.Binary;
4+
using Text.Json;
5+
36
/// <summary>
47
/// Represents extension methods for <see cref="IRaftCluster"/> interface.
58
/// </summary>
69
public static class RaftClusterExtensions
710
{
811
/// <summary>
9-
/// Appends a new log entry and ensures that it is replicated and committed.
12+
/// Appends binary log entry and ensures that it is replicated and committed.
1013
/// </summary>
1114
/// <param name="cluster">The cluster node.</param>
1215
/// <param name="payload">The log entry payload.</param>
@@ -18,4 +21,35 @@ public static class RaftClusterExtensions
1821
public static ValueTask<bool> ReplicateAsync(this IRaftCluster cluster, ReadOnlyMemory<byte> payload, object? context = null,
1922
CancellationToken token = default)
2023
=> cluster.ReplicateAsync<BinaryLogEntry>(new() { Content = payload, Term = cluster.Term, Context = context }, token);
24+
25+
/// <summary>
26+
/// Appends binary log entry and ensures that it is replicated and committed.
27+
/// </summary>
28+
/// <typeparam name="T">The type of the binary formattable log entry.</typeparam>
29+
/// <param name="cluster">The cluster node.</param>
30+
/// <param name="payload">The log entry payload.</param>
31+
/// <param name="context">The context to be passed to the state machine.</param>
32+
/// <param name="token">The token that can be used to cancel the operation.</param>
33+
/// <returns><see langword="true"/> if the appended log entry has been committed by the majority of nodes; <see langword="false"/> if retry is required.</returns>
34+
/// <exception cref="InvalidOperationException">The current node is not a leader.</exception>
35+
/// <exception cref="OperationCanceledException">The operation has been canceled.</exception>
36+
public static ValueTask<bool> ReplicateAsync<T>(this IRaftCluster cluster, T payload, object? context = null, CancellationToken token = default)
37+
where T : IBinaryFormattable<T>
38+
=> cluster.ReplicateAsync<BinaryLogEntry<T>>(new() { Content = payload, Term = cluster.Term, Context = context }, token);
39+
40+
/// <summary>
41+
/// Appends JSON log entry and ensures that it is replicated and committed.
42+
/// </summary>
43+
/// <typeparam name="T">The type of the JSON log entry.</typeparam>
44+
/// <param name="cluster">The cluster node.</param>
45+
/// <param name="payload">The log entry payload.</param>
46+
/// <param name="context">The context to be passed to the state machine.</param>
47+
/// <param name="token">The token that can be used to cancel the operation.</param>
48+
/// <returns><see langword="true"/> if the appended log entry has been committed by the majority of nodes; <see langword="false"/> if retry is required.</returns>
49+
/// <exception cref="InvalidOperationException">The current node is not a leader.</exception>
50+
/// <exception cref="OperationCanceledException">The operation has been canceled.</exception>
51+
public static ValueTask<bool> ReplicateJsonAsync<T>(this IRaftCluster cluster, T payload, object? context = null,
52+
CancellationToken token = default)
53+
where T : IJsonSerializable<T>
54+
=> cluster.ReplicateAsync<JsonLogEntry<T>>(new() { Content = payload, Term = cluster.Term, Context = context }, token);
2155
}

0 commit comments

Comments
 (0)