Skip to content

Commit b6452b9

Browse files
RCORE-2135 Make AsyncOpenTask wait until all pending subscriptions finish bootstrapping (#7723)
* Make AsyncOpenTask to wait until all pending subscriptions finish bootstrapping * Don't use subscription machinery for pbs * Update changelog * Fix test due to scheduling
1 parent f3928e3 commit b6452b9

File tree

5 files changed

+98
-55
lines changed

5 files changed

+98
-55
lines changed

CHANGELOG.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
### Fixed
88
* <How do the end-user experience this issue? what was the impact?> ([#????](https://github.com/realm/realm-core/issues/????), since v?.?.?)
99
* Valgrind could report a branch on an uninitialized read when opening something that is not an encrypted Realm file as an encrypted Realm file ([PR #7789](https://github.com/realm/realm-core/pull/7789), since v14.10.0).
10+
* Opening an FLX realm asynchronously may not wait to download all data ([#7720](https://github.com/realm/realm-core/issues/7720), since FLX sync was introduced).
1011

1112
### Breaking changes
1213
* None.
@@ -468,7 +469,7 @@
468469
* Fixed equality queries on a Mixed property with an index possibly returning the wrong result if values of different types happened to have the same StringIndex hash. ([6407](https://github.com/realm/realm-core/issues/6407) since v11.0.0-beta.5).
469470
* If you have more than 8388606 links pointing to one specific object, the program will crash. ([#6577](https://github.com/realm/realm-core/issues/6577), since v6.0.0)
470471
* Query for NULL value in Dictionary<Mixed> would give wrong results ([6748])(https://github.com/realm/realm-core/issues/6748), since v10.0.0)
471-
* A Realm generated on a non-apple ARM 64 device and copied to another platform (and vice-versa) were non-portable due to a sorting order difference. This impacts strings or binaries that have their first difference at a non-ascii character. These items may not be found in a set, or in an indexed column if the strings had a long common prefix (> 200 characters). ([PR #6670](https://github.com/realm/realm-core/pull/6670), since 2.0.0-rc7 for indexes, and since since the introduction of sets in v10.2.0)
472+
* A Realm generated on a non-apple ARM 64 device and copied to another platform (and vice-versa) were non-portable due to a sorting order difference. This impacts strings or binaries that have their first difference at a non-ascii character. These items may not be found in a set, or in an indexed column if the strings had a long common prefix (> 200 characters). ([PR #6670](https://github.com/realm/realm-core/pull/6670), since 2.0.0-rc7 for indexes, and since the introduction of sets in v10.2.0)
472473

473474
### Breaking changes
474475
* Support for upgrading from Realm files produced by RealmCore v5.23.9 or earlier is no longer supported.

src/realm/object-store/impl/realm_coordinator.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -386,7 +386,7 @@ void RealmCoordinator::do_get_realm(RealmConfig&& config, std::shared_ptr<Realm>
386386
const auto subscription_version = current_subscription.version();
387387
// in case we are hitting this check while during a normal open, we need to take in
388388
// consideration if the db was created during this call. Since this may be the first time
389-
// we are actually creating a realm. For async open this does not apply, infact db_created
389+
// we are actually creating a realm. For async open this does not apply, in fact db_created
390390
// will always be false.
391391
if (!first_time_open)
392392
first_time_open = db_created;

src/realm/object-store/sync/async_open_task.cpp

Lines changed: 21 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ void AsyncOpenTask::start(AsyncOpenCallback callback)
5959
return;
6060
}
6161

62-
self->migrate_schema_or_complete(std::move(callback), coordinator, status);
62+
self->migrate_schema_or_complete(std::move(callback), coordinator);
6363
});
6464
session->revive_if_needed();
6565
}
@@ -108,15 +108,21 @@ void AsyncOpenTask::unregister_download_progress_notifier(uint64_t token)
108108
m_session->unregister_progress_notifier(token);
109109
}
110110

111-
void AsyncOpenTask::attach_to_subscription_initializer(AsyncOpenCallback&& callback,
112-
std::shared_ptr<_impl::RealmCoordinator> coordinator,
113-
bool rerun_on_launch)
111+
void AsyncOpenTask::wait_for_bootstrap_or_complete(AsyncOpenCallback&& callback,
112+
std::shared_ptr<_impl::RealmCoordinator> coordinator,
113+
Status status)
114114
{
115-
// Attaching the subscription initializer to the latest subscription that was committed.
116-
// This is going to be enough, for waiting that the subscription committed by init_subscription_initializer has
117-
// been completed (either if it is the first time that the file is created or if rerun on launch was set to true).
118-
// If the same Realm file is already opened, there is the possibility that this code may wait on a subscription
119-
// that was not committed by init_subscription_initializer.
115+
if (!status.is_ok()) {
116+
async_open_complete(std::move(callback), coordinator, status);
117+
return;
118+
}
119+
120+
auto config = coordinator->get_config();
121+
// FlX sync is not used so there is nothing to bootstrap.
122+
if (!config.sync_config || !config.sync_config->flx_sync_requested) {
123+
async_open_complete(std::move(callback), coordinator, status);
124+
return;
125+
}
120126

121127
SharedRealm shared_realm;
122128
try {
@@ -126,13 +132,13 @@ void AsyncOpenTask::attach_to_subscription_initializer(AsyncOpenCallback&& callb
126132
async_open_complete(std::move(callback), coordinator, exception_to_status());
127133
return;
128134
}
129-
const auto init_subscription = shared_realm->get_latest_subscription_set();
130-
const auto sub_state = init_subscription.state();
135+
const auto subscription_set = shared_realm->get_latest_subscription_set();
136+
const auto sub_state = subscription_set.state();
131137

132-
if ((sub_state != sync::SubscriptionSet::State::Complete) || (m_db_first_open && rerun_on_launch)) {
138+
if (sub_state != sync::SubscriptionSet::State::Complete) {
133139
// We need to wait until subscription initializer completes
134140
std::shared_ptr<AsyncOpenTask> self(shared_from_this());
135-
init_subscription.get_state_change_notification(sync::SubscriptionSet::State::Complete)
141+
subscription_set.get_state_change_notification(sync::SubscriptionSet::State::Complete)
136142
.get_async([self, coordinator, callback = std::move(callback)](
137143
StatusWith<realm::sync::SubscriptionSet::State> state) mutable {
138144
self->async_open_complete(std::move(callback), coordinator, state.get_status());
@@ -172,7 +178,7 @@ void AsyncOpenTask::async_open_complete(AsyncOpenCallback&& callback,
172178
}
173179

174180
void AsyncOpenTask::migrate_schema_or_complete(AsyncOpenCallback&& callback,
175-
std::shared_ptr<_impl::RealmCoordinator> coordinator, Status status)
181+
std::shared_ptr<_impl::RealmCoordinator> coordinator)
176182
{
177183
util::CheckedUniqueLock lock(m_mutex);
178184
if (!m_session)
@@ -186,7 +192,7 @@ void AsyncOpenTask::migrate_schema_or_complete(AsyncOpenCallback&& callback,
186192
}();
187193

188194
if (!pending_migration) {
189-
wait_for_bootstrap_or_complete(std::move(callback), coordinator, status);
195+
wait_for_bootstrap_or_complete(std::move(callback), coordinator, Status::OK());
190196
return;
191197
}
192198

@@ -216,19 +222,4 @@ void AsyncOpenTask::migrate_schema_or_complete(AsyncOpenCallback&& callback,
216222
});
217223
}
218224

219-
void AsyncOpenTask::wait_for_bootstrap_or_complete(AsyncOpenCallback&& callback,
220-
std::shared_ptr<_impl::RealmCoordinator> coordinator,
221-
Status status)
222-
{
223-
auto config = coordinator->get_config();
224-
if (config.sync_config && config.sync_config->flx_sync_requested &&
225-
config.sync_config->subscription_initializer && status.is_ok()) {
226-
const bool rerun_on_launch = config.sync_config->rerun_init_subscription_on_open;
227-
attach_to_subscription_initializer(std::move(callback), coordinator, rerun_on_launch);
228-
}
229-
else {
230-
async_open_complete(std::move(callback), coordinator, status);
231-
}
232-
}
233-
234225
} // namespace realm

src/realm/object-store/sync/async_open_task.hpp

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -68,10 +68,7 @@ class AsyncOpenTask : public std::enable_shared_from_this<AsyncOpenTask> {
6868

6969
void async_open_complete(AsyncOpenCallback&&, std::shared_ptr<_impl::RealmCoordinator>, Status)
7070
REQUIRES(!m_mutex);
71-
void attach_to_subscription_initializer(AsyncOpenCallback&&, std::shared_ptr<_impl::RealmCoordinator>, bool)
72-
REQUIRES(!m_mutex);
73-
void migrate_schema_or_complete(AsyncOpenCallback&&, std::shared_ptr<_impl::RealmCoordinator>, Status)
74-
REQUIRES(!m_mutex);
71+
void migrate_schema_or_complete(AsyncOpenCallback&&, std::shared_ptr<_impl::RealmCoordinator>) REQUIRES(!m_mutex);
7572
void wait_for_bootstrap_or_complete(AsyncOpenCallback&&, std::shared_ptr<_impl::RealmCoordinator>, Status)
7673
REQUIRES(!m_mutex);
7774

test/object-store/sync/flx_sync.cpp

Lines changed: 73 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1380,9 +1380,10 @@ TEST_CASE("flx: client reset", "[sync][flx][client reset][baas]") {
13801380
.get_state_change_notification(sync::SubscriptionSet::State::Complete)
13811381
.get();
13821382
CHECK(result == sync::SubscriptionSet::State::Complete);
1383-
wait_for_advance(*realm);
1383+
realm->sync_session()->shutdown_and_wait();
13841384
realm->close();
13851385
}
1386+
_impl::RealmCoordinator::assert_no_open_realms();
13861387
{
13871388
// ensure that an additional schema change after the successful reset is also accepted by the server
13881389
changed_schema[0].persisted_properties.push_back(
@@ -1393,23 +1394,19 @@ TEST_CASE("flx: client reset", "[sync][flx][client reset][baas]") {
13931394
{"str_field_2", PropertyType::String | PropertyType::Nullable},
13941395
}});
13951396
config_local.schema = changed_schema;
1396-
async_open_realm(config_local, [&](ThreadSafeReference&& ref, std::exception_ptr error) {
1397-
REQUIRE(ref);
1398-
REQUIRE_FALSE(error);
1399-
auto realm = Realm::get_shared_realm(std::move(ref));
1400-
auto table = realm->read_group().get_table("class_AddedClassSecond");
1401-
ColKey new_col = table->get_column_key("str_field_2");
1402-
REQUIRE(new_col);
1403-
auto new_subs = realm->get_latest_subscription_set().make_mutable_copy();
1404-
new_subs.insert_or_assign(Query(table).equal(new_col, "hello"));
1405-
auto subs = new_subs.commit();
1406-
realm->begin_transaction();
1407-
table->create_object_with_primary_key(Mixed{ObjectId::gen()}, {{new_col, "hello"}});
1408-
realm->commit_transaction();
1409-
subs.get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
1410-
wait_for_advance(*realm);
1411-
REQUIRE(table->size() == 1);
1412-
});
1397+
auto realm = Realm::get_shared_realm(config_local);
1398+
auto table = realm->read_group().get_table("class_AddedClassSecond");
1399+
ColKey new_col = table->get_column_key("str_field_2");
1400+
REQUIRE(new_col);
1401+
auto new_subs = realm->get_latest_subscription_set().make_mutable_copy();
1402+
new_subs.insert_or_assign(Query(table).equal(new_col, "hello"));
1403+
auto subs = new_subs.commit();
1404+
realm->begin_transaction();
1405+
table->create_object_with_primary_key(Mixed{ObjectId::gen()}, {{new_col, "hello"}});
1406+
realm->commit_transaction();
1407+
subs.get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
1408+
wait_for_advance(*realm);
1409+
REQUIRE(table->size() == 1);
14131410
}
14141411
}
14151412

@@ -4332,12 +4329,13 @@ TEST_CASE("flx: open realm + register subscription callback while bootstrapping"
43324329
"[sync][flx][bootstrap][async open][baas]") {
43334330
FLXSyncTestHarness harness("flx_bootstrap_and_subscribe");
43344331
auto foo_obj_id = ObjectId::gen();
4332+
int64_t foo_obj_queryable_int = 5;
43354333
harness.load_initial_data([&](SharedRealm realm) {
43364334
CppContext c(realm);
43374335
Object::create(c, realm, "TopLevel",
43384336
std::any(AnyDict{{"_id", foo_obj_id},
43394337
{"queryable_str_field", "foo"s},
4340-
{"queryable_int_field", static_cast<int64_t>(5)},
4338+
{"queryable_int_field", foo_obj_queryable_int},
43414339
{"non_queryable_field", "created as initial data seed"s}}));
43424340
});
43434341
SyncTestFile config(harness.app()->current_user(), harness.schema(), SyncConfig::FLXSyncEnabled{});
@@ -4594,6 +4592,62 @@ TEST_CASE("flx: open realm + register subscription callback while bootstrapping"
45944592
REQUIRE(r1_active <= 2);
45954593
}
45964594
}
4595+
4596+
SECTION("Wait to bootstrap all pending subscriptions even when subscription_initializer is not used") {
4597+
// Client 1
4598+
{
4599+
auto realm = Realm::get_shared_realm(config);
4600+
// Create subscription (version = 1) and bootstrap data.
4601+
subscribe_to_all_and_bootstrap(*realm);
4602+
realm->sync_session()->shutdown_and_wait();
4603+
4604+
// Create a new subscription (version = 2) while the session is closed.
4605+
// The new subscription does not match the object bootstrapped at version 1.
4606+
auto mutable_subscription = realm->get_latest_subscription_set().make_mutable_copy();
4607+
mutable_subscription.clear();
4608+
auto table = realm->read_group().get_table("class_TopLevel");
4609+
auto queryable_int_field = table->get_column_key("queryable_int_field");
4610+
mutable_subscription.insert_or_assign(
4611+
Query(table).not_equal(queryable_int_field, foo_obj_queryable_int));
4612+
mutable_subscription.commit();
4613+
4614+
realm->close();
4615+
}
4616+
4617+
_impl::RealmCoordinator::assert_no_open_realms();
4618+
4619+
// Client 2 uploads data matching Client 1's subscription at version 1
4620+
harness.load_initial_data([&](SharedRealm realm) {
4621+
CppContext c(realm);
4622+
Object::create(c, realm, "TopLevel",
4623+
std::any(AnyDict{{"_id", ObjectId::gen()},
4624+
{"queryable_str_field", "bar"s},
4625+
{"queryable_int_field", 2 * foo_obj_queryable_int},
4626+
{"non_queryable_field", "some data"s}}));
4627+
});
4628+
4629+
// Client 1 opens the realm asynchronously and expects the task to complete
4630+
// when the subscription at version 2 finishes bootstrapping.
4631+
auto async_open = Realm::get_synchronized_realm(config);
4632+
auto open_task_pf = util::make_promise_future<bool>();
4633+
int64_t latest_version, active_version;
4634+
auto open_realm_completed_callback =
4635+
[promise_holder = util::CopyablePromiseHolder(std::move(open_task_pf.promise)), &latest_version,
4636+
&active_version](ThreadSafeReference ref, std::exception_ptr err) mutable {
4637+
REQUIRE_FALSE(err);
4638+
auto realm = Realm::get_shared_realm(std::move(ref));
4639+
REQUIRE(realm);
4640+
latest_version = realm->get_latest_subscription_set().version();
4641+
active_version = realm->get_active_subscription_set().version();
4642+
promise_holder.get_promise().emplace_value(true);
4643+
};
4644+
async_open->start(open_realm_completed_callback);
4645+
CHECK(open_task_pf.future.get());
4646+
4647+
// Check subscription at version 2 is marked Complete.
4648+
CHECK(active_version == 2);
4649+
CHECK(latest_version == active_version);
4650+
}
45974651
}
45984652
}
45994653
TEST_CASE("flx sync: Client reset during async open", "[sync][flx][client reset][async open][baas]") {

0 commit comments

Comments
 (0)