Skip to content

Commit c84fcb6

Browse files
committed
Make wait_for_(upload|download)_complete_or_client_stopped() thin wrappers around async completion
1 parent 83ba52f commit c84fcb6

File tree

1 file changed

+33
-98
lines changed

1 file changed

+33
-98
lines changed

src/realm/sync/client.cpp

Lines changed: 33 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -228,13 +228,6 @@ class SessionWrapper final : public util::AtomicRefCountBase, DB::CommitListener
228228
std::vector<WaitOperCompletionHandler> m_download_completion_handlers;
229229
std::vector<WaitOperCompletionHandler> m_sync_completion_handlers;
230230

231-
// `m_target_*load_mark` and `m_reached_*load_mark` are protected by
232-
// `m_client.m_mutex`. `m_staged_*load_mark` must only be accessed by the
233-
// event loop thread.
234-
std::int_fast64_t m_target_upload_mark = 0, m_target_download_mark = 0;
235-
std::int_fast64_t m_staged_upload_mark = 0, m_staged_download_mark = 0;
236-
std::int_fast64_t m_reached_upload_mark = 0, m_reached_download_mark = 0;
237-
238231
void on_upload_completion();
239232
void on_download_completion();
240233
void on_suspended(const SessionErrorInfo& error_info);
@@ -1338,8 +1331,12 @@ void SessionWrapper::async_wait_for(bool upload_completion, bool download_comple
13381331
REALM_ASSERT(upload_completion || download_completion);
13391332

13401333
m_client.post([self = util::bind_ptr{this}, handler = std::move(handler), upload_completion,
1341-
download_completion]() mutable {
1334+
download_completion](Status status) mutable {
13421335
REALM_ASSERT(self->m_actualized);
1336+
if (!status.is_ok()) {
1337+
handler(status); // Throws
1338+
return;
1339+
}
13431340
if (REALM_UNLIKELY(!self->m_sess)) {
13441341
// Already finalized
13451342
handler({ErrorCodes::OperationAborted, "Session finalized before callback could run"}); // Throws
@@ -1373,36 +1370,11 @@ bool SessionWrapper::wait_for_upload_complete_or_client_stopped()
13731370
// Thread safety required
13741371
REALM_ASSERT(!m_abandoned);
13751372

1376-
std::int_fast64_t target_mark;
1377-
{
1378-
util::CheckedLockGuard lock{m_client.m_mutex};
1379-
target_mark = ++m_target_upload_mark;
1380-
}
1381-
1382-
m_client.post([self = util::bind_ptr{this}, target_mark] {
1383-
REALM_ASSERT(self->m_actualized);
1384-
// The session wrapper may already have been finalized. This can only
1385-
// happen if it was abandoned, but in that case, the call of
1386-
// wait_for_upload_complete_or_client_stopped() must have returned
1387-
// already.
1388-
if (REALM_UNLIKELY(!self->m_sess))
1389-
return;
1390-
if (target_mark > self->m_staged_upload_mark) {
1391-
self->m_staged_upload_mark = target_mark;
1392-
SessionImpl& sess = *self->m_sess;
1393-
sess.request_upload_completion_notification(); // Throws
1394-
}
1395-
}); // Throws
1396-
1397-
bool completion_condition_was_satisfied;
1398-
{
1399-
util::CheckedUniqueLock lock{m_client.m_mutex};
1400-
m_client.m_wait_or_client_stopped_cond.wait(lock.native_handle(), [&]() REQUIRES(m_client.m_mutex) {
1401-
return m_reached_upload_mark >= target_mark || m_client.m_stopped;
1402-
});
1403-
completion_condition_was_satisfied = !m_client.m_stopped;
1404-
}
1405-
return completion_condition_was_satisfied;
1373+
auto pf = util::make_promise_future<bool>();
1374+
async_wait_for(true, false, [promise = std::move(pf.promise)](Status status) mutable {
1375+
promise.emplace_value(status.is_ok());
1376+
});
1377+
return pf.future.get();
14061378
}
14071379

14081380

@@ -1411,36 +1383,11 @@ bool SessionWrapper::wait_for_download_complete_or_client_stopped()
14111383
// Thread safety required
14121384
REALM_ASSERT(!m_abandoned);
14131385

1414-
std::int_fast64_t target_mark;
1415-
{
1416-
util::CheckedLockGuard lock{m_client.m_mutex};
1417-
target_mark = ++m_target_download_mark;
1418-
}
1419-
1420-
m_client.post([self = util::bind_ptr{this}, target_mark] {
1421-
REALM_ASSERT(self->m_actualized);
1422-
// The session wrapper may already have been finalized. This can only
1423-
// happen if it was abandoned, but in that case, the call of
1424-
// wait_for_download_complete_or_client_stopped() must have returned
1425-
// already.
1426-
if (REALM_UNLIKELY(!self->m_sess))
1427-
return;
1428-
if (target_mark > self->m_staged_download_mark) {
1429-
self->m_staged_download_mark = target_mark;
1430-
SessionImpl& sess = *self->m_sess;
1431-
sess.request_download_completion_notification(); // Throws
1432-
}
1433-
}); // Throws
1434-
1435-
bool completion_condition_was_satisfied;
1436-
{
1437-
util::CheckedUniqueLock lock{m_client.m_mutex};
1438-
m_client.m_wait_or_client_stopped_cond.wait(lock.native_handle(), [&]() REQUIRES(m_client.m_mutex) {
1439-
return m_reached_download_mark >= target_mark || m_client.m_stopped;
1440-
});
1441-
completion_condition_was_satisfied = !m_client.m_stopped;
1442-
}
1443-
return completion_condition_was_satisfied;
1386+
auto pf = util::make_promise_future<bool>();
1387+
async_wait_for(false, true, [promise = std::move(pf.promise)](Status status) mutable {
1388+
promise.emplace_value(status.is_ok());
1389+
});
1390+
return pf.future.get();
14441391
}
14451392

14461393

@@ -1564,6 +1511,24 @@ void SessionWrapper::force_close()
15641511
m_sess = nullptr;
15651512
// Everything is being torn down, no need to report connection state anymore
15661513
m_connection_state_change_listener = {};
1514+
1515+
// All outstanding wait operations must be canceled
1516+
while (!m_upload_completion_handlers.empty()) {
1517+
auto handler = std::move(m_upload_completion_handlers.back());
1518+
m_upload_completion_handlers.pop_back();
1519+
handler({ErrorCodes::OperationAborted, "Sync session is being closed before upload was complete"}); // Throws
1520+
}
1521+
while (!m_download_completion_handlers.empty()) {
1522+
auto handler = std::move(m_download_completion_handlers.back());
1523+
m_download_completion_handlers.pop_back();
1524+
handler(
1525+
{ErrorCodes::OperationAborted, "Sync session is being closed before download was complete"}); // Throws
1526+
}
1527+
while (!m_sync_completion_handlers.empty()) {
1528+
auto handler = std::move(m_sync_completion_handlers.back());
1529+
m_sync_completion_handlers.pop_back();
1530+
handler({ErrorCodes::OperationAborted, "Sync session is being closed before sync was complete"}); // Throws
1531+
}
15671532
}
15681533

15691534
// Must be called from event loop thread
@@ -1586,25 +1551,6 @@ void SessionWrapper::finalize()
15861551
// deactivation.
15871552
m_db->release_sync_agent();
15881553
m_db = nullptr;
1589-
1590-
// All outstanding wait operations must be canceled
1591-
while (!m_upload_completion_handlers.empty()) {
1592-
auto handler = std::move(m_upload_completion_handlers.back());
1593-
m_upload_completion_handlers.pop_back();
1594-
handler(
1595-
{ErrorCodes::OperationAborted, "Sync session is being finalized before upload was complete"}); // Throws
1596-
}
1597-
while (!m_download_completion_handlers.empty()) {
1598-
auto handler = std::move(m_download_completion_handlers.back());
1599-
m_download_completion_handlers.pop_back();
1600-
handler(
1601-
{ErrorCodes::OperationAborted, "Sync session is being finalized before download was complete"}); // Throws
1602-
}
1603-
while (!m_sync_completion_handlers.empty()) {
1604-
auto handler = std::move(m_sync_completion_handlers.back());
1605-
m_sync_completion_handlers.pop_back();
1606-
handler({ErrorCodes::OperationAborted, "Sync session is being finalized before sync was complete"}); // Throws
1607-
}
16081554
}
16091555

16101556

@@ -1636,11 +1582,6 @@ void SessionWrapper::on_upload_completion()
16361582
m_download_completion_handlers.push_back(std::move(handler)); // Throws
16371583
m_sync_completion_handlers.pop_back();
16381584
}
1639-
util::CheckedLockGuard lock{m_client.m_mutex};
1640-
if (m_staged_upload_mark > m_reached_upload_mark) {
1641-
m_reached_upload_mark = m_staged_upload_mark;
1642-
m_client.m_wait_or_client_stopped_cond.notify_all();
1643-
}
16441585
}
16451586

16461587

@@ -1663,12 +1604,6 @@ void SessionWrapper::on_download_completion()
16631604
m_flx_subscription_store->update_state(m_flx_pending_mark_version, SubscriptionSet::State::Complete);
16641605
m_flx_pending_mark_version = SubscriptionSet::EmptyVersion;
16651606
}
1666-
1667-
util::CheckedLockGuard lock{m_client.m_mutex};
1668-
if (m_staged_download_mark > m_reached_download_mark) {
1669-
m_reached_download_mark = m_staged_download_mark;
1670-
m_client.m_wait_or_client_stopped_cond.notify_all();
1671-
}
16721607
}
16731608

16741609

0 commit comments

Comments
 (0)