@@ -10,13 +10,13 @@ use zerocopy::FromZeroes;
10
10
use crate :: io:: buf:: ZeroCopyBoxIoBuf ;
11
11
use crate :: segment:: Frame ;
12
12
use crate :: storage:: backend:: FindSegmentReq ;
13
- use crate :: storage:: Storage ;
13
+ use crate :: storage:: { Storage } ;
14
14
15
15
use super :: Result ;
16
16
17
17
pub trait ReplicateFromStorage : Sync + Send + ' static {
18
- fn stream < ' a > (
19
- & ' a self ,
18
+ fn stream < ' a , ' b > (
19
+ & ' b self ,
20
20
seen : & ' a mut RoaringBitmap ,
21
21
current : u64 ,
22
22
until : u64 ,
@@ -38,16 +38,18 @@ impl<S> ReplicateFromStorage for StorageReplicator<S>
38
38
where
39
39
S : Storage ,
40
40
{
41
- fn stream < ' a > (
42
- & ' a self ,
41
+ fn stream < ' a , ' b > (
42
+ & ' b self ,
43
43
seen : & ' a mut roaring:: RoaringBitmap ,
44
44
mut current : u64 ,
45
45
until : u64 ,
46
46
) -> Pin < Box < dyn Stream < Item = Result < Box < Frame > > > + Send + ' a > > {
47
+ let storage = self . storage . clone ( ) ;
48
+ let namespace = self . namespace . clone ( ) ;
47
49
Box :: pin ( async_stream:: try_stream! {
48
50
loop {
49
- let key = self . storage. find_segment( & self . namespace, FindSegmentReq :: EndFrameNoLessThan ( current) , None ) . await ?;
50
- let index = self . storage. fetch_segment_index( & self . namespace, & key, None ) . await ?;
51
+ let key = storage. find_segment( & namespace, FindSegmentReq :: EndFrameNoLessThan ( current) , None ) . await ?;
52
+ let index = storage. fetch_segment_index( & namespace, & key, None ) . await ?;
51
53
let mut pages = index. into_stream( ) ;
52
54
let mut maybe_seg = None ;
53
55
while let Some ( ( page, offset) ) = pages. next( ) {
58
60
let segment = match maybe_seg {
59
61
Some ( ref seg) => seg,
60
62
None => {
61
- maybe_seg = Some ( self . storage. fetch_segment_data( & self . namespace, & key, None ) . await ?) ;
63
+ maybe_seg = Some ( storage. fetch_segment_data( & namespace, & key, None ) . await ?) ;
62
64
maybe_seg. as_ref( ) . unwrap( )
63
65
} ,
64
66
} ;
0 commit comments