Skip to content

Commit faee4f1

Browse files
authored
Delete conflict timestamp handling (#320)
1 parent 064127a commit faee4f1

File tree

8 files changed

+258
-62
lines changed

8 files changed

+258
-62
lines changed

include/spock_conflict.h

Lines changed: 24 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -43,41 +43,38 @@ extern int spock_conflict_resolver;
4343
extern int spock_conflict_log_level;
4444
extern bool spock_save_resolutions;
4545

46-
/* Avoid conflicts with PG's conflict.h via pgstat.h */
47-
#ifndef CONFLICT_H
4846
/*
49-
* From include/replication/conflict.h in PostgreSQL
50-
* Once we only support >= PG17, we may just include it.
51-
*
52-
* Conflict types that could occur while applying remote changes.
47+
* We want to eventually match native PostgreSQL conflict types,
48+
* so use same ordering and similar naming.
49+
* We add one additional conflict type, CT_DELETE_LATE.
5350
*/
5451
typedef enum
5552
{
56-
/* The row to be inserted violates unique constraint */
57-
CT_INSERT_EXISTS,
53+
/* The row to be inserted violates unique constraint */
54+
SPOCK_CT_INSERT_EXISTS,
55+
56+
/* The row to be updated was modified by a different origin */
57+
SPOCK_CT_UPDATE_ORIGIN_DIFFERS,
5858

59-
/* The row to be updated was modified by a different origin */
60-
CT_UPDATE_ORIGIN_DIFFERS,
59+
/* The updated row value violates unique constraint */
60+
SPOCK_CT_UPDATE_EXISTS,
6161

62-
/* The updated row value violates unique constraint */
63-
CT_UPDATE_EXISTS,
62+
/* The row to be updated is missing */
63+
SPOCK_CT_UPDATE_MISSING,
6464

65-
/* The row to be updated is missing */
66-
CT_UPDATE_MISSING,
65+
/* The row to be deleted was modified by a different origin */
66+
SPOCK_CT_DELETE_ORIGIN_DIFFERS,
6767

68-
/* The row to be deleted was modified by a different origin */
69-
CT_DELETE_ORIGIN_DIFFERS,
68+
/* The row to be deleted is missing */
69+
SPOCK_CT_DELETE_MISSING,
7070

71-
/* The row to be deleted is missing */
72-
CT_DELETE_MISSING
71+
/*
72+
* Unique to Spock, delete timestamp is earlier than an existing row.
73+
* Use a higher number so we don't conflict with PostgreSQL in the future.
74+
*/
75+
SPOCK_CT_DELETE_LATE = 101
7376

74-
/*
75-
* Other conflicts, such as exclusion constraint violations, involve more
76-
* complex rules than simple equality checks. These conflicts are left for
77-
* future improvements.
78-
*/
79-
} ConflictType;
80-
#endif
77+
} SpockConflictType;
8178

8279
extern int spock_conflict_resolver;
8380
extern int spock_conflict_log_level;
@@ -92,7 +89,7 @@ extern bool try_resolve_conflict(Relation rel, HeapTuple localtuple,
9289
RepOriginId local_origin, TimestampTz local_ts,
9390
SpockConflictResolution *resolution);
9491

95-
extern void spock_report_conflict(ConflictType conflict_type,
92+
extern void spock_report_conflict(SpockConflictType conflict_type,
9693
SpockRelation *rel,
9794
HeapTuple localtuple,
9895
SpockTupleData *oldkey,
@@ -105,7 +102,7 @@ extern void spock_report_conflict(ConflictType conflict_type,
105102
TimestampTz local_tuple_timestamp,
106103
Oid conflict_idx_id);
107104

108-
extern void spock_conflict_log_table(ConflictType conflict_type,
105+
extern void spock_conflict_log_table(SpockConflictType conflict_type,
109106
SpockRelation *rel,
110107
HeapTuple localtuple,
111108
SpockTupleData *oldkey,

include/spock_output_plugin.h

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -93,15 +93,18 @@ typedef struct SpockOutputSlotGroup
9393
TimestampTz last_commit_ts;
9494
} SpockOutputSlotGroup;
9595

96-
/*
97-
* Custom WAL messages
98-
*/
9996
extern bool spock_replication_repair_mode;
10097

10198
#define SPOCK_REPAIR_MODE_ON 1 /* Suppress subsequent DML/DDL */
10299
#define SPOCK_REPAIR_MODE_OFF 2 /* Resume regular replication */
103100
#define SPOCK_SYNC_EVENT_MSG 3 /* Sync event message */
104101

102+
extern int spock_output_delay;
103+
104+
/*
105+
* Custom WAL messages
106+
*/
107+
105108
typedef struct SpockWalMessageSimple
106109
{
107110
int32 mtype;

src/spock.c

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1172,6 +1172,19 @@ _PG_init(void)
11721172
0,
11731173
NULL, NULL, NULL);
11741174

1175+
DefineCustomIntVariable("spock.output_delay",
1176+
"For testing conflicts, delay in output plugin in ms",
1177+
"For testing conflicts, delay in output plugin in milliseconds",
1178+
&spock_output_delay,
1179+
0,
1180+
0,
1181+
60000,
1182+
PGC_SIGHUP,
1183+
0,
1184+
NULL,
1185+
NULL,
1186+
NULL);
1187+
11751188
if (IsBinaryUpgrade)
11761189
return;
11771190

src/spock_apply_heap.c

Lines changed: 45 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -749,7 +749,7 @@ spock_handle_conflict_and_apply(SpockRelation *rel, EState *estate,
749749
* See if we need to log any conflict to the server log and spock.resolutions
750750
* Calling this does not necessarily mean that there is a conflict
751751
*/
752-
spock_report_conflict(is_insert ? CT_INSERT_EXISTS : CT_UPDATE_EXISTS,
752+
spock_report_conflict(is_insert ? SPOCK_CT_INSERT_EXISTS : SPOCK_CT_UPDATE_EXISTS,
753753
rel, TTS_TUP(localslot), oldtup,
754754
remotetuple, applytuple, resolution,
755755
xmin, local_origin_found, local_origin,
@@ -1065,7 +1065,7 @@ spock_apply_heap_update(SpockRelation *rel, SpockTupleData *oldtup,
10651065
}
10661066
else
10671067
{
1068-
/* CT_UPDATE_MISSING case gets logged in exception_log, not resolutions */
1068+
/* SPOCK_CT_UPDATE_MISSING case gets logged in exception_log, not resolutions */
10691069
SpockExceptionLog *exception_log = &exception_log_ptr[my_exception_log_index];
10701070

10711071
/*
@@ -1164,7 +1164,15 @@ spock_apply_heap_delete(SpockRelation *rel, SpockTupleData *oldtup)
11641164
*/
11651165
if (found)
11661166
{
1167-
UserContext ucxt;
1167+
UserContext ucxt;
1168+
TransactionId xmin;
1169+
TimestampTz local_ts;
1170+
RepOriginId local_origin;
1171+
bool local_origin_found;
1172+
HeapTuple applytuple;
1173+
SpockConflictResolution resolution;
1174+
1175+
11681176
SpockExceptionLog *exception_log = &exception_log_ptr[my_exception_log_index];
11691177

11701178
/*
@@ -1177,14 +1185,40 @@ spock_apply_heap_delete(SpockRelation *rel, SpockTupleData *oldtup)
11771185
exception_log->local_tuple = heap_copytuple(local_tuple);
11781186
MemoryContextSwitchTo(oldctx);
11791187

1180-
/* Make sure that any user-supplied code runs as the table owner. */
1181-
SwitchToUntrustedUser(rel->rel->rd_rel->relowner, &ucxt);
1188+
local_origin_found = get_tuple_origin(rel, local_tuple,
1189+
&(local_tuple->t_self), &xmin,
1190+
&local_origin, &local_ts);
11821191

1183-
/* Delete the tuple found */
1184-
EvalPlanQualSetSlot(&epqstate, remoteslot);
1185-
ExecSimpleRelationDelete(edata->targetRelInfo, estate, &epqstate,
1186-
localslot);
1187-
RestoreUserContext(&ucxt);
1192+
/*
1193+
* Check if the local tuple was inserted/updated after this DELETE.
1194+
*/
1195+
if (!try_resolve_conflict(rel->rel, TTS_TUP(localslot),
1196+
NULL, /* remotetuple */
1197+
&applytuple, local_origin,
1198+
local_ts, &resolution))
1199+
{
1200+
/* Current DELETE happened before current tuple */
1201+
spock_report_conflict(SPOCK_CT_DELETE_LATE,
1202+
rel, TTS_TUP(localslot), oldtup,
1203+
NULL, /* remotetuple */
1204+
local_tuple, SpockResolution_Skip,
1205+
xmin, local_origin_found, local_origin,
1206+
local_ts, edata->targetRel->idxoid
1207+
);
1208+
}
1209+
else
1210+
{
1211+
/* DELETE happened after (usual case) */
1212+
1213+
/* Make sure that any user-supplied code runs as the table owner. */
1214+
SwitchToUntrustedUser(rel->rel->rd_rel->relowner, &ucxt);
1215+
1216+
/* Delete the tuple found */
1217+
EvalPlanQualSetSlot(&epqstate, remoteslot);
1218+
ExecSimpleRelationDelete(edata->targetRelInfo, estate, &epqstate,
1219+
localslot);
1220+
RestoreUserContext(&ucxt);
1221+
}
11881222
}
11891223
else
11901224
{
@@ -1198,7 +1232,7 @@ spock_apply_heap_delete(SpockRelation *rel, SpockTupleData *oldtup)
11981232
exception_log->local_tuple = NULL;
11991233
remotetuple = heap_form_tuple(RelationGetDescr(rel->rel),
12001234
oldtup->values, oldtup->nulls);
1201-
spock_report_conflict(CT_DELETE_MISSING,
1235+
spock_report_conflict(SPOCK_CT_DELETE_MISSING,
12021236
rel, NULL, oldtup,
12031237
remotetuple, NULL, SpockResolution_Skip,
12041238
InvalidTransactionId, false, InvalidRepOriginId,

src/spock_conflict.c

Lines changed: 47 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -57,13 +57,14 @@
5757

5858

5959
/* From src/backend/replication/logical/conflict.c */
60-
static const char *const ConflictTypeNames[] = {
61-
[CT_INSERT_EXISTS] = "insert_exists",
62-
[CT_UPDATE_ORIGIN_DIFFERS] = "update_origin_differs",
63-
[CT_UPDATE_EXISTS] = "update_exists",
64-
[CT_UPDATE_MISSING] = "update_missing",
65-
[CT_DELETE_ORIGIN_DIFFERS] = "delete_origin_differs",
66-
[CT_DELETE_MISSING] = "delete_missing"
60+
static const char *const SpockConflictTypeNames[] = {
61+
[SPOCK_CT_INSERT_EXISTS] = "insert_exists",
62+
[SPOCK_CT_UPDATE_ORIGIN_DIFFERS] = "update_origin_differs",
63+
[SPOCK_CT_UPDATE_EXISTS] = "update_exists",
64+
[SPOCK_CT_UPDATE_MISSING] = "update_missing",
65+
[SPOCK_CT_DELETE_ORIGIN_DIFFERS] = "delete_origin_differs",
66+
[SPOCK_CT_DELETE_MISSING] = "delete_missing",
67+
[SPOCK_CT_DELETE_LATE] = "delete_late"
6768
};
6869

6970

@@ -328,7 +329,7 @@ conflict_resolution_to_string(SpockConflictResolution resolution)
328329
* we still try to free the big chunks as we go.
329330
*/
330331
void
331-
spock_report_conflict(ConflictType conflict_type,
332+
spock_report_conflict(SpockConflictType conflict_type,
332333
SpockRelation *rel,
333334
HeapTuple localtuple,
334335
SpockTupleData *oldkey,
@@ -350,7 +351,7 @@ spock_report_conflict(ConflictType conflict_type,
350351

351352

352353
/* Ignore update-update conflict for same origin */
353-
if (conflict_type == CT_UPDATE_EXISTS)
354+
if (conflict_type == SPOCK_CT_UPDATE_EXISTS)
354355
{
355356
/*
356357
* If updating a row that came from the same origin, do not report it
@@ -365,7 +366,7 @@ spock_report_conflict(ConflictType conflict_type,
365366
return;
366367

367368
/* Differing origin */
368-
conflict_type = CT_UPDATE_ORIGIN_DIFFERS;
369+
conflict_type = SPOCK_CT_UPDATE_ORIGIN_DIFFERS;
369370
}
370371

371372
/* Count statistics */
@@ -415,17 +416,17 @@ spock_report_conflict(ConflictType conflict_type,
415416
* log_error_verbosity=verbose because we don't necessarily have all that
416417
* info enabled.
417418
*
418-
* Handling for CT_DELETE_ORIGIN_DIFFERS will be added separately.
419+
* Handling for SPOCK_CT_DELETE_ORIGIN_DIFFERS will be added separately.
419420
*/
420421
switch (conflict_type)
421422
{
422-
case CT_INSERT_EXISTS:
423-
case CT_UPDATE_EXISTS:
424-
case CT_UPDATE_ORIGIN_DIFFERS:
423+
case SPOCK_CT_INSERT_EXISTS:
424+
case SPOCK_CT_UPDATE_EXISTS:
425+
case SPOCK_CT_UPDATE_ORIGIN_DIFFERS:
425426
ereport(spock_conflict_log_level,
426427
(errcode(ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION),
427428
errmsg("CONFLICT: remote %s on relation %s (local index %s). Resolution: %s.",
428-
ConflictTypeNames[conflict_type],
429+
SpockConflictTypeNames[conflict_type],
429430
qualrelname, idxname,
430431
conflict_resolution_to_string(resolution)),
431432
errdetail("existing local tuple {%s} xid=%u,origin=%d,timestamp=%s; remote tuple {%s} in xact origin=%u,timestamp=%s,commit_lsn=%X/%X",
@@ -438,12 +439,11 @@ spock_report_conflict(ConflictType conflict_type,
438439
(uint32) (replorigin_session_origin_lsn << 32),
439440
(uint32) replorigin_session_origin_lsn)));
440441
break;
441-
case CT_UPDATE_MISSING:
442-
case CT_DELETE_MISSING:
442+
case SPOCK_CT_UPDATE_MISSING:
443443
ereport(spock_conflict_log_level,
444444
(errcode(ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION),
445445
errmsg("CONFLICT: remote %s on relation %s replica identity index %s (tuple not found). Resolution: %s.",
446-
ConflictTypeNames[conflict_type],
446+
SpockConflictTypeNames[conflict_type],
447447
qualrelname, idxname,
448448
conflict_resolution_to_string(resolution)),
449449
errdetail("remote tuple {%s} in xact origin=%u,timestamp=%s,commit_lsn=%X/%X",
@@ -453,7 +453,33 @@ spock_report_conflict(ConflictType conflict_type,
453453
(uint32) (replorigin_session_origin_lsn << 32),
454454
(uint32) replorigin_session_origin_lsn)));
455455
break;
456-
case CT_DELETE_ORIGIN_DIFFERS:
456+
case SPOCK_CT_DELETE_MISSING:
457+
ereport(spock_conflict_log_level,
458+
(errcode(ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION),
459+
errmsg("CONFLICT: remote %s on relation %s replica identity index %s (tuple not found). Resolution: %s.",
460+
SpockConflictTypeNames[conflict_type],
461+
qualrelname, idxname,
462+
conflict_resolution_to_string(resolution)),
463+
errdetail("tuple for remote delete in xact origin=%u,timestamp=%s,commit_lsn=%X/%X",
464+
replorigin_session_origin,
465+
timestamptz_to_str(replorigin_session_origin_timestamp),
466+
(uint32) (replorigin_session_origin_lsn << 32),
467+
(uint32) replorigin_session_origin_lsn)));
468+
break;
469+
case SPOCK_CT_DELETE_LATE:
470+
ereport(spock_conflict_log_level,
471+
(errcode(ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION),
472+
errmsg("CONFLICT: remote %s on relation %s replica identity index %s (newer tuple found). Resolution: %s.",
473+
SpockConflictTypeNames[conflict_type],
474+
qualrelname, idxname,
475+
conflict_resolution_to_string(resolution)),
476+
errdetail("remote delete in xact origin=%u,timestamp=%s,commit_lsn=%X/%X",
477+
replorigin_session_origin,
478+
timestamptz_to_str(replorigin_session_origin_timestamp),
479+
(uint32) (replorigin_session_origin_lsn << 32),
480+
(uint32) replorigin_session_origin_lsn)));
481+
break;
482+
case SPOCK_CT_DELETE_ORIGIN_DIFFERS:
457483
/* keep compiler happy; handling will be added separately */
458484
break;
459485
}
@@ -482,7 +508,7 @@ spock_report_conflict(ConflictType conflict_type,
482508
* we still try to free the big chunks as we go.
483509
*/
484510
void
485-
spock_conflict_log_table(ConflictType conflict_type,
511+
spock_conflict_log_table(SpockConflictType conflict_type,
486512
SpockRelation *rel,
487513
HeapTuple localtuple,
488514
SpockTupleData *oldkey,
@@ -543,7 +569,7 @@ spock_conflict_log_table(ConflictType conflict_type,
543569
nulls[4] = true;
544570

545571
/* conflict type */
546-
values[5] = CStringGetTextDatum(ConflictTypeNames[conflict_type]);
572+
values[5] = CStringGetTextDatum(SpockConflictTypeNames[conflict_type]);
547573
/* conflict_resolution */
548574
values[6] = CStringGetTextDatum(conflict_resolution_to_string(resolution));
549575
/* local_origin */

src/spock_output_plugin.c

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848

4949
/* Global variables */
5050
bool spock_replication_repair_mode = false;
51+
int spock_output_delay = 0;
5152

5253
/* Local functions */
5354
static inline void set_repair_mode(bool is_enabled);
@@ -585,6 +586,10 @@ pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
585586
LWLockRelease(slot_group->lock);
586587
}
587588

589+
/* Sleep if set for testing */
590+
if (spock_output_delay)
591+
pg_usleep(1000 * spock_output_delay);
592+
588593
old_ctx = MemoryContextSwitchTo(data->context);
589594

590595
VALGRIND_DO_ADDED_LEAK_CHECK;

0 commit comments

Comments
 (0)