Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions bdb/bdb_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -1755,11 +1755,10 @@ int bdb_newsc_del_redo_genid(tran_type *t, const char *tablename, uint64_t genid

int bdb_newsc_del_all_redo_genids(tran_type *t, const char *tablename, int *bdberr);

int bdb_set_high_genid(tran_type *input_trans, const char *tablename,
unsigned long long genid, int *bdberr);
int bdb_set_high_genid_stripe(tran_type *input_trans, const char *db_name,
int stripe, unsigned long long genid,
int *bdberr);
int bdb_set_high_genid(tran_type *input_trans, const char *tablename, unsigned long long genid, int *bdberr,
const char *f, int l);
int bdb_set_high_genid_stripe(tran_type *input_trans, const char *db_name, int stripe, unsigned long long genid,
int *bdberr, const char *f, int l);
int bdb_clear_high_genid(tran_type *input_trans, const char *db_name,
int num_stripes, int *bdberr);
int bdb_get_high_genid(const char *db_name, int stripe,
Expand Down
17 changes: 12 additions & 5 deletions bdb/llmeta.c
Original file line number Diff line number Diff line change
Expand Up @@ -5150,16 +5150,23 @@ int bdb_clear_high_genid(

/* determines what stripe the genid is part of and calls
* bdb_set_high_genid_int */
int bdb_set_high_genid(tran_type *input_trans, const char *db_name,
unsigned long long genid, int *bdberr)
int bdb_set_high_genid(tran_type *input_trans, const char *db_name, unsigned long long genid, int *bdberr,
const char *f, int l)
{
#ifdef DEBUG_LLMETA
fprintf(stderr, "%s: (%s:%d) setting %s stripe %d to %llx (%llu)\n", __func__, f, l, db_name,
get_dtafile_from_genid(genid), genid, genid);
#endif
return bdb_set_high_genid_int(input_trans, db_name,
get_dtafile_from_genid(genid), genid, bdberr);
}

int bdb_set_high_genid_stripe(tran_type *input_trans, const char *db_name,
int stripe, unsigned long long genid, int *bdberr)
int bdb_set_high_genid_stripe(tran_type *input_trans, const char *db_name, int stripe, unsigned long long genid,
int *bdberr, const char *f, int l)
{
#ifdef DEBUG_LLMETA
fprintf(stderr, "%s: (%s:%d) setting %s stripe %d to %llx (%llu)\n", __func__, f, l, db_name, stripe, genid, genid);
#endif
return bdb_set_high_genid_int(input_trans, db_name, stripe, genid, bdberr);
}

Expand Down Expand Up @@ -10510,7 +10517,7 @@ static int bdb_process_each_table_entry(bdb_state_type *bdb_state,
key_struct.dbname_len = strlen(key_struct.dbname) + 1 /* NULL byte */;

if (key_struct.dbname_len > LLMETA_TBLLEN) {
fprintf(stderr, "%s: db_name is too long\n", __func__);
logmsg(LOGMSG_ERROR, "%s: db_name is too long\n", __func__);
*bdberr = BDBERR_BADARGS;
return -1;
}
Expand Down
5 changes: 2 additions & 3 deletions db/comdb2.h
Original file line number Diff line number Diff line change
Expand Up @@ -1768,7 +1768,6 @@ extern int gbl_reset_queue_cursor;
extern int gbl_readonly;
extern int gbl_readonly_sc;
extern int gbl_init_single_meta;
extern unsigned long long gbl_sc_genids[MAXDTASTRIPE];
extern int gbl_sc_usleep;
extern int gbl_sc_wrusleep;
extern int gbl_sc_last_writer_time;
Expand Down Expand Up @@ -3645,8 +3644,8 @@ const char *sc_tag_change_subtype_text(sc_tag_change_subtype);
int cmp_index_int(struct schema *oldix, struct schema *newix, char *descr,
size_t descrlen);
int get_dbtable_idx_by_name(const char *tablename);
int open_temp_db_resume(struct ireq *iq, struct dbtable *db, char *prefix, int resume,
int temp, tran_type *tran);
int open_temp_db_resume(struct ireq *iq, struct dbtable *db, char *tablename, int resume);
int open_temp_newdb_resume(struct ireq *iq, struct dbtable *db, int resume);
int find_constraint(struct dbtable *db, constraint_t *ct);

/* END OF SCHEMACHANGE DECLARATIONS*/
Expand Down
58 changes: 57 additions & 1 deletion db/osqlcomm.c
Original file line number Diff line number Diff line change
Expand Up @@ -6024,6 +6024,35 @@ static inline int is_write_request(int type)

void free_cached_idx(uint8_t **cached_idx);

int dbtable_get_highest_genid(struct dbtable *table, unsigned long long *genids)
{
int rc;
void *rec;
int i;
int orglen = MAXLRL;
int bdberr;

rec = alloca(orglen);

/* get max genid for each stripe */
for (i = 0; i < table->dtastripe; ++i) {
uint8_t ver;
int dtalen = orglen;

rc = bdb_find_newest_genid(table->handle, NULL, i, rec, &dtalen, dtalen, &genids[i], &ver, &bdberr);
if (rc == IX_FND)
logmsg(LOGMSG_INFO, "%s: LOOKING FOR %s STRIPE %d found genid %llx (%lld)\n", __func__, table->tablename, i,
genids[i], genids[i]);
else if (rc == 1)
logmsg(LOGMSG_INFO, "%s: LOOKING FOR %s STRIPE %d empty stripe, genid will be 0\n", __func__,
table->tablename, i);
else if (rc < 0 || bdberr != BDBERR_NOERROR) {
return -1;
}
}
return 0;
}

int gbl_disable_tpsc_tblvers = 0;
static int start_schema_change_tran_wrapper(const char *tblname,
timepart_view_t **pview,
Expand Down Expand Up @@ -6116,8 +6145,18 @@ static int start_schema_change_tran_wrapper(const char *tblname,
rc = populate_db_with_alt_schema(thedb, sc->newdb, sc->newcsc2, &err);
if (rc) {
logmsg(LOGMSG_ERROR, "%s: populate_db_with_alt_schema failed with rc %d %s\n", __func__, rc, err.errstr);
sc_errf(sc, "%s: populate_db_with_alt_schema failed with rc %d %s\n", __func__, rc, err.errstr);
return VIEW_ERR_SC;
}

/* we also need to retrieve the highest genid */
if (arg->indx > 0) {
rc = dbtable_get_highest_genid(sc->newdb, arg->retros->cs[arg->indx - 1].resume_genids);
if (rc) {
sc_errf(sc, "%s: failed to find newest genid for shard %s\n", __func__, sc->tablename);
return VIEW_ERR_SC;
}
}
}

if (arg->lockless) {
Expand Down Expand Up @@ -6352,6 +6391,7 @@ static int _process_partitioning_retro(timepart_sc_arg_t *arg)
struct schema_change_type *sc = arg->s;
struct errstat err = {0};
int rc = 0;
int ii;

/* determine retroactive time boundaries for the shards */
int len = sizeof(struct timepart_retro) +
Expand All @@ -6370,7 +6410,7 @@ static int _process_partitioning_retro(timepart_sc_arg_t *arg)
sc->partition.u.tpt.retention * sizeof(int));
retros->cs = (timepart_retro_ctr_t *)((char *)retros + sizeof(struct timepart_retro) +
sc->partition.u.tpt.retention * (sizeof(int) + sizeof(int *)));
for (int ii = 0; ii < sc->partition.u.tpt.retention; ii++) {
for (ii = 0; ii < sc->partition.u.tpt.retention; ii++) {
pthread_mutex_init(&retros->cs[ii].mtx, 0);
}
rc = timepart_populate_timelimits(sc->newpartition, retros, &err);
Expand Down Expand Up @@ -6405,6 +6445,20 @@ static int _process_partitioning_retro(timepart_sc_arg_t *arg)
}
retros->ss[sc->partition.u.tpt.retention - 1] = arg->s;

/* set the maximum genid for each stripe */
for (int stripe = 0; stripe < sc->newdb->dtastripe; stripe++) {
for (ii = 0; ii < sc->partition.u.tpt.retention; ii++) {
unsigned long long genid = retros->cs[ii].resume_genids[stripe];
if (genid > retros->resume_genids[stripe]) {
logmsg(LOGMSG_INFO,
"%s: increased stripe %d resume genid from %llx (%lld) to %llx (%lld) per shard %s\n", __func__,
stripe, retros->resume_genids[stripe], retros->resume_genids[stripe], genid, genid,
sc->tablename);
retros->resume_genids[stripe] = genid;
}
}
}

/* alter existing shard */
arg->indx = 0;
arg->pos = FIRST_SHARD | LAST_SHARD;
Expand All @@ -6416,6 +6470,8 @@ static int _process_partitioning_retro(timepart_sc_arg_t *arg)
db->sharding_func = timepart_retro_route;
sc->newpartition = newpartition;
sc->force_rebuild = 1;
/* run this async, this can be in upgrade thread */
sc->nothrevent = 0;
for (int jj = 0; jj < retros->n; jj++) {
logmsg(LOGMSG_USER, "PARTITION %s shard %d time %u name %s\n", arg->part_name, jj, retros->limits[jj],
(jj < (retros->n - 1)) ? retros->ss[jj]->tablename : arg->part_name);
Expand Down
2 changes: 2 additions & 0 deletions db/views.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,15 @@ enum views_trigger_op {
typedef struct timepart_retro_ctr {
pthread_mutex_t mtx;
int counter;
unsigned long long resume_genids[MAXDTASTRIPE];
} timepart_retro_ctr_t;

typedef struct timepart_retro {
int n;
int *limits;
struct schema_change_type **ss;
timepart_retro_ctr_t *cs;
unsigned long long resume_genids[MAXDTASTRIPE];
} timepart_retro_t;

enum shard_pos { /* keep them bits */
Expand Down
10 changes: 9 additions & 1 deletion schemachange/sc_add_table.c
Original file line number Diff line number Diff line change
Expand Up @@ -128,14 +128,22 @@ int add_table_to_environment(char *table, const char *csc2,
goto err;
}

if ((rc = get_db_handle(newdb, trans))) {
if (s && s->resume && s->partition.type == PARTITION_ADD_TIMED_RETRO) {
/* this is adding a shard, we need to try to open an existing shard, which may have data */
if ((rc = open_temp_db_resume(iq, newdb, newdb->tablename, s->resume))) {
sc_errf(s, "Failed to open shard %s\n", newdb->tablename);
reqerrstr(iq, ERR_SC, "Failed to open shard %s\n", newdb->tablename);
}
} else if ((rc = get_db_handle(newdb, trans))) {
if (rc == BDBERR_EXCEEDED_BLOBS){
sc_errf(s, "Maximum number of vutf8/blob fields exceeded\n");
reqerrstr(iq, ERR_SC, "Maximum number of vutf8/blob fields exceeded\n");
} else if (rc == BDBERR_EXCEEDED_INDEXES){
sc_errf(s, "Maximum number of indexes exceeded\n");
reqerrstr(iq, ERR_SC, "Maximum number of indexes exceeded\n");
}
}
if (rc) {
rc = SC_BDB_ERROR;
goto err;
}
Expand Down
37 changes: 24 additions & 13 deletions schemachange/sc_alter_table.c
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,6 @@ int do_alter_table(struct ireq *iq, struct schema_change_type *s,
int datacopy_odh = 0;
int changed;
int i;
char new_prefix[32];
struct scinfo scinfo;
struct errstat err = {0};

Expand Down Expand Up @@ -522,18 +521,7 @@ int do_alter_table(struct ireq *iq, struct schema_change_type *s,
print_schemachange_info(s, db, newdb);

/*************** open tables ********************************************/

/* create temporary tables. to try to avoid strange issues always
* use a unqiue prefix. this avoids multiple histories for these
* new. files in our logs.
*
* since the prefix doesn't matter and bdb needs to be able to unappend
* it, we let bdb choose the prefix */
/* ignore failures, there shouln't be any and we'd just have a
* truncated prefix anyway */
bdb_get_new_prefix(new_prefix, sizeof(new_prefix), &bdberr);

rc = open_temp_db_resume(iq, newdb, new_prefix, s->resume, 0, tran);
rc = open_temp_newdb_resume(iq, newdb, s->resume);
if (rc) {
if (rc == BDBERR_EXCEEDED_BLOBS) {
sc_errf(s, "Maximum number of vutf8/blob fields exceeded\n");
Expand Down Expand Up @@ -609,6 +597,29 @@ int do_alter_table(struct ireq *iq, struct schema_change_type *s,
return -1;
}

if (s->resume && s->partition.type == PARTITION_ADD_TIMED_RETRO) {
/* we have more shards here where we put data,
* update newdb->sc_genids with the max of all shard stripes
*/
assert(db->sharding_arg);
assert(db->sharding_func);
for (i = 0; i < newdb->dtastripe; i++) {
for (int shard = 0; shard < db->sharding_arg->n - 1; shard++) {
if (newdb->sc_genids[i] < db->sharding_arg->cs[shard].resume_genids[i]) {
logmsg(LOGMSG_INFO, "%s updating %s stripe %d sc_genid from %llx (%lld) to %llx (%lld) using %s\n",
__func__, s->tablename, i, newdb->sc_genids[i], newdb->sc_genids[i],
db->sharding_arg->cs[shard].resume_genids[i], db->sharding_arg->cs[shard].resume_genids[i],
db->sharding_arg->ss[shard]->tablename);
newdb->sc_genids[i] = db->sharding_arg->cs[shard].resume_genids[i];
}
}
}
}
for (i = 0; i < newdb->dtastripe; i++) {
logmsg(LOGMSG_INFO, "%s: %s stripe %d result resume genid %llx (%lld)\n", __func__, s->tablename, i,
newdb->sc_genids[i], newdb->sc_genids[i]);
}

Pthread_rwlock_wrlock(&db->sc_live_lk);
db->sc_from = s->db = db;
db->sc_to = s->newdb = newdb;
Expand Down
14 changes: 1 addition & 13 deletions schemachange/sc_fastinit_table.c
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,7 @@ int do_fastinit(struct ireq *iq, struct schema_change_type *s, tran_type *tran)
struct dbtable *db;
struct dbtable *newdb;
int rc = 0;
int bdberr = 0;
int datacopy_odh = 0;
char new_prefix[32];
struct scinfo scinfo;
struct errstat err = {0};

Expand Down Expand Up @@ -103,22 +101,12 @@ int do_fastinit(struct ireq *iq, struct schema_change_type *s, tran_type *tran)

Pthread_mutex_unlock(&csc2_subsystem_mtx);

/* create temporary tables. to try to avoid strange issues always
* use a unqiue prefix. this avoids multiple histories for these
* new. files in our logs.
*
* since the prefix doesn't matter and bdb needs to be able to unappend
* it, we let bdb choose the prefix */
/* ignore failures, there shouln't be any and we'd just have a
* truncated prefix anyway */
bdb_get_new_prefix(new_prefix, sizeof(new_prefix), &bdberr);

int local_lock = 0;
if (!iq->sc_locked) {
local_lock = 1;
wrlock_schema_lk();
}
rc = open_temp_db_resume(iq, newdb, new_prefix, 0, 0, tran);
rc = open_temp_newdb_resume(iq, newdb, 0);
if (local_lock)
unlock_schema_lk();
if (rc) {
Expand Down
Loading
Loading