Skip to content

Commit ecf20cc

Browse files
committed
Implemented saga logic
1 parent 7db5472 commit ecf20cc

File tree

3 files changed

+68
-30
lines changed

3 files changed

+68
-30
lines changed

src/ServiceControl.Audit.Persistence.PostgreSQL/PostgreSQLAuditDataStore.cs

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,39 @@ public Task<QueryResult<IList<MessagesView>>> QueryMessagesByReceivingEndpointAn
163163
return ExecuteMessagesQuery(builder, cancellationToken);
164164
}
165165

166-
public Task<QueryResult<SagaHistory>> QuerySagaHistoryById(Guid input, CancellationToken cancellationToken) => throw new NotImplementedException();
166+
public async Task<QueryResult<SagaHistory>> QuerySagaHistoryById(Guid input, CancellationToken cancellationToken)
167+
{
168+
await using var conn = await connectionFactory.OpenConnection(cancellationToken);
169+
await using var cmd = new NpgsqlCommand(@"
170+
SELECT
171+
id,
172+
saga_id,
173+
saga_type,
174+
changes
175+
FROM saga_snapshots
176+
WHERE saga_id = @saga_id
177+
LIMIT 1", conn);
178+
179+
cmd.Parameters.AddWithValue("saga_id", input);
180+
181+
await using var reader = await cmd.ExecuteReaderAsync(cancellationToken);
182+
183+
if (await reader.ReadAsync(cancellationToken))
184+
{
185+
var changes = GetValue<List<SagaStateChange>>(reader, "changes") ?? [];
186+
var sagaHistory = new SagaHistory
187+
{
188+
Id = GetValue<Guid>(reader, "id"),
189+
SagaId = GetValue<Guid>(reader, "saga_id"),
190+
SagaType = GetValue<string>(reader, "saga_type"),
191+
Changes = changes
192+
};
193+
194+
return new QueryResult<SagaHistory>(sagaHistory, new QueryStatsInfo(string.Empty, changes.Count));
195+
}
196+
197+
return QueryResult<SagaHistory>.Empty();
198+
}
167199

168200
async Task<QueryResult<IList<MessagesView>>> ExecuteMessagesQuery(
169201
PostgresqlMessagesQueryBuilder builder,

src/ServiceControl.Audit.Persistence.PostgreSQL/PostgreSQLPersistenceInstaller.cs

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -130,17 +130,19 @@ CREATE INDEX IF NOT EXISTS idx_processed_messages_by_message_id ON processed_mes
130130
await cmd.ExecuteNonQueryAsync(cancellationToken);
131131
}
132132

133+
133134
await using (var cmd = new NpgsqlCommand(@"
134-
CREATE INDEX IF NOT EXISTS idx_processed_messages_by_conversation_id ON processed_messages (
135-
conversation_id
135+
CREATE INDEX IF NOT EXISTS idx_processed_messages_by_created_at ON processed_messages (
136+
created_at
136137
);", connection))
137138
{
138139
await cmd.ExecuteNonQueryAsync(cancellationToken);
139140
}
140141

141142
await using (var cmd = new NpgsqlCommand(@"
142-
CREATE INDEX IF NOT EXISTS idx_processed_messages_by_created_at ON processed_messages (
143-
created_at
143+
CREATE INDEX IF NOT EXISTS idx_processed_messages_by_conversation ON processed_messages (
144+
conversation_id,
145+
time_sent
144146
);", connection))
145147
{
146148
await cmd.ExecuteNonQueryAsync(cancellationToken);
@@ -157,17 +159,19 @@ CREATE INDEX IF NOT EXISTS idx_processed_messages_by_query ON processed_messages
157159
// Create saga_snapshots table
158160
await using (var cmd = new NpgsqlCommand(@"
159161
CREATE TABLE IF NOT EXISTS saga_snapshots (
160-
id TEXT PRIMARY KEY,
162+
id UUID PRIMARY KEY,
161163
saga_id UUID,
162164
saga_type TEXT,
163-
start_time TIMESTAMPTZ,
164-
finish_time TIMESTAMPTZ,
165-
status TEXT,
166-
state_after_change TEXT,
167-
initiating_message JSONB,
168-
outgoing_messages JSONB,
169-
endpoint TEXT,
170-
processed_at TIMESTAMPTZ
165+
changes JSONB
166+
);", connection))
167+
{
168+
await cmd.ExecuteNonQueryAsync(cancellationToken);
169+
}
170+
171+
// Create index on saga_snapshots for faster saga_id lookups
172+
await using (var cmd = new NpgsqlCommand(@"
173+
CREATE INDEX IF NOT EXISTS idx_saga_snapshots_saga_id ON saga_snapshots (
174+
saga_id
171175
);", connection))
172176
{
173177
await cmd.ExecuteNonQueryAsync(cancellationToken);

src/ServiceControl.Audit.Persistence.PostgreSQL/UnitOfWork/PostgreSQLAuditIngestionUnitOfWork.cs

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -78,27 +78,29 @@ INSERT INTO processed_messages (
7878

7979
public Task RecordSagaSnapshot(SagaSnapshot sagaSnapshot, CancellationToken cancellationToken)
8080
{
81-
// Insert SagaSnapshot into saga_snapshots table
81+
var newChange = new
82+
{
83+
sagaSnapshot.StartTime,
84+
sagaSnapshot.FinishTime,
85+
sagaSnapshot.Status,
86+
sagaSnapshot.StateAfterChange,
87+
sagaSnapshot.InitiatingMessage,
88+
sagaSnapshot.OutgoingMessages,
89+
sagaSnapshot.Endpoint
90+
};
91+
92+
// Insert or update saga_snapshots table - add new change to the changes array
8293
var cmd = batch.CreateBatchCommand();
8394
cmd.CommandText = @"
84-
INSERT INTO saga_snapshots (
85-
id, saga_id, saga_type, start_time, finish_time, status, state_after_change, initiating_message, outgoing_messages, endpoint, processed_at
86-
) VALUES (
87-
@id, @saga_id, @saga_type, @start_time, @finish_time, @status, @state_after_change, @initiating_message, @outgoing_messages, @endpoint, @processed_at
88-
)
89-
ON CONFLICT (id) DO NOTHING;";
95+
INSERT INTO saga_snapshots (id, saga_id, saga_type, changes)
96+
VALUES (@saga_id, @saga_id, @saga_type, @new_change)
97+
ON CONFLICT (id) DO UPDATE SET
98+
changes = COALESCE(saga_snapshots.changes, '[]'::jsonb) || @new_change::jsonb;";
9099

91-
cmd.Parameters.AddWithValue("id", sagaSnapshot.Id);
92100
cmd.Parameters.AddWithValue("saga_id", sagaSnapshot.SagaId);
93101
cmd.Parameters.AddWithValue("saga_type", sagaSnapshot.SagaType);
94-
cmd.Parameters.AddWithValue("start_time", sagaSnapshot.StartTime);
95-
cmd.Parameters.AddWithValue("finish_time", sagaSnapshot.FinishTime);
96-
cmd.Parameters.AddWithValue("status", sagaSnapshot.Status.ToString());
97-
cmd.Parameters.AddWithValue("state_after_change", sagaSnapshot.StateAfterChange);
98-
cmd.Parameters.AddWithValue("initiating_message", NpgsqlTypes.NpgsqlDbType.Jsonb, sagaSnapshot.InitiatingMessage);
99-
cmd.Parameters.AddWithValue("outgoing_messages", NpgsqlTypes.NpgsqlDbType.Jsonb, sagaSnapshot.OutgoingMessages);
100-
cmd.Parameters.AddWithValue("endpoint", sagaSnapshot.Endpoint);
101-
cmd.Parameters.AddWithValue("processed_at", sagaSnapshot.ProcessedAt);
102+
cmd.Parameters.AddWithValue("new_change", NpgsqlTypes.NpgsqlDbType.Jsonb, new[] { newChange });
103+
102104
batch.BatchCommands.Add(cmd);
103105

104106
return Task.CompletedTask;

0 commit comments

Comments
 (0)