Skip to content

Commit 6e1f4f4

Browse files
author
Antonin Houska
committed
Fixed processing of the squeeze.max_xlock_time parameter.
1. So far, the value of the parameter was not passed to the worker at all. 2. The worker coding was such that the parameter was only checked during logical decoding, but not when applying the data changes.
1 parent 3f6c271 commit 6e1f4f4

File tree

4 files changed

+57
-9
lines changed

4 files changed

+57
-9
lines changed

concurrent.c

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@ extern PGDLLIMPORT int wal_segment_size;
2525

2626
static void apply_concurrent_changes(DecodingOutputState *dstate,
2727
Relation relation, ScanKey key,
28-
int nkeys, IndexInsertState *iistate);
28+
int nkeys, IndexInsertState *iistate,
29+
struct timeval *must_complete);
2930
static bool processing_time_elapsed(struct timeval *utmost);
3031

3132
static void plugin_startup(LogicalDecodingContext *ctx,
@@ -64,6 +65,16 @@ process_concurrent_changes(LogicalDecodingContext *ctx,
6465
bool done;
6566

6667
dstate = (DecodingOutputState *) ctx->output_writer_private;
68+
69+
/*
70+
* If some changes could not be applied due to time constraint, make sure
71+
* the tuplestore is empty before we insert new tuples into it.
72+
*/
73+
if (dstate->nchanges > 0)
74+
apply_concurrent_changes(dstate, rel_dst, ident_key,
75+
ident_key_nentries, iistate, NULL);
76+
Assert(dstate->nchanges == 0);
77+
6778
done = false;
6879
while (!done)
6980
{
@@ -87,7 +98,11 @@ process_concurrent_changes(LogicalDecodingContext *ctx,
8798
* non-trivial.
8899
*/
89100
apply_concurrent_changes(dstate, rel_dst, ident_key,
90-
ident_key_nentries, iistate);
101+
ident_key_nentries, iistate, must_complete);
102+
103+
if (processing_time_elapsed(must_complete))
104+
/* Like above. */
105+
return false;
91106
}
92107

93108
return true;
@@ -189,7 +204,8 @@ decode_concurrent_changes(LogicalDecodingContext *ctx,
189204
*/
190205
static void
191206
apply_concurrent_changes(DecodingOutputState *dstate, Relation relation,
192-
ScanKey key, int nkeys, IndexInsertState *iistate)
207+
ScanKey key, int nkeys, IndexInsertState *iistate,
208+
struct timeval *must_complete)
193209
{
194210
TupleTableSlot *slot;
195211
#if PG_VERSION_NUM >= 120000
@@ -240,6 +256,9 @@ apply_concurrent_changes(DecodingOutputState *dstate, Relation relation,
240256
bool isnull[1];
241257
Datum values[1];
242258

259+
Assert(dstate->nchanges > 0);
260+
dstate->nchanges--;
261+
243262
/* Get the change from the single-column tuple. */
244263
#if PG_VERSION_NUM >= 120000
245264
tup_change = ExecFetchSlotHeapTuple(dstate->tsslot, false, &shouldFree);
@@ -483,10 +502,22 @@ apply_concurrent_changes(DecodingOutputState *dstate, Relation relation,
483502
Assert(shouldFree);
484503
pfree(tup_change);
485504
#endif
505+
506+
/*
507+
* If there is a limit on the time of completion, check it
508+
* now. However, make sure the loop does not break if tup_old was set
509+
* in the previous iteration. In such a case we could not resume the
510+
* processing in the next call.
511+
*/
512+
if (must_complete && tup_old == NULL &&
513+
processing_time_elapsed(must_complete))
514+
/* The next call will process the remaining changes. */
515+
break;
486516
}
487517

488-
tuplestore_clear(dstate->tstore);
489-
dstate->nchanges = 0;
518+
/* If we could not apply all the changes, the next call will do. */
519+
if (dstate->nchanges == 0)
520+
tuplestore_clear(dstate->tstore);
490521

491522
PopActiveSnapshot();
492523

pg_squeeze.c

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3193,6 +3193,10 @@ perform_final_merge(Oid relid_src, Oid *indexes_src, int nindexes,
31933193
cat_state, rel_dst, ident_key,
31943194
ident_key_nentries, iistate,
31953195
AccessExclusiveLock, NULL);
3196+
3197+
/* No time constraint, all changes must have been processed. */
3198+
Assert(((DecodingOutputState *)
3199+
ctx->output_writer_private)->nchanges == 0);
31963200
}
31973201

31983202
return success;

pg_squeeze.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@
3333
#include "utils/resowner.h"
3434
#include "utils/snapmgr.h"
3535

36+
extern int squeeze_max_xlock_time;
37+
3638
typedef enum
3739
{
3840
PG_SQUEEZE_CHANGE_INSERT,

worker.c

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ typedef struct WorkerTask
9292
NameData relname;
9393
NameData indname; /* clustering index */
9494
NameData tbspname; /* destination tablespace */
95+
int max_xlock_time;
9596

9697
/*
9798
* index destination tablespaces.
@@ -166,12 +167,14 @@ typedef struct TaskDetails
166167
ArrayType *ind_tbsps;
167168
bool last_try;
168169
bool skip_analyze;
170+
int max_xlock_time;
169171
} TaskDetails;
170172

171173
static void init_task_details(TaskDetails *task, int32 task_id,
172174
Name relschema, Name relname, Name cl_index,
173175
Name rel_tbsp, ArrayType *ind_tbsps,
174-
bool last_try, bool skip_analyze);
176+
bool last_try, bool skip_analyze,
177+
int max_xlock_time);
175178
static void squeeze_handle_error_app(ErrorData *edata, TaskDetails *td);
176179
static void release_task(void);
177180

@@ -502,6 +505,7 @@ squeeze_table_new(PG_FUNCTION_ARGS)
502505
memcpy(task->ind_tbsps, ind_tbsps, VARSIZE(ind_tbsps));
503506
else
504507
SET_VARSIZE(task->ind_tbsps, 0);
508+
task->max_xlock_time = squeeze_max_xlock_time;
505509

506510
task_id = task->id;
507511
LWLockRelease(task->lock);
@@ -1046,7 +1050,7 @@ process_tasks(MemoryContext task_cxt)
10461050
}
10471051

10481052
init_task_details(tasks, 0, relschema, relname, cl_index, rel_tbsp,
1049-
ind_tbsps, false, false);
1053+
ind_tbsps, false, false, task->max_xlock_time);
10501054
MemoryContextSwitchTo(oldcxt);
10511055

10521056
/* No other worker should pick this task. */
@@ -1176,7 +1180,10 @@ LIMIT %d", TASK_BATCH_SIZE);
11761180

11771181
init_task_details(&tasks[i], task_id, relschema, relname,
11781182
cl_index, rel_tbsp, ind_tbsps, last_try,
1179-
skip_analyze);
1183+
skip_analyze,
1184+
/* XXX Should max_xlock_time be added to
1185+
* squeeze.tables ? */
1186+
0);
11801187

11811188
}
11821189
MemoryContextSwitchTo(oldcxt);
@@ -1293,6 +1300,8 @@ LIMIT %d", TASK_BATCH_SIZE);
12931300
if (strlen(NameStr(td->rel_tbsp)) > 0)
12941301
rel_tbsp = &td->rel_tbsp;
12951302

1303+
squeeze_max_xlock_time = td->max_xlock_time;
1304+
12961305
/* Perform the actual work. */
12971306
SetCurrentStatementStartTimestamp();
12981307
StartTransactionCommand();
@@ -1529,7 +1538,8 @@ run_command(char *command, int rc)
15291538
static void
15301539
init_task_details(TaskDetails *task, int32 task_id, Name relschema,
15311540
Name relname, Name cl_index, Name rel_tbsp,
1532-
ArrayType *ind_tbsps, bool last_try, bool skip_analyze)
1541+
ArrayType *ind_tbsps, bool last_try, bool skip_analyze,
1542+
int max_xlock_time)
15331543
{
15341544
memset(task, 0, sizeof(TaskDetails));
15351545
task->id = task_id;
@@ -1542,6 +1552,7 @@ init_task_details(TaskDetails *task, int32 task_id, Name relschema,
15421552
task->ind_tbsps = ind_tbsps;
15431553
task->last_try = last_try;
15441554
task->skip_analyze = skip_analyze;
1555+
task->max_xlock_time = max_xlock_time;
15451556
}
15461557

15471558
#define ACTIVE_WORKERS_RES_ATTRS 7

0 commit comments

Comments
 (0)