Skip to content

Commit c9384c7

Browse files
committed
.
1 parent 706eb6d commit c9384c7

File tree

7 files changed

+128
-28
lines changed

7 files changed

+128
-28
lines changed

shared/src/sync_engine/optimistic/db/index.rs

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use typesafe_idb::{Index, IndexSpec};
88
use crate::sync_engine::optimistic::optimistic_changes::OptimisticChanges;
99

1010
use super::reactivity_trackers::ReactivityTrackers;
11-
use super::Error;
11+
use super::{Error, MaybeOptimistic};
1212

1313
#[derive(derive_more::Constructor)]
1414
pub struct IndexWithOptimisticChanges<'txn, IS> {
@@ -18,7 +18,7 @@ pub struct IndexWithOptimisticChanges<'txn, IS> {
1818
txn_location: &'static Location<'static>,
1919
}
2020
impl<IS: IndexSpec> IndexWithOptimisticChanges<'_, IS> {
21-
pub async fn get(&self, id: &IS::Type) -> Result<Option<IS::Store>, super::Error> {
21+
pub async fn get(&self, id: &IS::Type) -> Result<Option<MaybeOptimistic<IS::Store>>, super::Error> {
2222
self.reactivity_trackers
2323
.borrow_mut()
2424
.add_bulk_read(IS::Store::NAME);
@@ -44,7 +44,8 @@ impl<IS: IndexSpec> IndexWithOptimisticChanges<'_, IS> {
4444
.optimistic_changes
4545
.updates
4646
.latest_downcasted(id)
47-
.or(Some(row)))
47+
.map(|o| MaybeOptimistic::new(o, true))
48+
.or(Some(MaybeOptimistic::new(row, false))))
4849
}
4950

5051
pub(crate) async fn no_optimism_get(
@@ -58,7 +59,7 @@ impl<IS: IndexSpec> IndexWithOptimisticChanges<'_, IS> {
5859
self.inner.get(id).await
5960
}
6061

61-
pub async fn get_all(&self, value: Option<&IS::Type>) -> Result<Vec<IS::Store>, Error> {
62+
pub async fn get_all(&self, value: Option<&IS::Type>) -> Result<Vec<MaybeOptimistic<IS::Store>>, Error> {
6263
self.reactivity_trackers
6364
.borrow_mut()
6465
.add_bulk_read(IS::Store::NAME);
@@ -79,22 +80,24 @@ impl<IS: IndexSpec> IndexWithOptimisticChanges<'_, IS> {
7980
self.optimistic_changes
8081
.updates
8182
.latest_downcasted(r.id())
82-
.unwrap_or(r)
83+
.map(|o| MaybeOptimistic::new(o, true))
84+
.unwrap_or(MaybeOptimistic::new(r, false))
8385
});
8486
let mut all = Vec::from_iter(from_db_filtered);
8587

8688
let optimistic_creations = self
8789
.optimistic_changes
8890
.creations
89-
.all_the_latest_downcasted();
91+
.all_the_latest_downcasted() ;
9092
if let Some(value) = value {
9193
all.extend(
9294
optimistic_creations
9395
.into_iter()
94-
.filter(|row| IS::get_index_value(row) == value),
96+
.filter(|row| IS::get_index_value(row) == value)
97+
.map(|o| MaybeOptimistic::new(o, true))
9598
);
9699
} else {
97-
all.extend(optimistic_creations)
100+
all.extend(optimistic_creations.into_iter().map(|o| MaybeOptimistic::new(o, true)));
98101
}
99102
Ok(all)
100103
}

shared/src/sync_engine/optimistic/db/object_store.rs

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,26 @@ pub struct MaybeOptimistic<S> {
2525
is_optimistic: bool,
2626
}
2727

28+
impl<S> MaybeOptimistic<S> {
29+
pub fn into_inner(self) -> S {
30+
self.inner
31+
}
32+
33+
pub fn map<T>(self, f: impl FnOnce(S) -> T) -> MaybeOptimistic<T> {
34+
MaybeOptimistic {
35+
inner: f(self.inner),
36+
is_optimistic: self.is_optimistic,
37+
}
38+
}
39+
40+
pub fn map_ref<T>(&self, f: impl FnOnce(&S) -> T) -> MaybeOptimistic<T> {
41+
MaybeOptimistic {
42+
inner: f(&self.inner),
43+
is_optimistic: self.is_optimistic,
44+
}
45+
}
46+
}
47+
2848
impl<S, Mode> ObjectStoreWithOptimisticChanges<S, Mode>
2949
where
3050
S: Store + 'static,
@@ -150,6 +170,11 @@ where
150170
.map_err(|e| Error::new(e, self.location))?;
151171
self.optimistic_changes
152172
.remove_obsoletes_for_id::<S>(item.id());
173+
174+
if let Some(commit_listener) = self.commit_listener.as_ref() {
175+
commit_listener(&self.reactivity_trackers.borrow());
176+
}
177+
153178
Ok(())
154179
}
155180
pub fn update(&self, row: S, update_fut: impl Future<Output = Result<(), ()>> + 'static) {
@@ -180,4 +205,4 @@ where
180205
tracing::info!("invoked commit_listener() from within ObjectStore::create()");
181206
}
182207
}
183-
}
208+
}

shared/src/sync_engine/optimistic/optimistic_changes.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ impl OptimisticChanges {
101101
}
102102

103103
pub fn remove_obsoletes_for_id<S: Store>(&self, id: &S::Id) {
104+
tracing::info!("Called remove obseletes_for_id: {id:?}");
104105
self.deletes.remove_all_successful::<S>(id, &());
105106
self.updates.remove_all_successful::<S>(id, &());
106107
self.creations

shared/src/sync_engine/websocket_updates/transport.rs

Lines changed: 77 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,17 @@
1-
use std::fmt::Debug;
21
use codee::{Decoder, Encoder};
32
use futures::{Sink, Stream};
43
use pin_project::pin_project;
4+
use std::fmt::Debug;
55
use std::task::{ready, Poll};
66
use url::Url;
77

88
use crate::{
99
endpoints::defns::api::websocket_updates::{ClientMsg, ServerMsg},
10-
sync_engine::websocket_updates::binary_transport::{BinaryTransportError, BinaryTransportTrait, ConnOrClosedError, JsonSerdeToBinaryCodec},
10+
sync_engine::websocket_updates::binary_transport::{
11+
BinaryTransportError, BinaryTransportTrait, ConnOrClosedError, JsonSerdeToBinaryCodec,
12+
},
1113
};
1214

13-
14-
1515
#[pin_project]
1616
pub struct Transport<I> {
1717
#[pin]
@@ -34,7 +34,8 @@ where
3434

3535
fn start_send(self: std::pin::Pin<&mut Self>, item: ClientMsg) -> Result<(), Self::Error> {
3636
let this = self.project();
37-
let encoded = JsonSerdeToBinaryCodec::encode(&item).map_err(BinaryTransportError::Encode)?;
37+
let encoded =
38+
JsonSerdeToBinaryCodec::encode(&item).map_err(BinaryTransportError::Encode)?;
3839
this.inner
3940
.start_send(encoded)
4041
.map_err(BinaryTransportError::Conn)
@@ -95,14 +96,15 @@ where
9596
}
9697
}
9798

98-
pub trait TransportTrait: Sized + Sink<ClientMsg> + Stream<Item = Result<ServerMsg, Self::TransportError>> {
99+
pub trait TransportTrait:
100+
Sized + Sink<ClientMsg> + Stream<Item = Result<ServerMsg, Self::TransportError>>
101+
{
99102
type TransportError: Debug;
100103

101104
#[allow(async_fn_in_trait)]
102105
async fn establish(url: &Url) -> Result<Self, Self::TransportError>;
103106
}
104107

105-
106108
impl<T> TransportTrait for Transport<T>
107109
where
108110
T: BinaryTransportTrait,
@@ -116,4 +118,72 @@ where
116118

117119
Ok(Transport { inner })
118120
}
121+
}
122+
123+
#[cfg(test)]
124+
pub mod tests {
125+
use std::{pin::Pin, task::{Context, Poll}};
126+
127+
use futures::{channel::mpsc, Sink, Stream};
128+
129+
use crate::endpoints::defns::api::websocket_updates::{ClientMsg, ServerMsg};
130+
131+
pub struct MockTransportHandler {
132+
send: mpsc::Sender<ServerMsg>,
133+
recv: mpsc::Receiver<ClientMsg>,
134+
}
135+
136+
pub struct TestTransport {
137+
recv: mpsc::Receiver<ServerMsg>,
138+
send: mpsc::Sender<ClientMsg>,
139+
}
140+
141+
impl TestTransport {
142+
pub fn new() -> (Self, MockTransportHandler) {
143+
let (server_msg_sender, server_msg_receiver) = mpsc::channel(100);
144+
let (client_msg_sender, client_msg_receiver) = mpsc::channel(100);
145+
146+
(
147+
Self { recv: server_msg_receiver, send: client_msg_sender },
148+
MockTransportHandler { send: server_msg_sender, recv: client_msg_receiver },
149+
)
150+
}
151+
}
152+
153+
impl Stream for TestTransport {
154+
type Item = Result<ServerMsg, mpsc::SendError>;
155+
156+
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
157+
Pin::new(&mut self.recv).poll_next(cx).map(|opt| opt.map(Ok))
158+
}
159+
}
160+
161+
impl Sink<ClientMsg> for TestTransport {
162+
type Error = mpsc::SendError;
163+
164+
fn poll_ready(
165+
mut self: Pin<&mut Self>,
166+
cx: &mut Context<'_>,
167+
) -> Poll<Result<(), Self::Error>> {
168+
Pin::new(&mut self.send).poll_ready(cx)
169+
}
170+
171+
fn start_send(mut self: Pin<&mut Self>, item: ClientMsg) -> Result<(), Self::Error> {
172+
Pin::new(&mut self.send).start_send(item)
173+
}
174+
175+
fn poll_flush(
176+
mut self: Pin<&mut Self>,
177+
cx: &mut Context<'_>,
178+
) -> Poll<Result<(), Self::Error>> {
179+
Pin::new(&mut self.send).poll_flush(cx)
180+
}
181+
182+
fn poll_close(
183+
mut self: Pin<&mut Self>,
184+
cx: &mut Context<'_>,
185+
) -> Poll<Result<(), Self::Error>> {
186+
Pin::new(&mut self.send).poll_close(cx)
187+
}
188+
}
119189
}

web/src/app/repository/issues_tab/list.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
use jiff::{fmt::strtime, Timestamp};
22
use leptos::prelude::*;
33
use macros::zwang_url;
4-
use shared::types::{
4+
use shared::{sync_engine::optimistic::db::MaybeOptimistic, types::{
55
issue::{Issue, RepositoryIdIndex},
66
issue_comment::{IssueComment, IssueIdIndex},
77
user::User,
8-
};
8+
}};
99

1010
use itertools::Itertools;
1111
use zwang_router::{ArgFromParent, RouteParams, A};
@@ -107,7 +107,7 @@ pub fn IssuesList(
107107

108108
#[component]
109109
fn IssueRow(
110-
issue: Issue,
110+
issue: MaybeOptimistic<Issue>,
111111
#[prop(into)] owner_name: Signal<String>,
112112
#[prop(into)] repo_name: Signal<String>,
113113
#[prop(optional)] is_last: bool,

web/src/app/repository/issues_tab/one_issue.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ use crate::{
2525
thirds::Thirds,
2626
},
2727
frontend_error::FrontendError,
28-
idb_signal::IdbSignal,
2928
idb_signal_from_sync_engine::IdbSignalFromSyncEngine,
3029
};
3130

@@ -47,7 +46,7 @@ pub fn OneIssue(
4746
Ok(i) => i,
4847
Err(_) => return view! { <NotFound /> }.into_any(),
4948
};
50-
let issue_and_user: IdbSignal<Result<Option<_>, _>> = sync_engine.idb_signal(
49+
let issue_and_user = sync_engine.idb_signal(
5150
move |txn| txn.with_store::<User>().with_store::<Issue>().build(),
5251
move |txn| async move {
5352
let issue = txn
@@ -71,6 +70,10 @@ pub fn OneIssue(
7170
},
7271
);
7372

73+
Effect::new(move || {
74+
tracing::info!("issue_and_user: {:?}", issue_and_user.get());
75+
});
76+
7477
(move || {
7578
issue_and_user.get().map(|issue_and_user| {
7679
let sync_engine = use_sync_engine();
@@ -105,8 +108,6 @@ pub fn OneIssue(
105108
Ok(issue_comments.into_iter().zip(users).collect::<Vec<_>>())
106109
});
107110

108-
109-
110111
Ok(
111112
view! {
112113
<div>
@@ -165,6 +166,7 @@ pub fn OneIssue(
165166
each=move || issue_comment_and_users.clone()
166167
key=|(ic, _)| ic.id
167168
children=|(issue_comment, user)| {
169+
let issue_comment = issue_comment.into_inner();
168170
view! {
169171
<IssueCommentBox
170172
body=issue_comment.body.to_option()

web/src/app/repository/mod.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,11 @@
11
use crate::signal_ext::*;
22
use leptos::{prelude::*, task::spawn_local};
33
use shared::{
4-
types::{
4+
sync_engine::optimistic::db::MaybeOptimistic, types::{
55
self,
66
repository::Repository,
77
user::{self, User},
8-
},
9-
utils::LogErr,
8+
}, utils::LogErr
109
};
1110
use top_bar::TopBar;
1211
use zwang_router::{set_pathname, Outlet, ParsedPath, RouteParams};
@@ -55,8 +54,8 @@ impl From<Tab> for RootOwnerNameRepoName {
5554

5655
#[derive(Clone, Debug, PartialEq, Eq)]
5756
pub struct RepositoryPageContextInner {
58-
repository: Repository,
59-
user: User
57+
repository: MaybeOptimistic<Repository>,
58+
user: MaybeOptimistic<User>
6059
}
6160

6261
pub type RepositoryPageContext = Memo<RepositoryPageContextInner>;

0 commit comments

Comments
 (0)