Skip to content

Commit 953d2fb

Browse files
heuveaGrabauskas
andauthored
FUND-2116 ZSDMS notifications-from-RabbitMQ-to-webhook (#105)
* refactored the way how zsdms close-zaak started state-machine * fixed UnitTest notificaties * co-pilot review points fixed * deleted the unused method from INotificatiesServiceAgent * added console logging jobs * cleanup * new hangfire console logging nuget added * code review points ii --------- Co-authored-by: Giedrius Grabauskas <43740166+Grabauskas@users.noreply.github.com>
1 parent 5454418 commit 953d2fb

File tree

14 files changed

+221
-97
lines changed

14 files changed

+221
-97
lines changed

src/Directory.Packages.props

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
<PackageVersion Include="FluentValidation" Version="12.0.0" />
1515
<PackageVersion Include="FluentValidation.DependencyInjectionExtensions" Version="12.0.0" />
1616
<PackageVersion Include="Hangfire" Version="1.8.20" />
17+
<PackageVersion Include="Hangfire.Console.Extensions" Version="2.1.0" />
1718
<PackageVersion Include="Hangfire.PostgreSql" Version="1.20.12" />
1819
<PackageVersion Include="MassTransit" Version="8.4.1" />
1920
<PackageVersion Include="MassTransit.Hangfire" Version="8.4.1" />

src/OneGround.ZGW.Notificaties.Contracts/v1/Responses/AbonnementResponseDto.cs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
using System;
12
using Newtonsoft.Json;
23

34
namespace OneGround.ZGW.Notificaties.Contracts.v1.Responses;
@@ -6,4 +7,10 @@ public class AbonnementResponseDto : AbonnementDto
67
{
78
[JsonProperty("url")]
89
public string Url { get; set; }
10+
11+
[JsonProperty(PropertyName = "id")]
12+
public Guid Id { get; set; }
13+
14+
[JsonProperty(PropertyName = "owner")]
15+
public string Owner { get; set; }
916
}

src/OneGround.ZGW.Notificaties.Messaging/Consumers/SendNotificatiesConsumer.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,9 +113,12 @@ public async Task Consume(ConsumeContext<ISendNotificaties> context)
113113
continue;
114114
}
115115

116+
// Get optional BatchId header
117+
context.TryGetHeader<Guid>("X-Batch-Id", out var batchId);
118+
116119
// Enqueue Hangfire job which sends the notificatie message (for each subscriber on channel)
117120
var job = _notificatieScheduler.Enqueue<NotificatieJob>(h =>
118-
h.ReQueueNotificatieAsync(abonnement.Id, notificatie.ToInstance())
121+
h.ReQueueNotificatieAsync(abonnement.Id, notificatie.ToInstance(), null, batchId)
119122
);
120123

121124
Logger.LogInformation(

src/OneGround.ZGW.Notificaties.Messaging/FailedQueueInitializationService.cs

Lines changed: 0 additions & 64 deletions
This file was deleted.
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
using Hangfire.Console;
2+
using Hangfire.Server;
3+
4+
namespace OneGround.ZGW.Notificaties.Messaging.Jobs.Notificatie;
5+
6+
public static class ContextExtensions
7+
{
8+
public static void WriteLineColored(this PerformContext context, ConsoleTextColor color, string message)
9+
{
10+
if (context != null)
11+
{
12+
context.SetTextColor(color);
13+
context.WriteLine(message);
14+
context.ResetTextColor();
15+
}
16+
}
17+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
using OneGround.ZGW.Common.Messaging;
2+
3+
namespace OneGround.ZGW.Notificaties.Messaging.Jobs.Notificatie;
4+
5+
public static class NotificatieExtensions
6+
{
7+
public static SubscriberNotificatie ToInstance(this INotificatie notificatie)
8+
{
9+
return new SubscriberNotificatie
10+
{
11+
Kanaal = notificatie.Kanaal,
12+
HoofdObject = notificatie.HoofdObject,
13+
Resource = notificatie.Resource,
14+
ResourceUrl = notificatie.ResourceUrl,
15+
Actie = notificatie.Actie,
16+
Kenmerken = notificatie.Kenmerken,
17+
CorrelationId = notificatie.CorrelationId,
18+
Rsin = notificatie.Rsin,
19+
};
20+
}
21+
}

src/OneGround.ZGW.Notificaties.Messaging/Jobs/Notificatie/NotificatieJob.cs

Lines changed: 71 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
using Hangfire;
2+
using Hangfire.Console;
3+
using Hangfire.Server;
24
using Microsoft.EntityFrameworkCore;
35
using Microsoft.Extensions.Caching.Memory;
46
using Microsoft.Extensions.Configuration;
57
using Microsoft.Extensions.DependencyInjection;
68
using Microsoft.Extensions.Logging;
7-
using OneGround.ZGW.Common.Messaging;
9+
using OneGround.ZGW.Common.Batching;
10+
using OneGround.ZGW.Common.CorrelationId;
811
using OneGround.ZGW.Notificaties.DataModel;
912
using OneGround.ZGW.Notificaties.Messaging.Configuration;
1013
using OneGround.ZGW.Notificaties.Messaging.Consumers;
@@ -13,7 +16,9 @@ namespace OneGround.ZGW.Notificaties.Messaging.Jobs.Notificatie;
1316

1417
public interface INotificatieJob
1518
{
16-
Task ReQueueNotificatieAsync(Guid abonnementId, SubscriberNotificatie notificatie);
19+
[Obsolete("Use overload with PerformContext (which adds functionality like logging")] // Note: Keep the old one for backward compatibility
20+
Task ReQueueNotificatieAsync(Guid abonnementId, SubscriberNotificatie notificatie, Guid? batchId = null);
21+
Task ReQueueNotificatieAsync(Guid abonnementId, SubscriberNotificatie notificatie, PerformContext context = null, Guid? batchId = null);
1722
}
1823

1924
[Queue(Constants.NrcListenerQueue)]
@@ -22,6 +27,8 @@ public class NotificatieJob : INotificatieJob
2227
private readonly INotificationSender _notificationSender;
2328
private readonly IServiceProvider _serviceProvider;
2429
private readonly IMemoryCache _memoryCache;
30+
private readonly IBatchIdAccessor _batchIdAccessor;
31+
private readonly ICorrelationContextAccessor _correlationIdAccessor;
2532
private readonly ILogger<NotificatieJob> _logger;
2633
private readonly ApplicationConfiguration _applicationConfiguration;
2734

@@ -30,30 +37,43 @@ public NotificatieJob(
3037
IServiceProvider serviceProvider,
3138
IConfiguration configuration,
3239
IMemoryCache memoryCache,
40+
IBatchIdAccessor batchIdAccessor,
41+
ICorrelationContextAccessor correlationIdAccessor,
3342
ILogger<NotificatieJob> logger
3443
)
3544
{
3645
_notificationSender = notificationSender;
3746
_serviceProvider = serviceProvider;
3847
_memoryCache = memoryCache;
48+
_batchIdAccessor = batchIdAccessor;
49+
_correlationIdAccessor = correlationIdAccessor;
3950
_logger = logger;
4051

4152
_applicationConfiguration = configuration.GetSection("Application").Get<ApplicationConfiguration>() ?? new ApplicationConfiguration();
4253
}
4354

44-
public async Task ReQueueNotificatieAsync(Guid abonnementId, SubscriberNotificatie notificatie)
55+
public Task ReQueueNotificatieAsync(Guid abonnementId, SubscriberNotificatie notificatie, Guid? batchId = null)
56+
{
57+
return ReQueueNotificatieAsync(abonnementId, notificatie, null, batchId);
58+
}
59+
60+
public async Task ReQueueNotificatieAsync(
61+
Guid abonnementId,
62+
SubscriberNotificatie notificatie,
63+
PerformContext context = null,
64+
Guid? batchId = null
65+
)
4566
{
4667
ArgumentNullException.ThrowIfNull(notificatie, nameof(notificatie));
4768

48-
using (GetLoggingScope(notificatie.Rsin, notificatie.CorrelationId))
69+
using (GetLoggingScope(notificatie.Rsin, notificatie.CorrelationId, batchId))
4970
{
5071
// Get deliver data for this subscriber like callback url and auth
5172
var subscriber = await GetCachedAbonnementByIdAsync(abonnementId, CancellationToken.None);
5273
if (subscriber == null)
5374
{
54-
throw new GeneralException(
55-
$"{DateTime.Now:yyyy-MM-dd HH:mm:ss} {notificatie.CorrelationId}: Could not find abonnement with id '{abonnementId}'"
56-
);
75+
_logger.LogWarning("Abonnement with id {abonnementId} not found. Probably deleted within the retry period of the job.", abonnementId);
76+
return; // Job Ignored->Succeeded
5777
}
5878
if (string.IsNullOrWhiteSpace(subscriber.CallbackUrl))
5979
{
@@ -62,20 +82,62 @@ public async Task ReQueueNotificatieAsync(Guid abonnementId, SubscriberNotificat
6282
);
6383
}
6484

85+
SetBatchIdOrDefault(batchId);
86+
SetCorrelationId(notificatie);
87+
6588
// Notify subscriber on channel....
89+
context.WriteLineColored(
90+
ConsoleTextColor.Yellow,
91+
$"Try to deliver notification to subscriber '{subscriber.CallbackUrl}' on channel '{notificatie.Kanaal}'."
92+
);
93+
6694
var result = await _notificationSender.SendAsync(notificatie, subscriber.CallbackUrl, subscriber.Auth);
6795
if (!result.Success)
6896
{
97+
context.WriteLineColored(
98+
ConsoleTextColor.Red,
99+
$"Could not deliver notification to subscriber '{subscriber.CallbackUrl}' on channel '{notificatie.Kanaal}'."
100+
);
101+
69102
throw new NotDeliveredException(
70103
$"{DateTime.Now:yyyy-MM-dd HH:mm:ss} {notificatie.CorrelationId}: Could not deliver notificatie to subscriber '{notificatie.Rsin}', channel '{notificatie.Kanaal}', endpoint '{subscriber.CallbackUrl}'"
71104
);
72105
}
106+
107+
context.WriteLineColored(
108+
ConsoleTextColor.Yellow,
109+
$"Successfully delivered notification to subscriber '{subscriber.CallbackUrl}' on channel '{notificatie.Kanaal}'."
110+
);
73111
}
74112
}
75113

76-
private IDisposable GetLoggingScope(string rsin, Guid correlationId)
114+
private void SetCorrelationId(SubscriberNotificatie notificatie)
115+
{
116+
_correlationIdAccessor.SetCorrelationId(notificatie.CorrelationId.ToString());
117+
}
118+
119+
private void SetBatchIdOrDefault(Guid? batchId)
77120
{
78-
return _logger.BeginScope(new Dictionary<string, object> { ["CorrelationId"] = correlationId, ["RSIN"] = rsin });
121+
_batchIdAccessor.Id = batchId.HasValue ? batchId.ToString() : null;
122+
}
123+
124+
private IDisposable GetLoggingScope(string rsin, Guid correlationId, Guid? batchId)
125+
{
126+
if (batchId.HasValue)
127+
{
128+
return _logger.BeginScope(
129+
new Dictionary<string, object>
130+
{
131+
["CorrelationId"] = correlationId,
132+
["RSIN"] = rsin,
133+
["BatchId"] = batchId,
134+
}
135+
);
136+
}
137+
else
138+
{
139+
return _logger.BeginScope(new Dictionary<string, object> { ["CorrelationId"] = correlationId, ["RSIN"] = rsin });
140+
}
79141
}
80142

81143
private async Task<Abonnement> GetCachedAbonnementByIdAsync(Guid id, CancellationToken cancellationToken)
@@ -99,21 +161,3 @@ private async Task<Abonnement> GetCachedAbonnementByIdAsync(Guid id, Cancellatio
99161
return abonnementen[id];
100162
}
101163
}
102-
103-
public static class NotificatieExtensions
104-
{
105-
public static SubscriberNotificatie ToInstance(this INotificatie notificatie)
106-
{
107-
return new SubscriberNotificatie
108-
{
109-
Kanaal = notificatie.Kanaal,
110-
HoofdObject = notificatie.HoofdObject,
111-
Resource = notificatie.Resource,
112-
ResourceUrl = notificatie.ResourceUrl,
113-
Actie = notificatie.Actie,
114-
Kenmerken = notificatie.Kenmerken,
115-
CorrelationId = notificatie.CorrelationId,
116-
Rsin = notificatie.Rsin,
117-
};
118-
}
119-
}

src/OneGround.ZGW.Notificaties.Messaging/Jobs/Notificatie/NotificatieManagementJob.cs

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,25 @@
11
using Hangfire;
2+
using Hangfire.Console;
3+
using Hangfire.Server;
24

35
namespace OneGround.ZGW.Notificaties.Messaging.Jobs.Notificatie;
46

57
public interface INotificatieManagementJob
68
{
9+
[Obsolete("Use overload with PerformContext (which adds functionality like logging")] // Note: Keep the old one for backward compatibility
710
void ExpireFailedJobsScanAt(TimeSpan maxAgeFailedJob);
11+
void ExpireFailedJobsScanAt(TimeSpan maxAgeFailedJob, PerformContext context = null);
812
}
913

10-
[DisableConcurrentExecution(10)]
1114
[Queue(Constants.NrcListenerQueue)]
1215
public class NotificatieManagementJob : INotificatieManagementJob
1316
{
1417
public void ExpireFailedJobsScanAt(TimeSpan maxAgeFailedJob)
18+
{
19+
ExpireFailedJobsScanAt(maxAgeFailedJob, null);
20+
}
21+
22+
public void ExpireFailedJobsScanAt(TimeSpan maxAgeFailedJob, PerformContext context = null)
1523
{
1624
const int pageSize = 100;
1725

@@ -46,5 +54,14 @@ public void ExpireFailedJobsScanAt(TimeSpan maxAgeFailedJob)
4654
{
4755
BackgroundJob.Delete(jobId); // Set Job state Deleted -> will expire afterward automatically
4856
}
57+
58+
if (jobsToDelete.Any())
59+
{
60+
context.WriteLineColored(ConsoleTextColor.Yellow, $"{jobsToDelete.Count} failed Notificatie jobs older than {maxAgeFailedJob} deleted.");
61+
}
62+
else
63+
{
64+
context.WriteLineColored(ConsoleTextColor.Yellow, $"No Failed Notificatie jobs older than {maxAgeFailedJob} found to delete.");
65+
}
4966
}
5067
}

0 commit comments

Comments
 (0)