Skip to content

Commit ddc49b2

Browse files
committed
fix: ensure consistent db_version in cloudsync_payload_apply for rows from the same transaction
Apply consecutive rows with the same db_version inside a transaction if no transaction has already been opened. The user may have already opened a transaction before applying the payload, and the `payload_apply_callback` may have already opened a savepoint. Nested savepoints work, but overlapping savepoints could alter the expected behavior. This savepoint ensures that the db_version value remains consistent for all rows with the same original db_version in the payload, even if the local db_version was greater that the db_version from the payload.
1 parent eeb6328 commit ddc49b2

File tree

1 file changed

+44
-11
lines changed

1 file changed

+44
-11
lines changed

src/cloudsync.c

Lines changed: 44 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2151,19 +2151,12 @@ int cloudsync_payload_apply (sqlite3_context *context, const char *payload, int
21512151
buffer = (const char *)clone;
21522152
}
21532153

2154-
// apply payload inside a transaction
21552154
sqlite3 *db = sqlite3_context_db_handle(context);
2156-
int rc = sqlite3_exec(db, "SAVEPOINT cloudsync_payload_apply;", NULL, NULL, NULL);
2157-
if (rc != SQLITE_OK) {
2158-
dbutils_context_result_error(context, "Error on cloudsync_payload_apply: unable to start a transaction (%s).", sqlite3_errmsg(db));
2159-
if (clone) cloudsync_memory_free(clone);
2160-
return -1;
2161-
}
21622155

21632156
// precompile the insert statement
21642157
sqlite3_stmt *vm = NULL;
21652158
const char *sql = "INSERT INTO cloudsync_changes(tbl, pk, col_name, col_value, col_version, db_version, site_id, cl, seq) VALUES (?,?,?,?,?,?,?,?,?);";
2166-
rc = sqlite3_prepare(db, sql, -1, &vm, NULL);
2159+
int rc = sqlite3_prepare(db, sql, -1, &vm, NULL);
21672160
if (rc != SQLITE_OK) {
21682161
dbutils_context_result_error(context, "Error on cloudsync_payload_apply: error while compiling SQL statement (%s).", sqlite3_errmsg(db));
21692162
if (clone) cloudsync_memory_free(clone);
@@ -2173,6 +2166,8 @@ int cloudsync_payload_apply (sqlite3_context *context, const char *payload, int
21732166
// process buffer, one row at a time
21742167
uint16_t ncols = header.ncols;
21752168
uint32_t nrows = header.nrows;
2169+
int64_t last_payload_db_version = -1;
2170+
bool in_savepoint = false;
21762171
int dbversion = dbutils_settings_get_int_value(db, CLOUDSYNC_KEY_CHECK_DBVERSION);
21772172
int seq = dbutils_settings_get_int_value(db, CLOUDSYNC_KEY_CHECK_SEQ);
21782173
cloudsync_pk_decode_bind_context decoded_context = {.vm = vm};
@@ -2184,10 +2179,44 @@ int cloudsync_payload_apply (sqlite3_context *context, const char *payload, int
21842179
pk_decode((char *)buffer, blen, ncols, &seek, cloudsync_pk_decode_bind_callback, &decoded_context);
21852180
// n is the pk_decode return value, I don't think I should assert here because in any case the next sqlite3_step would fail
21862181
// assert(n == ncols);
2187-
2182+
21882183
bool approved = true;
21892184
if (payload_apply_callback) approved = payload_apply_callback(&payload_apply_xdata, &decoded_context, db, data, CLOUDSYNC_PAYLOAD_APPLY_WILL_APPLY, SQLITE_OK);
2185+
2186+
// Apply consecutive rows with the same db_version inside a transaction if no
2187+
// transaction has already been opened.
2188+
// The user may have already opened a transaction before applying the payload,
2189+
// and the `payload_apply_callback` may have already opened a savepoint.
2190+
// Nested savepoints work, but overlapping savepoints could alter the expected behavior.
2191+
// This savepoint ensures that the db_version value remains consistent for all
2192+
// rows with the same original db_version in the payload.
2193+
2194+
bool db_version_changed = (last_payload_db_version != decoded_context.db_version);
2195+
2196+
// Release existing savepoint if db_version changed
2197+
if (in_savepoint && db_version_changed) {
2198+
rc = sqlite3_exec(db, "RELEASE cloudsync_payload_apply;", NULL, NULL, NULL);
2199+
if (rc != SQLITE_OK) {
2200+
dbutils_context_result_error(context, "Error on cloudsync_payload_apply: unable to release a savepoint (%s).", sqlite3_errmsg(db));
2201+
if (clone) cloudsync_memory_free(clone);
2202+
return -1;
2203+
}
2204+
in_savepoint = false;
2205+
}
21902206

2207+
// Start new savepoint if needed
2208+
bool in_transaction = sqlite3_get_autocommit(db) != true;
2209+
if (!in_transaction && db_version_changed) {
2210+
rc = sqlite3_exec(db, "SAVEPOINT cloudsync_payload_apply;", NULL, NULL, NULL);
2211+
if (rc != SQLITE_OK) {
2212+
dbutils_context_result_error(context, "Error on cloudsync_payload_apply: unable to start a transaction (%s).", sqlite3_errmsg(db));
2213+
if (clone) cloudsync_memory_free(clone);
2214+
return -1;
2215+
}
2216+
last_payload_db_version = decoded_context.db_version;
2217+
in_savepoint = true;
2218+
}
2219+
21912220
if (approved) {
21922221
rc = sqlite3_step(vm);
21932222
if (rc != SQLITE_DONE) {
@@ -2203,10 +2232,14 @@ int cloudsync_payload_apply (sqlite3_context *context, const char *payload, int
22032232
blen -= seek;
22042233
stmt_reset(vm);
22052234
}
2235+
2236+
if (in_savepoint) {
2237+
sql = "RELEASE cloudsync_payload_apply;";
2238+
int rc1 = sqlite3_exec(db, sql, NULL, NULL, NULL);
2239+
if (rc1 != SQLITE_OK) rc = rc1;
2240+
}
22062241

22072242
char *lasterr = (rc != SQLITE_OK && rc != SQLITE_DONE) ? cloudsync_string_dup(sqlite3_errmsg(db), false) : NULL;
2208-
sql = (lasterr) ? "ROLLBACK TO cloudsync_payload_apply;" : "RELEASE cloudsync_payload_apply;";
2209-
sqlite3_exec(db, sql, NULL, NULL, NULL);
22102243

22112244
if (payload_apply_callback) {
22122245
payload_apply_callback(&payload_apply_xdata, &decoded_context, db, data, CLOUDSYNC_PAYLOAD_APPLY_CLEANUP, rc);

0 commit comments

Comments
 (0)