11use std:: {
22 collections:: BTreeMap ,
3+ fs:: OpenOptions ,
34 path:: { Path , PathBuf } ,
45} ;
56
67use dora_core:: build:: BuildInfo ;
78use dora_message:: { BuildId , SessionId , common:: GitSource , id:: NodeId } ;
89use eyre:: { Context , ContextCompat } ;
10+ use fs2:: FileExt ;
911
1012#[ derive( Debug , Clone , serde:: Serialize , serde:: Deserialize ) ]
1113pub struct DataflowSession {
14+ #[ serde( default ) ]
1215 pub build_id : Option < BuildId > ,
1316 pub session_id : SessionId ,
17+ #[ serde( default ) ]
1418 pub git_sources : BTreeMap < NodeId , GitSource > ,
19+ #[ serde( default ) ]
1520 pub local_build : Option < BuildInfo > ,
1621}
1722
@@ -29,23 +34,33 @@ impl Default for DataflowSession {
2934impl DataflowSession {
3035 pub fn read_session ( dataflow_path : & Path ) -> eyre:: Result < Self > {
3136 let session_file = session_file_path ( dataflow_path) ?;
32- if session_file. exists ( ) {
33- if let Ok ( parsed) = deserialize ( & session_file) {
34- return Ok ( parsed) ;
35- } else {
36- tracing:: warn!(
37- "failed to read dataflow session file, regenerating (you might need to run `dora build` again)"
38- ) ;
37+ with_session_file_lock ( & session_file, || {
38+ if session_file. exists ( ) {
39+ match deserialize ( & session_file) {
40+ Ok ( parsed) => return Ok ( parsed) ,
41+ Err ( err) => {
42+ tracing:: warn!(
43+ "failed to parse dataflow session at `{}` ({err:#}); regenerating" ,
44+ session_file. display( )
45+ ) ;
46+ }
47+ }
3948 }
40- }
4149
42- let default_session = DataflowSession :: default ( ) ;
43- default_session. write_out_for_dataflow ( dataflow_path) ?;
44- Ok ( default_session)
50+ let default_session = DataflowSession :: default ( ) ;
51+ default_session. write_out_for_session_file ( & session_file) ?;
52+ Ok ( default_session)
53+ } )
4554 }
4655
4756 pub fn write_out_for_dataflow ( & self , dataflow_path : & Path ) -> eyre:: Result < ( ) > {
4857 let session_file = session_file_path ( dataflow_path) ?;
58+ with_session_file_lock ( & session_file, || {
59+ self . write_out_for_session_file ( & session_file)
60+ } )
61+ }
62+
63+ fn write_out_for_session_file ( & self , session_file : & Path ) -> eyre:: Result < ( ) > {
4964 let filename = session_file
5065 . file_name ( )
5166 . context ( "session file has no file name" ) ?
@@ -54,7 +69,7 @@ impl DataflowSession {
5469 if let Some ( parent) = session_file. parent ( ) {
5570 std:: fs:: create_dir_all ( parent) . context ( "failed to create out dir" ) ?;
5671 }
57- std:: fs:: write ( & session_file, self . serialize ( ) ?)
72+ std:: fs:: write ( session_file, self . serialize ( ) ?)
5873 . context ( "failed to write dataflow session file" ) ?;
5974 let gitignore = session_file. with_file_name ( ".gitignore" ) ;
6075 if gitignore. exists ( ) {
@@ -87,6 +102,44 @@ fn deserialize(session_file: &Path) -> eyre::Result<DataflowSession> {
87102 } )
88103}
89104
105+ fn with_session_file_lock < T > (
106+ session_file : & Path ,
107+ f : impl FnOnce ( ) -> eyre:: Result < T > ,
108+ ) -> eyre:: Result < T > {
109+ let lock_file = session_lock_file_path ( session_file) ?;
110+ if let Some ( parent) = lock_file. parent ( ) {
111+ std:: fs:: create_dir_all ( parent) . context ( "failed to create out dir" ) ?;
112+ }
113+
114+ let lock = OpenOptions :: new ( )
115+ . create ( true )
116+ . read ( true )
117+ . write ( true )
118+ . open ( & lock_file)
119+ . with_context ( || format ! ( "failed to open session lock file `{}`" , lock_file. display( ) ) ) ?;
120+
121+ lock. lock_exclusive ( )
122+ . with_context ( || format ! ( "failed to lock session lock file `{}`" , lock_file. display( ) ) ) ?;
123+
124+ let result = f ( ) ;
125+ lock. unlock ( ) . with_context ( || {
126+ format ! (
127+ "failed to unlock session lock file `{}`" ,
128+ lock_file. display( )
129+ )
130+ } ) ?;
131+ result
132+ }
133+
134+ fn session_lock_file_path ( session_file : & Path ) -> eyre:: Result < PathBuf > {
135+ let file_name = session_file
136+ . file_name ( )
137+ . wrap_err ( "session path has no file name" ) ?
138+ . to_str ( )
139+ . wrap_err ( "session file name is not valid utf-8" ) ?;
140+ Ok ( session_file. with_file_name ( format ! ( "{file_name}.lock" ) ) )
141+ }
142+
90143fn session_file_path ( dataflow_path : & Path ) -> eyre:: Result < PathBuf > {
91144 let file_stem = dataflow_path
92145 . file_stem ( )
@@ -98,3 +151,132 @@ fn session_file_path(dataflow_path: &Path) -> eyre::Result<PathBuf> {
98151 . join ( format ! ( "{file_stem}.dora-session.yaml" ) ) ;
99152 Ok ( session_file)
100153}
154+
155+ #[ cfg( test) ]
156+ mod tests {
157+ use super :: DataflowSession ;
158+ use std:: {
159+ fs,
160+ path:: PathBuf ,
161+ time:: { SystemTime , UNIX_EPOCH } ,
162+ } ;
163+
164+ fn test_root ( ) -> PathBuf {
165+ std:: env:: temp_dir ( ) . join ( format ! (
166+ "dora-session-test-{}-{}" ,
167+ std:: process:: id( ) ,
168+ SystemTime :: now( )
169+ . duration_since( UNIX_EPOCH )
170+ . unwrap_or_default( )
171+ . as_nanos( )
172+ ) )
173+ }
174+
175+ fn test_dataflow_path ( root : & std:: path:: Path ) -> PathBuf {
176+ let dataflow = root. join ( "dataflow.yml" ) ;
177+ fs:: create_dir_all ( root) . expect ( "failed to create test dir" ) ;
178+ fs:: write ( & dataflow, "nodes: []\n " ) . expect ( "failed to write test dataflow" ) ;
179+ dataflow
180+ }
181+
182+ fn session_file_for ( dataflow_path : & std:: path:: Path ) -> PathBuf {
183+ let stem = dataflow_path
184+ . file_stem ( )
185+ . expect ( "dataflow path should have file stem" )
186+ . to_string_lossy ( ) ;
187+ dataflow_path
188+ . with_file_name ( "out" )
189+ . join ( format ! ( "{stem}.dora-session.yaml" ) )
190+ }
191+
192+ #[ test]
193+ fn read_session_regenerates_on_invalid_yaml ( ) {
194+ let root = test_root ( ) ;
195+ let dataflow_path = test_dataflow_path ( & root) ;
196+ let session_file = session_file_for ( & dataflow_path) ;
197+ fs:: create_dir_all (
198+ session_file
199+ . parent ( )
200+ . expect ( "session file should have parent" ) ,
201+ )
202+ . expect ( "failed to create out dir" ) ;
203+ fs:: write ( & session_file, "session_id: [\n " ) . expect ( "failed to write invalid session file" ) ;
204+
205+ let result = DataflowSession :: read_session ( & dataflow_path)
206+ . expect ( "invalid yaml should be replaced with a default session" ) ;
207+
208+ assert ! ( result. build_id. is_none( ) ) ;
209+ assert ! ( result. git_sources. is_empty( ) ) ;
210+ assert ! ( result. local_build. is_none( ) ) ;
211+
212+ let after = fs:: read_to_string ( & session_file) . expect ( "failed to read session file" ) ;
213+ assert ! (
214+ after. contains( "session_id:" ) ,
215+ "session file should be regenerated"
216+ ) ;
217+ assert_ne ! (
218+ after, "session_id: [\n " ,
219+ "invalid session file must be replaced"
220+ ) ;
221+ }
222+
223+ #[ test]
224+ fn read_session_regenerates_on_truncated_yaml ( ) {
225+ let root = test_root ( ) ;
226+ let dataflow_path = test_dataflow_path ( & root) ;
227+ let session_file = session_file_for ( & dataflow_path) ;
228+ fs:: create_dir_all (
229+ session_file
230+ . parent ( )
231+ . expect ( "session file should have parent" ) ,
232+ )
233+ . expect ( "failed to create out dir" ) ;
234+ fs:: write (
235+ & session_file,
236+ "build_id: null\n session_id: 0195f7e0-3f4a-7e22-b13f-41f0327de0f8\n git_sources: [\n " ,
237+ )
238+ . expect ( "failed to write truncated session file" ) ;
239+
240+ let result = DataflowSession :: read_session ( & dataflow_path)
241+ . expect ( "truncated yaml should be replaced with a default session" ) ;
242+
243+ assert ! ( result. build_id. is_none( ) ) ;
244+ assert ! ( result. git_sources. is_empty( ) ) ;
245+ assert ! ( result. local_build. is_none( ) ) ;
246+
247+ let after = fs:: read_to_string ( & session_file) . expect ( "failed to read session file" ) ;
248+ assert ! (
249+ after. contains( "session_id:" ) ,
250+ "session file should be regenerated"
251+ ) ;
252+ assert ! (
253+ !after. contains( "git_sources: [" ) ,
254+ "broken/truncated session content should be replaced"
255+ ) ;
256+ }
257+
258+ #[ test]
259+ fn read_session_accepts_missing_optional_fields ( ) {
260+ let root = test_root ( ) ;
261+ let dataflow_path = test_dataflow_path ( & root) ;
262+ let session_file = session_file_for ( & dataflow_path) ;
263+ fs:: create_dir_all (
264+ session_file
265+ . parent ( )
266+ . expect ( "session file should have parent" ) ,
267+ )
268+ . expect ( "failed to create out dir" ) ;
269+ fs:: write (
270+ & session_file,
271+ "session_id: 0195f7e0-3f4a-7e22-b13f-41f0327de0f8\n " ,
272+ )
273+ . expect ( "failed to write legacy session file" ) ;
274+
275+ let parsed = DataflowSession :: read_session ( & dataflow_path)
276+ . expect ( "legacy session with missing optional fields should still parse" ) ;
277+
278+ assert ! ( parsed. build_id. is_none( ) ) ;
279+ assert ! ( parsed. git_sources. is_empty( ) ) ;
280+ assert ! ( parsed. local_build. is_none( ) ) ;
281+ }
282+ }
0 commit comments