Skip to content

Commit f522692

Browse files
author
Kirill Burtsev
authored
RCORE-2013 Allow non-streaming download callback on subsequent sync batches (#7561)
* Allow non-streaming download callback on subsequent sync batches and also on initial flx sync after query version 0 * Remove solved fixme and workaround in test for query version 0 * Resolve, refactor and consolidate check for SyncProgressNotifier after the fix * Delay download progress notification also until progress for new data transfer is signaled with fresh progress value and transferred bytes
1 parent 979a4d2 commit f522692

File tree

7 files changed

+472
-398
lines changed

7 files changed

+472
-398
lines changed

CHANGELOG.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
* Deleting the active user left the active user unset rather than selecting another logged-in user as the active user like logging out and removing users did. ([PR #7300](https://github.com/realm/realm-core/pull/7300)).
1414
* Fix compilation errors when using command-line `swift build` ([#7587](https://github.com/realm/realm-core/pull/7587), since v14.5.1).
1515
* Fixed crash when integrating removal of already removed dictionary key ([#7488](https://github.com/realm/realm-core/issues/7488), since v10.0.0).
16+
* Non-streaming download sync progress notification is fixed for flexible sync Realms where before it was sometimes stopping to emit values right after the registration of the callback (PR [#7561](https://github.com/realm/realm-core/issues/7561)).
1617

1718
### Breaking changes
1819
* The following things have been renamed or moved as part of moving all of the App Services functionality to the app namespace:
@@ -33,6 +34,7 @@
3334
* Sync user management has been removed from SyncManager. This functionality was already additionally available on App. ([PR #7300](https://github.com/realm/realm-core/pull/7300).
3435
* AuditConfig now has a base_file_path field which must be set by the SDK rather than inheriting it from the SyncManager. ([PR #7300](https://github.com/realm/realm-core/pull/7300).
3536
* App::switch_user() no longer returns a user. The return value was always exactly the passed-in user and any code which needs it can just use that. ([PR #7300](https://github.com/realm/realm-core/pull/7300).
37+
* Non-streaming download progress callback no longer stops reporting values immediately after the registration (if the progress update has happened earlier), but waits for the next batch of data to start syncing to report its progress, since the previous behaviour was not useful (PR [#7561](https://github.com/realm/realm-core/issues/7561)).
3638

3739
### Compatibility
3840
* Fileformat: Generates files with format v24. Reads and automatically upgrade from fileformat v10. If you want to upgrade from an earlier file format version you will have to use RealmCore v13.x.y or earlier.
@@ -298,7 +300,7 @@
298300
* Use `clonefile()` when possible in `File::copy()` on Apple platforms for faster copying. ([PR #7341](https://github.com/realm/realm-core/pull/7341)).
299301

300302
### Fixed
301-
* Fixed queries like `indexed_property == NONE {x}` which mistakenly matched on only x instead of not x. This only applies when an indexed property with equality (==, or IN) matches with `NONE` on a list of one item. If the constant list contained more than one value then it was working correctly. ([realm-js #7862](https://github.com/realm/realm-java/issues/7862), since v12.5.0)
303+
* Fixed queries like `indexed_property == NONE {x}` which mistakenly matched on only x instead of not x. This only applies when an indexed property with equality (==, or IN) matches with `NONE` on a list of one item. If the constant list contained more than one value then it was working correctly. ([realm-js #7862](https://github.com/realm/realm-java/issues/7862), since v12.5.0)
302304
* Uploading the changesets recovered during an automatic client reset recovery may lead to 'Bad server version' errors and a new client reset. ([#7279](https://github.com/realm/realm-core/issues/7279), since v13.24.1)
303305
* Fixed invalid data in error reason string when registering a subscription change notification after the subscription has already failed. ([#6839](https://github.com/realm/realm-core/issues/6839), since v11.8.0)
304306
* Fixed crash in fulltext index using prefix search with no matches ([#7309](https://github.com/realm/realm-core/issues/7309), since v13.18.0)

src/realm/object-store/sync/sync_session.cpp

Lines changed: 36 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -291,10 +291,10 @@ static bool check_for_redirect_response(const app::AppError& error)
291291
return false;
292292
}
293293

294-
util::UniqueFunction<void(util::Optional<app::AppError>)>
294+
util::UniqueFunction<void(std::optional<app::AppError>)>
295295
SyncSession::handle_refresh(const std::shared_ptr<SyncSession>& session, bool restart_session)
296296
{
297-
return [session, restart_session](util::Optional<app::AppError> error) {
297+
return [session, restart_session](std::optional<app::AppError> error) {
298298
auto session_user = session->user();
299299
if (!session_user) {
300300
util::CheckedUniqueLock lock(session->m_state_mutex);
@@ -651,7 +651,7 @@ void SyncSession::handle_error(sync::SessionErrorInfo error)
651651
{
652652
enum class NextStateAfterError { none, inactive, error };
653653
auto next_state = error.is_fatal ? NextStateAfterError::error : NextStateAfterError::none;
654-
util::Optional<ShouldBackup> delete_file;
654+
std::optional<ShouldBackup> delete_file;
655655
bool log_out_user = false;
656656
bool unrecognized_by_client = false;
657657

@@ -966,7 +966,7 @@ void SyncSession::create_sync_session()
966966
// Sets up the connection state listener. This callback is used for both reporting errors as well as changes to
967967
// the connection state.
968968
m_session->set_connection_state_change_listener(
969-
[weak_self](sync::ConnectionState state, util::Optional<sync::SessionErrorInfo> error) {
969+
[weak_self](sync::ConnectionState state, std::optional<sync::SessionErrorInfo> error) {
970970
using cs = sync::ConnectionState;
971971
ConnectionState new_state = [&] {
972972
switch (state) {
@@ -1520,15 +1520,15 @@ uint64_t SyncProgressNotifier::register_callback(std::function<ProgressNotifierC
15201520
{
15211521
std::lock_guard<std::mutex> lock(m_mutex);
15221522
token_value = m_progress_notifier_token++;
1523-
NotifierPackage package{std::move(notifier), util::none, m_local_transaction_version, is_streaming,
1523+
NotifierPackage package{std::move(notifier), m_local_transaction_version, is_streaming,
15241524
direction == NotifierType::download};
15251525
if (!m_current_progress) {
15261526
// Simply register the package, since we have no data yet.
15271527
m_packages.emplace(token_value, std::move(package));
15281528
return token_value;
15291529
}
15301530
bool skip_registration = false;
1531-
invocation = package.create_invocation(*m_current_progress, skip_registration);
1531+
invocation = package.create_invocation(*m_current_progress, skip_registration, true);
15321532
if (skip_registration) {
15331533
token_value = 0;
15341534
}
@@ -1573,10 +1573,11 @@ void SyncProgressNotifier::set_local_version(uint64_t snapshot_version)
15731573
}
15741574

15751575
util::UniqueFunction<void()>
1576-
SyncProgressNotifier::NotifierPackage::create_invocation(Progress const& current_progress, bool& is_expired)
1576+
SyncProgressNotifier::NotifierPackage::create_invocation(Progress const& current_progress, bool& is_expired,
1577+
bool initial_registration)
15771578
{
15781579
uint64_t transferred = is_download ? current_progress.downloaded : current_progress.uploaded;
1579-
uint64_t transferrable = is_download ? current_progress.downloadable : current_progress.uploadable;
1580+
uint64_t transferable = is_download ? current_progress.downloadable : current_progress.uploadable;
15801581
double progress_estimate = is_download ? current_progress.download_estimate : current_progress.upload_estimate;
15811582

15821583
// If the sync client has not yet processed all of the local
@@ -1585,21 +1586,36 @@ SyncProgressNotifier::NotifierPackage::create_invocation(Progress const& current
15851586
if (!is_download && snapshot_version > current_progress.snapshot_version)
15861587
return [] {};
15871588

1588-
if (!is_streaming) {
1589-
// The initial download size we get from the server is the uncompacted
1590-
// size, and so the download may complete before we actually receive
1591-
// that much data. When that happens, transferrable will drop and we
1592-
// need to use the new value instead of the captured one.
1593-
if (!captured_transferrable || *captured_transferrable > transferrable)
1594-
captured_transferrable = transferrable;
1595-
transferrable = *captured_transferrable;
1589+
// for download only invoke the callback on registration if is in active data transfer,
1590+
// otherwise delay notifying until an update with the new transfer signaled
1591+
if (is_download && !started_notifying && progress_estimate >= 1) {
1592+
if (initial_registration) {
1593+
initial_transferred = transferred;
1594+
return [] {};
1595+
}
1596+
else if (initial_transferred == transferred)
1597+
return [] {};
15961598
}
15971599

1598-
// A notifier is expired if at least as many bytes have been transferred
1599-
// as were originally considered transferrable.
1600-
is_expired = !is_streaming && transferred >= transferrable;
1600+
started_notifying = true;
1601+
1602+
// only capture and adjust transferable bytes for upload non-streaming to provide
1603+
// the progress of upload for the callback registered right after the commit
1604+
if (!is_streaming && !is_download) {
1605+
if (!captured_transferable || *captured_transferable > transferable)
1606+
captured_transferable = transferable;
1607+
transferable = *captured_transferable;
1608+
}
1609+
1610+
// A notifier is expired for upload if at least as many bytes have been transferred
1611+
// as were originally considered transferable based on local committed version
1612+
// on callback registration, or when simply 1.0 progress is reached for download
1613+
// since the amount of bytes is not precisely known until the end
1614+
if (!is_streaming)
1615+
is_expired = is_download ? progress_estimate >= 1 : transferred >= transferable;
1616+
16011617
return [=, notifier = notifier] {
1602-
notifier(transferred, transferrable, progress_estimate);
1618+
notifier(transferred, transferable, progress_estimate);
16031619
};
16041620
}
16051621

src/realm/object-store/sync/sync_session.hpp

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727

2828
#include <realm/util/checked_mutex.hpp>
2929
#include <realm/util/future.hpp>
30-
#include <realm/util/optional.hpp>
3130
#include <realm/version_id.hpp>
3231

3332
#include <mutex>
@@ -80,12 +79,14 @@ class SyncProgressNotifier {
8079
// can register upon this session.
8180
struct NotifierPackage {
8281
std::function<ProgressNotifierCallback> notifier;
83-
util::Optional<uint64_t> captured_transferrable;
8482
uint64_t snapshot_version;
8583
bool is_streaming;
8684
bool is_download;
87-
88-
util::UniqueFunction<void()> create_invocation(const Progress&, bool&);
85+
bool started_notifying = false;
86+
uint64_t initial_transferred = 0;
87+
std::optional<uint64_t> captured_transferable;
88+
util::UniqueFunction<void()> create_invocation(const Progress&, bool& is_expired,
89+
bool initial_registration = false);
8990
};
9091

9192
// A counter used as a token to identify progress notifier callbacks registered on this session.
@@ -96,8 +97,10 @@ class SyncProgressNotifier {
9697
// Will be `none` until we've received the initial notification from sync. Note that this
9798
// happens only once ever during the lifetime of a given `SyncSession`, since these values are
9899
// expected to semi-monotonically increase, and a lower-bounds estimate is still useful in the
99-
// event more up-to-date information isn't yet available.
100-
util::Optional<Progress> m_current_progress;
100+
// event more up-to-date information isn't yet available. FIXME: If we support transparent
101+
// client reset in the future, we might need to reset the progress state variables if the Realm
102+
// is rolled back.
103+
std::optional<Progress> m_current_progress;
101104

102105
std::unordered_map<uint64_t, NotifierPackage> m_packages;
103106
};
@@ -389,7 +392,9 @@ class SyncSession : public std::enable_shared_from_this<SyncSession> {
389392
}
390393
// }
391394

392-
static util::UniqueFunction<void(util::Optional<app::AppError>)>
395+
std::shared_ptr<SyncManager> sync_manager() const REQUIRES(!m_state_mutex);
396+
397+
static util::UniqueFunction<void(std::optional<app::AppError>)>
393398
handle_refresh(const std::shared_ptr<SyncSession>&, bool);
394399

395400
// Initialize or tear down the subscription store based on whether or not flx_sync_requested is true

src/realm/sync/client.cpp

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1131,6 +1131,11 @@ void SessionImpl::init_progress_handler()
11311131
m_wrapper.init_progress_handler();
11321132
}
11331133

1134+
void SessionImpl::enable_progress_notifications()
1135+
{
1136+
m_wrapper.m_reliable_download_progress = true;
1137+
}
1138+
11341139
void SessionImpl::notify_upload_progress()
11351140
{
11361141
if (m_state != State::Active)
@@ -1751,20 +1756,12 @@ inline void SessionWrapper::finalize_before_actualization() noexcept
17511756
inline void SessionWrapper::on_upload_progress(bool only_if_new_uploadable_data)
17521757
{
17531758
REALM_ASSERT(!m_finalized);
1754-
1755-
// don't set the flag in case of the progress change of local origin
1756-
// progress should be delayed until first DOWNLOAD message received
1757-
// since uploads are not allowed before that and can't progress
1758-
if (!only_if_new_uploadable_data)
1759-
m_reliable_download_progress = true;
1760-
17611759
report_progress(/* is_download = */ false, only_if_new_uploadable_data); // Throws
17621760
}
17631761

17641762
inline void SessionWrapper::on_download_progress(const std::optional<uint64_t>& bootstrap_store_bytes)
17651763
{
17661764
REALM_ASSERT(!m_finalized);
1767-
m_reliable_download_progress = true;
17681765
m_bootstrap_store_bytes = bootstrap_store_bytes;
17691766
report_progress(/* is_download = */ true); // Throws
17701767
}

src/realm/sync/noinst/client_impl_base.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2367,6 +2367,9 @@ Status Session::receive_download_message(const DownloadMessage& message)
23672367
bool is_flx = m_conn.is_flx_sync_connection();
23682368
int64_t query_version = is_flx ? *message.query_version : 0;
23692369

2370+
if (!is_flx || query_version > 0)
2371+
enable_progress_notifications();
2372+
23702373
// If this is a PBS connection, then every download message is its own complete batch.
23712374
bool last_in_batch = is_flx ? *message.last_in_batch : true;
23722375
auto batch_state = last_in_batch ? sync::DownloadBatchState::LastInBatch : sync::DownloadBatchState::MoreToCome;

src/realm/sync/noinst/client_impl_base.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1200,6 +1200,7 @@ class ClientImpl::Session {
12001200
bool is_steady_state_download_message(DownloadBatchState batch_state, int64_t query_version);
12011201

12021202
void init_progress_handler();
1203+
void enable_progress_notifications();
12031204
void notify_upload_progress();
12041205
void update_download_estimate(double download_estimate);
12051206
void notify_download_progress(const std::optional<uint64_t>& bootstrap_store_bytes = {});

0 commit comments

Comments
 (0)