Skip to content

Commit addcabe

Browse files
authored
fix: potential fail to read the data of the row (#1793)
1 parent 4b605b6 commit addcabe

File tree

7 files changed

+84
-44
lines changed

7 files changed

+84
-44
lines changed

frontend/rust-lib/flowy-client-sync/src/client_database/block_revision_pad.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ impl std::ops::Deref for GridBlockRevisionPad {
2727
}
2828

2929
impl GridBlockRevisionPad {
30-
pub async fn duplicate_data(&self, duplicated_block_id: &str) -> DatabaseBlockRevision {
30+
pub fn duplicate_data(&self, duplicated_block_id: &str) -> DatabaseBlockRevision {
3131
let duplicated_rows = self
3232
.block
3333
.rows

frontend/rust-lib/flowy-database/src/event_handler.rs

Lines changed: 15 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -375,6 +375,7 @@ pub(crate) async fn update_select_option_handler(
375375
let changeset: SelectOptionChangeset = data.into_inner().try_into()?;
376376
let editor = manager.get_database_editor(&changeset.cell_path.database_id).await?;
377377
let field_id = changeset.cell_path.field_id.clone();
378+
let (tx, rx) = tokio::sync::oneshot::channel();
378379
editor
379380
.modify_field_rev(&field_id, |field_rev| {
380381
let mut type_option = select_type_option_from_field_rev(field_rev)?;
@@ -403,27 +404,24 @@ pub(crate) async fn update_select_option_handler(
403404
if is_changed.is_some() {
404405
field_rev.insert_type_option(&*type_option);
405406
}
406-
407-
if let Some(cell_changeset_str) = cell_changeset_str {
408-
let cloned_editor = editor.clone();
409-
tokio::spawn(async move {
410-
match cloned_editor
411-
.update_cell_with_changeset(
412-
&changeset.cell_path.row_id,
413-
&changeset.cell_path.field_id,
414-
cell_changeset_str,
415-
)
416-
.await
417-
{
418-
Ok(_) => {}
419-
Err(e) => tracing::error!("{}", e),
420-
}
421-
});
422-
}
407+
let _ = tx.send(cell_changeset_str);
423408
Ok(is_changed)
424409
})
425410
.await?;
426411

412+
if let Ok(Some(cell_changeset_str)) = rx.await {
413+
match editor
414+
.update_cell_with_changeset(
415+
&changeset.cell_path.row_id,
416+
&changeset.cell_path.field_id,
417+
cell_changeset_str,
418+
)
419+
.await
420+
{
421+
Ok(_) => {}
422+
Err(e) => tracing::error!("{}", e),
423+
}
424+
}
427425
Ok(())
428426
}
429427

frontend/rust-lib/flowy-database/src/services/block_editor.rs

Lines changed: 29 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use crate::services::retry::GetRowDataRetryAction;
12
use bytes::Bytes;
23
use flowy_client_sync::client_database::{GridBlockRevisionChangeset, GridBlockRevisionPad};
34
use flowy_client_sync::make_operations_from_revisions;
@@ -8,11 +9,14 @@ use flowy_revision::{
89
use flowy_sqlite::ConnectionPool;
910
use grid_model::{CellRevision, DatabaseBlockRevision, RowChangeset, RowRevision};
1011
use lib_infra::future::FutureResult;
12+
use lib_infra::retry::spawn_retry;
1113
use lib_ot::core::EmptyAttributes;
14+
use parking_lot::RwLock;
1215
use revision_model::Revision;
1316
use std::borrow::Cow;
1417
use std::sync::Arc;
15-
use tokio::sync::RwLock;
18+
use std::time::Duration;
19+
// use tokio::sync::RwLock;
1620

1721
pub struct DatabaseBlockRevisionEditor {
1822
#[allow(dead_code)]
@@ -53,7 +57,7 @@ impl DatabaseBlockRevisionEditor {
5357
}
5458

5559
pub async fn duplicate_block(&self, duplicated_block_id: &str) -> DatabaseBlockRevision {
56-
self.pad.read().await.duplicate_data(duplicated_block_id).await
60+
self.pad.read().duplicate_data(duplicated_block_id)
5761
}
5862

5963
/// Create a row after the the with prev_row_id. If prev_row_id is None, the row will be appended to the list
@@ -108,28 +112,38 @@ impl DatabaseBlockRevisionEditor {
108112
}
109113

110114
pub async fn index_of_row(&self, row_id: &str) -> Option<usize> {
111-
self.pad.read().await.index_of_row(row_id)
115+
self.pad.read().index_of_row(row_id)
112116
}
113117

114118
pub async fn number_of_rows(&self) -> i32 {
115-
self.pad.read().await.rows.len() as i32
119+
self.pad.read().rows.len() as i32
116120
}
117121

118122
pub async fn get_row_rev(&self, row_id: &str) -> FlowyResult<Option<(usize, Arc<RowRevision>)>> {
119-
if self.pad.try_read().is_err() {
120-
tracing::error!("Required grid block read lock failed");
121-
Ok(None)
123+
let duration = Duration::from_millis(300);
124+
if let Some(pad) = self.pad.try_read_for(duration) {
125+
Ok(pad.get_row_rev(row_id))
122126
} else {
123-
let row_rev = self.pad.read().await.get_row_rev(row_id);
124-
Ok(row_rev)
127+
tracing::error!("Required grid block read lock failed, retrying");
128+
let retry = GetRowDataRetryAction {
129+
row_id: row_id.to_owned(),
130+
pad: self.pad.clone(),
131+
};
132+
match spawn_retry(3, 300, retry).await {
133+
Ok(value) => Ok(value),
134+
Err(err) => {
135+
tracing::error!("Read row revision failed with: {}", err);
136+
Ok(None)
137+
}
138+
}
125139
}
126140
}
127141

128142
pub async fn get_row_revs<T>(&self, row_ids: Option<Vec<Cow<'_, T>>>) -> FlowyResult<Vec<Arc<RowRevision>>>
129143
where
130144
T: AsRef<str> + ToOwned + ?Sized,
131145
{
132-
let row_revs = self.pad.read().await.get_row_revs(row_ids)?;
146+
let row_revs = self.pad.read().get_row_revs(row_ids)?;
133147
Ok(row_revs)
134148
}
135149

@@ -138,19 +152,19 @@ impl DatabaseBlockRevisionEditor {
138152
field_id: &str,
139153
row_ids: Option<Vec<Cow<'_, String>>>,
140154
) -> FlowyResult<Vec<CellRevision>> {
141-
let cell_revs = self.pad.read().await.get_cell_revs(field_id, row_ids)?;
155+
let cell_revs = self.pad.read().get_cell_revs(field_id, row_ids)?;
142156
Ok(cell_revs)
143157
}
144158

145159
async fn modify<F>(&self, f: F) -> FlowyResult<()>
146160
where
147161
F: for<'a> FnOnce(&'a mut GridBlockRevisionPad) -> FlowyResult<Option<GridBlockRevisionChangeset>>,
148162
{
149-
let mut write_guard = self.pad.write().await;
150-
match f(&mut write_guard)? {
163+
let changeset = f(&mut self.pad.write())?;
164+
match changeset {
151165
None => {}
152-
Some(change) => {
153-
self.apply_change(change).await?;
166+
Some(changeset) => {
167+
self.apply_change(changeset).await?;
154168
}
155169
}
156170
Ok(())

frontend/rust-lib/flowy-database/src/services/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ pub mod grid_editor;
99
mod grid_editor_trait_impl;
1010
pub mod group;
1111
pub mod persistence;
12+
mod retry;
1213
pub mod row;
1314
pub mod setting;
1415
pub mod sort;
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
use flowy_client_sync::client_database::GridBlockRevisionPad;
2+
use flowy_error::FlowyError;
3+
use grid_model::RowRevision;
4+
use lib_infra::retry::Action;
5+
6+
use parking_lot::RwLock;
7+
use std::future::Future;
8+
use std::pin::Pin;
9+
use std::sync::Arc;
10+
11+
pub struct GetRowDataRetryAction {
12+
pub row_id: String,
13+
pub pad: Arc<RwLock<GridBlockRevisionPad>>,
14+
}
15+
16+
impl Action for GetRowDataRetryAction {
17+
type Future = Pin<Box<dyn Future<Output = Result<Self::Item, Self::Error>> + Send + Sync>>;
18+
type Item = Option<(usize, Arc<RowRevision>)>;
19+
type Error = FlowyError;
20+
21+
fn run(&mut self) -> Self::Future {
22+
let pad = self.pad.clone();
23+
let row_id = self.row_id.clone();
24+
Box::pin(async move {
25+
match pad.try_read() {
26+
None => Ok(None),
27+
Some(read_guard) => Ok(read_guard.get_row_rev(&row_id)),
28+
}
29+
})
30+
}
31+
}

shared-lib/lib-infra/src/retry/future.rs

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,7 @@ use std::{
88
pin::Pin,
99
task::{Context, Poll},
1010
};
11-
use tokio::{
12-
task::JoinHandle,
13-
time::{sleep_until, Duration, Instant, Sleep},
14-
};
11+
use tokio::time::{sleep_until, Duration, Instant, Sleep};
1512

1613
#[pin_project(project = RetryStateProj)]
1714
enum RetryState<A>
@@ -55,7 +52,7 @@ where
5552
I: Iterator<Item = Duration>,
5653
A: Action,
5754
{
58-
pub fn spawn<T: IntoIterator<IntoIter = I, Item = Duration>>(strategy: T, action: A) -> Retry<I, A> {
55+
pub fn new<T: IntoIterator<IntoIter = I, Item = Duration>>(strategy: T, action: A) -> Retry<I, A> {
5956
Retry {
6057
retry_if: RetryIf::spawn(strategy, action, (|_| true) as fn(&A::Error) -> bool),
6158
}
@@ -196,16 +193,15 @@ impl<E, F: FnMut(&E) -> bool> Condition<E> for F {
196193
}
197194

198195
pub fn spawn_retry<A: Action + 'static>(
199-
millis: u64,
200196
retry_count: usize,
197+
retry_per_millis: u64,
201198
action: A,
202-
) -> JoinHandle<Result<A::Item, A::Error>>
199+
) -> impl Future<Output = Result<A::Item, A::Error>>
203200
where
204201
A::Item: Send + Sync,
205202
A::Error: Send + Sync,
206203
<A as Action>::Future: Send + Sync,
207204
{
208-
let strategy = FixedInterval::from_millis(millis).take(retry_count);
209-
let retry = Retry::spawn(strategy, action);
210-
tokio::spawn(async move { retry.await })
205+
let strategy = FixedInterval::from_millis(retry_per_millis).take(retry_count);
206+
Retry::new(strategy, action)
211207
}

shared-lib/lib-ws/src/ws.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ impl WSController {
103103
addr,
104104
handlers: self.handlers.clone(),
105105
};
106-
let retry = Retry::spawn(strategy, action);
106+
let retry = Retry::new(strategy, action);
107107
conn_state_notify.update_state(WSConnectState::Connecting);
108108
drop(conn_state_notify);
109109

0 commit comments

Comments
 (0)