@@ -27,24 +27,29 @@ public async Task StartAsync(CancellationToken cancellationToken)
2727 // Create processed_messages table
2828 await using ( var cmd = new NpgsqlCommand ( @"
2929 CREATE TABLE IF NOT EXISTS processed_messages (
30- id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
31- unique_message_id TEXT,
32- message_metadata JSONB,
33- headers JSONB,
34- processed_at TIMESTAMPTZ,
35- body BYTEA,
36- message_id TEXT,
37- message_type TEXT,
38- is_system_message BOOLEAN,
39- status NUMERIC,
40- time_sent TIMESTAMPTZ,
41- receiving_endpoint_name TEXT,
42- critical_time INTERVAL,
43- processing_time INTERVAL,
44- delivery_time INTERVAL,
45- conversation_id TEXT,
46- query tsvector
47- );" , connection ) )
30+ id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
31+ unique_message_id TEXT,
32+ message_metadata JSONB,
33+ headers JSONB,
34+ processed_at TIMESTAMPTZ,
35+ body BYTEA,
36+ message_id TEXT,
37+ message_type TEXT,
38+ is_system_message BOOLEAN,
39+ status NUMERIC,
40+ time_sent TIMESTAMPTZ,
41+ receiving_endpoint_name TEXT,
42+ critical_time INTERVAL,
43+ processing_time INTERVAL,
44+ delivery_time INTERVAL,
45+ conversation_id TEXT,
46+ query tsvector,
47+ created_at TIMESTAMPTZ NOT NULL DEFAULT now()
48+ )
49+ WITH (
50+ autovacuum_vacuum_scale_factor = 0.05,
51+ autovacuum_analyze_scale_factor = 0.02
52+ );" , connection ) )
4853 {
4954 await cmd . ExecuteNonQueryAsync ( cancellationToken ) ;
5055 }
@@ -67,22 +72,88 @@ BEFORE INSERT OR UPDATE ON processed_messages
6772 {
6873 await cmd . ExecuteNonQueryAsync ( cancellationToken ) ;
6974 }
75+
7076 // Create index on processed_messages for specified columns
7177 await using ( var cmd = new NpgsqlCommand ( @"
72- CREATE INDEX IF NOT EXISTS idx_processed_messages_multi ON processed_messages (
73- message_id,
74- time_sent,
75- receiving_endpoint_name,
76- critical_time,
77- processing_time,
78- delivery_time,
79- conversation_id,
78+ CREATE INDEX IF NOT EXISTS idx_processed_messages_receiving_endpoint_name ON processed_messages (
79+ receiving_endpoint_name
80+ );" , connection ) )
81+ {
82+ await cmd . ExecuteNonQueryAsync ( cancellationToken ) ;
83+ }
84+
85+ await using ( var cmd = new NpgsqlCommand ( @"
86+ CREATE INDEX IF NOT EXISTS idx_processed_messages_is_system_message ON processed_messages (
8087 is_system_message
8188 );" , connection ) )
8289 {
8390 await cmd . ExecuteNonQueryAsync ( cancellationToken ) ;
8491 }
8592
93+ await using ( var cmd = new NpgsqlCommand ( @"
94+ CREATE INDEX IF NOT EXISTS idx_processed_messages_by_time_sent ON processed_messages (
95+ time_sent
96+ );" , connection ) )
97+ {
98+ await cmd . ExecuteNonQueryAsync ( cancellationToken ) ;
99+ }
100+
101+ await using ( var cmd = new NpgsqlCommand ( @"
102+ CREATE INDEX IF NOT EXISTS idx_processed_messages_by_critical_time ON processed_messages (
103+ critical_time
104+ );" , connection ) )
105+ {
106+ await cmd . ExecuteNonQueryAsync ( cancellationToken ) ;
107+ }
108+
109+ await using ( var cmd = new NpgsqlCommand ( @"
110+ CREATE INDEX IF NOT EXISTS idx_processed_messages_by_processing_time ON processed_messages (
111+ processing_time
112+ );" , connection ) )
113+ {
114+ await cmd . ExecuteNonQueryAsync ( cancellationToken ) ;
115+ }
116+
117+ await using ( var cmd = new NpgsqlCommand ( @"
118+ CREATE INDEX IF NOT EXISTS idx_processed_messages_by_delivery_time ON processed_messages (
119+ delivery_time
120+ );" , connection ) )
121+ {
122+ await cmd . ExecuteNonQueryAsync ( cancellationToken ) ;
123+ }
124+
125+ await using ( var cmd = new NpgsqlCommand ( @"
126+ CREATE INDEX IF NOT EXISTS idx_processed_messages_by_message_id ON processed_messages (
127+ message_id
128+ );" , connection ) )
129+ {
130+ await cmd . ExecuteNonQueryAsync ( cancellationToken ) ;
131+ }
132+
133+ await using ( var cmd = new NpgsqlCommand ( @"
134+ CREATE INDEX IF NOT EXISTS idx_processed_messages_by_conversation_id ON processed_messages (
135+ conversation_id
136+ );" , connection ) )
137+ {
138+ await cmd . ExecuteNonQueryAsync ( cancellationToken ) ;
139+ }
140+
141+ await using ( var cmd = new NpgsqlCommand ( @"
142+ CREATE INDEX IF NOT EXISTS idx_processed_messages_by_created_at ON processed_messages (
143+ created_at
144+ );" , connection ) )
145+ {
146+ await cmd . ExecuteNonQueryAsync ( cancellationToken ) ;
147+ }
148+
149+ await using ( var cmd = new NpgsqlCommand ( @"
150+ CREATE INDEX IF NOT EXISTS idx_processed_messages_by_query ON processed_messages (
151+ query
152+ );" , connection ) )
153+ {
154+ await cmd . ExecuteNonQueryAsync ( cancellationToken ) ;
155+ }
156+
86157 // Create saga_snapshots table
87158 await using ( var cmd = new NpgsqlCommand ( @"
88159 CREATE TABLE IF NOT EXISTS saga_snapshots (
0 commit comments