Skip to content

Commit c6b7d3d

Browse files
committed
Derive upload completion entirely from the state of the history
Rather than tracking a bunch of derived state in-memory, check for upload completion by checking if there are any unuploaded changesets. This is both multiprocess-compatible and is more precise than the old checks, which had some false-negatives.
1 parent c84fcb6 commit c6b7d3d

File tree

11 files changed

+132
-211
lines changed

11 files changed

+132
-211
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
* Fixed a change of mode from Strong to All when removing links from an embedded object that links to a tombstone. This affects sync apps that use embedded objects which have a `Lst<Mixed>` that contains a link to another top level object which has been deleted by another sync client (creating a tombstone locally). In this particular case, the switch would cause any remaining link removals to recursively delete the destination object if there were no other links to it. ([#7828](https://github.com/realm/realm-core/issues/7828), since 14.0.0-beta.0)
1010
* Fixed removing backlinks from the wrong objects if the link came from a nested list, nested dictionary, top-level dictionary, or list of mixed, and the source table had more than 256 objects. This could manifest as `array_backlink.cpp:112: Assertion failed: int64_t(value >> 1) == key.value` when removing an object. ([#7594](https://github.com/realm/realm-core/issues/7594), since v11 for dictionaries)
1111
* Fixed the collapse/rejoin of clusters which contained nested collections with links. This could manifest as `array.cpp:319: Array::move() Assertion failed: begin <= end [2, 1]` when removing an object. ([#7839](https://github.com/realm/realm-core/issues/7839), since the introduction of nested collections in v14.0.0-beta.0)
12+
* wait_for_upload_completion() was inconsistent in how it handled commits which did not produce any changesets to upload. Previously it would sometimes complete immediately if all commits waiting to be uploaded were empty, and at other times it would wait for a server roundtrip. It will now always complete immediately. ([PR #7796](https://github.com/realm/realm-core/pull/7796)).
1213

1314
### Breaking changes
1415
* None.
@@ -20,6 +21,7 @@
2021

2122
### Internals
2223
* Fixed `Table::remove_object_recursive` which wouldn't recursively follow links through a single `Mixed` property. This feature is exposed publicly on `Table` but no SDK currently uses it, so this is considered internal. ([#7829](https://github.com/realm/realm-core/issues/7829), likely since the introduction of Mixed)
24+
* Upload completion is now tracked in a multiprocess-compatible manner ([PR #7796](https://github.com/realm/realm-core/pull/7796)).
2325

2426
----------------------------------------------
2527

src/realm/chunked_binary.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,13 @@ bool ChunkedBinaryData::is_null() const
4545
return chunk.is_null();
4646
}
4747

48+
bool ChunkedBinaryData::empty() const
49+
{
50+
BinaryIterator copy = m_begin;
51+
BinaryData chunk = copy.get_next();
52+
return chunk.size() == 0;
53+
}
54+
4855
char ChunkedBinaryData::operator[](size_t index) const
4956
{
5057
BinaryIterator copy = m_begin;

src/realm/chunked_binary.hpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,9 @@ class ChunkedBinaryData {
5454
/// the first chunk points to the nullptr.
5555
bool is_null() const;
5656

57+
/// Equivalent to `size() == 0`, but O(1) rather than O(N).
58+
bool empty() const;
59+
5760
/// FIXME: O(n)
5861
char operator[](size_t index) const;
5962

src/realm/sync/client.cpp

Lines changed: 50 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,8 @@ class SessionWrapper final : public util::AtomicRefCountBase, DB::CommitListener
228228
std::vector<WaitOperCompletionHandler> m_download_completion_handlers;
229229
std::vector<WaitOperCompletionHandler> m_sync_completion_handlers;
230230

231-
void on_upload_completion();
231+
version_type m_upload_completion_requested_version = -1;
232+
232233
void on_download_completion();
233234
void on_suspended(const SessionErrorInfo& error_info);
234235
void on_resumed();
@@ -238,7 +239,9 @@ class SessionWrapper final : public util::AtomicRefCountBase, DB::CommitListener
238239
void on_flx_sync_version_complete(int64_t version);
239240

240241
void init_progress_handler();
241-
void report_progress();
242+
void check_progress();
243+
void report_progress(ReportedProgress& p, DownloadableProgress downloadable);
244+
void report_upload_completion(version_type);
242245

243246
friend class SessionWrapperStack;
244247
friend class ClientImpl::Session;
@@ -736,23 +739,14 @@ void SessionImpl::initiate_integrate_changesets(std::uint_fast64_t downloadable_
736739
// Fake it for "dry run" mode
737740
client_version = m_last_version_available + 1;
738741
}
739-
on_changesets_integrated(client_version, progress, !changesets.empty()); // Throws
742+
on_changesets_integrated(client_version, progress); // Throws
740743
}
741744
catch (const IntegrationException& e) {
742745
on_integration_failure(e);
743746
}
744747
}
745748

746749

747-
void SessionImpl::on_upload_completion()
748-
{
749-
// Ignore the call if the session is not active
750-
if (m_state == State::Active) {
751-
m_wrapper.on_upload_completion(); // Throws
752-
}
753-
}
754-
755-
756750
void SessionImpl::on_download_completion()
757751
{
758752
// Ignore the call if the session is not active
@@ -842,7 +836,6 @@ bool SessionImpl::process_flx_bootstrap_message(const SyncProgress& progress, Do
842836
REALM_ASSERT_EX(hook_action == SyncClientHookAction::NoAction, hook_action);
843837

844838
if (batch_state == DownloadBatchState::MoreToCome) {
845-
notify_sync_progress();
846839
return true;
847840
}
848841

@@ -935,7 +928,7 @@ void SessionImpl::process_pending_flx_bootstrap()
935928
REALM_ASSERT_3(query_version, !=, -1);
936929
on_flx_sync_progress(query_version, DownloadBatchState::LastInBatch);
937930

938-
on_changesets_integrated(new_version.realm_version, progress, changesets_processed > 0);
931+
on_changesets_integrated(new_version.realm_version, progress);
939932
auto action = call_debug_hook(SyncClientHookEvent::BootstrapProcessed, progress, query_version,
940933
DownloadBatchState::LastInBatch, changesets_processed);
941934
// NoAction/EarlyReturn are both valid no-op actions to take here.
@@ -1090,14 +1083,6 @@ void SessionImpl::enable_progress_notifications()
10901083
m_wrapper.m_reliable_download_progress = true;
10911084
}
10921085

1093-
void SessionImpl::notify_sync_progress()
1094-
{
1095-
if (m_state != State::Active)
1096-
return;
1097-
1098-
m_wrapper.report_progress();
1099-
}
1100-
11011086
util::Future<std::string> SessionImpl::send_test_command(std::string body)
11021087
{
11031088
if (m_state != State::Active) {
@@ -1301,7 +1286,7 @@ void SessionWrapper::on_commit(version_type new_version)
13011286
return; // Already finalized
13021287
SessionImpl& sess = *self->m_sess;
13031288
sess.recognize_sync_version(new_version); // Throws
1304-
self->report_progress(); // Throws
1289+
self->check_progress(); // Throws
13051290
});
13061291
}
13071292

@@ -1343,6 +1328,7 @@ void SessionWrapper::async_wait_for(bool upload_completion, bool download_comple
13431328
return;
13441329
}
13451330
if (upload_completion) {
1331+
self->m_upload_completion_requested_version = self->m_db->get_version_of_latest_snapshot();
13461332
if (download_completion) {
13471333
// Wait for upload and download completion
13481334
self->m_sync_completion_handlers.push_back(std::move(handler)); // Throws
@@ -1358,7 +1344,7 @@ void SessionWrapper::async_wait_for(bool upload_completion, bool download_comple
13581344
}
13591345
SessionImpl& sess = *self->m_sess;
13601346
if (upload_completion)
1361-
sess.request_upload_completion_notification(); // Throws
1347+
self->check_progress();
13621348
if (download_completion)
13631349
sess.request_download_completion_notification(); // Throws
13641350
}); // Throws
@@ -1484,7 +1470,7 @@ void SessionWrapper::actualize()
14841470
}
14851471

14861472
if (!m_client_reset_config)
1487-
report_progress(); // Throws
1473+
check_progress(); // Throws
14881474
}
14891475

14901476
void SessionWrapper::force_close()
@@ -1569,24 +1555,14 @@ inline void SessionWrapper::finalize_before_actualization() noexcept
15691555
m_db = nullptr;
15701556
}
15711557

1572-
void SessionWrapper::on_upload_completion()
1573-
{
1574-
REALM_ASSERT(!m_finalized);
1575-
while (!m_upload_completion_handlers.empty()) {
1576-
auto handler = std::move(m_upload_completion_handlers.back());
1577-
m_upload_completion_handlers.pop_back();
1578-
handler(Status::OK()); // Throws
1579-
}
1580-
while (!m_sync_completion_handlers.empty()) {
1581-
auto handler = std::move(m_sync_completion_handlers.back());
1582-
m_download_completion_handlers.push_back(std::move(handler)); // Throws
1583-
m_sync_completion_handlers.pop_back();
1584-
}
1585-
}
1586-
1587-
15881558
void SessionWrapper::on_download_completion()
15891559
{
1560+
// Ensure that progress handlers get called before completion handlers. The
1561+
// download completing performed a commit and will trigger progress
1562+
// notifications asynchronously, but they would arrive after the download
1563+
// completion without this.
1564+
check_progress();
1565+
15901566
while (!m_download_completion_handlers.empty()) {
15911567
auto handler = std::move(m_download_completion_handlers.back());
15921568
m_download_completion_handlers.pop_back();
@@ -1642,26 +1618,52 @@ void SessionWrapper::on_connection_state_changed(ConnectionState state,
16421618

16431619
void SessionWrapper::init_progress_handler()
16441620
{
1645-
ClientHistory::get_upload_download_bytes(m_db.get(), m_final_downloaded, m_final_uploaded);
1621+
ClientHistory::get_upload_download_state(m_db.get(), m_final_downloaded, m_final_uploaded);
16461622
}
16471623

1648-
void SessionWrapper::report_progress()
1624+
void SessionWrapper::check_progress()
16491625
{
16501626
REALM_ASSERT(!m_finalized);
16511627
REALM_ASSERT(m_sess);
16521628

1629+
if (!m_progress_handler && m_upload_completion_handlers.empty() && m_sync_completion_handlers.empty())
1630+
return;
1631+
1632+
version_type uploaded_version;
1633+
ReportedProgress p;
1634+
DownloadableProgress downloadable;
1635+
ClientHistory::get_upload_download_state(*m_db, p.downloaded, downloadable, p.uploaded, p.uploadable, p.snapshot,
1636+
uploaded_version);
1637+
1638+
report_progress(p, downloadable);
1639+
report_upload_completion(uploaded_version);
1640+
}
1641+
1642+
void SessionWrapper::report_upload_completion(version_type uploaded_version)
1643+
{
1644+
if (uploaded_version < m_upload_completion_requested_version)
1645+
return;
1646+
1647+
std::move(m_sync_completion_handlers.begin(), m_sync_completion_handlers.end(),
1648+
std::back_inserter(m_download_completion_handlers));
1649+
m_sync_completion_handlers.clear();
1650+
1651+
while (!m_upload_completion_handlers.empty()) {
1652+
auto handler = std::move(m_upload_completion_handlers.back());
1653+
m_upload_completion_handlers.pop_back();
1654+
handler(Status::OK()); // Throws
1655+
}
1656+
}
1657+
1658+
void SessionWrapper::report_progress(ReportedProgress& p, DownloadableProgress downloadable)
1659+
{
16531660
if (!m_progress_handler)
16541661
return;
16551662

16561663
// Ignore progress messages from before we first receive a DOWNLOAD message
16571664
if (!m_reliable_download_progress)
16581665
return;
16591666

1660-
ReportedProgress p;
1661-
DownloadableProgress downloadable;
1662-
ClientHistory::get_upload_download_bytes(m_db.get(), p.downloaded, downloadable, p.uploaded, p.uploadable,
1663-
p.snapshot);
1664-
16651667
auto calculate_progress = [](uint64_t transferred, uint64_t transferable, uint64_t final_transferred) {
16661668
REALM_ASSERT_DEBUG_EX(final_transferred <= transferred, final_transferred, transferred, transferable);
16671669
REALM_ASSERT_DEBUG_EX(transferred <= transferable, final_transferred, transferred, transferable);

src/realm/sync/noinst/client_history_impl.cpp

Lines changed: 50 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ std::vector<ClientHistory::LocalChange> ClientHistory::get_local_changes(version
157157
// find_sync_history_entry() returns 0 to indicate not found and
158158
// otherwise adds 1 to the version, and then get_reciprocal_transform()
159159
// subtracts 1 from the version
160-
if (auto changeset = get_reciprocal_transform(version + 1, compressed); changeset.size()) {
160+
if (auto changeset = get_reciprocal_transform(version + 1, compressed); !changeset.empty()) {
161161
changesets.push_back({version, changeset});
162162
}
163163
}
@@ -344,7 +344,7 @@ void ClientHistory::set_sync_progress(const SyncProgress& progress, Downloadable
344344
ensure_updated(local_version); // Throws
345345
prepare_for_write(); // Throws
346346

347-
update_sync_progress(progress, downloadable_bytes, wt); // Throws
347+
update_sync_progress(progress, downloadable_bytes); // Throws
348348

349349
// Note: This transaction produces an empty changeset. Empty changesets are
350350
// not uploaded to the server.
@@ -489,17 +489,17 @@ void ClientHistory::integrate_server_changesets(
489489
// During the bootstrap phase in flexible sync, the server sends multiple download messages with the same
490490
// synthetic server version that represents synthetic changesets generated from state on the server.
491491
if (batch_state == DownloadBatchState::LastInBatch && changesets_to_integrate.empty()) {
492-
update_sync_progress(progress, downloadable_bytes, transact); // Throws
492+
update_sync_progress(progress, downloadable_bytes); // Throws
493493
}
494494
// Always update progress for download messages from steady state.
495495
else if (batch_state == DownloadBatchState::SteadyState && !changesets_to_integrate.empty()) {
496496
auto partial_progress = progress;
497497
partial_progress.download.server_version = last_changeset.remote_version;
498498
partial_progress.download.last_integrated_client_version = last_changeset.last_integrated_local_version;
499-
update_sync_progress(partial_progress, downloadable_bytes, transact); // Throws
499+
update_sync_progress(partial_progress, downloadable_bytes); // Throws
500500
}
501501
else if (batch_state == DownloadBatchState::SteadyState && changesets_to_integrate.empty()) {
502-
update_sync_progress(progress, downloadable_bytes, transact); // Throws
502+
update_sync_progress(progress, downloadable_bytes); // Throws
503503
}
504504
if (run_in_write_tr) {
505505
run_in_write_tr(transact, changesets_for_cb);
@@ -610,33 +610,65 @@ size_t ClientHistory::transform_and_apply_server_changesets(util::Span<Changeset
610610
}
611611

612612

613-
void ClientHistory::get_upload_download_bytes(DB* db, std::uint_fast64_t& downloaded_bytes,
613+
void ClientHistory::get_upload_download_state(DB& db, std::uint_fast64_t& downloaded_bytes,
614614
DownloadableProgress& downloadable_bytes,
615615
std::uint_fast64_t& uploaded_bytes,
616616
std::uint_fast64_t& uploadable_bytes,
617-
std::uint_fast64_t& snapshot_version)
617+
std::uint_fast64_t& snapshot_version, version_type& uploaded_version)
618618
{
619-
TransactionRef rt = db->start_read(); // Throws
619+
TransactionRef rt = db.start_read(); // Throws
620620
version_type current_client_version = rt->get_version();
621621

622622
downloaded_bytes = 0;
623623
downloadable_bytes = uint64_t(0);
624624
uploaded_bytes = 0;
625625
uploadable_bytes = 0;
626626
snapshot_version = current_client_version;
627+
uploaded_version = 0;
627628

628629
using gf = _impl::GroupFriend;
629-
if (ref_type ref = gf::get_history_ref(*rt)) {
630-
Array root(db->get_alloc());
631-
root.init_from_ref(ref);
632-
downloaded_bytes = root.get_as_ref_or_tagged(s_progress_downloaded_bytes_iip).get_as_int();
633-
downloadable_bytes = root.get_as_ref_or_tagged(s_progress_downloadable_bytes_iip).get_as_int();
634-
uploadable_bytes = root.get_as_ref_or_tagged(s_progress_uploadable_bytes_iip).get_as_int();
635-
uploaded_bytes = root.get_as_ref_or_tagged(s_progress_uploaded_bytes_iip).get_as_int();
630+
ref_type ref = gf::get_history_ref(*rt);
631+
if (!ref)
632+
return;
633+
634+
Array root(db.get_alloc());
635+
root.init_from_ref(ref);
636+
downloaded_bytes = root.get_as_ref_or_tagged(s_progress_downloaded_bytes_iip).get_as_int();
637+
downloadable_bytes = root.get_as_ref_or_tagged(s_progress_downloadable_bytes_iip).get_as_int();
638+
uploadable_bytes = root.get_as_ref_or_tagged(s_progress_uploadable_bytes_iip).get_as_int();
639+
uploaded_bytes = root.get_as_ref_or_tagged(s_progress_uploaded_bytes_iip).get_as_int();
640+
641+
uploaded_version = root.get_as_ref_or_tagged(s_progress_upload_client_version_iip).get_as_int();
642+
if (uploaded_version == current_client_version)
643+
return;
644+
645+
BinaryColumn changesets(db.get_alloc());
646+
changesets.init_from_ref(root.get_as_ref(s_changesets_iip));
647+
IntegerBpTree origin_file_idents(db.get_alloc());
648+
origin_file_idents.init_from_ref(root.get_as_ref(s_origin_file_idents_iip));
649+
650+
// `base_version` is the oldest version we have history for. If this is
651+
// greater than uploaded_version, all of the versions in between the two had
652+
// empty changesets and did not need to be uploaded. If this is less than
653+
// uploaded_version, we have changesets which have been uploaded but the
654+
// server has not yet told us we can delete and we may need to use for merging.
655+
auto base_version = current_client_version - changesets.size();
656+
if (uploaded_version < base_version) {
657+
uploaded_version = base_version;
658+
}
659+
660+
auto count = size_t(current_client_version - uploaded_version);
661+
for (size_t i = changesets.size() - count; i < changesets.size(); ++i) {
662+
if (origin_file_idents.get(i) == 0) {
663+
size_t pos = 0;
664+
if (changesets.get_at(i, pos).size() != 0)
665+
break;
666+
}
667+
++uploaded_version;
636668
}
637669
}
638670

639-
void ClientHistory::get_upload_download_bytes(DB* db, std::uint_fast64_t& downloaded_bytes,
671+
void ClientHistory::get_upload_download_state(DB* db, std::uint_fast64_t& downloaded_bytes,
640672
std::uint_fast64_t& uploaded_bytes)
641673
{
642674
TransactionRef rt = db->start_read(); // Throws
@@ -711,7 +743,7 @@ auto ClientHistory::find_sync_history_entry(Arrays& arrays, version_type base_ve
711743
bool not_from_server = (origin_file_ident == 0);
712744
if (not_from_server) {
713745
ChunkedBinaryData chunked_changeset(arrays.changesets, offset + i);
714-
if (chunked_changeset.size() > 0) {
746+
if (!chunked_changeset.empty()) {
715747
entry.origin_file_ident = file_ident_type(origin_file_ident);
716748
entry.remote_version = last_integrated_server_version;
717749
entry.origin_timestamp = timestamp_type(arrays.origin_timestamps.get(offset + i));
@@ -845,8 +877,7 @@ void ClientHistory::add_sync_history_entry(const HistoryEntry& entry)
845877
}
846878

847879

848-
void ClientHistory::update_sync_progress(const SyncProgress& progress, DownloadableProgress downloadable_bytes,
849-
TransactionRef)
880+
void ClientHistory::update_sync_progress(const SyncProgress& progress, DownloadableProgress downloadable_bytes)
850881
{
851882
Array& root = m_arrays->root;
852883

0 commit comments

Comments
 (0)