@@ -33,7 +33,19 @@ public async ValueTask DisposeAsync()
3333
3434 public Task RecordProcessedMessage ( ProcessedMessage processedMessage , ReadOnlyMemory < byte > body , CancellationToken cancellationToken )
3535 {
36- T GetMetadata < T > ( string key ) => processedMessage . MessageMetadata . TryGetValue ( key , out var value ) ? ( T ) value ?? default : default ;
36+ T ? GetMetadata < T > ( string key )
37+ {
38+ if ( processedMessage . MessageMetadata . TryGetValue ( key , out var value ) )
39+ {
40+ return ( T ? ) value ;
41+ }
42+ else
43+ {
44+ return default ;
45+ }
46+ }
47+
48+ object GetMetadataOrDbNull < T > ( string key ) => GetMetadata < T > ( key ) ?? ( object ) DBNull . Value ;
3749
3850 // Insert ProcessedMessage into processed_messages table
3951 var cmd = batch . CreateBatchCommand ( ) ;
@@ -61,15 +73,15 @@ INSERT INTO processed_messages (
6173 cmd . Parameters . AddWithValue ( "message_metadata" , NpgsqlTypes . NpgsqlDbType . Jsonb , processedMessage . MessageMetadata ) ;
6274 cmd . Parameters . AddWithValue ( "headers" , NpgsqlTypes . NpgsqlDbType . Jsonb , processedMessage . Headers ) ;
6375 cmd . Parameters . AddWithValue ( "processed_at" , processedMessage . ProcessedAt ) ;
64- cmd . Parameters . AddWithValue ( "message_id" , GetMetadata < string > ( "MessageId" ) ) ;
65- cmd . Parameters . AddWithValue ( "message_type" , GetMetadata < string > ( "MessageType" ) ) ;
66- cmd . Parameters . AddWithValue ( "is_system_message" , GetMetadata < bool > ( "IsSystemMessage" ) ) ;
67- cmd . Parameters . AddWithValue ( "time_sent" , GetMetadata < DateTime > ( "TimeSent" ) ) ;
76+ cmd . Parameters . AddWithValue ( "message_id" , GetMetadataOrDbNull < string > ( "MessageId" ) ) ;
77+ cmd . Parameters . AddWithValue ( "message_type" , GetMetadataOrDbNull < string > ( "MessageType" ) ) ;
78+ cmd . Parameters . AddWithValue ( "is_system_message" , GetMetadataOrDbNull < bool > ( "IsSystemMessage" ) ) ;
79+ cmd . Parameters . AddWithValue ( "time_sent" , GetMetadataOrDbNull < DateTime > ( "TimeSent" ) ) ;
6880 cmd . Parameters . AddWithValue ( "receiving_endpoint_name" , GetMetadata < EndpointDetails > ( "ReceivingEndpoint" ) ? . Name ?? ( object ) DBNull . Value ) ;
69- cmd . Parameters . AddWithValue ( "critical_time" , GetMetadata < TimeSpan > ( "CriticalTime" ) ) ;
70- cmd . Parameters . AddWithValue ( "processing_time" , GetMetadata < TimeSpan > ( "ProcessingTime" ) ) ;
71- cmd . Parameters . AddWithValue ( "delivery_time" , GetMetadata < TimeSpan > ( "DeliveryTime" ) ) ;
72- cmd . Parameters . AddWithValue ( "conversation_id" , GetMetadata < string > ( "ConversationId" ) ) ;
81+ cmd . Parameters . AddWithValue ( "critical_time" , GetMetadataOrDbNull < TimeSpan > ( "CriticalTime" ) ) ;
82+ cmd . Parameters . AddWithValue ( "processing_time" , GetMetadataOrDbNull < TimeSpan > ( "ProcessingTime" ) ) ;
83+ cmd . Parameters . AddWithValue ( "delivery_time" , GetMetadataOrDbNull < TimeSpan > ( "DeliveryTime" ) ) ;
84+ cmd . Parameters . AddWithValue ( "conversation_id" , GetMetadataOrDbNull < string > ( "ConversationId" ) ) ;
7385 cmd . Parameters . AddWithValue ( "status" , ( int ) ( GetMetadata < bool > ( "IsRetried" ) ? MessageStatus . ResolvedSuccessfully : MessageStatus . Successful ) ) ;
7486
7587 batch . BatchCommands . Add ( cmd ) ;
0 commit comments