@@ -15,8 +15,32 @@ namespace ServiceControl.Audit.Persistence.PostgreSQL;
1515using ServiceControl . Audit . Persistence ;
1616using ServiceControl . SagaAudit ;
1717
18- class PostgreSQLAuditDataStore ( PostgreSQLConnectionFactory connectionFactory ) : IAuditDataStore
18+ class PostgreSQLAuditDataStore : IAuditDataStore
1919{
20+ readonly PostgreSQLConnectionFactory connectionFactory ;
21+ public PostgreSQLAuditDataStore ( PostgreSQLConnectionFactory connectionFactory )
22+ {
23+ this . connectionFactory = connectionFactory ;
24+ }
25+
26+ // Helper to safely deserialize a value from a dictionary with a default
27+ static T ? DeserializeOrDefault < T > ( Dictionary < string , object > dict , string key , T ? defaultValue = default )
28+ {
29+ if ( dict . TryGetValue ( key , out var value ) && value is JsonElement element && element . ValueKind != JsonValueKind . Null )
30+ {
31+ try
32+ {
33+ return JsonSerializer . Deserialize < T > ( element ) ;
34+ }
35+ catch { }
36+ }
37+ return defaultValue ;
38+ }
39+
40+ // Helper to get a value from the reader by column name
41+ static T GetValue < T > ( NpgsqlDataReader reader , string column )
42+ => reader . GetFieldValue < T > ( reader . GetOrdinal ( column ) ) ;
43+
2044 public async Task < MessageBodyView > GetMessageBody ( string messageId , CancellationToken cancellationToken )
2145 {
2246 using var conn = await connectionFactory . OpenConnection ( cancellationToken ) ;
@@ -41,7 +65,7 @@ public async Task<MessageBodyView> GetMessageBody(string messageId, Cancellation
4165 }
4266
4367 async Task < QueryResult < IList < MessagesView > > > GetAllMessages (
44- string ? conversationId , bool ? includeSystemMessages , PagingInfo pagingInfo , SortInfo sortInfo , DateTimeRange timeSentRange ,
68+ string ? conversationId , bool ? includeSystemMessages , PagingInfo pagingInfo , SortInfo sortInfo , DateTimeRange ? timeSentRange ,
4569 string ? endpointName ,
4670 string ? q ,
4771 CancellationToken cancellationToken )
@@ -174,7 +198,7 @@ public async Task<QueryResult<IList<MessagesView>>> GetMessages(bool includeSyst
174198
175199 public Task < QueryResult < IList < AuditCount > > > QueryAuditCounts ( string endpointName , CancellationToken cancellationToken ) => throw new NotImplementedException ( ) ;
176200 public Task < QueryResult < IList < KnownEndpointsView > > > QueryKnownEndpoints ( CancellationToken cancellationToken ) => throw new NotImplementedException ( ) ;
177- public Task < QueryResult < IList < MessagesView > > > QueryMessages ( string searchParam , PagingInfo pagingInfo , SortInfo sortInfo , DateTimeRange timeSentRange = null , CancellationToken cancellationToken = default )
201+ public Task < QueryResult < IList < MessagesView > > > QueryMessages ( string searchParam , PagingInfo pagingInfo , SortInfo sortInfo , DateTimeRange ? timeSentRange = null , CancellationToken cancellationToken = default )
178202 {
179203 return GetAllMessages ( null , null , pagingInfo , sortInfo , timeSentRange , searchParam , null , cancellationToken ) ;
180204 }
@@ -183,12 +207,12 @@ public async Task<QueryResult<IList<MessagesView>>> QueryMessagesByConversationI
183207 return await GetAllMessages ( conversationId , null , pagingInfo , sortInfo , null , null , null , cancellationToken ) ;
184208 }
185209
186- public async Task < QueryResult < IList < MessagesView > > > QueryMessagesByReceivingEndpoint ( bool includeSystemMessages , string endpointName , PagingInfo pagingInfo , SortInfo sortInfo , DateTimeRange timeSentRange = null , CancellationToken cancellationToken = default )
210+ public async Task < QueryResult < IList < MessagesView > > > QueryMessagesByReceivingEndpoint ( bool includeSystemMessages , string endpointName , PagingInfo pagingInfo , SortInfo sortInfo , DateTimeRange ? timeSentRange = null , CancellationToken cancellationToken = default )
187211 {
188212 return await GetAllMessages ( null , includeSystemMessages , pagingInfo , sortInfo , timeSentRange , null , endpointName , cancellationToken ) ;
189213 }
190214
191- public Task < QueryResult < IList < MessagesView > > > QueryMessagesByReceivingEndpointAndKeyword ( string endpoint , string keyword , PagingInfo pagingInfo , SortInfo sortInfo , DateTimeRange timeSentRange = null , CancellationToken cancellationToken = default )
215+ public Task < QueryResult < IList < MessagesView > > > QueryMessagesByReceivingEndpointAndKeyword ( string endpoint , string keyword , PagingInfo pagingInfo , SortInfo sortInfo , DateTimeRange ? timeSentRange = null , CancellationToken cancellationToken = default )
192216 {
193217 return GetAllMessages ( null , null , pagingInfo , sortInfo , timeSentRange , keyword , endpoint , cancellationToken ) ;
194218 }
@@ -197,37 +221,35 @@ public Task<QueryResult<IList<MessagesView>>> QueryMessagesByReceivingEndpointAn
197221 async Task < QueryResult < IList < MessagesView > > > ReturnResults ( NpgsqlCommand cmd , CancellationToken cancellationToken = default )
198222 {
199223 var results = new List < MessagesView > ( ) ;
200-
201224 using var reader = await cmd . ExecuteReaderAsync ( cancellationToken ) ;
202225 while ( await reader . ReadAsync ( cancellationToken ) )
203226 {
204- var headers = reader . GetFieldValue < Dictionary < string , string > > ( reader . GetOrdinal ( "headers" ) ) ;
205- var messageMetadata = reader . GetFieldValue < Dictionary < string , object > > ( reader . GetOrdinal ( "message_metadata" ) ) ;
227+ var headers = GetValue < Dictionary < string , string > > ( reader , "headers" ) ;
228+ var messageMetadata = GetValue < Dictionary < string , object > > ( reader , "message_metadata" ) ;
206229
207230 results . Add ( new MessagesView
208231 {
209- Id = reader . GetFieldValue < string > ( reader . GetOrdinal ( "unique_message_id" ) ) ,
210- MessageId = reader . GetFieldValue < string > ( reader . GetOrdinal ( "message_id" ) ) ,
211- MessageType = reader . GetFieldValue < string > ( reader . GetOrdinal ( "message_type" ) ) ,
212- SendingEndpoint = JsonSerializer . Deserialize < EndpointDetails > ( ( JsonElement ) messageMetadata [ "SendingEndpoint" ] ) ,
213- ReceivingEndpoint = JsonSerializer . Deserialize < EndpointDetails > ( ( JsonElement ) messageMetadata [ "ReceivingEndpoint" ] ) ,
214- TimeSent = reader . GetFieldValue < DateTime > ( reader . GetOrdinal ( "time_sent" ) ) ,
215- ProcessedAt = reader . GetFieldValue < DateTime > ( reader . GetOrdinal ( "processed_at" ) ) ,
216- CriticalTime = reader . GetFieldValue < TimeSpan > ( reader . GetOrdinal ( "critical_time" ) ) ,
217- ProcessingTime = reader . GetFieldValue < TimeSpan > ( reader . GetOrdinal ( "processing_time" ) ) ,
218- DeliveryTime = reader . GetFieldValue < TimeSpan > ( reader . GetOrdinal ( "delivery_time" ) ) ,
219- IsSystemMessage = reader . GetFieldValue < bool > ( reader . GetOrdinal ( "is_system_message" ) ) ,
220- ConversationId = reader . GetFieldValue < string > ( reader . GetOrdinal ( "conversation_id" ) ) ,
232+ Id = GetValue < string > ( reader , "unique_message_id" ) ,
233+ MessageId = GetValue < string > ( reader , "message_id" ) ,
234+ MessageType = GetValue < string > ( reader , "message_type" ) ,
235+ SendingEndpoint = DeserializeOrDefault < EndpointDetails > ( messageMetadata , "SendingEndpoint" ) ,
236+ ReceivingEndpoint = DeserializeOrDefault < EndpointDetails > ( messageMetadata , "ReceivingEndpoint" ) ,
237+ TimeSent = GetValue < DateTime > ( reader , "time_sent" ) ,
238+ ProcessedAt = GetValue < DateTime > ( reader , "processed_at" ) ,
239+ CriticalTime = GetValue < TimeSpan > ( reader , "critical_time" ) ,
240+ ProcessingTime = GetValue < TimeSpan > ( reader , "processing_time" ) ,
241+ DeliveryTime = GetValue < TimeSpan > ( reader , "delivery_time" ) ,
242+ IsSystemMessage = GetValue < bool > ( reader , "is_system_message" ) ,
243+ ConversationId = GetValue < string > ( reader , "conversation_id" ) ,
221244 Headers = [ .. headers ] ,
222- Status = ( MessageStatus ) reader . GetFieldValue < int > ( reader . GetOrdinal ( "status" ) ) ,
223- MessageIntent = ( MessageIntent ) ( messageMetadata . ContainsKey ( "MessageIntent" ) ? JsonSerializer . Deserialize < int > ( ( JsonElement ) messageMetadata [ "MessageIntent" ] ) : 1 ) ,
245+ Status = ( MessageStatus ) GetValue < int > ( reader , "status" ) ,
246+ MessageIntent = ( MessageIntent ) DeserializeOrDefault ( messageMetadata , "MessageIntent" , 1 ) ,
224247 BodyUrl = "" ,
225- BodySize = messageMetadata . ContainsKey ( "ContentLength" ) ? JsonSerializer . Deserialize < int > ( ( JsonElement ) messageMetadata [ "ContentLength" ] ) : 0 ,
226- InvokedSagas = messageMetadata . ContainsKey ( "InvokedSagas" ) ? JsonSerializer . Deserialize < List < SagaInfo > > ( ( JsonElement ) messageMetadata [ "InvokedSagas" ] ) : [ ] ,
227- OriginatesFromSaga = messageMetadata . ContainsKey ( "OriginatesFromSaga" ) ? JsonSerializer . Deserialize < SagaInfo > ( ( JsonElement ) messageMetadata [ "OriginatesFromSaga" ] ) : null
248+ BodySize = DeserializeOrDefault ( messageMetadata , "ContentLength" , 0 ) ,
249+ InvokedSagas = DeserializeOrDefault < List < SagaInfo > > ( messageMetadata , "InvokedSagas" , [ ] ) ,
250+ OriginatesFromSaga = DeserializeOrDefault < SagaInfo > ( messageMetadata , "OriginatesFromSaga" )
228251 } ) ;
229252 }
230-
231253 return new QueryResult < IList < MessagesView > > ( results , new QueryStatsInfo ( string . Empty , results . Count ) ) ;
232254 }
233255}
0 commit comments