Skip to content

Commit 3f80af8

Browse files
authored
Add neon.logical_replication_max_logicalsnapdir_size
This GUC will drop replication slots if the size of the pg_logical/snapshots directory (not including temp snapshot files) becomes larger than the specified size. Keeping the size of this directory smaller will help with basebackup size from the pageserver. Part-of: #8619 Signed-off-by: Tristan Partin <[email protected]>
1 parent a61d81b commit 3f80af8

File tree

1 file changed

+114
-33
lines changed

1 file changed

+114
-33
lines changed

pgxn/neon/logical_replication_monitor.c

Lines changed: 114 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
1+
#include <dirent.h>
12
#include <limits.h>
23
#include <string.h>
3-
#include <dirent.h>
44
#include <signal.h>
5+
#include <sys/stat.h>
56

67
#include "postgres.h"
78

@@ -21,17 +22,35 @@
2122

2223
static int logical_replication_max_snap_files = 300;
2324

25+
/*
26+
* According to Chi (shyzh), the pageserver _should_ be good with 10 MB worth of
27+
* snapshot files. Let's use 8 MB since 8 is a power of 2.
28+
*/
29+
static int logical_replication_max_logicalsnapdir_size = 8000;
30+
31+
/*
32+
* A primitive description of a logical snapshot file including the LSN of the
33+
* file and its size.
34+
*/
35+
typedef struct SnapDesc {
36+
XLogRecPtr lsn;
37+
off_t sz;
38+
} SnapDesc;
39+
2440
PGDLLEXPORT void LogicalSlotsMonitorMain(Datum main_arg);
2541

42+
/*
43+
* Sorts an array of snapshot descriptors by their LSN.
44+
*/
2645
static int
27-
LsnDescComparator(const void *a, const void *b)
46+
SnapDescComparator(const void *a, const void *b)
2847
{
29-
XLogRecPtr lsn1 = *((const XLogRecPtr *) a);
30-
XLogRecPtr lsn2 = *((const XLogRecPtr *) b);
48+
const SnapDesc *desc1 = a;
49+
const SnapDesc *desc2 = b;
3150

32-
if (lsn1 < lsn2)
51+
if (desc1->lsn < desc2->lsn)
3352
return 1;
34-
else if (lsn1 == lsn2)
53+
else if (desc1->lsn == desc2->lsn)
3554
return 0;
3655
else
3756
return -1;
@@ -43,28 +62,39 @@ LsnDescComparator(const void *a, const void *b)
4362
* slots having lower restart_lsn should be dropped.
4463
*/
4564
static XLogRecPtr
46-
get_num_snap_files_lsn_threshold(void)
65+
get_snapshots_cutoff_lsn(void)
4766
{
67+
/* PG 18 has a constant defined for this, PG_LOGICAL_SNAPSHOTS_DIR */
68+
#define SNAPDIR "pg_logical/snapshots"
69+
4870
DIR *dirdesc;
71+
int dirdesc_fd;
4972
struct dirent *de;
50-
char *snap_path = "pg_logical/snapshots/";
51-
int lsns_allocated = 1024;
52-
int lsns_num = 0;
53-
XLogRecPtr *lsns;
54-
XLogRecPtr cutoff;
73+
size_t snapshot_index = 0;
74+
SnapDesc *snapshot_descriptors;
75+
size_t descriptors_allocated = 1024;
76+
XLogRecPtr cutoff = 0;
77+
off_t logicalsnapdir_size = 0;
78+
const int logical_replication_max_logicalsnapdir_size_bytes = logical_replication_max_logicalsnapdir_size * 1000;
5579

56-
if (logical_replication_max_snap_files < 0)
80+
if (logical_replication_max_snap_files < 0 && logical_replication_max_logicalsnapdir_size < 0)
5781
return 0;
5882

59-
lsns = palloc(sizeof(XLogRecPtr) * lsns_allocated);
83+
snapshot_descriptors = palloc(sizeof(*snapshot_descriptors) * descriptors_allocated);
84+
85+
dirdesc = AllocateDir(SNAPDIR);
86+
dirdesc_fd = dirfd(dirdesc);
87+
if (dirdesc_fd == -1)
88+
ereport(ERROR, errmsg("failed to get a file descriptor for " SNAPDIR ": %m"));
6089

6190
/* find all .snap files and get their lsns */
62-
dirdesc = AllocateDir(snap_path);
63-
while ((de = ReadDir(dirdesc, snap_path)) != NULL)
91+
while ((de = ReadDir(dirdesc, SNAPDIR)) != NULL)
6492
{
65-
XLogRecPtr lsn;
6693
uint32 hi;
6794
uint32 lo;
95+
struct stat st;
96+
XLogRecPtr lsn;
97+
SnapDesc *desc;
6898

6999
if (strcmp(de->d_name, ".") == 0 ||
70100
strcmp(de->d_name, "..") == 0)
@@ -79,28 +109,69 @@ get_num_snap_files_lsn_threshold(void)
79109

80110
lsn = ((uint64) hi) << 32 | lo;
81111
elog(DEBUG5, "found snap file %X/%X", LSN_FORMAT_ARGS(lsn));
82-
if (lsns_allocated == lsns_num)
112+
113+
if (fstatat(dirdesc_fd, de->d_name, &st, 0) == -1)
114+
ereport(ERROR, errmsg("failed to get the size of " SNAPDIR "/%s: %m", de->d_name));
115+
116+
if (descriptors_allocated == snapshot_index)
83117
{
84-
lsns_allocated *= 2;
85-
lsns = repalloc(lsns, sizeof(XLogRecPtr) * lsns_allocated);
118+
descriptors_allocated *= 2;
119+
snapshot_descriptors = repalloc(snapshot_descriptors, sizeof(*snapshot_descriptors) * descriptors_allocated);
86120
}
87-
lsns[lsns_num++] = lsn;
121+
122+
desc = &snapshot_descriptors[snapshot_index++];
123+
desc->lsn = lsn;
124+
desc->sz = st.st_size;
88125
}
89-
/* sort by lsn desc */
90-
qsort(lsns, lsns_num, sizeof(XLogRecPtr), LsnDescComparator);
91-
/* and take cutoff at logical_replication_max_snap_files */
92-
if (logical_replication_max_snap_files > lsns_num)
93-
cutoff = 0;
94-
/* have less files than cutoff */
95-
else
126+
127+
qsort(snapshot_descriptors, snapshot_index, sizeof(*snapshot_descriptors), SnapDescComparator);
128+
129+
/* Are there more snapshot files than specified? */
130+
if (logical_replication_max_snap_files <= snapshot_index)
96131
{
97-
cutoff = lsns[logical_replication_max_snap_files - 1];
98-
elog(LOG, "ls_monitor: dropping logical slots with restart_lsn lower %X/%X, found %d .snap files, limit is %d",
99-
LSN_FORMAT_ARGS(cutoff), lsns_num, logical_replication_max_snap_files);
132+
cutoff = snapshot_descriptors[logical_replication_max_snap_files - 1].lsn;
133+
elog(LOG,
134+
"ls_monitor: dropping logical slots with restart_lsn lower %X/%X, found %zu snapshot files, limit is %d",
135+
LSN_FORMAT_ARGS(cutoff), snapshot_index, logical_replication_max_snap_files);
100136
}
101-
pfree(lsns);
137+
138+
/* Is the size of the logical snapshots directory larger than specified?
139+
*
140+
* It's possible we could hit both thresholds, so remove any extra files
141+
* first, and then truncate based on size of the remaining files.
142+
*/
143+
if (logicalsnapdir_size > logical_replication_max_logicalsnapdir_size_bytes)
144+
{
145+
/* Unfortunately, iterating the directory does not guarantee any order
146+
* so we can't cache an index in the preceding loop.
147+
*/
148+
149+
off_t sz;
150+
const XLogRecPtr original = cutoff;
151+
152+
sz = snapshot_descriptors[0].sz;
153+
for (size_t i = 1; i < logical_replication_max_snap_files; ++i)
154+
{
155+
if (sz > logical_replication_max_logicalsnapdir_size_bytes)
156+
{
157+
cutoff = snapshot_descriptors[i - 1].lsn;
158+
break;
159+
}
160+
161+
sz += snapshot_descriptors[i].sz;
162+
}
163+
164+
if (cutoff != original)
165+
elog(LOG, "ls_monitor: dropping logical slots with restart_lsn lower than %X/%X, " SNAPDIR " is larger than %d KB",
166+
LSN_FORMAT_ARGS(cutoff), logical_replication_max_logicalsnapdir_size);
167+
}
168+
169+
pfree(snapshot_descriptors);
102170
FreeDir(dirdesc);
171+
103172
return cutoff;
173+
174+
#undef SNAPDIR
104175
}
105176

106177
void
@@ -118,6 +189,16 @@ InitLogicalReplicationMonitor(void)
118189
0,
119190
NULL, NULL, NULL);
120191

192+
DefineCustomIntVariable(
193+
"neon.logical_replication_max_logicalsnapdir_size",
194+
"Maximum allowed size of the pg_logical/snapshots directory (KB). When exceeded, slots are dropped until the limit is met. -1 disables the limit.",
195+
NULL,
196+
&logical_replication_max_logicalsnapdir_size,
197+
8000, -1, INT_MAX,
198+
PGC_SIGHUP,
199+
GUC_UNIT_KB,
200+
NULL, NULL, NULL);
201+
121202
memset(&bgw, 0, sizeof(bgw));
122203
bgw.bgw_flags = BGWORKER_SHMEM_ACCESS;
123204
bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
@@ -162,7 +243,7 @@ LogicalSlotsMonitorMain(Datum main_arg)
162243
* If there are too many .snap files, just drop all logical slots to
163244
* prevent aux files bloat.
164245
*/
165-
cutoff_lsn = get_num_snap_files_lsn_threshold();
246+
cutoff_lsn = get_snapshots_cutoff_lsn();
166247
if (cutoff_lsn > 0)
167248
{
168249
for (int i = 0; i < max_replication_slots; i++)

0 commit comments

Comments
 (0)