33 using System ;
44 using System . Collections . Generic ;
55 using System . Diagnostics ;
6+ using System . Diagnostics . Metrics ;
67 using System . Text . Json ;
78 using System . Threading . Tasks ;
89 using Infrastructure ;
1516 using ServiceControl . Audit . Persistence . Monitoring ;
1617 using ServiceControl . EndpointPlugin . Messages . SagaState ;
1718 using ServiceControl . Infrastructure ;
18- using ServiceControl . Infrastructure . Metrics ;
1919 using ServiceControl . SagaAudit ;
2020
21- class AuditPersister
21+ class AuditPersister ( IAuditIngestionUnitOfWorkFactory unitOfWorkFactory ,
22+ IEnrichImportedAuditMessages [ ] enrichers ,
23+ IMessageSession messageSession ,
24+ Lazy < IMessageDispatcher > messageDispatcher )
2225 {
23- public AuditPersister ( IAuditIngestionUnitOfWorkFactory unitOfWorkFactory ,
24- IEnrichImportedAuditMessages [ ] enrichers ,
25- Counter ingestedAuditMeter , Counter ingestedSagaAuditMeter , Meter auditBulkInsertDurationMeter ,
26- Meter sagaAuditBulkInsertDurationMeter , Meter bulkInsertCommitDurationMeter , IMessageSession messageSession ,
27- Lazy < IMessageDispatcher > messageDispatcher )
28- {
29- this . unitOfWorkFactory = unitOfWorkFactory ;
30- this . enrichers = enrichers ;
31-
32- this . ingestedAuditMeter = ingestedAuditMeter ;
33- this . ingestedSagaAuditMeter = ingestedSagaAuditMeter ;
34- this . auditBulkInsertDurationMeter = auditBulkInsertDurationMeter ;
35- this . sagaAuditBulkInsertDurationMeter = sagaAuditBulkInsertDurationMeter ;
36- this . bulkInsertCommitDurationMeter = bulkInsertCommitDurationMeter ;
37- this . messageSession = messageSession ;
38- this . messageDispatcher = messageDispatcher ;
39- }
40-
4126 public async Task < IReadOnlyList < MessageContext > > Persist ( IReadOnlyList < MessageContext > contexts )
4227 {
4328 var stopwatch = Stopwatch . StartNew ( ) ;
4429
4530 if ( Logger . IsDebugEnabled )
4631 {
47- Logger . DebugFormat ( "Batch size {0}" , contexts . Count ) ;
32+ Logger . Debug ( $ "Batch size { contexts . Count } " ) ;
4833 }
4934
5035 var storedContexts = new List < MessageContext > ( contexts . Count ) ;
@@ -86,23 +71,25 @@ public async Task<IReadOnlyList<MessageContext>> Persist(IReadOnlyList<MessageCo
8671
8772 Logger . Debug ( "Adding audit message for bulk storage" ) ;
8873
89- using ( auditBulkInsertDurationMeter . Measure ( ) )
90- {
91- await unitOfWork . RecordProcessedMessage ( processedMessage , context . Body ) ;
92- }
74+ var auditSw = Stopwatch . StartNew ( ) ;
75+ await unitOfWork . RecordProcessedMessage ( processedMessage , context . Body ) ;
76+ auditSw . Stop ( ) ;
77+
78+ auditBulkInsertDurationMeter . Record ( auditSw . ElapsedMilliseconds ) ;
9379
94- ingestedAuditMeter . Mark ( ) ;
80+ ingestedAuditMeter . Add ( 1 ) ;
9581 }
9682 else if ( context . Extensions . TryGet ( out SagaSnapshot sagaSnapshot ) )
9783 {
9884 Logger . Debug ( "Adding SagaSnapshot message for bulk storage" ) ;
9985
100- using ( sagaAuditBulkInsertDurationMeter . Measure ( ) )
101- {
102- await unitOfWork . RecordSagaSnapshot ( sagaSnapshot ) ;
103- }
86+ var sagaSw = Stopwatch . StartNew ( ) ;
87+ await unitOfWork . RecordSagaSnapshot ( sagaSnapshot ) ;
88+ sagaSw . Stop ( ) ;
89+
90+ sagaAuditBulkInsertDurationMeter . Record ( sagaSw . ElapsedMilliseconds ) ;
10491
105- ingestedSagaAuditMeter . Mark ( ) ;
92+ ingestedSagaAuditMeter . Add ( 1 ) ;
10693 }
10794
10895 storedContexts . Add ( context ) ;
@@ -131,10 +118,10 @@ public async Task<IReadOnlyList<MessageContext>> Persist(IReadOnlyList<MessageCo
131118 try
132119 {
133120 // this can throw even though dispose is never supposed to throw
134- using ( bulkInsertCommitDurationMeter . Measure ( ) )
135- {
136- await unitOfWork . DisposeAsync ( ) ;
137- }
121+ var commitSw = Stopwatch . StartNew ( ) ;
122+ await unitOfWork . DisposeAsync ( ) ;
123+ commitSw . Stop ( ) ;
124+ bulkInsertCommitDurationMeter . Record ( commitSw . ElapsedMilliseconds ) ;
138125 }
139126 catch ( Exception e )
140127 {
@@ -284,16 +271,12 @@ await messageDispatcher.Value.Dispatch(new TransportOperations(messagesToEmit.To
284271 }
285272 }
286273
287- readonly Counter ingestedAuditMeter ;
288- readonly Counter ingestedSagaAuditMeter ;
289- readonly Meter auditBulkInsertDurationMeter ;
290- readonly Meter sagaAuditBulkInsertDurationMeter ;
291- readonly Meter bulkInsertCommitDurationMeter ;
292- readonly IMessageSession messageSession ;
293- readonly Lazy < IMessageDispatcher > messageDispatcher ;
274+ readonly Counter < long > ingestedAuditMeter = AuditMetrics . Meter . CreateCounter < long > ( $ "{ AuditMetrics . Prefix } .ingested_audit_messages") ; // metrics.GetCounter("Audit ingestion - ingested audit");
275+ readonly Counter < long > ingestedSagaAuditMeter = AuditMetrics . Meter . CreateCounter < long > ( $ "{ AuditMetrics . Prefix } .ingested_saga_audits") ; // metrics.GetCounter("Audit ingestion - ingested audit");
276+ readonly Histogram < double > auditBulkInsertDurationMeter = AuditMetrics . Meter . CreateHistogram < double > ( $ "{ AuditMetrics . Prefix } .audit_bulk_insert_duration_ms") ; // metrics.GetCounter("Audit ingestion - ingested audit");
277+ readonly Histogram < double > sagaAuditBulkInsertDurationMeter = AuditMetrics . Meter . CreateHistogram < double > ( $ "{ AuditMetrics . Prefix } .saga_bulk_insert_duration_ms") ; // metrics.GetCounter("Audit ingestion - ingested audit");
278+ readonly Histogram < double > bulkInsertCommitDurationMeter = AuditMetrics . Meter . CreateHistogram < double > ( $ "{ AuditMetrics . Prefix } .audit_commit_duration_ms") ; // metrics.GetCounter("Audit ingestion - ingested audit");
294279
295- readonly IEnrichImportedAuditMessages [ ] enrichers ;
296- readonly IAuditIngestionUnitOfWorkFactory unitOfWorkFactory ;
297280 static readonly ILog Logger = LogManager . GetLogger < AuditPersister > ( ) ;
298281 }
299282}
0 commit comments