@@ -5,6 +5,7 @@ use itertools::Itertools;
55use serde:: ser:: SerializeSeq ;
66use serde:: { Deserialize , Serialize } ;
77use sqlx:: PgPool ;
8+ use std:: borrow:: Cow ;
89use std:: collections:: BTreeMap ;
910use std:: path:: { Path , PathBuf } ;
1011use yaml_rust2:: YamlEmitter ;
@@ -24,10 +25,11 @@ const FILENAME_PREFIX_MAX_LENGTH: usize = 128;
2425
2526struct TargetExportData < ' a > {
2627 schema : & ' a Vec < schema:: FieldSchema > ,
28+ // The purpose is to make rows sorted by primary key.
2729 data : BTreeMap < value:: KeyValue , & ' a value:: FieldValues > ,
2830}
2931
30- impl < ' a > Serialize for TargetExportData < ' a > {
32+ impl Serialize for TargetExportData < ' _ > {
3133 fn serialize < S > ( & self , serializer : S ) -> Result < S :: Ok , S :: Error >
3234 where
3335 S : serde:: Serializer ,
@@ -46,7 +48,11 @@ impl<'a> Serialize for TargetExportData<'a> {
4648#[ derive( Serialize ) ]
4749struct SourceOutputData < ' a > {
4850 key : value:: TypedValue < ' a > ,
51+
52+ #[ serde( skip_serializing_if = "Option::is_none" ) ]
4953 exports : Option < IndexMap < & ' a str , TargetExportData < ' a > > > ,
54+
55+ #[ serde( skip_serializing_if = "Option::is_none" ) ]
5056 error : Option < String > ,
5157}
5258
@@ -103,14 +109,12 @@ impl<'a> Dumper<'a> {
103109 export_op. name . as_str ( ) ,
104110 TargetExportData {
105111 schema : & self . schema . collectors [ collector_idx] . spec . fields ,
106- data : collected_values_buffer
112+ data : collected_values_buffer[ collector_idx ]
107113 . iter ( )
108114 . map ( |v| -> Result < _ > {
109- let key = indexer:: extract_primary_key (
110- & export_op. primary_key_def ,
111- & v[ collector_idx] ,
112- ) ?;
113- Ok ( ( key, & v[ collector_idx] ) )
115+ let key =
116+ indexer:: extract_primary_key ( & export_op. primary_key_def , v) ?;
117+ Ok ( ( key, v) )
114118 } )
115119 . collect :: < Result < _ > > ( ) ?,
116120 } ,
@@ -125,7 +129,7 @@ impl<'a> Dumper<'a> {
125129 & self ,
126130 source_op : & AnalyzedSourceOp ,
127131 key : value:: KeyValue ,
128- file_name : PathBuf ,
132+ file_path : PathBuf ,
129133 ) -> Result < ( ) > {
130134 let mut collected_values_buffer = Vec :: new ( ) ;
131135 let ( exports, error) = match self
@@ -138,24 +142,24 @@ impl<'a> Dumper<'a> {
138142 let key_value = value:: Value :: from ( key) ;
139143 let file_data = SourceOutputData {
140144 key : value:: TypedValue {
141- t : & self . schema . fields [ source_op. output . field_idx as usize ]
142- . value_type
143- . typ ,
145+ t : & source_op. primary_key_type ,
144146 v : & key_value,
145147 } ,
146148 exports,
147149 error,
148150 } ;
151+
149152 let yaml_output = {
150153 let mut yaml_output = String :: new ( ) ;
151154 let yaml_data = YamlSerializer :: serialize ( & file_data) ?;
152155 let mut yaml_emitter = YamlEmitter :: new ( & mut yaml_output) ;
156+ yaml_emitter. multiline_strings ( true ) ;
157+ yaml_emitter. compact ( true ) ;
153158 yaml_emitter. dump ( & yaml_data) ?;
154159 yaml_output
155160 } ;
156- let mut file_path = file_name;
157- file_path. push ( ".yaml" ) ;
158161 tokio:: fs:: write ( file_path, yaml_output) . await ?;
162+
159163 Ok ( ( ) )
160164 }
161165
@@ -177,22 +181,21 @@ impl<'a> Dumper<'a> {
177181 ) ;
178182 keys_by_filename_prefix. entry ( s) . or_default ( ) . push ( key) ;
179183 }
180-
181- let mut file_path_base =
182- PathBuf :: from ( & self . options . output_dir ) . join ( source_op. name . as_str ( ) ) ;
183- file_path_base. push ( ":" ) ;
184+ let output_dir = Path :: new ( & self . options . output_dir ) ;
184185 let evaluate_futs =
185186 keys_by_filename_prefix
186187 . into_iter ( )
187188 . flat_map ( |( filename_prefix, keys) | {
188189 let num_keys = keys. len ( ) ;
189- let file_path_base = & file_path_base;
190190 keys. into_iter ( ) . enumerate ( ) . map ( move |( i, key) | {
191- let mut file_path = file_path_base. clone ( ) ;
192- file_path. push ( & filename_prefix) ;
193- if num_keys > 1 {
194- file_path. push ( format ! ( ".{}" , i) ) ;
195- }
191+ let extra_id = if num_keys > 1 {
192+ Cow :: Owned ( format ! ( ".{}" , i) )
193+ } else {
194+ Cow :: Borrowed ( "" )
195+ } ;
196+ let file_name =
197+ format ! ( "{}@{}{}.yaml" , source_op. name, filename_prefix, extra_id) ;
198+ let file_path = output_dir. join ( Path :: new ( & file_name) ) ;
196199 self . evaluate_and_dump_source_entry ( source_op, key, file_path)
197200 } )
198201 } ) ;
0 commit comments