Skip to content

Commit 773ab26

Browse files
alexjgcscheidshikokuchuo
committed
fix DontAnnounce policy drops documents synced by clients
Problem: when we receive a sync message for a document which we don't have in storage and for whom the AnnouncePolicy returns DontAnnounce then we erroneously decide that the document is unavailable even if the incoming sync message contains data about the document. This is because we fail to process pending sync messages once the load has completed if we don't have the document available or any connected peers who we could request from. Solution: process pending sync messages before deciding that the document is unavailable. Whilst I'm here I also cleaned up the logic around the phase transition during load to make it more consistent with the rest of the phase transitions and easier to read. One important improvement is that if the document state changes mutliple times in one turn of a document actor, we only notify of the last status. This is important because it means that if a document transitions from loading through not found and into requesting in the same turn (which can happen if a load completes after receiving a sync message) then we don't notify the hub of the not found state. This is in turn important because notifying the hub of a not found state causes any outstanding find commands to complete with `None`. Fixes: #85 Co-authored-by: Carlos Scheidegger <285675+cscheid@users.noreply.github.com> Co-authored-by: shikokuchuo <53399081+shikokuchuo@users.noreply.github.com>
1 parent bf0b78c commit 773ab26

File tree

10 files changed

+262
-107
lines changed

10 files changed

+262
-107
lines changed

CHANGELOG.md

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,19 @@
22

33
## Unreleased
44

5+
### Fixed
6+
7+
* A bug where locally unavailable documents sent by peers with an announce
8+
policy set to false would be marked as unavailable
9+
510
### Added
611

7-
- `TcpDialer::new` which takes a `Url` parameter, rather than a host and a port
12+
* `TcpDialer::new` which takes a `Url` parameter, rather than a host and a port
813
or a socket address.
9-
- `Repo::dial_tcp()` to simplify construction of `TcpDialer`.
10-
- Allow documents syncing over the TCP transport to be up to 8gb size instead
14+
* `Repo::dial_tcp()` to simplify construction of `TcpDialer`.
15+
* Allow documents syncing over the TCP transport to be up to 8gb size instead
1116
of Tokio's default 8mb frame size
12-
- Exposed receiving `ConnectionHandle`s via `accept()`. Users can now subscribe
17+
* Exposed receiving `ConnectionHandle`s via `accept()`. Users can now subscribe
1318
to an `events()` stream directly on the handle, or `await` for
1419
`handshake_completed()`.
1520

samod-core/src/actors/document.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,7 @@ mod load;
4040
mod on_disk_state;
4141
pub use on_disk_state::CompactionHash;
4242
mod peer_doc_connection;
43-
mod ready;
44-
mod request;
43+
mod phase;
4544
mod spawn_args;
4645
mod with_doc_result;
4746
pub use with_doc_result::WithDocResult;

samod-core/src/actors/document/doc_actor_result.rs

Lines changed: 57 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,11 @@ use std::collections::HashMap;
33
use automerge::ChangeHash;
44

55
use crate::{
6-
ConnectionId, DocumentChanged, PeerId, StorageKey,
6+
ConnectionId, DocumentChanged, DocumentId, PeerId, StorageKey,
77
actors::{
88
DocToHubMsg,
9-
document::{SyncMessageStat, io::DocumentIoTask},
10-
messages::DocToHubMsgPayload,
9+
document::{DocumentStatus, SyncMessageStat, io::DocumentIoTask},
10+
messages::{Broadcast, DocToHubMsgPayload, SyncMessage},
1111
},
1212
io::{IoTask, IoTaskId, StorageTask},
1313
network::PeerDocState,
@@ -58,8 +58,60 @@ impl DocActorResult {
5858
}
5959

6060
/// Send a message back to the hub
61-
pub(crate) fn send_message(&mut self, message: DocToHubMsgPayload) {
62-
self.outgoing_messages.push(DocToHubMsg(message));
61+
pub(crate) fn send_sync_message(
62+
&mut self,
63+
conn_id: ConnectionId,
64+
doc_id: DocumentId,
65+
message: SyncMessage,
66+
) {
67+
self.outgoing_messages
68+
.push(DocToHubMsg(DocToHubMsgPayload::SendSyncMessage {
69+
connection_id: conn_id,
70+
document_id: doc_id,
71+
message,
72+
}));
73+
}
74+
75+
pub(crate) fn send_broadcast(&mut self, connections: Vec<ConnectionId>, msg: Broadcast) {
76+
self.outgoing_messages
77+
.push(DocToHubMsg(DocToHubMsgPayload::Broadcast {
78+
connections,
79+
msg,
80+
}));
81+
}
82+
83+
pub(crate) fn send_terminated(&mut self) {
84+
self.outgoing_messages
85+
.push(DocToHubMsg(DocToHubMsgPayload::Terminated));
86+
}
87+
88+
pub(crate) fn send_peer_states_changes(
89+
&mut self,
90+
new_states: HashMap<ConnectionId, PeerDocState>,
91+
) {
92+
// Remove previous peer state change messages as they are redundant
93+
self.outgoing_messages
94+
.retain(|m| !matches!(m.0, DocToHubMsgPayload::PeerStatesChanged { .. }));
95+
self.outgoing_messages
96+
.push(DocToHubMsg(DocToHubMsgPayload::PeerStatesChanged {
97+
new_states,
98+
}));
99+
}
100+
101+
pub(crate) fn send_doc_status_update(&mut self, new_status: DocumentStatus) {
102+
// remove any existing doc status update so that if the document status changes
103+
// multiple times during a turn, only the latest status is sent to the hub.
104+
// This is especially important to avoid bouncing through a NotFound state
105+
// when loading a document as that will cause any outstanding find commands
106+
// to fail even if the document loads successfully in this turn (as it might
107+
// if we finish loading after receiving a sync message with the document
108+
// content).
109+
self.outgoing_messages
110+
.retain(|m| !matches!(m.0, DocToHubMsgPayload::DocumentStatusChanged { .. }));
111+
self.outgoing_messages
112+
.push(DocToHubMsg(DocToHubMsgPayload::DocumentStatusChanged {
113+
new_status,
114+
}));
63115
}
64116

65117
pub(crate) fn put(&mut self, key: StorageKey, value: Vec<u8>) -> IoTaskId {

samod-core/src/actors/document/doc_state.rs

Lines changed: 37 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,21 @@ use automerge::Automerge;
66
use crate::{
77
ConnectionId, DocumentId, StorageKey, UnixTimestamp,
88
actors::{
9-
document::{DocActorResult, SyncDirection, SyncMessageStat},
10-
messages::{Broadcast, DocMessage, DocToHubMsgPayload, SyncMessage},
9+
document::{
10+
DocActorResult, SyncDirection, SyncMessageStat,
11+
phase::{
12+
loading::Loading,
13+
ready::Ready,
14+
request::{Request, RequestState},
15+
},
16+
},
17+
messages::{Broadcast, DocMessage, SyncMessage},
1118
},
1219
};
1320

1421
use super::{
1522
DocumentStatus,
1623
peer_doc_connection::{AnnouncePolicy, PeerDocConnection},
17-
ready::Ready,
18-
request::{Request, RequestState},
1924
};
2025

2126
#[derive(Debug)]
@@ -32,9 +37,7 @@ pub(super) struct DocState {
3237

3338
#[derive(Debug)]
3439
pub enum Phase {
35-
Loading {
36-
pending_sync_messages: HashMap<ConnectionId, Vec<SyncMessage>>,
37-
},
40+
Loading(Loading),
3841
Requesting(Request),
3942
Ready(Ready),
4043
NotFound,
@@ -56,9 +59,7 @@ impl DocState {
5659
any_dialer_connecting: bool,
5760
) -> Self {
5861
Self {
59-
phase: Phase::Loading {
60-
pending_sync_messages: HashMap::new(),
61-
},
62+
phase: Phase::Loading(Loading::new()),
6263
document_id,
6364
doc,
6465
any_dialer_connecting,
@@ -79,43 +80,33 @@ impl DocState {
7980
PhaseTransition::None => {}
8081
PhaseTransition::ToReady => {
8182
tracing::trace!("transitioning to ready");
82-
out.send_message(DocToHubMsgPayload::DocumentStatusChanged {
83-
new_status: DocumentStatus::Ready,
84-
});
83+
out.send_doc_status_update(DocumentStatus::Ready);
8584
out.emit_doc_changed(self.doc.get_heads());
8685
self.phase = Phase::Ready(Ready::new());
8786
}
8887
PhaseTransition::ToNotFound => {
8988
tracing::trace!("transitioning to NotFound");
90-
out.send_message(DocToHubMsgPayload::DocumentStatusChanged {
91-
new_status: DocumentStatus::NotFound,
92-
});
89+
out.send_doc_status_update(DocumentStatus::NotFound);
9390
if let Phase::Requesting(request) = &self.phase {
9491
for peer in request.peers_waiting_for_us_to_respond() {
95-
out.send_message(DocToHubMsgPayload::SendSyncMessage {
96-
connection_id: peer,
97-
document_id: self.document_id.clone(),
98-
message: SyncMessage::DocUnavailable,
99-
});
92+
out.send_sync_message(
93+
peer,
94+
self.document_id.clone(),
95+
SyncMessage::DocUnavailable,
96+
);
10097
}
10198
}
10299
self.phase = Phase::NotFound;
103100
}
104101
PhaseTransition::ToRequesting(request) => {
105102
tracing::trace!("transitioning to requesting");
106-
out.send_message(DocToHubMsgPayload::DocumentStatusChanged {
107-
new_status: DocumentStatus::Requesting,
108-
});
103+
out.send_doc_status_update(DocumentStatus::Requesting);
109104
self.phase = Phase::Requesting(request);
110105
}
111106
PhaseTransition::ToLoading => {
112107
tracing::trace!("transitioning to loading");
113-
out.send_message(DocToHubMsgPayload::DocumentStatusChanged {
114-
new_status: DocumentStatus::Loading,
115-
});
116-
self.phase = Phase::Loading {
117-
pending_sync_messages: HashMap::new(),
118-
};
108+
out.send_doc_status_update(DocumentStatus::Loading);
109+
self.phase = Phase::Loading(Loading::new());
119110
}
120111
}
121112
}
@@ -159,8 +150,9 @@ impl DocState {
159150
}
160151
// self.save_state
161152
// .add_on_disk(snapshots.into_keys().chain(incrementals.into_keys()));
162-
if matches!(self.phase, Phase::Loading { .. }) {
163-
if self.doc.get_heads().is_empty() {
153+
if let Phase::Loading(loading) = &mut self.phase {
154+
let pending_sync_messages = loading.take_pending_sync_messages();
155+
let phase_transition = if self.doc.get_heads().is_empty() {
164156
let eligible_conns = peer_connections
165157
.values()
166158
.any(|p| p.announce_policy() != AnnouncePolicy::DontAnnounce);
@@ -170,47 +162,23 @@ impl DocState {
170162
self.any_dialer_connecting,
171163
"no data found on disk, requesting document"
172164
);
173-
let mut next_phase = Phase::Requesting(Request::new(
165+
PhaseTransition::ToRequesting(Request::new(
174166
self.document_id.clone(),
175167
peer_connections.values(),
176-
));
177-
std::mem::swap(&mut self.phase, &mut next_phase);
178-
let Phase::Loading {
179-
pending_sync_messages,
180-
} = next_phase
181-
else {
182-
unreachable!("we already checked");
183-
};
184-
for (conn_id, msgs) in pending_sync_messages {
185-
for msg in msgs {
186-
self.handle_sync_message(now, out, conn_id, peer_connections, msg, now);
187-
}
188-
}
189-
out.send_message(DocToHubMsgPayload::DocumentStatusChanged {
190-
new_status: DocumentStatus::Requesting,
191-
});
168+
))
192169
} else {
193170
tracing::debug!(
194171
"no data found on disk and no connections available, transitioning to NotFound"
195172
);
196-
self.handle_phase_transition(out, PhaseTransition::ToNotFound);
173+
PhaseTransition::ToNotFound
197174
}
198-
return;
199-
}
175+
} else {
176+
tracing::trace!("load complete, transitioning to ready");
177+
PhaseTransition::ToReady
178+
};
200179

201-
tracing::trace!("load complete, transitioning to ready");
180+
self.handle_phase_transition(out, phase_transition);
202181

203-
let mut next_phase = Phase::Ready(Ready::new());
204-
std::mem::swap(&mut self.phase, &mut next_phase);
205-
let Phase::Loading {
206-
pending_sync_messages,
207-
} = next_phase
208-
else {
209-
unreachable!("we already checked");
210-
};
211-
out.send_message(DocToHubMsgPayload::DocumentStatusChanged {
212-
new_status: DocumentStatus::Ready,
213-
});
214182
for (conn_id, msgs) in pending_sync_messages {
215183
for msg in msgs {
216184
self.handle_sync_message(now, out, conn_id, peer_connections, msg, now);
@@ -254,10 +222,7 @@ impl DocState {
254222
}
255223
})
256224
.collect();
257-
out.send_message(DocToHubMsgPayload::Broadcast {
258-
connections: targets,
259-
msg: Broadcast::Gossip { msg },
260-
});
225+
out.send_broadcast(targets, Broadcast::Gossip { msg });
261226
}
262227
DocMessage::Sync(msg) => self.handle_sync_message(
263228
now,
@@ -298,13 +263,8 @@ impl DocState {
298263
};
299264

300265
let (transition, duration) = match &mut self.phase {
301-
Phase::Loading {
302-
pending_sync_messages,
303-
} => {
304-
pending_sync_messages
305-
.entry(connection_id)
306-
.or_default()
307-
.push(msg);
266+
Phase::Loading(loading) => {
267+
loading.receive_sync_message(connection_id, msg);
308268
(PhaseTransition::None, None)
309269
}
310270
Phase::Requesting(request) => {
@@ -380,11 +340,8 @@ impl DocState {
380340
) -> HashMap<ConnectionId, Vec<SyncMessage>> {
381341
let mut result: HashMap<ConnectionId, Vec<SyncMessage>> = HashMap::new();
382342
for (conn_id, peer_conn) in peer_connections {
383-
if let Phase::Loading {
384-
pending_sync_messages,
385-
} = &self.phase
386-
{
387-
out.pending_sync_messages = pending_sync_messages.values().map(|v| v.len()).sum();
343+
if let Phase::Loading(loading) = &self.phase {
344+
out.pending_sync_messages = loading.pending_msg_count();
388345
continue;
389346
}
390347

samod-core/src/actors/document/document_actor.rs

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -70,9 +70,7 @@ impl DocumentActor {
7070

7171
let state = if let Some(doc) = initial_content {
7272
// Let the hub know this document is ready immediately if we already have content
73-
out.send_message(DocToHubMsgPayload::DocumentStatusChanged {
74-
new_status: DocumentStatus::Ready,
75-
});
73+
out.send_doc_status_update(DocumentStatus::Ready);
7674
DocState::new_ready(document_id.clone(), doc, any_dialer_pending)
7775
} else {
7876
DocState::new_loading(document_id.clone(), Automerge::new(), any_dialer_pending)
@@ -400,7 +398,7 @@ impl DocumentActor {
400398
if self.run_state == RunState::Stopping {
401399
if self.on_disk_state.is_flushed() {
402400
self.run_state = RunState::Stopped;
403-
out.send_message(DocToHubMsgPayload::Terminated);
401+
out.send_terminated();
404402
out.stopped = true;
405403
}
406404
return;
@@ -458,11 +456,7 @@ impl DocumentActor {
458456
.generate_sync_messages(now, out, &mut self.peer_connections)
459457
{
460458
for msg in msgs {
461-
out.send_message(DocToHubMsgPayload::SendSyncMessage {
462-
connection_id: conn_id,
463-
document_id: doc_id.clone(),
464-
message: msg,
465-
});
459+
out.send_sync_message(conn_id, doc_id.clone(), msg);
466460
}
467461
}
468462
}
@@ -475,7 +469,7 @@ impl DocumentActor {
475469
.collect::<HashMap<_, _>>();
476470
if !states.is_empty() {
477471
out.peer_state_changes = states.clone();
478-
out.send_message(DocToHubMsgPayload::PeerStatesChanged { new_states: states })
472+
out.send_peer_states_changes(states)
479473
}
480474
}
481475

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
pub(crate) mod loading;
2+
pub(crate) mod ready;
3+
pub(crate) mod request;

0 commit comments

Comments
 (0)