33
44use std:: any:: Any ;
55use std:: sync:: Arc ;
6- use std:: sync:: atomic:: AtomicU64 ;
7- use std:: sync:: atomic:: Ordering ;
86
97use arrow_schema:: SchemaRef ;
108use async_trait:: async_trait;
119use datafusion_common:: DataFusionError ;
1210use datafusion_common:: Result as DFResult ;
11+ use datafusion_common:: exec_datafusion_err;
1312use datafusion_common_runtime:: JoinSet ;
1413use datafusion_common_runtime:: SpawnedTask ;
1514use datafusion_datasource:: file_sink_config:: FileSink ;
@@ -33,6 +32,7 @@ use vortex::dtype::DType;
3332use vortex:: dtype:: arrow:: FromArrowType ;
3433use vortex:: error:: VortexResult ;
3534use vortex:: file:: WriteOptionsSessionExt ;
35+ use vortex:: file:: WriteSummary ;
3636use vortex:: io:: ObjectStoreWriter ;
3737use vortex:: io:: VortexWrite ;
3838use vortex:: session:: VortexSession ;
@@ -109,57 +109,53 @@ impl FileSink for VortexSink {
109109 object_store : Arc < dyn ObjectStore > ,
110110 ) -> DFResult < u64 > {
111111 // This is a hack
112- let row_counter = Arc :: new ( AtomicU64 :: new ( 0 ) ) ;
113112
114- let mut file_write_tasks: JoinSet < DFResult < Path > > = JoinSet :: new ( ) ;
113+ let mut file_write_tasks: JoinSet < DFResult < ( Path , WriteSummary ) > > = JoinSet :: new ( ) ;
115114
116115 // TODO(adamg):
117116 // 1. We can probably be better at signaling how much memory we're consuming (potentially when reading too), see ParquetSink::spawn_writer_tasks_and_join.
118117 while let Some ( ( path, rx) ) = file_stream_rx. recv ( ) . await {
119118 let session = self . session . clone ( ) ;
120- let row_counter = row_counter. clone ( ) ;
121119 let object_store = object_store. clone ( ) ;
122120 let writer_schema = get_writer_schema ( & self . config ) ;
123121 let dtype = DType :: from_arrow ( writer_schema) ;
124122
125123 // We need to spawn work because there's a dependency between the different files. If one file has too many batches buffered,
126124 // the demux task might deadlock itself.
127125 file_write_tasks. spawn ( async move {
128- let stream = ReceiverStream :: new ( rx) . map ( move |rb| {
129- row_counter. fetch_add ( rb. num_rows ( ) as u64 , Ordering :: Relaxed ) ;
130- VortexResult :: Ok ( ArrayRef :: from_arrow ( rb, false ) )
131- } ) ;
126+ let stream = ReceiverStream :: new ( rx)
127+ . map ( move |rb| VortexResult :: Ok ( ArrayRef :: from_arrow ( rb, false ) ) ) ;
132128
133129 let stream_adapter = ArrayStreamAdapter :: new ( dtype, stream) ;
134130
135- let mut sink = ObjectStoreWriter :: new ( object_store. clone ( ) , & path)
131+ let mut object_writer = ObjectStoreWriter :: new ( object_store, & path)
136132 . await
137- . map_err ( |e| {
138- DataFusionError :: Execution ( format ! (
139- "Failed to create ObjectStoreWriter: {e}"
140- ) )
141- } ) ?;
133+ . map_err ( |e| exec_datafusion_err ! ( "Failed to create ObjectStoreWriter: {e}" ) ) ?;
142134
143- session
135+ let summary = session
144136 . write_options ( )
145- . write ( & mut sink , stream_adapter)
137+ . write ( & mut object_writer , stream_adapter)
146138 . await
147- . map_err ( |e| {
148- DataFusionError :: Execution ( format ! ( "Failed to write Vortex file: {e}" ) )
149- } ) ?;
139+ . map_err ( |e| exec_datafusion_err ! ( "Failed to write Vortex file: {e}" ) ) ?;
150140
151- sink. shutdown ( ) . await . map_err ( |e| {
152- DataFusionError :: Execution ( format ! ( "Failed to shutdown Vortex writer: {e}" ) )
153- } ) ?;
141+ object_writer
142+ . shutdown ( )
143+ . await
144+ . map_err ( |e| exec_datafusion_err ! ( "Failed to shutdown Vortex writer: {e}" ) ) ?;
154145
155- Ok ( path)
146+ Ok ( ( path, summary ) )
156147 } ) ;
157148 }
158149
150+ let mut row_count = 0 ;
151+
159152 while let Some ( result) = file_write_tasks. join_next ( ) . await {
160153 match result {
161- Ok ( path) => {
162- let path = path?;
154+ Ok ( r) => {
155+ let ( path, summary) = r?;
156+
157+ row_count += summary. row_count ( ) ;
158+
163159 tracing:: info!( path = %path, "Successfully written file" ) ;
164160 }
165161 Err ( e) => {
@@ -177,7 +173,7 @@ impl FileSink for VortexSink {
177173 . await
178174 . map_err ( |e| DataFusionError :: ExecutionJoin ( Box :: new ( e) ) ) ??;
179175
180- Ok ( row_counter . load ( Ordering :: SeqCst ) )
176+ Ok ( row_count )
181177 }
182178}
183179
0 commit comments