11namespace Particular . LicensingComponent ;
22
3+ using System ;
4+ using System . Runtime . CompilerServices ;
5+ using System . Threading ;
36using AuditThroughput ;
47using Contracts ;
58using MonitoringThroughput ;
@@ -59,28 +62,16 @@ public async Task UpdateUserIndicatorsOnEndpoints(List<UpdateUserIndicator> user
5962
6063 public async Task < List < EndpointThroughputSummary > > GetThroughputSummary ( CancellationToken cancellationToken )
6164 {
62- var endpoints = ( await dataStore . GetAllEndpoints ( false , cancellationToken ) ) . ToList ( ) ;
63- var queueNames = endpoints . Select ( endpoint => endpoint . SanitizedName ) . Distinct ( ) . ToList ( ) ;
64- var endpointThroughputPerQueue = await dataStore . GetEndpointThroughputByQueueName ( queueNames , cancellationToken ) ;
6565 var endpointSummaries = new List < EndpointThroughputSummary > ( ) ;
6666
67- //group endpoints by sanitized name - so to group throughput recorded from broker, audit and monitoring
68- foreach ( var endpointGroupPerQueue in endpoints . GroupBy ( g => CleanseSanitizedName ( g . SanitizedName ) ) )
67+ await foreach ( var endpointData in GetDistinctEndpointData ( cancellationToken ) )
6968 {
70- var data = new List < ThroughputData > ( ) ;
71- if ( endpointThroughputPerQueue . TryGetValue ( endpointGroupPerQueue . Key , out var tempData ) )
72- {
73- data . AddRange ( tempData ) ;
74- }
75-
76- var isKnownEndpoint = IsKnownEndpoint ( endpointGroupPerQueue ) ;
7769 var endpointSummary = new EndpointThroughputSummary
7870 {
79- //want to display the endpoint name to the user if it's different to the sanitized endpoint name
80- Name = endpointGroupPerQueue . FirstOrDefault ( endpoint => endpoint . Id . Name != endpoint . SanitizedName ) ? . Id . Name ?? endpointGroupPerQueue . Key ,
81- UserIndicator = UserIndicator ( endpointGroupPerQueue ) ?? ( isKnownEndpoint ? Contracts . UserIndicator . NServiceBusEndpoint . ToString ( ) : string . Empty ) ,
82- IsKnownEndpoint = isKnownEndpoint ,
83- MaxDailyThroughput = data . Max ( )
71+ Name = endpointData . Name ,
72+ UserIndicator = endpointData . UserIndicator ?? ( endpointData . IsKnownEndpoint ? Contracts . UserIndicator . NServiceBusEndpoint . ToString ( ) : string . Empty ) ,
73+ IsKnownEndpoint = endpointData . IsKnownEndpoint ,
74+ MaxDailyThroughput = endpointData . ThroughputData . Max ( )
8475 } ;
8576
8677 endpointSummaries . Add ( endpointSummary ) ;
@@ -124,40 +115,25 @@ public async Task<SignedReport> GenerateThroughputReport(string spVersion, DateT
124115 var reportMasks = await dataStore . GetReportMasks ( cancellationToken ) ;
125116 CreateMasks ( reportMasks . ToArray ( ) ) ;
126117
127- var endpoints = ( await dataStore . GetAllEndpoints ( false , cancellationToken ) ) . ToArray ( ) ;
128- var queueNames = endpoints . Select ( endpoint => endpoint . SanitizedName ) . Distinct ( ) . ToList ( ) ;
129- var endpointThroughputPerQueue = await dataStore . GetEndpointThroughputByQueueName ( queueNames , cancellationToken ) ;
130118 var queueThroughputs = new List < QueueThroughput > ( ) ;
131119 List < string > ignoredQueueNames = [ ] ;
132120
133- //group endpoints by sanitized name - so to group throughput recorded from broker, audit and monitoring
134- foreach ( var endpointGroupPerQueue in endpoints . GroupBy ( g => CleanseSanitizedName ( g . SanitizedName ) ) )
121+ await foreach ( var endpointData in GetDistinctEndpointData ( cancellationToken ) )
135122 {
136- //want to display the endpoint name if it's different to the sanitized endpoint name
137- var endpointName = endpointGroupPerQueue . FirstOrDefault ( endpoint => endpoint . Id . Name != endpoint . SanitizedName ) ? . Id . Name ?? endpointGroupPerQueue . Key ;
138-
139- if ( ! endpointThroughputPerQueue . TryGetValue ( endpointGroupPerQueue . Key , out var data ) )
140- {
141- data = [ ] ;
142- }
143-
144- var throughputData = data . ToList ( ) ;
145-
146- var userIndicator = UserIndicator ( endpointGroupPerQueue ) ?? null ;
147- var notAnNsbEndpoint = userIndicator ? . Equals ( Contracts . UserIndicator . NotNServiceBusEndpoint . ToString ( ) , StringComparison . OrdinalIgnoreCase ) ?? false ;
123+ var notAnNsbEndpoint = endpointData . UserIndicator ? . Equals ( Contracts . UserIndicator . NotNServiceBusEndpoint . ToString ( ) , StringComparison . OrdinalIgnoreCase ) ?? false ;
148124
149125 //get all data that we have, including daily values
150126 var queueThroughput = new QueueThroughput
151127 {
152- QueueName = Mask ( endpointName ) ,
153- UserIndicator = userIndicator ,
154- EndpointIndicators = EndpointIndicators ( endpointGroupPerQueue ) ?? [ ] ,
155- NoDataOrSendOnly = throughputData . Sum ( ) == 0 ,
156- Scope = EndpointScope ( endpointGroupPerQueue ) ?? "" ,
157- Throughput = throughputData . Max ( ) ,
158- DailyThroughputFromAudit = throughputData . FromSource ( ThroughputSource . Audit ) . Select ( s => new DailyThroughput { DateUTC = s . DateUTC , MessageCount = s . MessageCount } ) . ToArray ( ) ,
159- DailyThroughputFromMonitoring = throughputData . FromSource ( ThroughputSource . Monitoring ) . Select ( s => new DailyThroughput { DateUTC = s . DateUTC , MessageCount = s . MessageCount } ) . ToArray ( ) ,
160- DailyThroughputFromBroker = notAnNsbEndpoint ? [ ] : throughputData . FromSource ( ThroughputSource . Broker ) . Select ( s => new DailyThroughput { DateUTC = s . DateUTC , MessageCount = s . MessageCount } ) . ToArray ( )
128+ QueueName = Mask ( endpointData . Name ) ,
129+ UserIndicator = endpointData . UserIndicator ,
130+ EndpointIndicators = endpointData . EndpointIndicators ?? [ ] ,
131+ NoDataOrSendOnly = endpointData . ThroughputData . Sum ( ) == 0 ,
132+ Scope = endpointData . Scope ?? "" ,
133+ Throughput = endpointData . ThroughputData . Max ( ) ,
134+ DailyThroughputFromAudit = endpointData . ThroughputData . FromSource ( ThroughputSource . Audit ) . Select ( s => new DailyThroughput { DateUTC = s . DateUTC , MessageCount = s . MessageCount } ) . ToArray ( ) ,
135+ DailyThroughputFromMonitoring = endpointData . ThroughputData . FromSource ( ThroughputSource . Monitoring ) . Select ( s => new DailyThroughput { DateUTC = s . DateUTC , MessageCount = s . MessageCount } ) . ToArray ( ) ,
136+ DailyThroughputFromBroker = notAnNsbEndpoint ? [ ] : endpointData . ThroughputData . FromSource ( ThroughputSource . Broker ) . Select ( s => new DailyThroughput { DateUTC = s . DateUTC , MessageCount = s . MessageCount } ) . ToArray ( )
161137 } ;
162138
163139 queueThroughputs . Add ( queueThroughput ) ;
@@ -199,8 +175,8 @@ public async Task<SignedReport> GenerateThroughputReport(string spVersion, DateT
199175
200176 report . EnvironmentInformation . EnvironmentData [ EnvironmentDataType . ServiceControlVersion . ToString ( ) ] = throughputSettings . ServiceControlVersion ;
201177 report . EnvironmentInformation . EnvironmentData [ EnvironmentDataType . ServicePulseVersion . ToString ( ) ] = spVersion ;
202- report . EnvironmentInformation . EnvironmentData [ EnvironmentDataType . AuditEnabled . ToString ( ) ] = endpointThroughputPerQueue . HasDataFromSource ( ThroughputSource . Audit ) . ToString ( ) ;
203- report . EnvironmentInformation . EnvironmentData [ EnvironmentDataType . MonitoringEnabled . ToString ( ) ] = endpointThroughputPerQueue . HasDataFromSource ( ThroughputSource . Monitoring ) . ToString ( ) ;
178+ report . EnvironmentInformation . EnvironmentData [ EnvironmentDataType . AuditEnabled . ToString ( ) ] = systemHasAuditEnabled . ToString ( ) ;
179+ report . EnvironmentInformation . EnvironmentData [ EnvironmentDataType . MonitoringEnabled . ToString ( ) ] = systemHasMonitoringEnabled . ToString ( ) ;
204180
205181 var throughputReport = new SignedReport { ReportData = report , Signature = Signature . SignReport ( report ) } ;
206182 return throughputReport ;
@@ -228,6 +204,57 @@ string Mask(string stringToMask)
228204 }
229205 }
230206
207+ async IAsyncEnumerable < EndpointData > GetDistinctEndpointData ( [ EnumeratorCancellation ] CancellationToken cancellationToken )
208+ {
209+ var endpoints = ( await dataStore . GetAllEndpoints ( false , cancellationToken ) ) . ToArray ( ) ;
210+ var queueNames = endpoints . Select ( endpoint => endpoint . SanitizedName ) . Distinct ( ) . ToList ( ) ;
211+ var endpointThroughputPerQueue = await dataStore . GetEndpointThroughputByQueueName ( queueNames , cancellationToken ) ;
212+
213+ systemHasAuditEnabled = endpointThroughputPerQueue . HasDataFromSource ( ThroughputSource . Audit ) ;
214+ systemHasMonitoringEnabled = endpointThroughputPerQueue . HasDataFromSource ( ThroughputSource . Monitoring ) ;
215+
216+ //group endpoints by sanitized name - so to group throughput recorded from broker, audit and monitoring
217+ //some brokers use lowercase only so we want to ensure that we are matching on what the user has setup via NSB in terms of nn endpoint name, and what is stored on the broker
218+ foreach ( var endpointGroupPerQueue in endpoints . GroupBy ( g => CleanseSanitizedName ( g . SanitizedName ) ) )
219+ {
220+ //want to display the endpoint name if it's different to the sanitized endpoint name
221+ var endpointName = endpointGroupPerQueue . FirstOrDefault ( endpoint => ! string . Equals ( endpoint . Id . Name , endpointGroupPerQueue . Key , StringComparison . Ordinal ) ) ? . Id . Name ?? endpointGroupPerQueue . Key ;
222+
223+ var throughputData = new List < ThroughputData > ( ) ;
224+ foreach ( var endpointQueueName in endpointThroughputPerQueue . Keys . Where ( k => CleanseSanitizedName ( k ) == endpointGroupPerQueue . Key ) )
225+ {
226+ if ( endpointThroughputPerQueue . TryGetValue ( endpointQueueName , out var tempData ) )
227+ {
228+ throughputData . AddRange ( tempData ) ;
229+ }
230+ }
231+
232+ var userIndicator = UserIndicator ( endpointGroupPerQueue ) ?? null ;
233+
234+ yield return new EndpointData ( endpointName , throughputData , userIndicator , EndpointScope ( endpointGroupPerQueue ) , EndpointIndicators ( endpointGroupPerQueue ) , IsKnownEndpoint ( endpointGroupPerQueue ) ) ;
235+ }
236+ }
237+
238+ class EndpointData
239+ {
240+ internal EndpointData ( string name , List < ThroughputData > throughputData , string ? userIndicator , string ? scope , string [ ] ? endpointIndicators , bool isKnownEndpoint )
241+ {
242+ Name = name ;
243+ ThroughputData = throughputData ;
244+ UserIndicator = userIndicator ;
245+ Scope = scope ;
246+ EndpointIndicators = endpointIndicators ;
247+ IsKnownEndpoint = isKnownEndpoint ;
248+ }
249+
250+ internal string Name { get ; }
251+ internal List < ThroughputData > ThroughputData { get ; }
252+ internal string ? UserIndicator { get ; }
253+ internal string ? Scope { get ; }
254+ internal string [ ] ? EndpointIndicators { get ; }
255+ internal bool IsKnownEndpoint { get ; }
256+ }
257+
231258 string CleanseSanitizedName ( string endpointName )
232259 {
233260 return throughputQuery == null ? endpointName : throughputQuery . SanitizedEndpointNameCleanser ( endpointName ) ;
@@ -241,4 +268,6 @@ string CleanseSanitizedName(string endpointName)
241268 string [ ] ? EndpointIndicators ( IGrouping < string , Endpoint > endpoint ) => endpoint . Where ( w => w . EndpointIndicators ? . Any ( ) == true ) ? . SelectMany ( s => s . EndpointIndicators ) ? . Distinct ( ) ? . ToArray ( ) ;
242269
243270 readonly string transport = throughputQuery ? . MessageTransport ?? throughputSettings . TransportType ;
271+ internal bool systemHasAuditEnabled ;
272+ internal bool systemHasMonitoringEnabled ;
244273}
0 commit comments