Skip to content

Commit 8d28eb8

Browse files
committed
Fix resuming a retroactively time partitioning, upon a master swings.
Signed-off-by: Dorin Hogea <dhogea@bloomberg.net>
1 parent 644f2f4 commit 8d28eb8

File tree

15 files changed

+199
-107
lines changed

15 files changed

+199
-107
lines changed

bdb/bdb_api.h

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1755,11 +1755,10 @@ int bdb_newsc_del_redo_genid(tran_type *t, const char *tablename, uint64_t genid
17551755

17561756
int bdb_newsc_del_all_redo_genids(tran_type *t, const char *tablename, int *bdberr);
17571757

1758-
int bdb_set_high_genid(tran_type *input_trans, const char *tablename,
1759-
unsigned long long genid, int *bdberr);
1760-
int bdb_set_high_genid_stripe(tran_type *input_trans, const char *db_name,
1761-
int stripe, unsigned long long genid,
1762-
int *bdberr);
1758+
int bdb_set_high_genid(tran_type *input_trans, const char *tablename, unsigned long long genid, int *bdberr,
1759+
const char *f, int l);
1760+
int bdb_set_high_genid_stripe(tran_type *input_trans, const char *db_name, int stripe, unsigned long long genid,
1761+
int *bdberr, const char *f, int l);
17631762
int bdb_clear_high_genid(tran_type *input_trans, const char *db_name,
17641763
int num_stripes, int *bdberr);
17651764
int bdb_get_high_genid(const char *db_name, int stripe,

bdb/llmeta.c

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5150,16 +5150,23 @@ int bdb_clear_high_genid(
51505150

51515151
/* determines what stripe the genid is part of and calls
51525152
* bdb_set_high_genid_int */
5153-
int bdb_set_high_genid(tran_type *input_trans, const char *db_name,
5154-
unsigned long long genid, int *bdberr)
5153+
int bdb_set_high_genid(tran_type *input_trans, const char *db_name, unsigned long long genid, int *bdberr,
5154+
const char *f, int l)
51555155
{
5156+
#ifdef DEBUG_LLMETA
5157+
fprintf(stderr, "%s: (%s:%d) setting %s stripe %d to %llx (%llu)\n", __func__, f, l, db_name,
5158+
get_dtafile_from_genid(genid), genid, genid);
5159+
#endif
51565160
return bdb_set_high_genid_int(input_trans, db_name,
51575161
get_dtafile_from_genid(genid), genid, bdberr);
51585162
}
51595163

5160-
int bdb_set_high_genid_stripe(tran_type *input_trans, const char *db_name,
5161-
int stripe, unsigned long long genid, int *bdberr)
5164+
int bdb_set_high_genid_stripe(tran_type *input_trans, const char *db_name, int stripe, unsigned long long genid,
5165+
int *bdberr, const char *f, int l)
51625166
{
5167+
#ifdef DEBUG_LLMETA
5168+
fprintf(stderr, "%s: (%s:%d) setting %s stripe %d to %llx (%llu)\n", __func__, f, l, db_name, stripe, genid, genid);
5169+
#endif
51635170
return bdb_set_high_genid_int(input_trans, db_name, stripe, genid, bdberr);
51645171
}
51655172

@@ -10510,7 +10517,7 @@ static int bdb_process_each_table_entry(bdb_state_type *bdb_state,
1051010517
key_struct.dbname_len = strlen(key_struct.dbname) + 1 /* NULL byte */;
1051110518

1051210519
if (key_struct.dbname_len > LLMETA_TBLLEN) {
10513-
fprintf(stderr, "%s: db_name is too long\n", __func__);
10520+
logmsg(LOGMSG_ERROR, "%s: db_name is too long\n", __func__);
1051410521
*bdberr = BDBERR_BADARGS;
1051510522
return -1;
1051610523
}

db/comdb2.h

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1767,7 +1767,6 @@ extern int gbl_reset_queue_cursor;
17671767
extern int gbl_readonly;
17681768
extern int gbl_readonly_sc;
17691769
extern int gbl_init_single_meta;
1770-
extern unsigned long long gbl_sc_genids[MAXDTASTRIPE];
17711770
extern int gbl_sc_usleep;
17721771
extern int gbl_sc_wrusleep;
17731772
extern int gbl_sc_last_writer_time;
@@ -3644,8 +3643,8 @@ const char *sc_tag_change_subtype_text(sc_tag_change_subtype);
36443643
int cmp_index_int(struct schema *oldix, struct schema *newix, char *descr,
36453644
size_t descrlen);
36463645
int get_dbtable_idx_by_name(const char *tablename);
3647-
int open_temp_db_resume(struct ireq *iq, struct dbtable *db, char *prefix, int resume,
3648-
int temp, tran_type *tran);
3646+
int open_temp_db_resume(struct ireq *iq, struct dbtable *db, char *tablename, int resume);
3647+
int open_temp_newdb_resume(struct ireq *iq, struct dbtable *db, int resume);
36493648
int find_constraint(struct dbtable *db, constraint_t *ct);
36503649

36513650
/* END OF SCHEMACHANGE DECLARATIONS*/

db/osqlcomm.c

Lines changed: 57 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6024,6 +6024,35 @@ static inline int is_write_request(int type)
60246024

60256025
void free_cached_idx(uint8_t **cached_idx);
60266026

6027+
int dbtable_get_highest_genid(struct dbtable *table, unsigned long long *genids)
6028+
{
6029+
int rc;
6030+
void *rec;
6031+
int i;
6032+
int orglen = MAXLRL;
6033+
int bdberr;
6034+
6035+
rec = alloca(orglen);
6036+
6037+
/* get max genid for each stripe */
6038+
for (i = 0; i < table->dtastripe; ++i) {
6039+
uint8_t ver;
6040+
int dtalen = orglen;
6041+
6042+
rc = bdb_find_newest_genid(table->handle, NULL, i, rec, &dtalen, dtalen, &genids[i], &ver, &bdberr);
6043+
if (rc == IX_FND)
6044+
logmsg(LOGMSG_INFO, "%s: LOOKING FOR %s STRIPE %d found genid %llx (%lld)\n", __func__, table->tablename, i,
6045+
genids[i], genids[i]);
6046+
else if (rc == 1)
6047+
logmsg(LOGMSG_INFO, "%s: LOOKING FOR %s STRIPE %d empty stripe, genid will be 0\n", __func__,
6048+
table->tablename, i);
6049+
else if (rc < 0 || bdberr != BDBERR_NOERROR) {
6050+
return -1;
6051+
}
6052+
}
6053+
return 0;
6054+
}
6055+
60276056
int gbl_disable_tpsc_tblvers = 0;
60286057
static int start_schema_change_tran_wrapper(const char *tblname,
60296058
timepart_view_t **pview,
@@ -6116,8 +6145,18 @@ static int start_schema_change_tran_wrapper(const char *tblname,
61166145
rc = populate_db_with_alt_schema(thedb, sc->newdb, sc->newcsc2, &err);
61176146
if (rc) {
61186147
logmsg(LOGMSG_ERROR, "%s: populate_db_with_alt_schema failed with rc %d %s\n", __func__, rc, err.errstr);
6148+
sc_errf(sc, "%s: populate_db_with_alt_schema failed with rc %d %s\n", __func__, rc, err.errstr);
61196149
return VIEW_ERR_SC;
61206150
}
6151+
6152+
/* we also need to retrieve the highest genid */
6153+
if (arg->indx > 0) {
6154+
rc = dbtable_get_highest_genid(sc->newdb, arg->retros->cs[arg->indx - 1].resume_genids);
6155+
if (rc) {
6156+
sc_errf(sc, "%s: failed to find newest genid for shard %s\n", __func__, sc->tablename);
6157+
return VIEW_ERR_SC;
6158+
}
6159+
}
61216160
}
61226161

61236162
if (arg->lockless) {
@@ -6352,6 +6391,7 @@ static int _process_partitioning_retro(timepart_sc_arg_t *arg)
63526391
struct schema_change_type *sc = arg->s;
63536392
struct errstat err = {0};
63546393
int rc = 0;
6394+
int ii;
63556395

63566396
/* determine retroactive time boundaries for the shards */
63576397
int len = sizeof(struct timepart_retro) +
@@ -6370,7 +6410,7 @@ static int _process_partitioning_retro(timepart_sc_arg_t *arg)
63706410
sc->partition.u.tpt.retention * sizeof(int));
63716411
retros->cs = (timepart_retro_ctr_t *)((char *)retros + sizeof(struct timepart_retro) +
63726412
sc->partition.u.tpt.retention * (sizeof(int) + sizeof(int *)));
6373-
for (int ii = 0; ii < sc->partition.u.tpt.retention; ii++) {
6413+
for (ii = 0; ii < sc->partition.u.tpt.retention; ii++) {
63746414
pthread_mutex_init(&retros->cs[ii].mtx, 0);
63756415
}
63766416
rc = timepart_populate_timelimits(sc->newpartition, retros, &err);
@@ -6405,6 +6445,20 @@ static int _process_partitioning_retro(timepart_sc_arg_t *arg)
64056445
}
64066446
retros->ss[sc->partition.u.tpt.retention - 1] = arg->s;
64076447

6448+
/* set the maximum genid for each stripe */
6449+
for (int stripe = 0; stripe < sc->newdb->dtastripe; stripe++) {
6450+
for (ii = 0; ii < sc->partition.u.tpt.retention; ii++) {
6451+
unsigned long long genid = retros->cs[ii].resume_genids[stripe];
6452+
if (genid > retros->resume_genids[stripe]) {
6453+
logmsg(LOGMSG_INFO,
6454+
"%s: increased stripe %d resume genid from %llx (%lld) to %llx (%lld) per shard %s\n", __func__,
6455+
stripe, retros->resume_genids[stripe], retros->resume_genids[stripe], genid, genid,
6456+
sc->tablename);
6457+
retros->resume_genids[stripe] = genid;
6458+
}
6459+
}
6460+
}
6461+
64086462
/* alter existing shard */
64096463
arg->indx = 0;
64106464
arg->pos = FIRST_SHARD | LAST_SHARD;
@@ -6416,6 +6470,8 @@ static int _process_partitioning_retro(timepart_sc_arg_t *arg)
64166470
db->sharding_func = timepart_retro_route;
64176471
sc->newpartition = newpartition;
64186472
sc->force_rebuild = 1;
6473+
/* run this async, this can be in upgrade thread */
6474+
sc->nothrevent = 0;
64196475
for (int jj = 0; jj < retros->n; jj++) {
64206476
logmsg(LOGMSG_USER, "PARTITION %s shard %d time %u name %s\n", arg->part_name, jj, retros->limits[jj],
64216477
(jj < (retros->n - 1)) ? retros->ss[jj]->tablename : arg->part_name);

db/views.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,13 +77,15 @@ enum views_trigger_op {
7777
typedef struct timepart_retro_ctr {
7878
pthread_mutex_t mtx;
7979
int counter;
80+
unsigned long long resume_genids[MAXDTASTRIPE];
8081
} timepart_retro_ctr_t;
8182

8283
typedef struct timepart_retro {
8384
int n;
8485
int *limits;
8586
struct schema_change_type **ss;
8687
timepart_retro_ctr_t *cs;
88+
unsigned long long resume_genids[MAXDTASTRIPE];
8789
} timepart_retro_t;
8890

8991
enum shard_pos { /* keep them bits */

schemachange/sc_add_table.c

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,14 +129,22 @@ int add_table_to_environment(char *table, const char *csc2,
129129
goto err;
130130
}
131131

132-
if ((rc = get_db_handle(newdb, trans))) {
132+
if (s && s->resume && s->partition.type == PARTITION_ADD_TIMED_RETRO) {
133+
/* this is adding a shard, we need to try to open an existing shard, which may have data */
134+
if ((rc = open_temp_db_resume(iq, newdb, newdb->tablename, s->resume))) {
135+
sc_errf(s, "Failed to open shard %s\n", newdb->tablename);
136+
reqerrstr(iq, ERR_SC, "Failed to open shard %s\n", newdb->tablename);
137+
}
138+
} else if ((rc = get_db_handle(newdb, trans))) {
133139
if (rc == BDBERR_EXCEEDED_BLOBS){
134140
sc_errf(s, "Maximum number of vutf8/blob fields exceeded\n");
135141
reqerrstr(iq, ERR_SC, "Maximum number of vutf8/blob fields exceeded\n");
136142
} else if (rc == BDBERR_EXCEEDED_INDEXES){
137143
sc_errf(s, "Maximum number of indexes exceeded\n");
138144
reqerrstr(iq, ERR_SC, "Maximum number of indexes exceeded\n");
139145
}
146+
}
147+
if (rc) {
140148
rc = SC_BDB_ERROR;
141149
goto err;
142150
}

schemachange/sc_alter_table.c

Lines changed: 24 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -399,7 +399,6 @@ int do_alter_table(struct ireq *iq, struct schema_change_type *s,
399399
int datacopy_odh = 0;
400400
int changed;
401401
int i;
402-
char new_prefix[32];
403402
struct scinfo scinfo;
404403
struct errstat err = {0};
405404

@@ -524,18 +523,7 @@ int do_alter_table(struct ireq *iq, struct schema_change_type *s,
524523
print_schemachange_info(s, db, newdb);
525524

526525
/*************** open tables ********************************************/
527-
528-
/* create temporary tables. to try to avoid strange issues always
529-
* use a unqiue prefix. this avoids multiple histories for these
530-
* new. files in our logs.
531-
*
532-
* since the prefix doesn't matter and bdb needs to be able to unappend
533-
* it, we let bdb choose the prefix */
534-
/* ignore failures, there shouln't be any and we'd just have a
535-
* truncated prefix anyway */
536-
bdb_get_new_prefix(new_prefix, sizeof(new_prefix), &bdberr);
537-
538-
rc = open_temp_db_resume(iq, newdb, new_prefix, s->resume, 0, tran);
526+
rc = open_temp_newdb_resume(iq, newdb, s->resume);
539527
if (rc) {
540528
if (rc == BDBERR_EXCEEDED_BLOBS) {
541529
sc_errf(s, "Maximum number of vutf8/blob fields exceeded\n");
@@ -611,6 +599,29 @@ int do_alter_table(struct ireq *iq, struct schema_change_type *s,
611599
return -1;
612600
}
613601

602+
if (s->resume && s->partition.type == PARTITION_ADD_TIMED_RETRO) {
603+
/* we have more shards here where we put data,
604+
* update newdb->sc_genids with the max of all shard stripes
605+
*/
606+
assert(db->sharding_arg);
607+
assert(db->sharding_func);
608+
for (i = 0; i < newdb->dtastripe; i++) {
609+
for (int shard = 0; shard < db->sharding_arg->n - 1; shard++) {
610+
if (newdb->sc_genids[i] < db->sharding_arg->cs[shard].resume_genids[i]) {
611+
logmsg(LOGMSG_INFO, "%s updating %s stripe %d sc_genid from %llx (%lld) to %llx (%lld) using %s\n",
612+
__func__, s->tablename, i, newdb->sc_genids[i], newdb->sc_genids[i],
613+
db->sharding_arg->cs[shard].resume_genids[i], db->sharding_arg->cs[shard].resume_genids[i],
614+
db->sharding_arg->ss[shard]->tablename);
615+
newdb->sc_genids[i] = db->sharding_arg->cs[shard].resume_genids[i];
616+
}
617+
}
618+
}
619+
}
620+
for (i = 0; i < newdb->dtastripe; i++) {
621+
logmsg(LOGMSG_INFO, "%s: %s stripe %d result resume genid %llx (%lld)\n", __func__, s->tablename, i,
622+
newdb->sc_genids[i], newdb->sc_genids[i]);
623+
}
624+
614625
Pthread_rwlock_wrlock(&db->sc_live_lk);
615626
db->sc_from = s->db = db;
616627
db->sc_to = s->newdb = newdb;

schemachange/sc_fastinit_table.c

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,7 @@ int do_fastinit(struct ireq *iq, struct schema_change_type *s, tran_type *tran)
5656
struct dbtable *db;
5757
struct dbtable *newdb;
5858
int rc = 0;
59-
int bdberr = 0;
6059
int datacopy_odh = 0;
61-
char new_prefix[32];
6260
struct scinfo scinfo;
6361
struct errstat err = {0};
6462

@@ -104,22 +102,12 @@ int do_fastinit(struct ireq *iq, struct schema_change_type *s, tran_type *tran)
104102

105103
Pthread_mutex_unlock(&csc2_subsystem_mtx);
106104

107-
/* create temporary tables. to try to avoid strange issues always
108-
* use a unqiue prefix. this avoids multiple histories for these
109-
* new. files in our logs.
110-
*
111-
* since the prefix doesn't matter and bdb needs to be able to unappend
112-
* it, we let bdb choose the prefix */
113-
/* ignore failures, there shouln't be any and we'd just have a
114-
* truncated prefix anyway */
115-
bdb_get_new_prefix(new_prefix, sizeof(new_prefix), &bdberr);
116-
117105
int local_lock = 0;
118106
if (!iq->sc_locked) {
119107
local_lock = 1;
120108
wrlock_schema_lk();
121109
}
122-
rc = open_temp_db_resume(iq, newdb, new_prefix, 0, 0, tran);
110+
rc = open_temp_newdb_resume(iq, newdb, 0);
123111
if (local_lock)
124112
unlock_schema_lk();
125113
if (rc) {

0 commit comments

Comments
 (0)