Skip to content

Commit 8d3f1e7

Browse files
authored
Update queue mode (#3)
* Update queue mode * Fix validator
1 parent 7612d2c commit 8d3f1e7

File tree

12 files changed

+214
-64
lines changed

12 files changed

+214
-64
lines changed

.github/workflows/reflection-events-pr-validator.yaml

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,7 @@ name: reflection-events-pr-validator
22

33
on:
44
pull_request:
5-
branches: [development]
6-
push:
7-
branches: [development]
5+
branches: [main]
86

97
workflow_dispatch:
108

Directory.Build.props

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
</PropertyGroup>
77

88
<PropertyGroup>
9-
<Version>4.0.0</Version>
9+
<Version>4.1.0</Version>
1010
<AssemblyVersion>4.0.0</AssemblyVersion>
1111
</PropertyGroup>
1212

@@ -29,6 +29,7 @@
2929
<NuGetAuditLevel>moderate</NuGetAuditLevel>
3030
<ManagePackageVersionsCentrally>true</ManagePackageVersionsCentrally>
3131
<CentralPackageTransitivePinningEnabled>false</CentralPackageTransitivePinningEnabled>
32+
<_SilenceIsAotCompatibleUnsupportedWarning>true</_SilenceIsAotCompatibleUnsupportedWarning>
3233
</PropertyGroup>
3334

3435
<PropertyGroup>

Directory.Build.targets

Lines changed: 29 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
<Project>
22

3-
<PropertyGroup>
4-
<!-- TODO: Dynamically generate Title if one wasn't set -->
5-
<Title Condition="'$(Title)' == ''">$(Product) Asset</Title>
3+
<PropertyGroup Condition="'$(GITHUB_ACTIONS)' == 'true'">
4+
<ContinuousIntegrationBuild>true</ContinuousIntegrationBuild>
65
</PropertyGroup>
76

87
<PropertyGroup>
@@ -11,7 +10,17 @@
1110
<PackageTags Condition="'$(PackageTags)' == ''">$(CommonTags)</PackageTags>
1211
</PropertyGroup>
1312

14-
<ItemGroup Condition="$(IsPackable)">
13+
<PropertyGroup Condition="'$(GeneratePackageOnBuild)' == 'true'">
14+
<GenerateLibraryLayout>true</GenerateLibraryLayout>
15+
<PackageReadmeFile>README.md</PackageReadmeFile>
16+
<DeterministicSourcePaths Condition="'$(SourceLinkEnabled)' == 'true'">true</DeterministicSourcePaths>
17+
<IncludeSymbols>true</IncludeSymbols>
18+
<SymbolPackageFormat>snupkg</SymbolPackageFormat>
19+
<PublishRepositoryUrl>true</PublishRepositoryUrl>
20+
<EmbedUntrackedSources>true</EmbedUntrackedSources>
21+
</PropertyGroup>
22+
23+
<ItemGroup Condition="'$(GeneratePackageOnBuild)' == 'true'">
1524
<None Include="$(BuildToolsDirectory)nuget.png" Pack="true" PackagePath="\Icon.png" Visible="False" />
1625
<None Include="$(RepositoryDirectory)ThirdPartyNotices.txt" Pack="true" PackagePath="\" Visible="False" />
1726
<None Include="$(RepositoryDirectory)LICENSE.md" Pack="true" PackagePath="\LICENSE.md" Visible="False" />
@@ -27,13 +36,8 @@
2736
</ItemGroup>
2837
</Target>
2938

30-
<!-- Define NETSTANDARD2_1_OR_GREATER for .NET Standard 2.1 targets and above -->
31-
<PropertyGroup Condition="'$(TargetFramework)' == 'net6.0' OR '$(TargetFramework)' == 'net6.0' OR '$(TargetFramework)' == 'net7.0'">
32-
<DefineConstants>NETSTANDARD2_1_OR_GREATER</DefineConstants>
33-
</PropertyGroup>
34-
3539
<!-- Configure trimming for projects on .NET 6 and above -->
36-
<PropertyGroup Condition="'$(TargetFramework)' == 'net6.0' OR '$(TargetFramework)' == 'net8.0'">
40+
<PropertyGroup Condition="'$(TargetFramework)' == 'net6.0' OR '$(TargetFramework)' == 'net8.0' OR '$(TargetFramework)' == 'net9.0'">
3741
<IsTrimmable>true</IsTrimmable>
3842
<EnableTrimAnalyzer>true</EnableTrimAnalyzer>
3943
<EnableAotAnalyzer>true</EnableAotAnalyzer>
@@ -44,4 +48,19 @@
4448
<SignAssembly>true</SignAssembly>
4549
<AssemblyOriginatorKeyFile>$(RepositoryDirectory)\src\lepo.snk</AssemblyOriginatorKeyFile>
4650
</PropertyGroup>
51+
52+
<Target Name="WpfSourceLinkWorkaround" BeforeTargets="InitializeSourceRootMappedPaths" Condition="'$(UseWPF)' == 'true'">
53+
<!-- WPF causes an error with SourceLink because its build targets create a temporary project without a PackageReference to SourceLink, see https://github.com/dotnet/sourcelink/issues/91,
54+
causing the @SourceRoot property to be unexpectedly empty for the MapSourceRoot task
55+
56+
For context, see https://github.com/dotnet/roslyn/blob/main/src/Compilers/Core/MSBuildTask/Microsoft.Managed.Core.targets
57+
and https://github.com/dotnet/roslyn/blob/main/src/Compilers/Core/MSBuildTask/MapSourceRoots.cs
58+
59+
This workaround sets the SourceRoot manually to some deterministic value to keep the promise given by having DeterministicSourcePaths set to true -->
60+
<Message Text="using deterministic source path workaround for WPF project instead of SourceLink" />
61+
<ItemGroup>
62+
<!-- There needs to be at least one SourceRoot defined, its value does not seem to matter as long as it ends with a directory separator -->
63+
<SourceRoot Include="\" />
64+
</ItemGroup>
65+
</Target>
4766
</Project>

docs/codesnippet/Rtf/Hyperlink/RtfDocumentProcessor.cs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,11 @@ from attr in doc.Descendants().Attributes()
8686
#endregion
8787

8888
#region FixLink
89-
private static void FixLink(XAttribute link, RelativePath filePath, HashSet<string> linkToFiles)
89+
private static void FixLink(
90+
XAttribute link,
91+
RelativePath filePath,
92+
HashSet<string> linkToFiles
93+
)
9094
{
9195
string linkFile;
9296
string anchor = null;

docs/codesnippet/Rtf/RtfBuildStep.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@ public class RtfBuildStep : IDocumentBuildStep
2121
public void Build(FileModel model, IHostService host)
2222
{
2323
string content = (string)((Dictionary<string, object>)model.Content)["conceptual"];
24-
content = _taskFactory.StartNew(() => RtfToHtmlConverter.ConvertRtfToHtml(content)).Result;
24+
content = _taskFactory
25+
.StartNew(() => RtfToHtmlConverter.ConvertRtfToHtml(content))
26+
.Result;
2527
((Dictionary<string, object>)model.Content)["conceptual"] = content;
2628
}
2729
#endregion

src/ReflectionEventing.Demo.Wpf/App.xaml.cs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
using ReflectionEventing.Demo.Wpf.Services;
77
using ReflectionEventing.Demo.Wpf.ViewModels;
88
using ReflectionEventing.DependencyInjection;
9+
using ReflectionEventing.DependencyInjection.Services;
910

1011
namespace ReflectionEventing.Demo.Wpf;
1112

@@ -33,8 +34,11 @@ public partial class App : Application
3334
_ = services.AddEventBus(e =>
3435
{
3536
e.Options.UseEventPolymorphism = true;
37+
e.Options.UseEventsQueue = true;
38+
e.Options.QueueMode = ProcessingMode.Parallel;
3639

37-
_ = e.AddAllConsumers(Assembly.GetExecutingAssembly());
40+
e.UseBackgroundService<DependencyInjectionQueueProcessor>();
41+
e.AddAllConsumers(Assembly.GetExecutingAssembly());
3842
});
3943
}
4044
)

src/ReflectionEventing.DependencyInjection/DependencyInjectionEventBusBuilder.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
// Copyright (C) Leszek Pomianowski and ReflectionEventing Contributors.
44
// All Rights Reserved.
55

6+
using ReflectionEventing.DependencyInjection.Services;
7+
68
namespace ReflectionEventing.DependencyInjection;
79

810
/// <summary>
@@ -11,6 +13,8 @@ namespace ReflectionEventing.DependencyInjection;
1113
// ReSharper disable once ClassWithVirtualMembersNeverInherited.Global
1214
public class DependencyInjectionEventBusBuilder(IServiceCollection services) : EventBusBuilder
1315
{
16+
internal Type QueueBackgroundService { get; set; } = typeof(DependencyInjectionQueueProcessor);
17+
1418
/// <summary>
1519
/// Adds a consumer to the event bus and <see cref="IServiceCollection"/> with a specified service lifetime.
1620
/// </summary>

src/ReflectionEventing.DependencyInjection/EventBusBuilderExtensions.cs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,28 @@ namespace ReflectionEventing.DependencyInjection;
77

88
public static class EventBusBuilderExtensions
99
{
10+
/// <summary>
11+
/// Configures the event bus to use a custom background service for processing events.
12+
/// </summary>
13+
/// <typeparam name="TQueueBackgroundService">The type of the background service to use. This type must implement <see cref="IHostedService"/>.</typeparam>
14+
/// <returns>The current instance of <see cref="EventBusBuilder"/>.</returns>
15+
public static EventBusBuilder UseBackgroundService<TQueueBackgroundService>(
16+
this EventBusBuilder builder
17+
)
18+
where TQueueBackgroundService : class, IHostedService
19+
{
20+
if (builder is not DependencyInjectionEventBusBuilder dependencyInjectionEventBusBuilder)
21+
{
22+
throw new InvalidOperationException(
23+
$"The event bus builder must be of type {nameof(DependencyInjectionEventBusBuilder)} to define background service."
24+
);
25+
}
26+
27+
dependencyInjectionEventBusBuilder.QueueBackgroundService = typeof(TQueueBackgroundService);
28+
29+
return dependencyInjectionEventBusBuilder;
30+
}
31+
1032
/// <summary>
1133
/// Adds a consumer to the event bus builder and <see cref="IServiceCollection"/>.
1234
/// </summary>

src/ReflectionEventing.DependencyInjection/ServiceCollectionExtensions.cs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
// All Rights Reserved.
55

66
using ReflectionEventing.DependencyInjection.Configuration;
7-
using ReflectionEventing.DependencyInjection.Services;
87
using ReflectionEventing.Queues;
98

109
namespace ReflectionEventing.DependencyInjection;
@@ -47,7 +46,7 @@ public static IServiceCollection AddEventBus(
4746

4847
if (builder.Options.UseEventsQueue)
4948
{
50-
_ = services.AddHostedService<DependencyInjectionQueueProcessor>();
49+
_ = services.AddSingleton(typeof(IHostedService), builder.QueueBackgroundService);
5150
}
5251

5352
return services;
@@ -82,7 +81,7 @@ Action<EventBusBuilder> configure
8281

8382
if (builder.Options.UseEventsQueue)
8483
{
85-
_ = services.AddHostedService<DependencyInjectionQueueProcessor>();
84+
_ = services.AddSingleton(typeof(IHostedService), builder.QueueBackgroundService);
8685
}
8786

8887
return services;

src/ReflectionEventing.DependencyInjection/Services/DependencyInjectionQueueProcessor.cs

Lines changed: 104 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ ILogger<DependencyInjectionQueueProcessor> logger
3131

3232
private readonly TimeSpan errorTickRate = options.Value.ErrorTickRate;
3333

34+
private readonly SemaphoreSlim semaphore = new(options.Value.ConcurrentTaskLimit);
35+
3436
protected override async Task ExecuteAsync(CancellationToken cancellationToken)
3537
{
3638
await BackgroundProcessing(cancellationToken);
@@ -114,60 +116,118 @@ CancellationToken cancellationToken
114116
return;
115117
}
116118

117-
MethodInfo? consumeMethod = consumerType.GetMethod(
118-
"ConsumeAsync",
119-
[@event.GetType(), typeof(CancellationToken)]
120-
);
121-
122-
if (consumeMethod != null)
119+
if (options.Value.QueueMode == ProcessingMode.Sequential)
123120
{
124-
try
125-
{
126-
await (Task)consumeMethod.Invoke(consumer, [@event, cancellationToken])!;
127-
}
128-
catch (Exception e)
129-
{
130-
//activity?.AddException(e);
131-
activity?.SetStatus(ActivityStatusCode.Error);
132-
133-
logger.LogError(
134-
new EventId(75001, "ReflectionEventingQueueProcessingFailed"),
135-
e,
136-
"Error processing event of type {EventName}",
137-
@event.GetType().Name
138-
);
139-
140-
if (options.Value.UseErrorQueue)
141-
{
142-
queue.EnqueueError(
143-
new FailedEvent
144-
{
145-
Data = @event,
146-
Exception = e,
147-
Timestamp = DateTimeOffset.UtcNow,
148-
FailedConsumer = consumerType,
149-
}
150-
);
151-
}
121+
await ExecuteConsumerAsync(
122+
@event,
123+
consumerType,
124+
eventType,
125+
consumer,
126+
activity,
127+
cancellationToken
128+
);
129+
}
130+
else if (options.Value.QueueMode == ProcessingMode.Parallel)
131+
{
132+
await semaphore.WaitAsync(cancellationToken);
152133

153-
EventsFailed.Add(
154-
1,
155-
new KeyValuePair<string, object?>("message_type", eventType.Name)
156-
);
157-
}
134+
_ = Task.Run(
135+
async () =>
136+
{
137+
try
138+
{
139+
await ExecuteConsumerAsync(
140+
@event,
141+
consumerType,
142+
eventType,
143+
consumer,
144+
activity,
145+
cancellationToken
146+
);
147+
}
148+
catch (Exception e)
149+
{
150+
logger.LogError(e, "Error occurred during consumer execution");
151+
}
152+
finally
153+
{
154+
semaphore.Release();
155+
}
156+
},
157+
cancellationToken
158+
);
158159
}
159160
else
160161
{
161-
logger.LogError(
162-
new EventId(75002, "ReflectionEventingConsumerMissing"),
163-
"ConsumeAsync method not found on consumer {ConsumerType} for event type {EventName}",
164-
consumerType.Name,
165-
@event.GetType().Name
162+
throw new InvalidOperationException(
163+
"Invalid queue processing mode. Must be either Sequential or Parallel."
166164
);
167165
}
168166
}
169167
}
170168

171169
EventsProcessed.Add(1, new KeyValuePair<string, object?>("message_type", eventType.Name));
172170
}
171+
172+
private async Task ExecuteConsumerAsync(
173+
object @event,
174+
Type consumerType,
175+
Type eventType,
176+
object consumer,
177+
Activity? activity,
178+
CancellationToken cancellationToken
179+
)
180+
{
181+
MethodInfo? consumeMethod = consumerType.GetMethod(
182+
"ConsumeAsync",
183+
[@event.GetType(), typeof(CancellationToken)]
184+
);
185+
186+
if (consumeMethod != null)
187+
{
188+
try
189+
{
190+
await (Task)consumeMethod.Invoke(consumer, [@event, cancellationToken])!;
191+
}
192+
catch (Exception e)
193+
{
194+
//activity?.AddException(e);
195+
activity?.SetStatus(ActivityStatusCode.Error);
196+
197+
logger.LogError(
198+
new EventId(75001, "ReflectionEventingQueueProcessingFailed"),
199+
e,
200+
"Error processing event of type {EventName}",
201+
@event.GetType().Name
202+
);
203+
204+
if (options.Value.UseErrorQueue)
205+
{
206+
queue.EnqueueError(
207+
new FailedEvent
208+
{
209+
Data = @event,
210+
Exception = e,
211+
Timestamp = DateTimeOffset.UtcNow,
212+
FailedConsumer = consumerType,
213+
}
214+
);
215+
}
216+
217+
EventsFailed.Add(
218+
1,
219+
new KeyValuePair<string, object?>("message_type", eventType.Name)
220+
);
221+
}
222+
}
223+
else
224+
{
225+
logger.LogError(
226+
new EventId(75002, "ReflectionEventingConsumerMissing"),
227+
"ConsumeAsync method not found on consumer {ConsumerType} for event type {EventName}",
228+
consumerType.Name,
229+
@event.GetType().Name
230+
);
231+
}
232+
}
173233
}

0 commit comments

Comments
 (0)