Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions db/comdb2.h
Original file line number Diff line number Diff line change
Expand Up @@ -810,6 +810,9 @@ typedef struct dbtable {
uint32_t numcols;
char **columns;
char **shardnames;
struct timepart_retro *sharding_arg;
struct dbtable *(*sharding_func)(struct timepart_retro *, unsigned long long, const char *, int);

} dbtable;

struct dbview {
Expand Down
2 changes: 2 additions & 0 deletions db/db_tunables.c
Original file line number Diff line number Diff line change
Expand Up @@ -582,6 +582,8 @@ int gbl_incoherent_clnt_wait = 10;
int gbl_new_leader_duration = 3;
extern int gbl_transaction_grace_period;
extern int gbl_partition_sc_reorder;
extern int gbl_retro_tpt;
extern int gbl_retro_tpt_verbose;
extern int gbl_dohsql_joins;
extern int gbl_altersc_latency;
extern int gbl_altersc_delay_usec;
Expand Down
6 changes: 6 additions & 0 deletions db/db_tunables.h
Original file line number Diff line number Diff line change
Expand Up @@ -2514,6 +2514,12 @@ REGISTER_TUNABLE("transaction_grace_period",

REGISTER_TUNABLE("partition_sc_reorder", "If the schema change is serialized for a partition, run current shard last",
TUNABLE_BOOLEAN, &gbl_partition_sc_reorder, 0, NULL, NULL, NULL, NULL);
REGISTER_TUNABLE("partition_retroactively",
"Disable/enable time partition syntax to retroactively partition an existing table (Default: ON)",
TUNABLE_BOOLEAN, &gbl_retro_tpt, 0, NULL, NULL, NULL, NULL);
REGISTER_TUNABLE("partition_retroactively_verbose",
"Disable/enable data routing debugging for retroactively time partitioning (Default: OFF)",
TUNABLE_BOOLEAN, &gbl_retro_tpt_verbose, 0, NULL, NULL, NULL, NULL);

REGISTER_TUNABLE("dohsql_joins", "Enable to support joins in parallel sql execution (default: on)", TUNABLE_BOOLEAN,
&gbl_dohsql_joins, 0, NULL, NULL, NULL, NULL);
Expand Down
140 changes: 134 additions & 6 deletions db/osqlcomm.c
Original file line number Diff line number Diff line change
Expand Up @@ -6031,6 +6031,7 @@ static int start_schema_change_tran_wrapper(const char *tblname,
{
struct schema_change_type *sc = arg->s;
struct ireq *iq = sc->iq;
struct errstat err = {0};
int rc;

/* we need to use the actual table name here; it might match partition name
Expand All @@ -6046,8 +6047,9 @@ static int start_schema_change_tran_wrapper(const char *tblname,
sc->fix_tp_badvers = 1;
}

if (((sc->partition.type == PARTITION_ADD_TIMED ||
sc->partition.type == PARTITION_ADD_MANUAL) && (arg->pos & FIRST_SHARD)) ||
if (((sc->partition.type == PARTITION_ADD_TIMED || sc->partition.type == PARTITION_ADD_TIMED_RETRO ||
sc->partition.type == PARTITION_ADD_MANUAL) &&
(arg->pos & FIRST_SHARD)) ||
(sc->partition.type == PARTITION_REMOVE && (arg->pos & LAST_SHARD))) {
sc->publish = partition_publish;
sc->unpublish = partition_unpublish;
Expand All @@ -6062,6 +6064,16 @@ static int start_schema_change_tran_wrapper(const char *tblname,
if (arg->lockless)
views_unlock();

if (arg->retros) {
/* the retros keep the current shard as the last one
* each earlier shard is shifted left by 1, matching limits
*/
if (arg->indx > 0)
arg->retros->ss[arg->indx - 1] = sc;
else
arg->retros->ss[arg->retros->n - 1] = sc;
}

rc = start_schema_change_tran(iq, sc->tran);

/* user suspended sc */
Expand All @@ -6081,7 +6093,7 @@ static int start_schema_change_tran_wrapper(const char *tblname,
} else {
iq->sc->sc_next = iq->sc_pending;
iq->sc_pending = iq->sc;
if (arg->pos & LAST_SHARD) {
if (!arg->clonelast && (arg->pos & LAST_SHARD)) {
/* last shard was done */
iq->osql_flags |= OSQL_FLAGS_SCDONE;
} else {
Expand All @@ -6097,6 +6109,17 @@ static int start_schema_change_tran_wrapper(const char *tblname,
}
}

if (!rc && sc->kind == SC_ADDTABLE && sc->partition.type == PARTITION_ADD_TIMED_RETRO) {
/* as we will start moving data from exiting table to new shards,
* we need to add .new tags for the new shards to support live updates
*/
rc = populate_db_with_alt_schema(thedb, sc->newdb, sc->newcsc2, &err);
if (rc) {
logmsg(LOGMSG_ERROR, "%s: populate_db_with_alt_schema failed with rc %d %s\n", __func__, rc, err.errstr);
return VIEW_ERR_SC;
}
}

if (arg->lockless) {
*pview = timepart_reaquire_view(arg->part_name);
if (!pview) {
Expand Down Expand Up @@ -6324,20 +6347,118 @@ static int _process_partitioned_table_merge(struct ireq *iq)
return rc;
}

static int _process_partitioning_retro(timepart_sc_arg_t *arg)
{
struct schema_change_type *sc = arg->s;
struct errstat err = {0};
int rc = 0;

/* determine retroactive time boundaries for the shards */
int len = sizeof(struct timepart_retro) +
sc->partition.u.tpt.retention * (sizeof(int) + sizeof(int *) + sizeof(timepart_retro_ctr_t));
arg->retros = malloc(len);
if (!arg->retros) {
logmsg(LOGMSG_ERROR, "%s malloc %d\n", __func__, len);
rc = ERR_SC;
goto err;
}
timepart_retro_t *retros = arg->retros;

bzero(retros, len);
retros->limits = (int *)(((char *)retros) + sizeof(struct timepart_retro));
retros->ss = (struct schema_change_type **)((char *)retros + sizeof(struct timepart_retro) +
sc->partition.u.tpt.retention * sizeof(int));
retros->cs = (timepart_retro_ctr_t *)((char *)retros + sizeof(struct timepart_retro) +
sc->partition.u.tpt.retention * (sizeof(int) + sizeof(int *)));
for (int ii = 0; ii < sc->partition.u.tpt.retention; ii++) {
pthread_mutex_init(&retros->cs[ii].mtx, 0);
}
rc = timepart_populate_timelimits(sc->newpartition, retros, &err);
if (rc) {
timepart_free_view(sc->newpartition);
logmsg(LOGMSG_ERROR, "Failed to get time limits rc %d \"%s\"\n", err.errval, err.errstr);
sc_errf(sc, "Failed to get time limits rc %d \"%s\"\n", err.errval, err.errstr);
rc = ERR_SC;
goto err;
}
timepart_view_t *newpartition = sc->newpartition;
sc->newpartition = NULL;
/* start by creating the empty shards */
sc->timepartition_version = arg->s->db->tableversion + 1; /* next version */
sc->kind = SC_ADDTABLE;
sc->nothrevent = 1; /* serial */
arg->start = 1;
arg->pos = 0;
arg->clonelast = 1; /* we need a clone of the sc to do the actual alter */
rc = timepart_foreach_shard_lockless(newpartition, start_schema_change_tran_wrapper, arg);
if (rc) {
logmsg(LOGMSG_ERROR,
"Failed to add shards for existing table %s while "
"partitioning rc %d\n",
sc->tablename, rc);
sc_errf(sc,
"Failed to add shards for existing table %s while "
"partitioning rc %d",
sc->tablename, rc);
rc = ERR_SC;
goto err;
}
retros->ss[sc->partition.u.tpt.retention - 1] = arg->s;

/* alter existing shard */
arg->indx = 0;
arg->pos = FIRST_SHARD | LAST_SHARD;
arg->clonelast = 0;
sc = arg->s;
sc->kind = SC_ALTERTABLE;
struct dbtable *db = get_dbtable_by_name(arg->part_name);
db->sharding_arg = retros;
db->sharding_func = timepart_retro_route;
sc->newpartition = newpartition;
sc->force_rebuild = 1;
for (int jj = 0; jj < retros->n; jj++) {
logmsg(LOGMSG_USER, "PARTITION %s shard %d time %u name %s\n", arg->part_name, jj, retros->limits[jj],
(jj < (retros->n - 1)) ? retros->ss[jj]->tablename : arg->part_name);
}
/* existing table name matches the partition name */
rc = start_schema_change_tran_wrapper(arg->part_name, NULL, arg);
if (rc) {
logmsg(LOGMSG_ERROR,
"Failed to alter existing table %s while "
"partitioning rc %d\n",
sc->tablename, rc);
sc_errf(sc,
"Failed to alter existing table %s while "
"partitioning rc %d",
sc->tablename, rc);
rc = ERR_SC;
goto err;
}
err:
return 0;
}

static struct schema_change_type* _create_logical_cron_systable(const char *tblname);

static int _process_single_table_sc_partitioning(struct ireq *iq)
{
struct schema_change_type *sc = iq->sc;
int rc;
int retro_partition = gbl_retro_tpt && sc->kind == SC_ALTERTABLE && sc->partition.type == PARTITION_ADD_TIMED_RETRO;

if (sc->partition.type == PARTITION_ADD_TIMED_RETRO && !gbl_retro_tpt) {
logmsg(LOGMSG_ERROR, "Retroactively partition disabled %s\n", sc->tablename);
sc_errf(sc, "Retroactively partition disabled %s\n", sc->tablename);
return ERR_SC;
}

if (sc->partition.type == PARTITION_REMOVE) {
logmsg(LOGMSG_ERROR, "Partition %s does not exist\n", sc->tablename);
sc_errf(sc, "Partition %s does not exist\n", sc->tablename);
return ERR_SC;
}

assert(sc->partition.type == PARTITION_ADD_TIMED ||
assert(sc->partition.type == PARTITION_ADD_TIMED || sc->partition.type == PARTITION_ADD_TIMED_RETRO ||
sc->partition.type == PARTITION_ADD_MANUAL);

/* create a new time partition object */
Expand All @@ -6361,7 +6482,7 @@ static int _process_single_table_sc_partitioning(struct ireq *iq)
}

/* create shards for the partition */
rc = timepart_populate_shards(sc->newpartition, &err);
rc = timepart_populate_shards(sc->newpartition, retro_partition, &err);
if (rc) {
assert(err.errval != VIEW_NOERR);

Expand All @@ -6382,6 +6503,13 @@ static int _process_single_table_sc_partitioning(struct ireq *iq)
return ERR_SC;
arg.lockless = 0; /* the partition does not exist */

if (retro_partition) {
/* we want to retroactively populate the shards with existing data */
rc = _process_partitioning_retro(&arg);
free(arg.part_name);
return rc;
}

/* is this an alter? preserve existing table as first shard */
if (sc->kind != SC_ADDTABLE) {
/* we need to create a light rename for first shard,
Expand Down Expand Up @@ -6418,7 +6546,7 @@ static int _process_single_table_sc_partitioning(struct ireq *iq)
rc = timepart_foreach_shard_lockless(
sc->newpartition, start_schema_change_tran_wrapper, &arg);

if (!rc&& sc->partition.type == PARTITION_ADD_MANUAL) {
if (!rc && sc->partition.type == PARTITION_ADD_MANUAL) {
if (!get_dbtable_by_name(LOGICAL_CRON_SYSTABLE)){
struct schema_change_type *lcsc = _create_logical_cron_systable(LOGICAL_CRON_SYSTABLE);
if (!lcsc)
Expand Down
2 changes: 2 additions & 0 deletions db/sql.h
Original file line number Diff line number Diff line change
Expand Up @@ -1355,6 +1355,8 @@ struct connection_info {
extern int gbl_master_swing_osql_verbose;
/* for testing: sleep in osql_sock_restart when master swings */
extern int gbl_master_swing_sock_restart_sleep;
/* allow altering existing tables to retroactively partition data for tpt */
extern int gbl_retro_tpt;

#define is_sqlite_stat(x) (strncmp((x), "sqlite_stat", sizeof("sqlite_stat") - 1) == 0)
#define is_stat1(x) (strcmp((x), "sqlite_stat1") == 0)
Expand Down
Loading