Skip to content

Commit 32e931a

Browse files
RCORE-2006 Reuse realm file for sync schema migrations (#7487)
* Use the same realm file when performing a schema migration * touch ups * Add support to clear PendingBootstrapStore while in a write transaction * Fix AdditiveDiscovered schema mode documentation and test * Fix tests * Delete private tables during schema migration
1 parent 0fcbf94 commit 32e931a

23 files changed

+423
-141
lines changed

src/realm/group.cpp

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -798,27 +798,27 @@ void Group::recycle_table_accessor(Table* to_be_recycled)
798798
g_table_recycler_1.push_back(to_be_recycled);
799799
}
800800

801-
void Group::remove_table(StringData name)
801+
void Group::remove_table(StringData name, bool ignore_backlinks)
802802
{
803803
check_attached();
804804
size_t table_ndx = m_table_names.find_first(name);
805805
if (table_ndx == not_found)
806806
throw NoSuchTable();
807807
auto key = ndx2key(table_ndx);
808-
remove_table(table_ndx, key); // Throws
808+
remove_table(table_ndx, key, ignore_backlinks); // Throws
809809
}
810810

811811

812-
void Group::remove_table(TableKey key)
812+
void Group::remove_table(TableKey key, bool ignore_backlinks)
813813
{
814814
check_attached();
815815

816816
size_t table_ndx = key2ndx_checked(key);
817-
remove_table(table_ndx, key);
817+
remove_table(table_ndx, key, ignore_backlinks);
818818
}
819819

820820

821-
void Group::remove_table(size_t table_ndx, TableKey key)
821+
void Group::remove_table(size_t table_ndx, TableKey key, bool ignore_backlinks)
822822
{
823823
if (!m_is_writable)
824824
throw LogicError(ErrorCodes::ReadOnlyDB, "Database not writable");
@@ -832,7 +832,7 @@ void Group::remove_table(size_t table_ndx, TableKey key)
832832
// tables. Such a behaviour is deemed too obscure, and we shall therefore
833833
// require that a removed table does not contain foreign origin backlink
834834
// columns.
835-
if (table->is_cross_table_link_target())
835+
if (!ignore_backlinks && table->is_cross_table_link_target())
836836
throw CrossTableLinkTarget(table->get_name());
837837

838838
{

src/realm/group.hpp

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -321,8 +321,10 @@ class Group : public ArrayParent {
321321
TableRef get_or_add_table_with_primary_key(StringData name, DataType pk_type, StringData pk_name,
322322
bool nullable = false, Table::Type table_type = Table::Type::TopLevel);
323323

324-
void remove_table(TableKey key);
325-
void remove_table(StringData name);
324+
// Use 'ignore_backlinks' with caution. ignore_backlinks=true will leave things in an invalid state
325+
// if the target table (or column) is not removed as well.
326+
void remove_table(TableKey key, bool ignore_backlinks = false);
327+
void remove_table(StringData name, bool ignore_backlinks = false);
326328

327329
void rename_table(TableKey key, StringData new_name, bool require_unique_name = true);
328330
void rename_table(StringData name, StringData new_name, bool require_unique_name = true);
@@ -631,7 +633,7 @@ class Group : public ArrayParent {
631633
void attach_shared(ref_type new_top_ref, size_t new_file_size, bool writable, VersionID version);
632634

633635
void create_empty_group();
634-
void remove_table(size_t table_ndx, TableKey key);
636+
void remove_table(size_t table_ndx, TableKey key, bool ignore_backlinks);
635637

636638
void reset_free_space_tracking();
637639

src/realm/object-store/object_store.cpp

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -867,8 +867,7 @@ static const char* schema_mode_to_string(SchemaMode mode)
867867
void ObjectStore::apply_schema_changes(Transaction& transaction, uint64_t schema_version, Schema& target_schema,
868868
uint64_t target_schema_version, SchemaMode mode,
869869
std::vector<SchemaChange> const& changes, bool handle_automatically_backlinks,
870-
std::function<void()> migration_function,
871-
bool set_schema_version_on_version_decrease)
870+
std::function<void()> migration_function)
872871
{
873872
using namespace std::chrono;
874873
auto t1 = steady_clock::now();
@@ -889,16 +888,11 @@ void ObjectStore::apply_schema_changes(Transaction& transaction, uint64_t schema
889888
create_metadata_tables(transaction);
890889

891890
if (mode == SchemaMode::AdditiveDiscovered || mode == SchemaMode::AdditiveExplicit) {
892-
bool set_schema = (schema_version < target_schema_version || schema_version == ObjectStore::NotVersioned ||
893-
set_schema_version_on_version_decrease);
894-
895891
// With sync v2.x, indexes are no longer synced, so there's no reason to avoid creating them.
896892
bool update_indexes = true;
897893
apply_additive_changes(transaction, changes, update_indexes);
898894

899-
if (set_schema)
900-
set_schema_version(transaction, target_schema_version);
901-
895+
set_schema_version(transaction, target_schema_version);
902896
set_schema_keys(transaction, target_schema);
903897
return;
904898
}

src/realm/object-store/object_store.hpp

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,8 +86,7 @@ class ObjectStore {
8686
static void apply_schema_changes(Transaction& group, uint64_t schema_version, Schema& target_schema,
8787
uint64_t target_schema_version, SchemaMode mode,
8888
std::vector<SchemaChange> const& changes, bool handle_automatically_backlinks,
89-
std::function<void()> migration_function = {},
90-
bool save_schema_version_on_version_decrease = false);
89+
std::function<void()> migration_function = {});
9190

9291
static void apply_additive_changes(Group&, std::vector<SchemaChange> const&, bool update_indexes);
9392

src/realm/object-store/schema.hpp

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -87,10 +87,6 @@ enum class SchemaMode : uint8_t {
8787
// The only changes allowed are to add new tables, add columns to
8888
// existing tables, and to add or remove indexes from existing
8989
// columns. Extra tables not present in the schema are ignored.
90-
// Indexes are only added to or removed from existing columns if the
91-
// schema version is greater than the existing one (and unlike other
92-
// modes, the schema version is allowed to be less than the existing
93-
// one).
9490
// The migration function is not used.
9591
// This should be used when including discovered user classes.
9692
// Previously called Additive.

src/realm/object-store/shared_realm.cpp

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -497,12 +497,6 @@ void Realm::update_schema(Schema schema, uint64_t version, MigrationFunction mig
497497

498498
schema.copy_keys_from(actual_schema, m_config.schema_subset_mode);
499499

500-
bool save_schema_version_on_version_decrease = false;
501-
#if REALM_ENABLE_SYNC
502-
if (m_config.sync_config && m_config.sync_config->flx_sync_requested)
503-
save_schema_version_on_version_decrease = true;
504-
#endif
505-
506500
uint64_t old_schema_version = m_schema_version;
507501
bool additive = m_config.schema_mode == SchemaMode::AdditiveDiscovered ||
508502
m_config.schema_mode == SchemaMode::AdditiveExplicit ||
@@ -532,12 +526,11 @@ void Realm::update_schema(Schema schema, uint64_t version, MigrationFunction mig
532526

533527
ObjectStore::apply_schema_changes(transaction(), version, m_schema, m_schema_version, m_config.schema_mode,
534528
required_changes, m_config.automatically_handle_backlinks_in_migrations,
535-
wrapper, save_schema_version_on_version_decrease);
529+
wrapper);
536530
}
537531
else {
538532
ObjectStore::apply_schema_changes(transaction(), m_schema_version, schema, version, m_config.schema_mode,
539-
required_changes, m_config.automatically_handle_backlinks_in_migrations,
540-
nullptr, save_schema_version_on_version_decrease);
533+
required_changes, m_config.automatically_handle_backlinks_in_migrations);
541534
REALM_ASSERT_DEBUG(additive ||
542535
(required_changes = ObjectStore::schema_from_group(read_group()).compare(schema)).empty());
543536
}

src/realm/object-store/sync/async_open_task.cpp

Lines changed: 6 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
#include <realm/sync/subscriptions.hpp>
2222
#include <realm/sync/noinst/sync_schema_migration.hpp>
2323
#include <realm/object-store/impl/realm_coordinator.hpp>
24-
#include <realm/object-store/sync/sync_manager.hpp>
2524
#include <realm/object-store/sync/sync_session.hpp>
2625
#include <realm/object-store/thread_safe_reference.hpp>
2726

@@ -191,20 +190,10 @@ void AsyncOpenTask::migrate_schema_or_complete(AsyncOpenCallback&& callback,
191190
return;
192191
}
193192

194-
// Sync schema migrations require setting a subscription initializer callback to bootstrap the data. The
195-
// subscriptions in the current realm file may not be compatible with the new schema so cannot rely on them.
196-
auto config = coordinator->get_config();
197-
if (!config.sync_config->subscription_initializer) {
198-
status = Status(ErrorCodes::SyncSchemaMigrationError,
199-
"Sync schema migrations must provide a subscription initializer callback in the sync config");
200-
async_open_complete(std::move(callback), coordinator, status);
201-
return;
202-
}
203-
204193
// Migrate the schema.
205194
// * First upload the changes at the old schema version
206-
// * Then, delete the realm, reopen it, and bootstrap at new schema version
207-
// The lifetime of the task is extended until bootstrap completes.
195+
// * Then, pause the session, delete all tables, re-initialize the metadata, and finally restart the session.
196+
// The lifetime of the task is extended until the bootstrap completes.
208197
std::shared_ptr<AsyncOpenTask> self(shared_from_this());
209198
session->wait_for_upload_completion([callback = std::move(callback), coordinator, session, self,
210199
this](Status status) mutable {
@@ -219,38 +208,11 @@ void AsyncOpenTask::migrate_schema_or_complete(AsyncOpenCallback&& callback,
219208
return;
220209
}
221210

222-
auto future = SyncSession::Internal::pause_async(*session);
223-
// Wait until the SessionWrapper is done using the DBRef.
224-
std::move(future).get_async([callback = std::move(callback), coordinator, self, this](Status status) mutable {
225-
{
226-
util::CheckedLockGuard lock(m_mutex);
227-
if (!m_session)
228-
return; // Swallow all events if the task has been cancelled.
229-
}
230-
231-
if (!status.is_ok()) {
232-
self->async_open_complete(std::move(callback), coordinator, status);
233-
return;
234-
}
235-
236-
// Delete the realm file and reopen it.
237-
try {
238-
util::CheckedLockGuard lock(m_mutex);
239-
auto config = coordinator->get_config();
240-
m_session = nullptr;
241-
coordinator->close();
242-
coordinator = nullptr;
243-
util::File::remove(config.path);
244-
coordinator = _impl::RealmCoordinator::get_coordinator(config);
245-
m_session = coordinator->sync_session();
246-
}
247-
catch (...) {
248-
async_open_complete(std::move(callback), coordinator, exception_to_status());
249-
return;
250-
}
251-
211+
auto migration_completed_callback = [callback = std::move(callback), coordinator = std::move(coordinator),
212+
self](Status status) mutable {
252213
self->wait_for_bootstrap_or_complete(std::move(callback), coordinator, status);
253-
});
214+
};
215+
SyncSession::Internal::migrate_schema(*session, std::move(migration_completed_callback));
254216
});
255217
}
256218

src/realm/object-store/sync/sync_session.cpp

Lines changed: 64 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -371,7 +371,6 @@ SyncSession::SyncSession(Private, SyncClient& client, std::shared_ptr<DB> db, co
371371
, m_migration_store{sync::MigrationStore::create(m_db)}
372372
, m_client(client)
373373
, m_sync_manager(sync_manager)
374-
, m_previous_schema_version(_impl::sync_schema_migration::has_pending_migration(*m_db->start_read()))
375374
{
376375
REALM_ASSERT(m_config.sync_config);
377376
// we don't want the following configs enabled during a client reset
@@ -633,12 +632,12 @@ void SyncSession::handle_fresh_realm_downloaded(DBRef db, Status status,
633632
revive_if_needed();
634633
}
635634

636-
util::Future<void> SyncSession::Internal::pause_async(SyncSession& session)
635+
util::Future<void> SyncSession::pause_async()
637636
{
638637
{
639-
util::CheckedUniqueLock lock(session.m_state_mutex);
638+
util::CheckedUniqueLock lock(m_state_mutex);
640639
// Nothing to wait for if the session is already paused or inactive.
641-
if (session.m_state == SyncSession::State::Paused || session.m_state == SyncSession::State::Inactive) {
640+
if (m_state == SyncSession::State::Paused || m_state == SyncSession::State::Inactive) {
642641
return util::Future<void>::make_ready();
643642
}
644643
}
@@ -647,8 +646,8 @@ util::Future<void> SyncSession::Internal::pause_async(SyncSession& session)
647646
// must have been destroyed upon return. This allows the caller to follow up with a call to
648647
// sync::Client::notify_session_terminated() in order to be notified when the Realm file is closed. This works
649648
// so long as this SyncSession object remains in the `paused` state after the invocation of shutdown().
650-
session.pause();
651-
return session.m_client.notify_session_terminated();
649+
pause();
650+
return m_client.notify_session_terminated();
652651
}
653652

654653
void SyncSession::OnlyForTesting::handle_error(SyncSession& session, sync::SessionErrorInfo&& error)
@@ -658,7 +657,7 @@ void SyncSession::OnlyForTesting::handle_error(SyncSession& session, sync::Sessi
658657

659658
util::Future<void> SyncSession::OnlyForTesting::pause_async(SyncSession& session)
660659
{
661-
return SyncSession::Internal::pause_async(session);
660+
return session.pause_async();
662661
}
663662

664663
// This method should only be called from within the error handler callback registered upon the underlying
@@ -1401,10 +1400,10 @@ void SyncSession::update_subscription_store(bool flx_sync_requested, std::option
14011400
// waiters
14021401
auto subscription_store = std::move(m_flx_subscription_store);
14031402
lock.unlock();
1404-
subscription_store->terminate();
14051403
auto tr = m_db->start_write();
1404+
subscription_store->reset(*tr);
14061405
history.set_write_validator_factory(nullptr);
1407-
tr->rollback();
1406+
tr->commit();
14081407
}
14091408
return;
14101409
}
@@ -1670,3 +1669,59 @@ util::Future<std::string> SyncSession::send_test_command(std::string body)
16701669

16711670
return m_session->send_test_command(std::move(body));
16721671
}
1672+
1673+
void SyncSession::migrate_schema(util::UniqueFunction<void(Status)>&& callback)
1674+
{
1675+
util::CheckedUniqueLock lock(m_state_mutex);
1676+
// If the schema migration is already in progress, just wait to complete.
1677+
if (m_schema_migration_in_progress) {
1678+
add_completion_callback(std::move(callback), ProgressDirection::download);
1679+
return;
1680+
}
1681+
m_schema_migration_in_progress = true;
1682+
1683+
// Perform the migration:
1684+
// 1. Pause the sync session
1685+
// 2. Once the sync client releases the realm file:
1686+
// a. Delete all tables (private and public)
1687+
// b. Reset the subscription store
1688+
// d. Empty the sync history and adjust cursors
1689+
// e. Reset file ident (the server flags the old ident as in the case of a client reset)
1690+
// 3. Resume the session (the client asks for a new file ident)
1691+
// See `sync_schema_migration::perform_schema_migration` for more details.
1692+
1693+
CompletionCallbacks callbacks;
1694+
std::swap(m_completion_callbacks, callbacks);
1695+
auto guard = util::make_scope_exit([&]() noexcept {
1696+
util::CheckedUniqueLock lock(m_state_mutex);
1697+
if (m_completion_callbacks.empty())
1698+
std::swap(callbacks, m_completion_callbacks);
1699+
else
1700+
m_completion_callbacks.merge(std::move(callbacks));
1701+
});
1702+
m_state_mutex.unlock(lock);
1703+
1704+
auto future = pause_async();
1705+
std::move(future).get_async(
1706+
[callback = std::move(callback), weak_session = weak_from_this()](Status status) mutable {
1707+
if (!status.is_ok())
1708+
return callback(status);
1709+
1710+
auto session = weak_session.lock();
1711+
if (!session) {
1712+
status = Status(ErrorCodes::InvalidSession, "Sync session was destroyed during schema migration");
1713+
return callback(status);
1714+
}
1715+
sync_schema_migration::perform_schema_migration(*session->m_db);
1716+
{
1717+
util::CheckedUniqueLock lock(session->m_state_mutex);
1718+
session->m_previous_schema_version.reset();
1719+
session->m_schema_migration_in_progress = false;
1720+
session->m_subscription_store_base.reset();
1721+
session->m_flx_subscription_store.reset();
1722+
}
1723+
session->update_subscription_store(true, {});
1724+
session->wait_for_download_completion(std::move(callback));
1725+
session->resume();
1726+
});
1727+
}

src/realm/object-store/sync/sync_session.hpp

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -321,7 +321,10 @@ class SyncSession : public std::enable_shared_from_this<SyncSession> {
321321
return session.m_db;
322322
}
323323

324-
static util::Future<void> pause_async(SyncSession& session);
324+
static void migrate_schema(SyncSession& session, util::UniqueFunction<void(Status)>&& callback)
325+
{
326+
session.migrate_schema(std::move(callback));
327+
}
325328
};
326329

327330
// Expose some internal functionality to testing code.
@@ -434,6 +437,7 @@ class SyncSession : public std::enable_shared_from_this<SyncSession> {
434437
REQUIRES(!m_connection_state_mutex);
435438
void become_paused(util::CheckedUniqueLock) RELEASE(m_state_mutex) REQUIRES(!m_connection_state_mutex);
436439
void become_waiting_for_access_token() REQUIRES(m_state_mutex);
440+
util::Future<void> pause_async() REQUIRES(!m_state_mutex, !m_connection_state_mutex);
437441

438442
// do restart session restarts the session without freeing any of the waiters
439443
void do_restart_session(util::CheckedUniqueLock)
@@ -454,6 +458,9 @@ class SyncSession : public std::enable_shared_from_this<SyncSession> {
454458

455459
util::Future<std::string> send_test_command(std::string body) REQUIRES(!m_state_mutex);
456460

461+
void migrate_schema(util::UniqueFunction<void(Status)>&& callback)
462+
REQUIRES(!m_state_mutex, !m_config_mutex, !m_connection_state_mutex);
463+
457464
std::function<TransactionCallback> m_sync_transact_callback GUARDED_BY(m_state_mutex);
458465

459466
template <typename Field>
@@ -525,6 +532,7 @@ class SyncSession : public std::enable_shared_from_this<SyncSession> {
525532

526533
// Set if ProtocolError::schema_version_changed error is received from the server.
527534
std::optional<uint64_t> m_previous_schema_version GUARDED_BY(m_state_mutex);
535+
bool m_schema_migration_in_progress GUARDED_BY(m_state_mutex) = false;
528536
};
529537

530538
} // namespace realm

0 commit comments

Comments
 (0)