Skip to content

Commit 98ff62f

Browse files
committed
Minor fixes and improvements for progress tracking
- Refactor apply_worker_get_progress() to return only the timestamp needed. Return value instead of pointer to entire struct. - Lock sooner in spock_group_resource_dump() because of getting the number of entries - Properly handle progress updates after COPY operations in replication set data sync and avoid potential memory leak - Address compiler warnings
1 parent 0698eae commit 98ff62f

File tree

5 files changed

+25
-16
lines changed

5 files changed

+25
-16
lines changed

include/spock_group.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ extern void spock_group_detach(void);
156156
extern bool spock_group_progress_update(const SpockApplyProgress *sap);
157157
extern void spock_group_progress_update_ptr(SpockGroupEntry *entry,
158158
const SpockApplyProgress *sap);
159-
extern SpockApplyProgress *apply_worker_get_progress(void);
159+
extern TimestampTz apply_worker_get_prev_remote_ts(void);
160160

161161
extern void spock_group_resource_dump(void);
162162
extern void spock_checkpoint_hook(XLogRecPtr checkPointRedo, int flags);

src/spock_apply.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,7 @@ wait_for_previous_transaction(void)
242242
* loop and process this transaction. Otherwise, wait for the
243243
* predecessor to commit.
244244
*/
245-
if (apply_worker_get_progress()->prev_remote_ts == required_commit_ts ||
245+
if (apply_worker_get_prev_remote_ts() == required_commit_ts ||
246246
required_commit_ts == 0)
247247
{
248248
break;
@@ -263,7 +263,7 @@ wait_for_previous_transaction(void)
263263
elog(DEBUG1, "SPOCK: slot-group '%s' WAIT for ts [current proccessed"
264264
", required] [" INT64_FORMAT ", " INT64_FORMAT "]",
265265
MySubscription->slot_name,
266-
apply_worker_get_progress()->prev_remote_ts,
266+
apply_worker_get_prev_remote_ts(),
267267
required_commit_ts);
268268

269269
/* Latch */

src/spock_executor.c

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,6 @@ spock_object_access(ObjectAccessType access,
247247
if (access == OAT_DROP)
248248
{
249249
ObjectAccessDrop *drop_arg = (ObjectAccessDrop *) arg;
250-
ObjectAddress object;
251250
DropBehavior behavior;
252251

253252
/* No need to check for internal deletions. */

src/spock_group.c

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -308,7 +308,7 @@ progress_update_struct(SpockApplyProgress *dest, const SpockApplyProgress *src)
308308
* Value of the received_lsn potentially can exceed remote_insert_lsn
309309
* because it is reported more frequently (by keepalive messages).
310310
*/
311-
Assert(!(dest->remote_commit_ts == 0 ^ dest->last_updated_ts == 0));
311+
Assert(!((dest->remote_commit_ts == 0) ^ (dest->last_updated_ts == 0)));
312312
Assert(dest->remote_commit_ts >= 0 && dest->last_updated_ts >= 0);
313313
}
314314

@@ -392,22 +392,23 @@ spock_group_progress_update_ptr(SpockGroupEntry *e,
392392
}
393393

394394
/*
395-
* apply_worker_get_progress
395+
* apply_worker_get_prev_remote_ts
396+
*
397+
* Get the previous remote timestamp for our apply worker
396398
*
397-
* Return a pointer to the snapshot of the current apply worker's progress.
398399
*/
399-
SpockApplyProgress *
400-
apply_worker_get_progress(void)
400+
TimestampTz
401+
apply_worker_get_prev_remote_ts(void)
401402
{
402-
static SpockApplyProgress sap;
403+
TimestampTz prev_remote_ts;
403404

404405
Assert(MyApplyWorker != NULL);
405406
Assert(MyApplyWorker->apply_group != NULL);
406407

407408
if (MyApplyWorker && MyApplyWorker->apply_group)
408409
{
409410
LWLockAcquire(SpockCtx->apply_group_master_lock, LW_SHARED);
410-
sap = MyApplyWorker->apply_group->progress;
411+
prev_remote_ts = MyApplyWorker->apply_group->progress.prev_remote_ts;
411412
LWLockRelease(SpockCtx->apply_group_master_lock);
412413
}
413414
else
@@ -417,7 +418,7 @@ apply_worker_get_progress(void)
417418
*/
418419
elog(ERROR, "apply worker has not been fully initialised yet");
419420

420-
return &sap;
421+
return prev_remote_ts;
421422
}
422423

423424
/* Iterate all groups */
@@ -507,12 +508,13 @@ spock_group_resource_dump(void)
507508
hdr.version = SPOCK_RES_VERSION;
508509
hdr.system_identifier = GetSystemIdentifier();
509510
hdr.flags = 0;
511+
512+
/* Acquire lock before reading hash table to ensure consistency */
513+
LWLockAcquire(SpockCtx->apply_group_master_lock, LW_SHARED);
510514
hdr.entry_count = hash_get_num_entries(SpockGroupHash);
511515

512516
write_buf(fd, &hdr, sizeof(hdr), SPOCK_RES_DUMPFILE "(header)");
513517

514-
LWLockAcquire(SpockCtx->apply_group_master_lock, LW_SHARED);
515-
516518
dctx.fd = fd;
517519
dctx.count = 0;
518520

@@ -662,7 +664,7 @@ spock_group_progress_update_list(List *lst)
662664

663665
elog(LOG, "SPOCK: adjust spock.progress %d->%d to "
664666
"remote_commit_ts='%s' "
665-
"remote_commit_lsn=%llX remote_insert_lsn=%llX",
667+
"remote_commit_lsn=%lX remote_insert_lsn=%lX",
666668
sap->key.remote_node_id, MySubscription->target->id,
667669
timestamptz_to_str(sap->remote_commit_ts),
668670
sap->remote_commit_lsn, sap->remote_insert_lsn);

src/spock_sync.c

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -958,6 +958,7 @@ copy_replication_sets_data(SpockSubscription *sub, const char *origin_dsn,
958958
PGconn *origin_conn;
959959
PGconn *target_conn;
960960
List *tables;
961+
List *progress_entries_list = NIL;
961962
ListCell *lc;
962963

963964
/* Connect to origin node. */
@@ -1021,12 +1022,19 @@ copy_replication_sets_data(SpockSubscription *sub, const char *origin_dsn,
10211022
CHECK_FOR_INTERRUPTS();
10221023
}
10231024

1024-
adjust_progress_info(origin_conn);
1025+
progress_entries_list = adjust_progress_info(origin_conn);
10251026

10261027
/* Finish the transactions and disconnect. */
10271028
finish_copy_origin_tx(origin_conn);
10281029
finish_copy_target_tx(target_conn);
10291030

1031+
/*
1032+
* Match handling in copy_tables_data().
1033+
* Update replication progress. We must do it after commit of the COPY.
1034+
* Call below will free progress_entries_list
1035+
*/
1036+
spock_group_progress_update_list(progress_entries_list);
1037+
10301038
return tables;
10311039
}
10321040

0 commit comments

Comments
 (0)