Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 2 additions & 7 deletions include/spock_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -151,16 +151,11 @@ extern void spock_group_shmem_request(void);
extern void spock_group_shmem_startup(int napply_groups);

SpockGroupEntry *spock_group_attach(Oid dbid, Oid node_id, Oid remote_node_id);
void spock_group_detach(void);
extern void spock_group_detach(void);
extern bool spock_group_progress_update(const SpockApplyProgress *sap);
extern void spock_group_progress_update_ptr(SpockGroupEntry *entry,
const SpockApplyProgress *sap);
SpockApplyProgress *apply_worker_get_progress(void);
SpockGroupEntry *spock_group_lookup(Oid dbid, Oid node_id, Oid remote_node_id);

/* Iterate all groups */
typedef void (*SpockGroupIterCB) (const SpockGroupEntry *e, void *arg);
void spock_group_foreach(SpockGroupIterCB cb, void *arg);
extern SpockApplyProgress *apply_worker_get_progress(void);

extern void spock_group_resource_dump(void);
extern void spock_checkpoint_hook(XLogRecPtr checkPointRedo, int flags);
Expand Down
41 changes: 20 additions & 21 deletions src/spock_group.c
Original file line number Diff line number Diff line change
Expand Up @@ -394,46 +394,45 @@ spock_group_progress_update_ptr(SpockGroupEntry *e,
/*
* apply_worker_get_progress
*
* Return a pointer to the current apply worker's progress payload, or NULL
* Return a pointer to the snapshot of the current apply worker's progress.
*/
SpockApplyProgress *
apply_worker_get_progress(void)
{
static SpockApplyProgress sap;

Assert(MyApplyWorker != NULL);
Assert(MyApplyWorker->apply_group != NULL);

if (MyApplyWorker && MyApplyWorker->apply_group)
return &MyApplyWorker->apply_group->progress;

return NULL;
}
{
LWLockAcquire(SpockCtx->apply_group_master_lock, LW_SHARED);

/*
* spock_group_lookup
*
* Snapshot-read the progress payload for the specified group. Uses HASH_FIND
* to locate the entry.
*
* Returns entry if found, NULL otherwise.
*/
SpockGroupEntry *
spock_group_lookup(Oid dbid, Oid node_id, Oid remote_node_id)
{
SpockGroupKey key = make_key(dbid, node_id, remote_node_id);
SpockGroupEntry *e;
memcpy(&sap, &MyApplyWorker->apply_group->progress,
sizeof(SpockApplyProgress));
LWLockRelease(SpockCtx->apply_group_master_lock);
}
else
/*
* Should never happen. In production just send the worker into
* exception behaviour without crash.
*/
elog(ERROR, "apply worker has not been fully initialised yet");

e = (SpockGroupEntry *) hash_search(SpockGroupHash, &key, HASH_FIND, NULL);
return e; /* may be NULL */
return &sap;
}

/* Iterate all groups */
typedef void (*SpockGroupIterCB) (const SpockGroupEntry *e, void *arg);

/*
* spock_group_foreach
*
* Iterate all entries in the group hash and invoke 'cb(e, arg)' for each.
* Caller selects any gating needed for consistency (e.g., take the gate in
* SHARED before calling this if you want a coherent snapshot).
*/
void
static void
spock_group_foreach(SpockGroupIterCB cb, void *arg)
{
HASH_SEQ_STATUS it;
Expand Down