@@ -43,21 +43,24 @@ impl<IO: Io> Replicator<IO> {
43
43
pub fn frame_stream ( & mut self ) -> impl Stream < Item = Result < Box < Frame > > > + ' _ {
44
44
async_stream:: try_stream! {
45
45
loop {
46
+ // First we decide up to what frame_no we want to replicate in this step. If we are
47
+ // already up to date, wait for something to happen
46
48
let most_recent_frame_no = * self
47
49
. new_frame_notifier
48
50
. wait_for( |fno| * fno > self . next_frame_no)
49
51
. await
50
52
. expect( "channel cannot be closed because we hold a ref to the sending end" ) ;
51
53
52
54
let mut commit_frame_no = 0 ;
55
+ // we have stuff to replicate
53
56
if most_recent_frame_no > self . next_frame_no {
57
+ // first replicate the most recent version of each page from the current
58
+ // segment. We also return how far we have replicated from the current log
54
59
let current = self . shared. current. load( ) ;
55
60
let mut seen = RoaringBitmap :: new( ) ;
56
61
let ( stream, replicated_until, size_after) = current. frame_stream_from( self . next_frame_no, & mut seen) ;
57
62
let should_replicate_from_tail = replicated_until != self . next_frame_no;
58
63
59
-
60
- // replicate from current
61
64
{
62
65
tokio:: pin!( stream) ;
63
66
@@ -76,8 +79,8 @@ impl<IO: Io> Replicator<IO> {
76
79
}
77
80
}
78
81
79
-
80
- // replicate from tail
82
+ // Replicating from the current segment wasn't enough to bring us up to date,
83
+ // wee need to take frames from the sealed segments.
81
84
if should_replicate_from_tail {
82
85
let replicated_until = {
83
86
let ( stream, replicated_until) = current
@@ -104,6 +107,8 @@ impl<IO: Io> Replicator<IO> {
104
107
should_replicate_from_storage. then_some( replicated_until)
105
108
} ;
106
109
110
+ // Replicating from sealed segments was not enough, so we replicate from
111
+ // durable storage
107
112
if let Some ( replicated_until) = replicated_until {
108
113
let stream = self
109
114
. shared
0 commit comments