Skip to content

Commit 8ff1434

Browse files
committed
callbacks
Signed-off-by: Dorin Hogea <dhogea@bloomberg.net>
1 parent bea55e3 commit 8ff1434

File tree

1 file changed

+37
-10
lines changed

1 file changed

+37
-10
lines changed

schemachange/sc_callbacks.c

Lines changed: 37 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,21 @@ unsigned long long get_genid_stripe_pointer(unsigned long long genid,
143143
return sc_genids[stripe];
144144
}
145145

146+
struct dbtable * _distribute_rows(struct dbtable *usedb, unsigned long long genid)
147+
{
148+
struct dbtable *table = usedb;
149+
150+
/* if we want to distribute the data, change the destination table here */
151+
if (usedb->sc_from && usedb->sc_from->sharding_func) {
152+
table = usedb->sc_from->sharding_func(usedb->sc_from->sharding_arg, genid);
153+
if (!table) {
154+
logmsg(LOGMSG_ERROR, "%s failed to get the sharding usedb for %s\n",
155+
__func__, usedb->tablename);
156+
}
157+
}
158+
return table;
159+
}
160+
146161
/* delete from new btree when genid is older than schemachange position
147162
*/
148163
int live_sc_post_del_record(struct ireq *iq, void *trans,
@@ -163,6 +178,13 @@ int live_sc_post_del_record(struct ireq *iq, void *trans,
163178
" behind cursor - DELETE\n", genid, sc_genids[stripe]);
164179
*/
165180

181+
/* if we want to distribute the data, change the destination table here */
182+
iq->usedb = _distribute_rows(usedb, genid);
183+
if (!iq->usedb) {
184+
iq->usedb = usedb;
185+
return -1;
186+
}
187+
166188
int rc = del_new_record(iq, trans, genid, del_keys, old_dta, oldblobs, 1);
167189
iq->usedb = usedb;
168190
if (rc != 0 && rc != RC_INTERNAL_RETRY) {
@@ -409,18 +431,15 @@ int live_sc_post_add_record(struct ireq *iq, void *trans,
409431
}
410432

411433
/* if we want to distribute the data, change the destination table here */
412-
if (usedb->sc_from && usedb->sc_from->sharding_func) {
413-
iq->usedb = usedb->sc_from->sharding_func(usedb->sc_from->sharding_arg, genid);
414-
if (!iq->usedb) {
415-
logmsg(LOGMSG_ERROR, "%s failed to get the sharding usedb for %s\n",
416-
__func__, usedb->tablename);
417-
rc = -1;
418-
goto done;
419-
}
420-
}
434+
iq->usedb = _distribute_rows(usedb, genid);
435+
if (!iq->usedb) {
436+
iq->usedb = usedb;
437+
rc = -1;
438+
goto done;
439+
}
421440

422441
rc = add_record(iq, trans, p_tagname_buf, p_tagname_buf_end, new_dta,
423-
((uint8_t*)new_dta) + usedb->sc_to->lrl, NULL, blobs, maxblobs,
442+
((uint8_t*)new_dta) + usedb->sc_to->lrl, NULL, blobs, maxblobs,
424443
&opfailcode, &ixfailnum, rrn, &genid, ins_keys,
425444
BLOCK2_ADDKL, // opcode
426445
0, // blkpos
@@ -478,6 +497,14 @@ int live_sc_post_upd_record(struct ireq *iq, void *trans,
478497
if (iq->debug) {
479498
reqpushprefixf(iq, "upd_new_record: ");
480499
}
500+
501+
/* if we want to distribute the data, change the destination table here */
502+
iq->usedb = _distribute_rows(usedb, oldgenid);
503+
if (!iq->usedb) {
504+
iq->usedb = usedb;
505+
return -1;
506+
}
507+
481508
rc = upd_new_record(iq, trans, oldgenid, old_dta, newgenid, new_dta,
482509
ins_keys, del_keys, od_len, updCols, blobs, deferredAdd,
483510
oldblobs, newblobs, 1);

0 commit comments

Comments
 (0)