Skip to content

Commit c502fab

Browse files
committed
Made known_endpoint table insert only
1 parent 892799a commit c502fab

File tree

5 files changed

+87
-17
lines changed

5 files changed

+87
-17
lines changed

src/ServiceControl.Audit.Persistence.PostgreSQL/PostgreSQLPersistence.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,5 +25,6 @@ public void AddPersistence(IServiceCollection services)
2525
services.AddSingleton<IBodyStorage, PostgreSQLAttachmentsBodyStorage>();
2626
services.AddSingleton<PostgreSQLConnectionFactory>();
2727
services.AddHostedService<RetentionCleanupService>();
28+
services.AddHostedService<UpdateKnownEndpointTable>();
2829
}
2930
}

src/ServiceControl.Audit.Persistence.PostgreSQL/PostgreSQLPersistenceInstaller.cs

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -186,15 +186,28 @@ CREATE INDEX IF NOT EXISTS idx_saga_snapshots_saga_id ON saga_snapshots (
186186
await cmd.ExecuteNonQueryAsync(cancellationToken);
187187
}
188188

189+
// Create known_endpoints table
190+
await using (var cmd = new NpgsqlCommand(@"
191+
CREATE TABLE IF NOT EXISTS known_endpoints_insert (
192+
id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
193+
endpoint_id TEXT,
194+
name TEXT,
195+
host_id UUID,
196+
host TEXT,
197+
last_seen TIMESTAMPTZ
198+
);", connection))
199+
{
200+
await cmd.ExecuteNonQueryAsync(cancellationToken);
201+
}
202+
189203
// Create known_endpoints table
190204
await using (var cmd = new NpgsqlCommand(@"
191205
CREATE TABLE IF NOT EXISTS known_endpoints (
192206
id TEXT PRIMARY KEY,
193207
name TEXT,
194208
host_id UUID,
195209
host TEXT,
196-
last_seen TIMESTAMPTZ,
197-
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
210+
last_seen TIMESTAMPTZ
198211
);", connection))
199212
{
200213
await cmd.ExecuteNonQueryAsync(cancellationToken);
@@ -212,11 +225,6 @@ CREATE OR REPLACE FUNCTION update_updated_at_column() RETURNS trigger AS $$
212225
DROP TRIGGER IF EXISTS saga_snapshots_updated_at_trigger ON saga_snapshots;
213226
CREATE TRIGGER saga_snapshots_updated_at_trigger
214227
BEFORE UPDATE ON saga_snapshots
215-
FOR EACH ROW EXECUTE FUNCTION update_updated_at_column();
216-
217-
DROP TRIGGER IF EXISTS known_endpoints_updated_at_trigger ON known_endpoints;
218-
CREATE TRIGGER known_endpoints_updated_at_trigger
219-
BEFORE UPDATE ON known_endpoints
220228
FOR EACH ROW EXECUTE FUNCTION update_updated_at_column();", connection))
221229
{
222230
await cmd.ExecuteNonQueryAsync(cancellationToken);

src/ServiceControl.Audit.Persistence.PostgreSQL/RetentionCleanupService.cs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ async Task CleanupOldMessages(CancellationToken cancellationToken)
4949
await CleanupTable("saga_snapshots", "updated_at", cutoffDate, conn, cancellationToken);
5050

5151
// Cleanup known endpoints
52-
await CleanupTable("known_endpoints", "updated_at", cutoffDate, conn, cancellationToken);
52+
await CleanupTable("known_endpoints", "last_seen", cutoffDate, conn, cancellationToken);
5353
}
5454

5555
async Task CleanupTable(string tableName, string dateColumn, DateTime cutoffDate, NpgsqlConnection conn, CancellationToken cancellationToken)
@@ -59,10 +59,11 @@ async Task CleanupTable(string tableName, string dateColumn, DateTime cutoffDate
5959

6060
while (!cancellationToken.IsCancellationRequested)
6161
{
62-
// Delete in batches
62+
// Delete in batches - skip if another process is already cleaning this table
6363
var sql = $@"
6464
DELETE FROM {tableName}
65-
WHERE ctid IN (
65+
WHERE pg_try_advisory_xact_lock(hashtext('{tableName}'))
66+
AND ctid IN (
6667
SELECT ctid FROM {tableName}
6768
WHERE {dateColumn} < @cutoff
6869
LIMIT {batchSize}

src/ServiceControl.Audit.Persistence.PostgreSQL/UnitOfWork/PostgreSQLAuditIngestionUnitOfWork.cs

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -129,15 +129,14 @@ public Task RecordKnownEndpoint(KnownEndpoint knownEndpoint, CancellationToken c
129129
// Insert KnownEndpoint into known_endpoints table
130130
var cmd = batch.CreateBatchCommand();
131131
cmd.CommandText = @"
132-
INSERT INTO known_endpoints (
133-
id, name, host_id, host, last_seen
132+
133+
INSERT INTO known_endpoints_insert (
134+
endpoint_id, name, host_id, host, last_seen
134135
) VALUES (
135-
@id, @name, @host_id, @host, @last_seen
136-
)
137-
ON CONFLICT (id) DO UPDATE SET
138-
last_seen = GREATEST(known_endpoints.last_seen, EXCLUDED.last_seen);";
136+
@endpoint_id, @name, @host_id, @host, @last_seen
137+
);";
139138

140-
cmd.Parameters.AddWithValue("id", knownEndpoint.Id);
139+
cmd.Parameters.AddWithValue("endpoint_id", knownEndpoint.Id);
141140
cmd.Parameters.AddWithValue("name", knownEndpoint.Name);
142141
cmd.Parameters.AddWithValue("host_id", knownEndpoint.HostId);
143142
cmd.Parameters.AddWithValue("host", knownEndpoint.Host);
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
namespace ServiceControl.Audit.Persistence.PostgreSQL;
2+
3+
using System;
4+
using System.Threading;
5+
using System.Threading.Tasks;
6+
using Microsoft.Extensions.Hosting;
7+
using Microsoft.Extensions.Logging;
8+
using Npgsql;
9+
10+
class UpdateKnownEndpointTable(
11+
ILogger<RetentionCleanupService> logger,
12+
PostgreSQLConnectionFactory connectionFactory) : BackgroundService
13+
{
14+
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
15+
{
16+
logger.LogInformation($"{nameof(UpdateKnownEndpointTable)} started.");
17+
18+
while (!stoppingToken.IsCancellationRequested)
19+
{
20+
try
21+
{
22+
await Task.Delay(TimeSpan.FromSeconds(30), stoppingToken);
23+
24+
await UpdateTable(stoppingToken);
25+
}
26+
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
27+
{
28+
logger.LogInformation($"{nameof(UpdateKnownEndpointTable)} stopped.");
29+
break;
30+
}
31+
catch (Exception ex)
32+
{
33+
logger.LogError(ex, "Error during update known_endpoints table.");
34+
}
35+
}
36+
}
37+
38+
async Task UpdateTable(CancellationToken stoppingToken)
39+
{
40+
await using var conn = await connectionFactory.OpenConnection(stoppingToken);
41+
42+
var sql = @"
43+
DO $$
44+
BEGIN
45+
IF pg_try_advisory_xact_lock(hashtext('known_endpoints_sync')) THEN
46+
INSERT INTO known_endpoints (id, name, host_id, host, last_seen)
47+
SELECT DISTINCT ON (endpoint_id) endpoint_id, name, host_id, host, last_seen
48+
FROM known_endpoints_insert
49+
ORDER BY endpoint_id, last_seen DESC
50+
ON CONFLICT (id) DO UPDATE SET
51+
last_seen = GREATEST(known_endpoints.last_seen, EXCLUDED.last_seen);
52+
53+
DELETE FROM known_endpoints_insert;
54+
END IF;
55+
END $$;
56+
";
57+
58+
await using var cmd = new NpgsqlCommand(sql, conn);
59+
await cmd.ExecuteNonQueryAsync(stoppingToken);
60+
}
61+
}

0 commit comments

Comments
 (0)