Skip to content

Commit 550a246

Browse files
Tom BrewerTom Brewer
authored andcommitted
feat: v 0.11.0
- Observability packages - Queue-like structure for LiteDB
1 parent c785964 commit 550a246

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

50 files changed

+3063
-23
lines changed

.github/workflows/publish-packages.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ jobs:
3535
dotnet pack Mythetech.Framework/Mythetech.Framework.csproj -c Release --no-restore -o ./nupkgs
3636
dotnet pack Mythetech.Framework.WebAssembly/Mythetech.Framework.WebAssembly.csproj -c Release --no-restore -o ./nupkgs
3737
dotnet pack Mythetech.Framework.Desktop/Mythetech.Framework.Desktop.csproj -c Release --no-restore -o ./nupkgs
38+
dotnet pack Mythetech.Framework.Observability/Mythetech.Framework.Observability.csproj -c Release --no-restore -o ./nupkgs
3839
3940
- name: Publish to NuGet.org
4041
run: dotnet nuget push ./nupkgs/*.nupkg --source https://api.nuget.org/v3/index.json --api-key ${{ secrets.NUGET_ORG_API_KEY }} --skip-duplicate
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
name: Publish Preview NuGet Packages
2+
3+
on:
4+
pull_request:
5+
branches: [ "main" ]
6+
types: [opened, synchronize, reopened]
7+
8+
jobs:
9+
publish-preview:
10+
runs-on: ubuntu-latest
11+
permissions:
12+
contents: read
13+
14+
steps:
15+
- name: Checkout
16+
uses: actions/checkout@v4
17+
18+
- name: Setup .NET SDK
19+
uses: actions/setup-dotnet@v4
20+
with:
21+
dotnet-version: '10.0'
22+
23+
- name: Install .NET WASM workload
24+
run: dotnet workload install wasm-tools
25+
26+
- name: Add GitHub Packages source
27+
run: dotnet nuget add source --username ${{ github.actor }} --password ${{ secrets.GITHUB_TOKEN }} --store-password-in-clear-text --name github-gtksharp https://nuget.pkg.github.com/GtkSharp/index.json
28+
29+
- name: Restore
30+
run: dotnet restore
31+
32+
- name: Build and Pack Preview
33+
run: |
34+
dotnet pack Mythetech.Framework/Mythetech.Framework.csproj -c Release --no-restore -o ./nupkgs --version-suffix "preview.${{ github.run_number }}"
35+
dotnet pack Mythetech.Framework.WebAssembly/Mythetech.Framework.WebAssembly.csproj -c Release --no-restore -o ./nupkgs --version-suffix "preview.${{ github.run_number }}"
36+
dotnet pack Mythetech.Framework.Desktop/Mythetech.Framework.Desktop.csproj -c Release --no-restore -o ./nupkgs --version-suffix "preview.${{ github.run_number }}"
37+
dotnet pack Mythetech.Framework.Observability/Mythetech.Framework.Observability.csproj -c Release --no-restore -o ./nupkgs --version-suffix "preview.${{ github.run_number }}"
38+
39+
- name: Publish to NuGet.org
40+
run: dotnet nuget push ./nupkgs/*.nupkg --source https://api.nuget.org/v3/index.json --api-key ${{ secrets.NUGET_ORG_API_KEY }} --skip-duplicate

Mythetech.Framework.Desktop/Mythetech.Framework.Desktop.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
<!-- Prevent static asset conflicts with consuming app -->
88
<StaticWebAssetsEnabled>false</StaticWebAssetsEnabled>
99
<PackageId>Mythetech.Framework.Desktop</PackageId>
10-
<Version>0.10.1</Version>
10+
<VersionPrefix>0.11.0</VersionPrefix>
1111
<Authors>Mythetech</Authors>
1212
<Description>Desktop-specific components for cross platform Blazor applications using Photino or Hermes</Description>
1313
<PackageTags>blazor;desktop;photino;hermes;components;ui;framework;cross-platform</PackageTags>
Lines changed: 337 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,337 @@
1+
using LiteDB;
2+
using JsonSerializer = System.Text.Json.JsonSerializer;
3+
using Microsoft.Extensions.Logging;
4+
using Mythetech.Framework.Infrastructure.Queue;
5+
6+
namespace Mythetech.Framework.Desktop.Queue;
7+
8+
/// <summary>
9+
/// LiteDB-based queue implementation for Desktop applications.
10+
/// Provides persistent queue storage with retry semantics.
11+
/// </summary>
12+
/// <typeparam name="T">The type of items in the queue.</typeparam>
13+
public class LiteDbQueue<T> : IQueue<T> where T : class
14+
{
15+
private readonly ILiteDatabase _database;
16+
private readonly string _collectionName;
17+
private readonly ILogger? _logger;
18+
private readonly object _lock = new();
19+
20+
/// <summary>
21+
/// Creates a new LiteDB queue instance.
22+
/// </summary>
23+
/// <param name="database">The LiteDB database instance.</param>
24+
/// <param name="collectionName">Name of the collection to use for this queue.</param>
25+
/// <param name="logger">Optional logger for error reporting.</param>
26+
public LiteDbQueue(ILiteDatabase database, string collectionName, ILogger? logger = null)
27+
{
28+
_database = database ?? throw new ArgumentNullException(nameof(database));
29+
_collectionName = collectionName ?? throw new ArgumentNullException(nameof(collectionName));
30+
_logger = logger;
31+
32+
EnsureIndexes();
33+
}
34+
35+
private void EnsureIndexes()
36+
{
37+
try
38+
{
39+
var collection = GetCollection();
40+
41+
collection.EnsureIndex(x => x.Status);
42+
collection.EnsureIndex(x => x.CreatedAt);
43+
}
44+
catch (Exception ex)
45+
{
46+
_logger?.LogWarning(ex, "Failed to ensure indexes for queue {CollectionName}", _collectionName);
47+
}
48+
}
49+
50+
private ILiteCollection<LiteDbQueueDocument> GetCollection()
51+
{
52+
return _database.GetCollection<LiteDbQueueDocument>(_collectionName);
53+
}
54+
55+
/// <inheritdoc />
56+
public Task<string> EnqueueAsync(T item, CancellationToken ct = default)
57+
{
58+
ct.ThrowIfCancellationRequested();
59+
60+
try
61+
{
62+
var id = Guid.NewGuid().ToString("N");
63+
var document = new LiteDbQueueDocument
64+
{
65+
Id = id,
66+
ItemJson = JsonSerializer.Serialize(item),
67+
Status = QueueEntryStatus.Pending,
68+
CreatedAt = DateTime.UtcNow,
69+
RetryCount = 0
70+
};
71+
72+
lock (_lock)
73+
{
74+
GetCollection().Insert(document);
75+
}
76+
77+
_logger?.LogDebug("Enqueued item {Id} to queue {QueueName}", id, _collectionName);
78+
return Task.FromResult(id);
79+
}
80+
catch (Exception ex)
81+
{
82+
_logger?.LogError(ex, "Failed to enqueue item to queue {QueueName}", _collectionName);
83+
throw;
84+
}
85+
}
86+
87+
/// <inheritdoc />
88+
public Task<QueueEntry<T>?> DequeueAsync(CancellationToken ct = default)
89+
{
90+
ct.ThrowIfCancellationRequested();
91+
92+
try
93+
{
94+
lock (_lock)
95+
{
96+
var collection = GetCollection();
97+
98+
var document = collection
99+
.Find(x => x.Status == QueueEntryStatus.Pending)
100+
.OrderBy(x => x.CreatedAt)
101+
.FirstOrDefault();
102+
103+
if (document == null)
104+
{
105+
return Task.FromResult<QueueEntry<T>?>(null);
106+
}
107+
108+
document.Status = QueueEntryStatus.Processing;
109+
collection.Update(document);
110+
111+
var entry = ToQueueEntry(document);
112+
_logger?.LogDebug("Dequeued item {Id} from queue {QueueName}", document.Id, _collectionName);
113+
return Task.FromResult<QueueEntry<T>?>(entry);
114+
}
115+
}
116+
catch (Exception ex)
117+
{
118+
_logger?.LogError(ex, "Failed to dequeue from queue {QueueName}", _collectionName);
119+
throw;
120+
}
121+
}
122+
123+
/// <inheritdoc />
124+
public Task<QueueEntry<T>?> PeekAsync(CancellationToken ct = default)
125+
{
126+
ct.ThrowIfCancellationRequested();
127+
128+
try
129+
{
130+
var collection = GetCollection();
131+
132+
var document = collection
133+
.Find(x => x.Status == QueueEntryStatus.Pending)
134+
.OrderBy(x => x.CreatedAt)
135+
.FirstOrDefault();
136+
137+
if (document == null)
138+
{
139+
return Task.FromResult<QueueEntry<T>?>(null);
140+
}
141+
142+
return Task.FromResult<QueueEntry<T>?>(ToQueueEntry(document));
143+
}
144+
catch (Exception ex)
145+
{
146+
_logger?.LogError(ex, "Failed to peek queue {QueueName}", _collectionName);
147+
throw;
148+
}
149+
}
150+
151+
/// <inheritdoc />
152+
public Task CompleteAsync(string entryId, CancellationToken ct = default)
153+
{
154+
ct.ThrowIfCancellationRequested();
155+
156+
try
157+
{
158+
lock (_lock)
159+
{
160+
var collection = GetCollection();
161+
var document = collection.FindById(entryId);
162+
163+
if (document == null)
164+
{
165+
_logger?.LogWarning("Attempted to complete non-existent entry {Id} in queue {QueueName}", entryId, _collectionName);
166+
return Task.CompletedTask;
167+
}
168+
169+
document.Status = QueueEntryStatus.Completed;
170+
document.ProcessedAt = DateTime.UtcNow;
171+
collection.Update(document);
172+
173+
_logger?.LogDebug("Completed entry {Id} in queue {QueueName}", entryId, _collectionName);
174+
}
175+
176+
return Task.CompletedTask;
177+
}
178+
catch (Exception ex)
179+
{
180+
_logger?.LogError(ex, "Failed to complete entry {Id} in queue {QueueName}", entryId, _collectionName);
181+
throw;
182+
}
183+
}
184+
185+
/// <inheritdoc />
186+
public Task FailAsync(string entryId, string? reason = null, CancellationToken ct = default)
187+
{
188+
ct.ThrowIfCancellationRequested();
189+
190+
try
191+
{
192+
lock (_lock)
193+
{
194+
var collection = GetCollection();
195+
var document = collection.FindById(entryId);
196+
197+
if (document == null)
198+
{
199+
_logger?.LogWarning("Attempted to fail non-existent entry {Id} in queue {QueueName}", entryId, _collectionName);
200+
return Task.CompletedTask;
201+
}
202+
203+
document.Status = QueueEntryStatus.Failed;
204+
document.ProcessedAt = DateTime.UtcNow;
205+
document.FailureReason = reason;
206+
document.RetryCount++;
207+
collection.Update(document);
208+
209+
_logger?.LogDebug("Failed entry {Id} in queue {QueueName}: {Reason}", entryId, _collectionName, reason);
210+
}
211+
212+
return Task.CompletedTask;
213+
}
214+
catch (Exception ex)
215+
{
216+
_logger?.LogError(ex, "Failed to mark entry {Id} as failed in queue {QueueName}", entryId, _collectionName);
217+
throw;
218+
}
219+
}
220+
221+
/// <inheritdoc />
222+
public Task<int> GetPendingCountAsync(CancellationToken ct = default)
223+
{
224+
ct.ThrowIfCancellationRequested();
225+
226+
try
227+
{
228+
var count = GetCollection().Count(x => x.Status == QueueEntryStatus.Pending);
229+
return Task.FromResult(count);
230+
}
231+
catch (Exception ex)
232+
{
233+
_logger?.LogError(ex, "Failed to get pending count for queue {QueueName}", _collectionName);
234+
throw;
235+
}
236+
}
237+
238+
/// <inheritdoc />
239+
public Task<IReadOnlyList<QueueEntry<T>>> GetFailedAsync(int limit = 100, CancellationToken ct = default)
240+
{
241+
ct.ThrowIfCancellationRequested();
242+
243+
try
244+
{
245+
var documents = GetCollection()
246+
.Find(x => x.Status == QueueEntryStatus.Failed)
247+
.OrderByDescending(x => x.ProcessedAt)
248+
.Take(limit)
249+
.ToList();
250+
251+
var entries = documents.Select(ToQueueEntry).ToList();
252+
return Task.FromResult<IReadOnlyList<QueueEntry<T>>>(entries);
253+
}
254+
catch (Exception ex)
255+
{
256+
_logger?.LogError(ex, "Failed to get failed entries from queue {QueueName}", _collectionName);
257+
throw;
258+
}
259+
}
260+
261+
/// <inheritdoc />
262+
public Task RetryAsync(string entryId, CancellationToken ct = default)
263+
{
264+
ct.ThrowIfCancellationRequested();
265+
266+
try
267+
{
268+
lock (_lock)
269+
{
270+
var collection = GetCollection();
271+
var document = collection.FindById(entryId);
272+
273+
if (document == null)
274+
{
275+
_logger?.LogWarning("Attempted to retry non-existent entry {Id} in queue {QueueName}", entryId, _collectionName);
276+
return Task.CompletedTask;
277+
}
278+
279+
document.Status = QueueEntryStatus.Pending;
280+
document.ProcessedAt = null;
281+
document.FailureReason = null;
282+
collection.Update(document);
283+
284+
_logger?.LogDebug("Retrying entry {Id} in queue {QueueName} (attempt {RetryCount})", entryId, _collectionName, document.RetryCount + 1);
285+
}
286+
287+
return Task.CompletedTask;
288+
}
289+
catch (Exception ex)
290+
{
291+
_logger?.LogError(ex, "Failed to retry entry {Id} in queue {QueueName}", entryId, _collectionName);
292+
throw;
293+
}
294+
}
295+
296+
/// <inheritdoc />
297+
public Task<int> PurgeCompletedAsync(DateTime olderThan, CancellationToken ct = default)
298+
{
299+
ct.ThrowIfCancellationRequested();
300+
301+
try
302+
{
303+
lock (_lock)
304+
{
305+
var collection = GetCollection();
306+
var count = collection.DeleteMany(x =>
307+
x.Status == QueueEntryStatus.Completed &&
308+
x.ProcessedAt != null &&
309+
x.ProcessedAt < olderThan);
310+
311+
_logger?.LogDebug("Purged {Count} completed entries from queue {QueueName}", count, _collectionName);
312+
return Task.FromResult(count);
313+
}
314+
}
315+
catch (Exception ex)
316+
{
317+
_logger?.LogError(ex, "Failed to purge completed entries from queue {QueueName}", _collectionName);
318+
throw;
319+
}
320+
}
321+
322+
private QueueEntry<T> ToQueueEntry(LiteDbQueueDocument document)
323+
{
324+
var item = JsonSerializer.Deserialize<T>(document.ItemJson)!;
325+
326+
return new QueueEntry<T>
327+
{
328+
Id = document.Id,
329+
Item = item,
330+
Status = document.Status,
331+
CreatedAt = document.CreatedAt,
332+
ProcessedAt = document.ProcessedAt,
333+
RetryCount = document.RetryCount,
334+
FailureReason = document.FailureReason
335+
};
336+
}
337+
}

0 commit comments

Comments
 (0)