@@ -4,10 +4,8 @@ use std::io::{BufWriter, ErrorKind, Write};
4
4
use std:: mem:: size_of;
5
5
use std:: ops:: Deref ;
6
6
use std:: path:: { Path , PathBuf } ;
7
- use std:: sync:: {
8
- atomic:: { AtomicU64 , Ordering } ,
9
- Arc ,
10
- } ;
7
+ use std:: sync:: Arc ;
8
+ use std:: sync:: atomic:: { AtomicU64 , Ordering } ;
11
9
12
10
use chrono:: prelude:: { DateTime , Utc } ;
13
11
use fst:: { Map , MapBuilder , Streamer } ;
@@ -212,7 +210,7 @@ where
212
210
}
213
211
214
212
impl < F : FileExt > SealedSegment < F > {
215
- pub fn open ( file : Arc < F > , path : PathBuf , read_locks : Arc < AtomicU64 > ) -> Result < Option < Self > > {
213
+ pub fn open ( file : Arc < F > , path : PathBuf , read_locks : Arc < AtomicU64 > , now : DateTime < Utc > ) -> Result < Option < Self > > {
216
214
let mut header: SegmentHeader = SegmentHeader :: new_zeroed ( ) ;
217
215
file. read_exact_at ( header. as_bytes_mut ( ) , 0 ) ?;
218
216
@@ -230,7 +228,7 @@ impl<F: FileExt> SealedSegment<F> {
230
228
// recover the index, and seal the segment.
231
229
if !header. flags ( ) . contains ( SegmentFlags :: SEALED ) {
232
230
assert_eq ! ( header. index_offset. get( ) , 0 , "{header:?}" ) ;
233
- return Self :: recover ( file, path, header) . map ( Some ) ;
231
+ return Self :: recover ( file, path, header, now ) . map ( Some ) ;
234
232
}
235
233
236
234
let mut slice = vec ! [ 0 ; index_len as usize ] ;
@@ -248,7 +246,7 @@ impl<F: FileExt> SealedSegment<F> {
248
246
} ) )
249
247
}
250
248
251
- fn recover ( file : Arc < F > , path : PathBuf , mut header : SegmentHeader ) -> Result < Self > {
249
+ fn recover ( file : Arc < F > , path : PathBuf , mut header : SegmentHeader , now : DateTime < Utc > ) -> Result < Self > {
252
250
assert ! ( !header. is_empty( ) ) ;
253
251
assert_eq ! ( header. index_size. get( ) , 0 ) ;
254
252
assert_eq ! ( header. index_offset. get( ) , 0 ) ;
@@ -335,6 +333,7 @@ impl<F: FileExt> SealedSegment<F> {
335
333
header. index_size = index_size. into ( ) ;
336
334
header. last_commited_frame_no = last_committed. into ( ) ;
337
335
header. size_after = size_after. into ( ) ;
336
+ header. sealed_at_timestamp = ( now. timestamp_millis ( ) as u64 ) . into ( ) ;
338
337
let flags = header. flags ( ) ;
339
338
header. set_flags ( flags | SegmentFlags :: SEALED ) ;
340
339
header. recompute_checksum ( ) ;
0 commit comments