Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 24 additions & 27 deletions include/spock_conflict.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,41 +43,38 @@ extern int spock_conflict_resolver;
extern int spock_conflict_log_level;
extern bool spock_save_resolutions;

/* Avoid conflicts with PG's conflict.h via pgstat.h */
#ifndef CONFLICT_H
/*
* From include/replication/conflict.h in PostgreSQL
* Once we only support >= PG17, we may just include it.
*
* Conflict types that could occur while applying remote changes.
* We want to eventually match native PostgreSQL conflict types,
* so use same ordering and similar naming.
* We add one additional conflict type, CT_DELETE_LATE.
*/
typedef enum
{
/* The row to be inserted violates unique constraint */
CT_INSERT_EXISTS,
/* The row to be inserted violates unique constraint */
SPOCK_CT_INSERT_EXISTS,

/* The row to be updated was modified by a different origin */
SPOCK_CT_UPDATE_ORIGIN_DIFFERS,

/* The row to be updated was modified by a different origin */
CT_UPDATE_ORIGIN_DIFFERS,
/* The updated row value violates unique constraint */
SPOCK_CT_UPDATE_EXISTS,

/* The updated row value violates unique constraint */
CT_UPDATE_EXISTS,
/* The row to be updated is missing */
SPOCK_CT_UPDATE_MISSING,

/* The row to be updated is missing */
CT_UPDATE_MISSING,
/* The row to be deleted was modified by a different origin */
SPOCK_CT_DELETE_ORIGIN_DIFFERS,

/* The row to be deleted was modified by a different origin */
CT_DELETE_ORIGIN_DIFFERS,
/* The row to be deleted is missing */
SPOCK_CT_DELETE_MISSING,

/* The row to be deleted is missing */
CT_DELETE_MISSING
/*
* Unique to Spock, delete timestamp is earlier than an existing row.
* Use a higher number so we don't conflict with PostgreSQL in the future.
*/
SPOCK_CT_DELETE_LATE = 101

/*
* Other conflicts, such as exclusion constraint violations, involve more
* complex rules than simple equality checks. These conflicts are left for
* future improvements.
*/
} ConflictType;
#endif
} SpockConflictType;

extern int spock_conflict_resolver;
extern int spock_conflict_log_level;
Expand All @@ -92,7 +89,7 @@ extern bool try_resolve_conflict(Relation rel, HeapTuple localtuple,
RepOriginId local_origin, TimestampTz local_ts,
SpockConflictResolution *resolution);

extern void spock_report_conflict(ConflictType conflict_type,
extern void spock_report_conflict(SpockConflictType conflict_type,
SpockRelation *rel,
HeapTuple localtuple,
SpockTupleData *oldkey,
Expand All @@ -105,7 +102,7 @@ extern void spock_report_conflict(ConflictType conflict_type,
TimestampTz local_tuple_timestamp,
Oid conflict_idx_id);

extern void spock_conflict_log_table(ConflictType conflict_type,
extern void spock_conflict_log_table(SpockConflictType conflict_type,
SpockRelation *rel,
HeapTuple localtuple,
SpockTupleData *oldkey,
Expand Down
9 changes: 6 additions & 3 deletions include/spock_output_plugin.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,15 +93,18 @@ typedef struct SpockOutputSlotGroup
TimestampTz last_commit_ts;
} SpockOutputSlotGroup;

/*
* Custom WAL messages
*/
extern bool spock_replication_repair_mode;

#define SPOCK_REPAIR_MODE_ON 1 /* Suppress subsequent DML/DDL */
#define SPOCK_REPAIR_MODE_OFF 2 /* Resume regular replication */
#define SPOCK_SYNC_EVENT_MSG 3 /* Sync event message */

extern int spock_output_delay;

/*
* Custom WAL messages
*/

typedef struct SpockWalMessageSimple
{
int32 mtype;
Expand Down
13 changes: 13 additions & 0 deletions src/spock.c
Original file line number Diff line number Diff line change
Expand Up @@ -1172,6 +1172,19 @@ _PG_init(void)
0,
NULL, NULL, NULL);

DefineCustomIntVariable("spock.output_delay",
"For testing conflicts, delay in output plugin in ms",
"For testing conflicts, delay in output plugin in milliseconds",
&spock_output_delay,
0,
0,
60000,
PGC_SIGHUP,
0,
NULL,
NULL,
NULL);

if (IsBinaryUpgrade)
return;

Expand Down
56 changes: 45 additions & 11 deletions src/spock_apply_heap.c
Original file line number Diff line number Diff line change
Expand Up @@ -749,7 +749,7 @@ spock_handle_conflict_and_apply(SpockRelation *rel, EState *estate,
* See if we need to log any conflict to the server log and spock.resolutions
* Calling this does not necessarily mean that there is a conflict
*/
spock_report_conflict(is_insert ? CT_INSERT_EXISTS : CT_UPDATE_EXISTS,
spock_report_conflict(is_insert ? SPOCK_CT_INSERT_EXISTS : SPOCK_CT_UPDATE_EXISTS,
rel, TTS_TUP(localslot), oldtup,
remotetuple, applytuple, resolution,
xmin, local_origin_found, local_origin,
Expand Down Expand Up @@ -1065,7 +1065,7 @@ spock_apply_heap_update(SpockRelation *rel, SpockTupleData *oldtup,
}
else
{
/* CT_UPDATE_MISSING case gets logged in exception_log, not resolutions */
/* SPOCK_CT_UPDATE_MISSING case gets logged in exception_log, not resolutions */
SpockExceptionLog *exception_log = &exception_log_ptr[my_exception_log_index];

/*
Expand Down Expand Up @@ -1164,7 +1164,15 @@ spock_apply_heap_delete(SpockRelation *rel, SpockTupleData *oldtup)
*/
if (found)
{
UserContext ucxt;
UserContext ucxt;
TransactionId xmin;
TimestampTz local_ts;
RepOriginId local_origin;
bool local_origin_found;
HeapTuple applytuple;
SpockConflictResolution resolution;


SpockExceptionLog *exception_log = &exception_log_ptr[my_exception_log_index];

/*
Expand All @@ -1177,14 +1185,40 @@ spock_apply_heap_delete(SpockRelation *rel, SpockTupleData *oldtup)
exception_log->local_tuple = heap_copytuple(local_tuple);
MemoryContextSwitchTo(oldctx);

/* Make sure that any user-supplied code runs as the table owner. */
SwitchToUntrustedUser(rel->rel->rd_rel->relowner, &ucxt);
local_origin_found = get_tuple_origin(rel, local_tuple,
&(local_tuple->t_self), &xmin,
&local_origin, &local_ts);

/* Delete the tuple found */
EvalPlanQualSetSlot(&epqstate, remoteslot);
ExecSimpleRelationDelete(edata->targetRelInfo, estate, &epqstate,
localslot);
RestoreUserContext(&ucxt);
/*
* Check if the local tuple was inserted/updated after this DELETE.
*/
if (!try_resolve_conflict(rel->rel, TTS_TUP(localslot),
NULL, /* remotetuple */
&applytuple, local_origin,
local_ts, &resolution))
{
/* Current DELETE happened before current tuple */
spock_report_conflict(SPOCK_CT_DELETE_LATE,
rel, TTS_TUP(localslot), oldtup,
NULL, /* remotetuple */
local_tuple, SpockResolution_Skip,
xmin, local_origin_found, local_origin,
local_ts, edata->targetRel->idxoid
);
}
else
{
/* DELETE happened after (usual case) */

/* Make sure that any user-supplied code runs as the table owner. */
SwitchToUntrustedUser(rel->rel->rd_rel->relowner, &ucxt);

/* Delete the tuple found */
EvalPlanQualSetSlot(&epqstate, remoteslot);
ExecSimpleRelationDelete(edata->targetRelInfo, estate, &epqstate,
localslot);
RestoreUserContext(&ucxt);
}
}
else
{
Expand All @@ -1198,7 +1232,7 @@ spock_apply_heap_delete(SpockRelation *rel, SpockTupleData *oldtup)
exception_log->local_tuple = NULL;
remotetuple = heap_form_tuple(RelationGetDescr(rel->rel),
oldtup->values, oldtup->nulls);
spock_report_conflict(CT_DELETE_MISSING,
spock_report_conflict(SPOCK_CT_DELETE_MISSING,
rel, NULL, oldtup,
remotetuple, NULL, SpockResolution_Skip,
InvalidTransactionId, false, InvalidRepOriginId,
Expand Down
68 changes: 47 additions & 21 deletions src/spock_conflict.c
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,14 @@


/* From src/backend/replication/logical/conflict.c */
static const char *const ConflictTypeNames[] = {
[CT_INSERT_EXISTS] = "insert_exists",
[CT_UPDATE_ORIGIN_DIFFERS] = "update_origin_differs",
[CT_UPDATE_EXISTS] = "update_exists",
[CT_UPDATE_MISSING] = "update_missing",
[CT_DELETE_ORIGIN_DIFFERS] = "delete_origin_differs",
[CT_DELETE_MISSING] = "delete_missing"
static const char *const SpockConflictTypeNames[] = {
[SPOCK_CT_INSERT_EXISTS] = "insert_exists",
[SPOCK_CT_UPDATE_ORIGIN_DIFFERS] = "update_origin_differs",
[SPOCK_CT_UPDATE_EXISTS] = "update_exists",
[SPOCK_CT_UPDATE_MISSING] = "update_missing",
[SPOCK_CT_DELETE_ORIGIN_DIFFERS] = "delete_origin_differs",
[SPOCK_CT_DELETE_MISSING] = "delete_missing",
[SPOCK_CT_DELETE_LATE] = "delete_late"
};


Expand Down Expand Up @@ -328,7 +329,7 @@ conflict_resolution_to_string(SpockConflictResolution resolution)
* we still try to free the big chunks as we go.
*/
void
spock_report_conflict(ConflictType conflict_type,
spock_report_conflict(SpockConflictType conflict_type,
SpockRelation *rel,
HeapTuple localtuple,
SpockTupleData *oldkey,
Expand All @@ -350,7 +351,7 @@ spock_report_conflict(ConflictType conflict_type,


/* Ignore update-update conflict for same origin */
if (conflict_type == CT_UPDATE_EXISTS)
if (conflict_type == SPOCK_CT_UPDATE_EXISTS)
{
/*
* If updating a row that came from the same origin, do not report it
Expand All @@ -365,7 +366,7 @@ spock_report_conflict(ConflictType conflict_type,
return;

/* Differing origin */
conflict_type = CT_UPDATE_ORIGIN_DIFFERS;
conflict_type = SPOCK_CT_UPDATE_ORIGIN_DIFFERS;
}

/* Count statistics */
Expand Down Expand Up @@ -415,17 +416,17 @@ spock_report_conflict(ConflictType conflict_type,
* log_error_verbosity=verbose because we don't necessarily have all that
* info enabled.
*
* Handling for CT_DELETE_ORIGIN_DIFFERS will be added separately.
* Handling for SPOCK_CT_DELETE_ORIGIN_DIFFERS will be added separately.
*/
switch (conflict_type)
{
case CT_INSERT_EXISTS:
case CT_UPDATE_EXISTS:
case CT_UPDATE_ORIGIN_DIFFERS:
case SPOCK_CT_INSERT_EXISTS:
case SPOCK_CT_UPDATE_EXISTS:
case SPOCK_CT_UPDATE_ORIGIN_DIFFERS:
ereport(spock_conflict_log_level,
(errcode(ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION),
errmsg("CONFLICT: remote %s on relation %s (local index %s). Resolution: %s.",
ConflictTypeNames[conflict_type],
SpockConflictTypeNames[conflict_type],
qualrelname, idxname,
conflict_resolution_to_string(resolution)),
errdetail("existing local tuple {%s} xid=%u,origin=%d,timestamp=%s; remote tuple {%s} in xact origin=%u,timestamp=%s,commit_lsn=%X/%X",
Expand All @@ -438,12 +439,11 @@ spock_report_conflict(ConflictType conflict_type,
(uint32) (replorigin_session_origin_lsn << 32),
(uint32) replorigin_session_origin_lsn)));
break;
case CT_UPDATE_MISSING:
case CT_DELETE_MISSING:
case SPOCK_CT_UPDATE_MISSING:
ereport(spock_conflict_log_level,
(errcode(ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION),
errmsg("CONFLICT: remote %s on relation %s replica identity index %s (tuple not found). Resolution: %s.",
ConflictTypeNames[conflict_type],
SpockConflictTypeNames[conflict_type],
qualrelname, idxname,
conflict_resolution_to_string(resolution)),
errdetail("remote tuple {%s} in xact origin=%u,timestamp=%s,commit_lsn=%X/%X",
Expand All @@ -453,7 +453,33 @@ spock_report_conflict(ConflictType conflict_type,
(uint32) (replorigin_session_origin_lsn << 32),
(uint32) replorigin_session_origin_lsn)));
break;
case CT_DELETE_ORIGIN_DIFFERS:
case SPOCK_CT_DELETE_MISSING:
ereport(spock_conflict_log_level,
(errcode(ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION),
errmsg("CONFLICT: remote %s on relation %s replica identity index %s (tuple not found). Resolution: %s.",
SpockConflictTypeNames[conflict_type],
qualrelname, idxname,
conflict_resolution_to_string(resolution)),
errdetail("tuple for remote delete in xact origin=%u,timestamp=%s,commit_lsn=%X/%X",
replorigin_session_origin,
timestamptz_to_str(replorigin_session_origin_timestamp),
(uint32) (replorigin_session_origin_lsn << 32),
(uint32) replorigin_session_origin_lsn)));
break;
case SPOCK_CT_DELETE_LATE:
ereport(spock_conflict_log_level,
(errcode(ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION),
errmsg("CONFLICT: remote %s on relation %s replica identity index %s (newer tuple found). Resolution: %s.",
SpockConflictTypeNames[conflict_type],
qualrelname, idxname,
conflict_resolution_to_string(resolution)),
errdetail("remote delete in xact origin=%u,timestamp=%s,commit_lsn=%X/%X",
replorigin_session_origin,
timestamptz_to_str(replorigin_session_origin_timestamp),
(uint32) (replorigin_session_origin_lsn << 32),
(uint32) replorigin_session_origin_lsn)));
break;
case SPOCK_CT_DELETE_ORIGIN_DIFFERS:
/* keep compiler happy; handling will be added separately */
break;
}
Expand Down Expand Up @@ -482,7 +508,7 @@ spock_report_conflict(ConflictType conflict_type,
* we still try to free the big chunks as we go.
*/
void
spock_conflict_log_table(ConflictType conflict_type,
spock_conflict_log_table(SpockConflictType conflict_type,
SpockRelation *rel,
HeapTuple localtuple,
SpockTupleData *oldkey,
Expand Down Expand Up @@ -543,7 +569,7 @@ spock_conflict_log_table(ConflictType conflict_type,
nulls[4] = true;

/* conflict type */
values[5] = CStringGetTextDatum(ConflictTypeNames[conflict_type]);
values[5] = CStringGetTextDatum(SpockConflictTypeNames[conflict_type]);
/* conflict_resolution */
values[6] = CStringGetTextDatum(conflict_resolution_to_string(resolution));
/* local_origin */
Expand Down
5 changes: 5 additions & 0 deletions src/spock_output_plugin.c
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@

/* Global variables */
bool spock_replication_repair_mode = false;
int spock_output_delay = 0;

/* Local functions */
static inline void set_repair_mode(bool is_enabled);
Expand Down Expand Up @@ -585,6 +586,10 @@ pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
LWLockRelease(slot_group->lock);
}

/* Sleep if set for testing */
if (spock_output_delay)
pg_usleep(1000 * spock_output_delay);

old_ctx = MemoryContextSwitchTo(data->context);

VALGRIND_DO_ADDED_LEAK_CHECK;
Expand Down
Loading