Skip to content

Commit c280bdb

Browse files
authored
RCORE-2099 Restore progress notifier behavior when sync session is already caught up (#7681)
1 parent 1434990 commit c280bdb

File tree

7 files changed

+940
-677
lines changed

7 files changed

+940
-677
lines changed

CHANGELOG.md

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,7 @@
55
* None.
66

77
### Fixed
8-
* <How do the end-user experience this issue? what was the impact?> ([#????](https://github.com/realm/realm-core/issues/????), since v?.?.?)
9-
* None.
8+
* A non-streaming progress notifier would not immediately call its callback after registration. Instead you would have to wait for a download message to be received to get your first update - if you were already caught up when you registered the notifier you could end up waiting a long time for the server to deliver a download that would call/expire your notifier ([#7627](https://github.com/realm/realm-core/issues/7627), since v14.6.0).
109

1110
### Breaking changes
1211
* None.

src/realm/object-store/sync/sync_session.cpp

Lines changed: 50 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -821,10 +821,10 @@ void SyncSession::cancel_pending_waits(util::CheckedUniqueLock lock, Status erro
821821

822822
void SyncSession::handle_progress_update(uint64_t downloaded, uint64_t downloadable, uint64_t uploaded,
823823
uint64_t uploadable, uint64_t snapshot_version, double download_estimate,
824-
double upload_estimate)
824+
double upload_estimate, int64_t query_version)
825825
{
826826
m_progress_notifier.update(downloaded, downloadable, uploaded, uploadable, snapshot_version, download_estimate,
827-
upload_estimate);
827+
upload_estimate, query_version);
828828
}
829829

830830
static sync::Session::Config::ClientReset make_client_reset_config(const RealmConfig& base_config,
@@ -962,10 +962,10 @@ void SyncSession::create_sync_session()
962962
m_session->set_progress_handler([weak_self](uint_fast64_t downloaded, uint_fast64_t downloadable,
963963
uint_fast64_t uploaded, uint_fast64_t uploadable,
964964
uint_fast64_t snapshot_version, double download_estimate,
965-
double upload_estimate) {
965+
double upload_estimate, int64_t query_version) {
966966
if (auto self = weak_self.lock()) {
967967
self->handle_progress_update(downloaded, downloadable, uploaded, uploadable, snapshot_version,
968-
download_estimate, upload_estimate);
968+
download_estimate, upload_estimate, query_version);
969969
}
970970
});
971971

@@ -1267,7 +1267,11 @@ void SyncSession::wait_for_download_completion(util::UniqueFunction<void(Status)
12671267
uint64_t SyncSession::register_progress_notifier(std::function<ProgressNotifierCallback>&& notifier,
12681268
ProgressDirection direction, bool is_streaming)
12691269
{
1270-
return m_progress_notifier.register_callback(std::move(notifier), direction, is_streaming);
1270+
int64_t pending_query_version = 0;
1271+
if (auto sub_store = get_flx_subscription_store()) {
1272+
pending_query_version = sub_store->get_version_info().latest;
1273+
}
1274+
return m_progress_notifier.register_callback(std::move(notifier), direction, is_streaming, pending_query_version);
12711275
}
12721276

12731277
void SyncSession::unregister_progress_notifier(uint64_t token)
@@ -1519,22 +1523,23 @@ void SyncSession::did_drop_external_reference()
15191523
}
15201524

15211525
uint64_t SyncProgressNotifier::register_callback(std::function<ProgressNotifierCallback> notifier,
1522-
NotifierType direction, bool is_streaming)
1526+
NotifierType direction, bool is_streaming,
1527+
int64_t pending_query_version)
15231528
{
15241529
util::UniqueFunction<void()> invocation;
15251530
uint64_t token_value = 0;
15261531
{
15271532
std::lock_guard<std::mutex> lock(m_mutex);
15281533
token_value = m_progress_notifier_token++;
15291534
NotifierPackage package{std::move(notifier), m_local_transaction_version, is_streaming,
1530-
direction == NotifierType::download};
1535+
direction == NotifierType::download, pending_query_version};
15311536
if (!m_current_progress) {
15321537
// Simply register the package, since we have no data yet.
15331538
m_packages.emplace(token_value, std::move(package));
15341539
return token_value;
15351540
}
15361541
bool skip_registration = false;
1537-
invocation = package.create_invocation(*m_current_progress, skip_registration, true);
1542+
invocation = package.create_invocation(*m_current_progress, skip_registration);
15381543
if (skip_registration) {
15391544
token_value = 0;
15401545
}
@@ -1553,13 +1558,14 @@ void SyncProgressNotifier::unregister_callback(uint64_t token)
15531558
}
15541559

15551560
void SyncProgressNotifier::update(uint64_t downloaded, uint64_t downloadable, uint64_t uploaded, uint64_t uploadable,
1556-
uint64_t snapshot_version, double download_estimate, double upload_estimate)
1561+
uint64_t snapshot_version, double download_estimate, double upload_estimate,
1562+
int64_t query_version)
15571563
{
15581564
std::vector<util::UniqueFunction<void()>> invocations;
15591565
{
15601566
std::lock_guard<std::mutex> lock(m_mutex);
1561-
m_current_progress = Progress{uploadable, downloadable, uploaded, downloaded,
1562-
upload_estimate, download_estimate, snapshot_version};
1567+
m_current_progress = Progress{uploadable, downloadable, uploaded, downloaded,
1568+
upload_estimate, download_estimate, snapshot_version, query_version};
15631569

15641570
for (auto it = m_packages.begin(); it != m_packages.end();) {
15651571
bool should_delete = false;
@@ -1579,49 +1585,50 @@ void SyncProgressNotifier::set_local_version(uint64_t snapshot_version)
15791585
}
15801586

15811587
util::UniqueFunction<void()>
1582-
SyncProgressNotifier::NotifierPackage::create_invocation(Progress const& current_progress, bool& is_expired,
1583-
bool initial_registration)
1588+
SyncProgressNotifier::NotifierPackage::create_invocation(Progress const& current_progress, bool& is_expired)
15841589
{
1585-
uint64_t transferred = is_download ? current_progress.downloaded : current_progress.uploaded;
1590+
uint64_t transfered = is_download ? current_progress.downloaded : current_progress.uploaded;
15861591
uint64_t transferable = is_download ? current_progress.downloadable : current_progress.uploadable;
1587-
double progress_estimate = is_download ? current_progress.download_estimate : current_progress.upload_estimate;
1588-
1589-
// If the sync client has not yet processed all of the local
1590-
// transactions then the uploadable data is incorrect and we should
1591-
// not invoke the callback
1592-
if (!is_download && snapshot_version > current_progress.snapshot_version)
1593-
return [] {};
1594-
1595-
// for download only invoke the callback on registration if is in active data transfer,
1596-
// otherwise delay notifying until an update with the new transfer signaled
1597-
if (is_download && !started_notifying && progress_estimate >= 1) {
1598-
if (initial_registration) {
1599-
initial_transferred = transferred;
1600-
return [] {};
1601-
}
1602-
else if (initial_transferred == transferred)
1592+
double estimate = is_download ? current_progress.download_estimate : current_progress.upload_estimate;
1593+
1594+
if (!is_streaming) {
1595+
// If the sync client has not yet processed all of the local
1596+
// transactions then the uploadable data is incorrect and we should
1597+
// not invoke the callback
1598+
if (!is_download && snapshot_version > current_progress.snapshot_version)
16031599
return [] {};
1604-
}
16051600

1606-
started_notifying = true;
1601+
// If this is a non-streaming download progress update and this notifier was
1602+
// created for a later query version (e.g. we're currently downloading
1603+
// subscription set version zero, but subscription set version 1 existed
1604+
// when the notifier was registered), then we want to skip this callback.
1605+
if (is_download && current_progress.query_version < pending_query_version) {
1606+
return [] {};
1607+
}
16071608

1608-
// only capture and adjust transferable bytes for upload non-streaming to provide
1609-
// the progress of upload for the callback registered right after the commit
1610-
if (!is_streaming && !is_download) {
1609+
// The initial download size we get from the server is the uncompacted
1610+
// size, and so the download may complete before we actually receive
1611+
// that much data. When that happens, transferrable will drop and we
1612+
// need to use the new value instead of the captured one.
16111613
if (!captured_transferable || *captured_transferable > transferable)
16121614
captured_transferable = transferable;
16131615
transferable = *captured_transferable;
1614-
}
16151616

1616-
// A notifier is expired for upload if at least as many bytes have been transferred
1617-
// as were originally considered transferable based on local committed version
1618-
// on callback registration, or when simply 1.0 progress is reached for download
1619-
// since the amount of bytes is not precisely known until the end
1620-
if (!is_streaming)
1621-
is_expired = is_download ? progress_estimate >= 1 : transferred >= transferable;
1617+
// Since we can adjust the transferrable downwards the estimate for uploads
1618+
// won't be correct since the sync client's view of the estimate is based on
1619+
// the total number of uploadable bytes available rather than the number of
1620+
// bytes this NotifierPackage was waiting to upload.
1621+
if (!is_download) {
1622+
estimate = transferable > 0 ? std::min(transfered / double(transferable), 1.0) : 0.0;
1623+
}
1624+
}
16221625

1626+
// A notifier is expired if at least as many bytes have been transferred
1627+
// as were originally considered transferrable.
1628+
is_expired =
1629+
!is_streaming && (transfered >= transferable && (!is_download || !pending_query_version || estimate >= 1.0));
16231630
return [=, notifier = notifier] {
1624-
notifier(transferred, transferable, progress_estimate);
1631+
notifier(transfered, transferable, estimate);
16251632
};
16261633
}
16271634

src/realm/object-store/sync/sync_session.hpp

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -54,12 +54,13 @@ class SyncProgressNotifier {
5454
using ProgressNotifierCallback = void(uint64_t transferred_bytes, uint64_t transferrable_bytes,
5555
double progress_estimate);
5656

57-
uint64_t register_callback(std::function<ProgressNotifierCallback>, NotifierType direction, bool is_streaming);
57+
uint64_t register_callback(std::function<ProgressNotifierCallback>, NotifierType direction, bool is_streaming,
58+
int64_t pending_query_version);
5859
void unregister_callback(uint64_t);
5960

6061
void set_local_version(uint64_t);
6162
void update(uint64_t downloaded, uint64_t downloadable, uint64_t uploaded, uint64_t uploadable,
62-
uint64_t snapshot_version, double download_estimate = 1.0, double upload_estimate = 1.0);
63+
uint64_t snapshot_version, double download_estimate, double upload_estimate, int64_t query_version);
6364

6465
private:
6566
mutable std::mutex m_mutex;
@@ -73,6 +74,7 @@ class SyncProgressNotifier {
7374
double upload_estimate;
7475
double download_estimate;
7576
uint64_t snapshot_version;
77+
int64_t query_version;
7678
};
7779

7880
// A PODS encapsulating some information for progress notifier callbacks a binding
@@ -82,11 +84,9 @@ class SyncProgressNotifier {
8284
uint64_t snapshot_version;
8385
bool is_streaming;
8486
bool is_download;
85-
bool started_notifying = false;
86-
uint64_t initial_transferred = 0;
87+
int64_t pending_query_version = 0;
8788
std::optional<uint64_t> captured_transferable;
88-
util::UniqueFunction<void()> create_invocation(const Progress&, bool& is_expired,
89-
bool initial_registration = false);
89+
util::UniqueFunction<void()> create_invocation(const Progress&, bool& is_expired);
9090
};
9191

9292
// A counter used as a token to identify progress notifier callbacks registered on this session.
@@ -174,7 +174,7 @@ class SyncSession : public std::enable_shared_from_this<SyncSession> {
174174
// Note that bindings should dispatch the callback onto a separate thread or queue
175175
// in order to avoid blocking the sync client.
176176
uint64_t register_progress_notifier(std::function<ProgressNotifierCallback>&&, ProgressDirection,
177-
bool is_streaming);
177+
bool is_streaming) REQUIRES(!m_state_mutex);
178178

179179
// Unregister a previously registered notifier. If the token is invalid,
180180
// this method does nothing.
@@ -422,7 +422,7 @@ class SyncSession : public std::enable_shared_from_this<SyncSession> {
422422
void cancel_pending_waits(util::CheckedUniqueLock, Status) RELEASE(m_state_mutex);
423423
enum class ShouldBackup { yes, no };
424424
void update_error_and_mark_file_for_deletion(SyncError&, ShouldBackup) REQUIRES(m_state_mutex, !m_config_mutex);
425-
void handle_progress_update(uint64_t, uint64_t, uint64_t, uint64_t, uint64_t, double, double);
425+
void handle_progress_update(uint64_t, uint64_t, uint64_t, uint64_t, uint64_t, double, double, int64_t);
426426
void handle_new_flx_sync_query(int64_t version);
427427

428428
void nonsync_transact_notify(VersionID::version_type) REQUIRES(!m_state_mutex);

src/realm/sync/client.cpp

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -980,11 +980,11 @@ void SessionImpl::process_pending_flx_bootstrap()
980980
std::chrono::duration_cast<std::chrono::milliseconds>(duration).count(),
981981
pending_batch.remaining_changesets);
982982
}
983-
on_changesets_integrated(new_version.realm_version, progress, changesets_processed > 0);
984983

985984
REALM_ASSERT_3(query_version, !=, -1);
986985
on_flx_sync_progress(query_version, DownloadBatchState::LastInBatch);
987986

987+
on_changesets_integrated(new_version.realm_version, progress, changesets_processed > 0);
988988
auto action = call_debug_hook(SyncClientHookEvent::BootstrapProcessed, progress, query_version,
989989
DownloadBatchState::LastInBatch, changesets_processed);
990990
// NoAction/EarlyReturn are both valid no-op actions to take here.
@@ -1958,14 +1958,15 @@ void SessionWrapper::report_progress(bool is_download, bool only_if_new_uploadab
19581958
ss << std::fixed << std::setprecision(4) << d;
19591959
return ss.str();
19601960
};
1961-
m_sess->logger.debug("Progress handler called, downloaded = %1, downloadable = %2, estimate = %3, "
1962-
"uploaded = %4, uploadable = %5, estimate = %6, snapshot version = %7",
1963-
p.downloaded, p.downloadable, to_str(download_estimate), p.uploaded, p.uploadable,
1964-
to_str(upload_estimate), p.snapshot);
1961+
m_sess->logger.debug(
1962+
"Progress handler called, downloaded = %1, downloadable = %2, estimate = %3, "
1963+
"uploaded = %4, uploadable = %5, estimate = %6, snapshot version = %7, query_version = %8",
1964+
p.downloaded, p.downloadable, to_str(download_estimate), p.uploaded, p.uploadable,
1965+
to_str(upload_estimate), p.snapshot, m_flx_active_version);
19651966
}
19661967

19671968
m_progress_handler(p.downloaded, p.downloadable, p.uploaded, p.uploadable, p.snapshot, download_estimate,
1968-
upload_estimate);
1969+
upload_estimate, m_flx_last_seen_version);
19691970
}
19701971

19711972
util::Future<std::string> SessionWrapper::send_test_command(std::string body)

src/realm/sync/client.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ class Session {
162162
using ProgressHandler = void(std::uint_fast64_t downloaded_bytes, std::uint_fast64_t downloadable_bytes,
163163
std::uint_fast64_t uploaded_bytes, std::uint_fast64_t uploadable_bytes,
164164
std::uint_fast64_t snapshot_version, double download_estimate,
165-
double upload_estimate);
165+
double upload_estimate, int64_t query_version);
166166
using WaitOperCompletionHandler = util::UniqueFunction<void(Status)>;
167167
using SSLVerifyCallback = bool(const std::string& server_address, port_type server_port, const char* pem_data,
168168
size_t pem_size, int preverify_ok, int depth);

0 commit comments

Comments
 (0)