Skip to content

Commit b345adb

Browse files
committed
Fix resuming a retroactively time partitioning, upon a master swings.
Signed-off-by: Dorin Hogea <dhogea@bloomberg.net>
1 parent 45778b3 commit b345adb

File tree

15 files changed

+200
-107
lines changed

15 files changed

+200
-107
lines changed

bdb/bdb_api.h

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1755,11 +1755,10 @@ int bdb_newsc_del_redo_genid(tran_type *t, const char *tablename, uint64_t genid
17551755

17561756
int bdb_newsc_del_all_redo_genids(tran_type *t, const char *tablename, int *bdberr);
17571757

1758-
int bdb_set_high_genid(tran_type *input_trans, const char *tablename,
1759-
unsigned long long genid, int *bdberr);
1760-
int bdb_set_high_genid_stripe(tran_type *input_trans, const char *db_name,
1761-
int stripe, unsigned long long genid,
1762-
int *bdberr);
1758+
int bdb_set_high_genid(tran_type *input_trans, const char *tablename, unsigned long long genid, int *bdberr,
1759+
const char *f, int l);
1760+
int bdb_set_high_genid_stripe(tran_type *input_trans, const char *db_name, int stripe, unsigned long long genid,
1761+
int *bdberr, const char *f, int l);
17631762
int bdb_clear_high_genid(tran_type *input_trans, const char *db_name,
17641763
int num_stripes, int *bdberr);
17651764
int bdb_get_high_genid(const char *db_name, int stripe,

bdb/llmeta.c

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5150,16 +5150,23 @@ int bdb_clear_high_genid(
51505150

51515151
/* determines what stripe the genid is part of and calls
51525152
* bdb_set_high_genid_int */
5153-
int bdb_set_high_genid(tran_type *input_trans, const char *db_name,
5154-
unsigned long long genid, int *bdberr)
5153+
int bdb_set_high_genid(tran_type *input_trans, const char *db_name, unsigned long long genid, int *bdberr,
5154+
const char *f, int l)
51555155
{
5156+
#ifdef DEBUG_LLMETA
5157+
fprintf(stderr, "%s: (%s:%d) setting %s stripe %d to %llx (%llu)\n", __func__, f, l, db_name,
5158+
get_dtafile_from_genid(genid), genid, genid);
5159+
#endif
51565160
return bdb_set_high_genid_int(input_trans, db_name,
51575161
get_dtafile_from_genid(genid), genid, bdberr);
51585162
}
51595163

5160-
int bdb_set_high_genid_stripe(tran_type *input_trans, const char *db_name,
5161-
int stripe, unsigned long long genid, int *bdberr)
5164+
int bdb_set_high_genid_stripe(tran_type *input_trans, const char *db_name, int stripe, unsigned long long genid,
5165+
int *bdberr, const char *f, int l)
51625166
{
5167+
#ifdef DEBUG_LLMETA
5168+
fprintf(stderr, "%s: (%s:%d) setting %s stripe %d to %llx (%llu)\n", __func__, f, l, db_name, stripe, genid, genid);
5169+
#endif
51635170
return bdb_set_high_genid_int(input_trans, db_name, stripe, genid, bdberr);
51645171
}
51655172

@@ -10510,7 +10517,7 @@ static int bdb_process_each_table_entry(bdb_state_type *bdb_state,
1051010517
key_struct.dbname_len = strlen(key_struct.dbname) + 1 /* NULL byte */;
1051110518

1051210519
if (key_struct.dbname_len > LLMETA_TBLLEN) {
10513-
fprintf(stderr, "%s: db_name is too long\n", __func__);
10520+
logmsg(LOGMSG_ERROR, "%s: db_name is too long\n", __func__);
1051410521
*bdberr = BDBERR_BADARGS;
1051510522
return -1;
1051610523
}

db/comdb2.h

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1768,7 +1768,6 @@ extern int gbl_reset_queue_cursor;
17681768
extern int gbl_readonly;
17691769
extern int gbl_readonly_sc;
17701770
extern int gbl_init_single_meta;
1771-
extern unsigned long long gbl_sc_genids[MAXDTASTRIPE];
17721771
extern int gbl_sc_usleep;
17731772
extern int gbl_sc_wrusleep;
17741773
extern int gbl_sc_last_writer_time;
@@ -3645,8 +3644,8 @@ const char *sc_tag_change_subtype_text(sc_tag_change_subtype);
36453644
int cmp_index_int(struct schema *oldix, struct schema *newix, char *descr,
36463645
size_t descrlen);
36473646
int get_dbtable_idx_by_name(const char *tablename);
3648-
int open_temp_db_resume(struct ireq *iq, struct dbtable *db, char *prefix, int resume,
3649-
int temp, tran_type *tran);
3647+
int open_temp_db_resume(struct ireq *iq, struct dbtable *db, char *tablename, int resume);
3648+
int open_temp_newdb_resume(struct ireq *iq, struct dbtable *db, int resume);
36503649
int find_constraint(struct dbtable *db, constraint_t *ct);
36513650

36523651
/* END OF SCHEMACHANGE DECLARATIONS*/

db/osqlcomm.c

Lines changed: 57 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6024,6 +6024,35 @@ static inline int is_write_request(int type)
60246024

60256025
void free_cached_idx(uint8_t **cached_idx);
60266026

6027+
int dbtable_get_highest_genid(struct dbtable *table, unsigned long long *genids)
6028+
{
6029+
int rc;
6030+
void *rec;
6031+
int i;
6032+
int orglen = MAXLRL;
6033+
int bdberr;
6034+
6035+
rec = alloca(orglen);
6036+
6037+
/* get max genid for each stripe */
6038+
for (i = 0; i < table->dtastripe; ++i) {
6039+
uint8_t ver;
6040+
int dtalen = orglen;
6041+
6042+
rc = bdb_find_newest_genid(table->handle, NULL, i, rec, &dtalen, dtalen, &genids[i], &ver, &bdberr);
6043+
if (rc == IX_FND)
6044+
logmsg(LOGMSG_INFO, "%s: LOOKING FOR %s STRIPE %d found genid %llx (%lld)\n", __func__, table->tablename, i,
6045+
genids[i], genids[i]);
6046+
else if (rc == 1)
6047+
logmsg(LOGMSG_INFO, "%s: LOOKING FOR %s STRIPE %d empty stripe, genid will be 0\n", __func__,
6048+
table->tablename, i);
6049+
else if (rc < 0 || bdberr != BDBERR_NOERROR) {
6050+
return -1;
6051+
}
6052+
}
6053+
return 0;
6054+
}
6055+
60276056
int gbl_disable_tpsc_tblvers = 0;
60286057
static int start_schema_change_tran_wrapper(const char *tblname,
60296058
timepart_view_t **pview,
@@ -6116,8 +6145,18 @@ static int start_schema_change_tran_wrapper(const char *tblname,
61166145
rc = populate_db_with_alt_schema(thedb, sc->newdb, sc->newcsc2, &err);
61176146
if (rc) {
61186147
logmsg(LOGMSG_ERROR, "%s: populate_db_with_alt_schema failed with rc %d %s\n", __func__, rc, err.errstr);
6148+
sc_errf(sc, "%s: populate_db_with_alt_schema failed with rc %d %s\n", __func__, rc, err.errstr);
61196149
return VIEW_ERR_SC;
61206150
}
6151+
6152+
/* we also need to retrieve the highest genid */
6153+
if (arg->indx > 0) {
6154+
rc = dbtable_get_highest_genid(sc->newdb, arg->retros->cs[arg->indx - 1].resume_genids);
6155+
if (rc) {
6156+
sc_errf(sc, "%s: failed to find newest genid for shard %s\n", __func__, sc->tablename);
6157+
return VIEW_ERR_SC;
6158+
}
6159+
}
61216160
}
61226161

61236162
if (arg->lockless) {
@@ -6352,6 +6391,7 @@ static int _process_partitioning_retro(timepart_sc_arg_t *arg)
63526391
struct schema_change_type *sc = arg->s;
63536392
struct errstat err = {0};
63546393
int rc = 0;
6394+
int ii;
63556395

63566396
/* determine retroactive time boundaries for the shards */
63576397
int len = sizeof(struct timepart_retro) +
@@ -6370,7 +6410,7 @@ static int _process_partitioning_retro(timepart_sc_arg_t *arg)
63706410
sc->partition.u.tpt.retention * sizeof(int));
63716411
retros->cs = (timepart_retro_ctr_t *)((char *)retros + sizeof(struct timepart_retro) +
63726412
sc->partition.u.tpt.retention * (sizeof(int) + sizeof(int *)));
6373-
for (int ii = 0; ii < sc->partition.u.tpt.retention; ii++) {
6413+
for (ii = 0; ii < sc->partition.u.tpt.retention; ii++) {
63746414
pthread_mutex_init(&retros->cs[ii].mtx, 0);
63756415
}
63766416
rc = timepart_populate_timelimits(sc->newpartition, retros, &err);
@@ -6405,6 +6445,20 @@ static int _process_partitioning_retro(timepart_sc_arg_t *arg)
64056445
}
64066446
retros->ss[sc->partition.u.tpt.retention - 1] = arg->s;
64076447

6448+
/* set the maximum genid for each stripe */
6449+
for (int stripe = 0; stripe < sc->newdb->dtastripe; stripe++) {
6450+
for (ii = 0; ii < sc->partition.u.tpt.retention; ii++) {
6451+
unsigned long long genid = retros->cs[ii].resume_genids[stripe];
6452+
if (genid > retros->resume_genids[stripe]) {
6453+
logmsg(LOGMSG_INFO,
6454+
"%s: increased stripe %d resume genid from %llx (%lld) to %llx (%lld) per shard %s\n", __func__,
6455+
stripe, retros->resume_genids[stripe], retros->resume_genids[stripe], genid, genid,
6456+
sc->tablename);
6457+
retros->resume_genids[stripe] = genid;
6458+
}
6459+
}
6460+
}
6461+
64086462
/* alter existing shard */
64096463
arg->indx = 0;
64106464
arg->pos = FIRST_SHARD | LAST_SHARD;
@@ -6416,6 +6470,8 @@ static int _process_partitioning_retro(timepart_sc_arg_t *arg)
64166470
db->sharding_func = timepart_retro_route;
64176471
sc->newpartition = newpartition;
64186472
sc->force_rebuild = 1;
6473+
/* run this async, this can be in upgrade thread */
6474+
sc->nothrevent = 0;
64196475
for (int jj = 0; jj < retros->n; jj++) {
64206476
logmsg(LOGMSG_USER, "PARTITION %s shard %d time %u name %s\n", arg->part_name, jj, retros->limits[jj],
64216477
(jj < (retros->n - 1)) ? retros->ss[jj]->tablename : arg->part_name);

db/views.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,13 +77,15 @@ enum views_trigger_op {
7777
typedef struct timepart_retro_ctr {
7878
pthread_mutex_t mtx;
7979
int counter;
80+
unsigned long long resume_genids[MAXDTASTRIPE];
8081
} timepart_retro_ctr_t;
8182

8283
typedef struct timepart_retro {
8384
int n;
8485
int *limits;
8586
struct schema_change_type **ss;
8687
timepart_retro_ctr_t *cs;
88+
unsigned long long resume_genids[MAXDTASTRIPE];
8789
} timepart_retro_t;
8890

8991
enum shard_pos { /* keep them bits */

schemachange/sc_add_table.c

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,14 +128,22 @@ int add_table_to_environment(char *table, const char *csc2,
128128
goto err;
129129
}
130130

131-
if ((rc = get_db_handle(newdb, trans))) {
131+
if (s && s->resume && s->partition.type == PARTITION_ADD_TIMED_RETRO) {
132+
/* this is adding a shard, we need to try to open an existing shard, which may have data */
133+
if ((rc = open_temp_db_resume(iq, newdb, newdb->tablename, s->resume))) {
134+
sc_errf(s, "Failed to open shard %s\n", newdb->tablename);
135+
reqerrstr(iq, ERR_SC, "Failed to open shard %s\n", newdb->tablename);
136+
}
137+
} else if ((rc = get_db_handle(newdb, trans))) {
132138
if (rc == BDBERR_EXCEEDED_BLOBS){
133139
sc_errf(s, "Maximum number of vutf8/blob fields exceeded\n");
134140
reqerrstr(iq, ERR_SC, "Maximum number of vutf8/blob fields exceeded\n");
135141
} else if (rc == BDBERR_EXCEEDED_INDEXES){
136142
sc_errf(s, "Maximum number of indexes exceeded\n");
137143
reqerrstr(iq, ERR_SC, "Maximum number of indexes exceeded\n");
138144
}
145+
}
146+
if (rc) {
139147
rc = SC_BDB_ERROR;
140148
goto err;
141149
}

schemachange/sc_alter_table.c

Lines changed: 24 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -399,7 +399,6 @@ int do_alter_table(struct ireq *iq, struct schema_change_type *s,
399399
int datacopy_odh = 0;
400400
int changed;
401401
int i;
402-
char new_prefix[32];
403402
struct scinfo scinfo;
404403
struct errstat err = {0};
405404

@@ -522,18 +521,7 @@ int do_alter_table(struct ireq *iq, struct schema_change_type *s,
522521
print_schemachange_info(s, db, newdb);
523522

524523
/*************** open tables ********************************************/
525-
526-
/* create temporary tables. to try to avoid strange issues always
527-
* use a unqiue prefix. this avoids multiple histories for these
528-
* new. files in our logs.
529-
*
530-
* since the prefix doesn't matter and bdb needs to be able to unappend
531-
* it, we let bdb choose the prefix */
532-
/* ignore failures, there shouln't be any and we'd just have a
533-
* truncated prefix anyway */
534-
bdb_get_new_prefix(new_prefix, sizeof(new_prefix), &bdberr);
535-
536-
rc = open_temp_db_resume(iq, newdb, new_prefix, s->resume, 0, tran);
524+
rc = open_temp_newdb_resume(iq, newdb, s->resume);
537525
if (rc) {
538526
if (rc == BDBERR_EXCEEDED_BLOBS) {
539527
sc_errf(s, "Maximum number of vutf8/blob fields exceeded\n");
@@ -609,6 +597,29 @@ int do_alter_table(struct ireq *iq, struct schema_change_type *s,
609597
return -1;
610598
}
611599

600+
if (s->resume && s->partition.type == PARTITION_ADD_TIMED_RETRO) {
601+
/* we have more shards here where we put data,
602+
* update newdb->sc_genids with the max of all shard stripes
603+
*/
604+
assert(db->sharding_arg);
605+
assert(db->sharding_func);
606+
for (i = 0; i < newdb->dtastripe; i++) {
607+
for (int shard = 0; shard < db->sharding_arg->n - 1; shard++) {
608+
if (newdb->sc_genids[i] < db->sharding_arg->cs[shard].resume_genids[i]) {
609+
logmsg(LOGMSG_INFO, "%s updating %s stripe %d sc_genid from %llx (%lld) to %llx (%lld) using %s\n",
610+
__func__, s->tablename, i, newdb->sc_genids[i], newdb->sc_genids[i],
611+
db->sharding_arg->cs[shard].resume_genids[i], db->sharding_arg->cs[shard].resume_genids[i],
612+
db->sharding_arg->ss[shard]->tablename);
613+
newdb->sc_genids[i] = db->sharding_arg->cs[shard].resume_genids[i];
614+
}
615+
}
616+
}
617+
}
618+
for (i = 0; i < newdb->dtastripe; i++) {
619+
logmsg(LOGMSG_INFO, "%s: %s stripe %d result resume genid %llx (%lld)\n", __func__, s->tablename, i,
620+
newdb->sc_genids[i], newdb->sc_genids[i]);
621+
}
622+
612623
Pthread_rwlock_wrlock(&db->sc_live_lk);
613624
db->sc_from = s->db = db;
614625
db->sc_to = s->newdb = newdb;

schemachange/sc_fastinit_table.c

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,7 @@ int do_fastinit(struct ireq *iq, struct schema_change_type *s, tran_type *tran)
5656
struct dbtable *db;
5757
struct dbtable *newdb;
5858
int rc = 0;
59-
int bdberr = 0;
6059
int datacopy_odh = 0;
61-
char new_prefix[32];
6260
struct scinfo scinfo;
6361
struct errstat err = {0};
6462

@@ -103,22 +101,12 @@ int do_fastinit(struct ireq *iq, struct schema_change_type *s, tran_type *tran)
103101

104102
Pthread_mutex_unlock(&csc2_subsystem_mtx);
105103

106-
/* create temporary tables. to try to avoid strange issues always
107-
* use a unqiue prefix. this avoids multiple histories for these
108-
* new. files in our logs.
109-
*
110-
* since the prefix doesn't matter and bdb needs to be able to unappend
111-
* it, we let bdb choose the prefix */
112-
/* ignore failures, there shouln't be any and we'd just have a
113-
* truncated prefix anyway */
114-
bdb_get_new_prefix(new_prefix, sizeof(new_prefix), &bdberr);
115-
116104
int local_lock = 0;
117105
if (!iq->sc_locked) {
118106
local_lock = 1;
119107
wrlock_schema_lk();
120108
}
121-
rc = open_temp_db_resume(iq, newdb, new_prefix, 0, 0, tran);
109+
rc = open_temp_newdb_resume(iq, newdb, 0);
122110
if (local_lock)
123111
unlock_schema_lk();
124112
if (rc) {

0 commit comments

Comments
 (0)