@@ -2,32 +2,45 @@ use crate::{util::ConnectorService, Result};
2
2
use bytes:: Bytes ;
3
3
use hyper:: Body ;
4
4
5
+ const METADATA_VERSION : u32 = 0 ;
6
+
5
7
const DEFAULT_MAX_RETRIES : usize = 5 ;
6
8
7
9
pub struct SyncContext {
10
+ db_path : String ,
8
11
sync_url : String ,
9
12
auth_token : Option < String > ,
10
13
max_retries : usize ,
14
+ /// Represents the max_frame_no from the server.
11
15
durable_frame_num : u32 ,
12
16
client : hyper:: Client < ConnectorService , Body > ,
13
17
}
14
18
15
19
impl SyncContext {
16
- pub fn new ( connector : ConnectorService , sync_url : String , auth_token : Option < String > ) -> Self {
17
- // TODO(lucio): add custom connector + tls support here
20
+ pub async fn new (
21
+ connector : ConnectorService ,
22
+ db_path : String ,
23
+ sync_url : String ,
24
+ auth_token : Option < String > ,
25
+ ) -> Result < Self > {
18
26
let client = hyper:: client:: Client :: builder ( ) . build :: < _ , hyper:: Body > ( connector) ;
19
27
20
- Self {
28
+ let mut me = Self {
29
+ db_path,
21
30
sync_url,
22
31
auth_token,
23
32
durable_frame_num : 0 ,
24
33
max_retries : DEFAULT_MAX_RETRIES ,
25
34
client,
26
- }
35
+ } ;
36
+
37
+ me. read_metadata ( ) . await ?;
38
+
39
+ Ok ( me)
27
40
}
28
41
29
42
pub ( crate ) async fn push_one_frame (
30
- & self ,
43
+ & mut self ,
31
44
frame : Bytes ,
32
45
generation : u32 ,
33
46
frame_no : u32 ,
@@ -41,6 +54,11 @@ impl SyncContext {
41
54
) ;
42
55
let max_frame_no = self . push_with_retry ( uri, frame, self . max_retries ) . await ?;
43
56
57
+ // Update our last known max_frame_no from the server.
58
+ self . durable_frame_num = max_frame_no;
59
+
60
+ self . write_metadata ( ) . await ?;
61
+
44
62
Ok ( max_frame_no)
45
63
}
46
64
@@ -93,4 +111,41 @@ impl SyncContext {
93
111
pub ( crate ) fn durable_frame_num ( & self ) -> u32 {
94
112
self . durable_frame_num
95
113
}
114
+
115
+ async fn write_metadata ( & mut self ) -> Result < ( ) > {
116
+ let path = format ! ( "{}-info" , self . db_path) ;
117
+
118
+ let contents = serde_json:: to_vec ( & MetadataJson {
119
+ version : METADATA_VERSION ,
120
+ durable_frame_num : self . durable_frame_num ,
121
+ } )
122
+ . unwrap ( ) ;
123
+
124
+ tokio:: fs:: write ( path, contents) . await . unwrap ( ) ;
125
+
126
+ Ok ( ( ) )
127
+ }
128
+
129
+ async fn read_metadata ( & mut self ) -> Result < ( ) > {
130
+ let path = format ! ( "{}-info" , self . db_path) ;
131
+
132
+ let contents = tokio:: fs:: read ( & path) . await . unwrap ( ) ;
133
+
134
+ let metadata = serde_json:: from_slice :: < MetadataJson > ( & contents[ ..] ) . unwrap ( ) ;
135
+
136
+ assert_eq ! (
137
+ metadata. version, METADATA_VERSION ,
138
+ "Reading metadata from a different version than expected"
139
+ ) ;
140
+
141
+ self . durable_frame_num = metadata. durable_frame_num ;
142
+
143
+ Ok ( ( ) )
144
+ }
145
+ }
146
+
147
+ #[ derive( serde:: Serialize , serde:: Deserialize ) ]
148
+ struct MetadataJson {
149
+ version : u32 ,
150
+ durable_frame_num : u32 ,
96
151
}
0 commit comments