1+
12namespace ServiceControl . Audit . Persistence . PostgreSQL ;
23
34using System ;
45using System . Collections . Generic ;
5- using System . Text ;
66using System . Text . Json ;
77using System . Threading ;
88using System . Threading . Tasks ;
@@ -15,209 +15,112 @@ namespace ServiceControl.Audit.Persistence.PostgreSQL;
1515using ServiceControl . Audit . Persistence ;
1616using ServiceControl . SagaAudit ;
1717
18- class PostgreSQLAuditDataStore : IAuditDataStore
19- {
20- readonly PostgreSQLConnectionFactory connectionFactory ;
21- public PostgreSQLAuditDataStore ( PostgreSQLConnectionFactory connectionFactory )
22- {
23- this . connectionFactory = connectionFactory ;
24- }
25-
26- // Helper to safely deserialize a value from a dictionary with a default
27- static T ? DeserializeOrDefault < T > ( Dictionary < string , object > dict , string key , T ? defaultValue = default )
28- {
29- if ( dict . TryGetValue ( key , out var value ) && value is JsonElement element && element . ValueKind != JsonValueKind . Null )
30- {
31- try
32- {
33- return JsonSerializer . Deserialize < T > ( element ) ;
34- }
35- catch { }
36- }
37- return defaultValue ;
38- }
39-
40- // Helper to get a value from the reader by column name
41- static T GetValue < T > ( NpgsqlDataReader reader , string column )
42- => reader . GetFieldValue < T > ( reader . GetOrdinal ( column ) ) ;
4318
19+ class PostgreSQLAuditDataStore ( PostgreSQLConnectionFactory connectionFactory ) : IAuditDataStore
20+ {
4421 public async Task < MessageBodyView > GetMessageBody ( string messageId , CancellationToken cancellationToken )
4522 {
4623 using var conn = await connectionFactory . OpenConnection ( cancellationToken ) ;
47-
4824 using var cmd = new NpgsqlCommand ( @"
4925 select body, headers from processed_messages
5026 where message_id = @message_id
5127 LIMIT 1;" , conn ) ;
52-
5328 cmd . Parameters . AddWithValue ( "message_id" , messageId ) ;
54-
5529 using var reader = await cmd . ExecuteReaderAsync ( cancellationToken ) ;
5630 if ( await reader . ReadAsync ( cancellationToken ) )
5731 {
5832 var stream = await reader . GetStreamAsync ( reader . GetOrdinal ( "body" ) , cancellationToken ) ;
5933 var contentType = reader . GetFieldValue < Dictionary < string , string > > ( reader . GetOrdinal ( "headers" ) ) . GetValueOrDefault ( Headers . ContentType , "text/xml" ) ;
60-
6134 return MessageBodyView . FromStream ( stream , contentType , ( int ) stream . Length , string . Empty ) ;
6235 }
63-
6436 return MessageBodyView . NotFound ( ) ;
6537 }
6638
67- async Task < QueryResult < IList < MessagesView > > > GetAllMessages (
68- string ? conversationId , bool ? includeSystemMessages , PagingInfo pagingInfo , SortInfo sortInfo , DateTimeRange ? timeSentRange ,
69- string ? endpointName ,
70- string ? q ,
71- CancellationToken cancellationToken )
72- {
73- using var conn = await connectionFactory . OpenConnection ( cancellationToken ) ;
74-
75- var sql = new StringBuilder ( @"select unique_message_id,
76- message_metadata,
77- headers,
78- processed_at,
79- message_id,
80- message_type,
81- is_system_message,
82- status,
83- time_sent,
84- receiving_endpoint_name,
85- critical_time,
86- processing_time,
87- delivery_time,
88- conversation_id from processed_messages
89- where 1 = 1" ) ;
90-
91- if ( includeSystemMessages . HasValue )
92- {
93- sql . Append ( " and is_system_message = @is_system_message" ) ;
94- }
95-
96- if ( ! string . IsNullOrWhiteSpace ( q ) )
97- {
98- sql . Append ( " and query @@ to_tsquery('english', @search)" ) ;
99- }
100-
101- if ( ! string . IsNullOrWhiteSpace ( conversationId ) )
102- {
103- sql . Append ( " and conversation_id = @conversation_id" ) ;
104- }
105-
106- if ( ! string . IsNullOrWhiteSpace ( endpointName ) )
107- {
108- sql . Append ( " and receiving_endpoint_name = @endpoint_name" ) ;
109- }
110-
111- if ( timeSentRange ? . From != null )
112- {
113- sql . Append ( " and time_sent >= @time_sent_start" ) ;
114- }
115-
116- if ( timeSentRange ? . To != null )
117- {
118- sql . Append ( " and time_sent <= @time_sent_end" ) ;
119- }
120-
121- sql . Append ( " ORDER BY" ) ;
122- switch ( sortInfo . Sort )
123- {
124-
125- case "id" :
126- case "message_id" :
127- sql . Append ( " message_id" ) ;
128- break ;
129- case "message_type" :
130- sql . Append ( " message_type" ) ;
131- break ;
132- case "critical_time" :
133- sql . Append ( " critical_time" ) ;
134- break ;
135- case "delivery_time" :
136- sql . Append ( " delivery_time" ) ;
137- break ;
138- case "processing_time" :
139- sql . Append ( " processing_time" ) ;
140- break ;
141- case "processed_at" :
142- sql . Append ( " processed_at" ) ;
143- break ;
144- case "status" :
145- sql . Append ( " status" ) ;
146- break ;
147- default :
148- sql . Append ( " time_sent" ) ;
149- break ;
150- }
151-
152- if ( sortInfo . Direction == "asc" )
153- {
154- sql . Append ( " ASC" ) ;
155- }
156- else
157- {
158- sql . Append ( " DESC" ) ;
159- }
160-
161- sql . Append ( $ " LIMIT { pagingInfo . PageSize } OFFSET { pagingInfo . Offset } ;") ;
162-
163- var query = sql . ToString ( ) ;
164- using var cmd = new NpgsqlCommand ( query , conn ) ;
165-
166- if ( ! string . IsNullOrWhiteSpace ( q ) )
167- {
168- cmd . Parameters . AddWithValue ( "search" , q ) ;
169- }
170- if ( ! string . IsNullOrWhiteSpace ( endpointName ) )
171- {
172- cmd . Parameters . AddWithValue ( "endpoint_name" , endpointName ) ;
173- }
174- if ( ! string . IsNullOrWhiteSpace ( conversationId ) )
175- {
176- cmd . Parameters . AddWithValue ( "conversation_id" , conversationId ) ;
177- }
178- if ( includeSystemMessages . HasValue )
179- {
180- cmd . Parameters . AddWithValue ( "is_system_message" , includeSystemMessages ) ;
181- }
182- if ( timeSentRange ? . From != null )
183- {
184- cmd . Parameters . AddWithValue ( "time_sent_start" , timeSentRange . From ) ;
185- }
186- if ( timeSentRange ? . To != null )
187- {
188- cmd . Parameters . AddWithValue ( "time_sent_end" , timeSentRange . To ) ;
189- }
190-
191- return await ReturnResults ( cmd , cancellationToken ) ;
192- }
193-
194- public async Task < QueryResult < IList < MessagesView > > > GetMessages ( bool includeSystemMessages , PagingInfo pagingInfo , SortInfo sortInfo , DateTimeRange timeSentRange , CancellationToken cancellationToken )
39+ public Task < QueryResult < IList < MessagesView > > > GetMessages ( bool includeSystemMessages , PagingInfo pagingInfo , SortInfo sortInfo , DateTimeRange timeSentRange , CancellationToken cancellationToken )
19540 {
196- return await GetAllMessages ( null , includeSystemMessages , pagingInfo , sortInfo , timeSentRange , null , null , cancellationToken ) ;
41+ var builder = new PostgresqlMessagesQueryBuilder ( )
42+ . WithSystemMessages ( includeSystemMessages )
43+ . WithTimeSentRange ( timeSentRange )
44+ . WithSorting ( sortInfo )
45+ . WithPaging ( pagingInfo ) ;
46+ return ExecuteMessagesQuery ( builder , cancellationToken ) ;
19747 }
19848
19949 public Task < QueryResult < IList < AuditCount > > > QueryAuditCounts ( string endpointName , CancellationToken cancellationToken ) => throw new NotImplementedException ( ) ;
20050 public Task < QueryResult < IList < KnownEndpointsView > > > QueryKnownEndpoints ( CancellationToken cancellationToken ) => throw new NotImplementedException ( ) ;
20151 public Task < QueryResult < IList < MessagesView > > > QueryMessages ( string searchParam , PagingInfo pagingInfo , SortInfo sortInfo , DateTimeRange ? timeSentRange = null , CancellationToken cancellationToken = default )
20252 {
203- return GetAllMessages ( null , null , pagingInfo , sortInfo , timeSentRange , searchParam , null , cancellationToken ) ;
53+ var builder = new PostgresqlMessagesQueryBuilder ( )
54+ . WithSearch ( searchParam )
55+ . WithTimeSentRange ( timeSentRange )
56+ . WithSorting ( sortInfo )
57+ . WithPaging ( pagingInfo ) ;
58+ return ExecuteMessagesQuery ( builder , cancellationToken ) ;
20459 }
205- public async Task < QueryResult < IList < MessagesView > > > QueryMessagesByConversationId ( string conversationId , PagingInfo pagingInfo , SortInfo sortInfo , CancellationToken cancellationToken )
60+
61+ public Task < QueryResult < IList < MessagesView > > > QueryMessagesByConversationId ( string conversationId , PagingInfo pagingInfo , SortInfo sortInfo , CancellationToken cancellationToken )
20662 {
207- return await GetAllMessages ( conversationId , null , pagingInfo , sortInfo , null , null , null , cancellationToken ) ;
63+ var builder = new PostgresqlMessagesQueryBuilder ( )
64+ . WithConversationId ( conversationId )
65+ . WithSorting ( sortInfo )
66+ . WithPaging ( pagingInfo ) ;
67+ return ExecuteMessagesQuery ( builder , cancellationToken ) ;
20868 }
20969
210- public async Task < QueryResult < IList < MessagesView > > > QueryMessagesByReceivingEndpoint ( bool includeSystemMessages , string endpointName , PagingInfo pagingInfo , SortInfo sortInfo , DateTimeRange ? timeSentRange = null , CancellationToken cancellationToken = default )
70+ public Task < QueryResult < IList < MessagesView > > > QueryMessagesByReceivingEndpoint ( bool includeSystemMessages , string endpointName , PagingInfo pagingInfo , SortInfo sortInfo , DateTimeRange ? timeSentRange = null , CancellationToken cancellationToken = default )
21171 {
212- return await GetAllMessages ( null , includeSystemMessages , pagingInfo , sortInfo , timeSentRange , null , endpointName , cancellationToken ) ;
72+ var builder = new PostgresqlMessagesQueryBuilder ( )
73+ . WithSystemMessages ( includeSystemMessages )
74+ . WithEndpointName ( endpointName )
75+ . WithTimeSentRange ( timeSentRange )
76+ . WithSorting ( sortInfo )
77+ . WithPaging ( pagingInfo ) ;
78+ return ExecuteMessagesQuery ( builder , cancellationToken ) ;
21379 }
21480
21581 public Task < QueryResult < IList < MessagesView > > > QueryMessagesByReceivingEndpointAndKeyword ( string endpoint , string keyword , PagingInfo pagingInfo , SortInfo sortInfo , DateTimeRange ? timeSentRange = null , CancellationToken cancellationToken = default )
21682 {
217- return GetAllMessages ( null , null , pagingInfo , sortInfo , timeSentRange , keyword , endpoint , cancellationToken ) ;
83+ var builder = new PostgresqlMessagesQueryBuilder ( )
84+ . WithSearch ( keyword )
85+ . WithEndpointName ( endpoint )
86+ . WithTimeSentRange ( timeSentRange )
87+ . WithSorting ( sortInfo )
88+ . WithPaging ( pagingInfo ) ;
89+ return ExecuteMessagesQuery ( builder , cancellationToken ) ;
21890 }
91+
21992 public Task < QueryResult < SagaHistory > > QuerySagaHistoryById ( Guid input , CancellationToken cancellationToken ) => throw new NotImplementedException ( ) ;
22093
94+ async Task < QueryResult < IList < MessagesView > > > ExecuteMessagesQuery (
95+ PostgresqlMessagesQueryBuilder builder ,
96+ CancellationToken cancellationToken )
97+ {
98+ using var conn = await connectionFactory . OpenConnection ( cancellationToken ) ;
99+ var ( query , parameters ) = builder . Build ( ) ;
100+ using var cmd = new NpgsqlCommand ( query , conn ) ;
101+ foreach ( var param in parameters )
102+ {
103+ cmd . Parameters . Add ( param ) ;
104+ }
105+ return await ReturnResults ( cmd , cancellationToken ) ;
106+ }
107+
108+ static T ? DeserializeOrDefault < T > ( Dictionary < string , object > dict , string key , T ? defaultValue = default )
109+ {
110+ if ( dict . TryGetValue ( key , out var value ) && value is JsonElement element && element . ValueKind != JsonValueKind . Null )
111+ {
112+ try
113+ {
114+ return JsonSerializer . Deserialize < T > ( element ) ;
115+ }
116+ catch { }
117+ }
118+ return defaultValue ;
119+ }
120+
121+ static T GetValue < T > ( NpgsqlDataReader reader , string column )
122+ => reader . GetFieldValue < T > ( reader . GetOrdinal ( column ) ) ;
123+
221124 async Task < QueryResult < IList < MessagesView > > > ReturnResults ( NpgsqlCommand cmd , CancellationToken cancellationToken = default )
222125 {
223126 var results = new List < MessagesView > ( ) ;
0 commit comments