33
44use std:: any:: Any ;
55use std:: sync:: Arc ;
6- use std:: sync:: atomic:: AtomicU64 ;
7- use std:: sync:: atomic:: Ordering ;
86
97use arrow_schema:: SchemaRef ;
108use async_trait:: async_trait;
119use datafusion_common:: DataFusionError ;
1210use datafusion_common:: Result as DFResult ;
11+ use datafusion_common:: exec_datafusion_err;
1312use datafusion_common_runtime:: JoinSet ;
1413use datafusion_common_runtime:: SpawnedTask ;
1514use datafusion_datasource:: file_sink_config:: FileSink ;
@@ -33,6 +32,7 @@ use vortex::dtype::DType;
3332use vortex:: dtype:: arrow:: FromArrowType ;
3433use vortex:: error:: VortexResult ;
3534use vortex:: file:: WriteOptionsSessionExt ;
35+ use vortex:: file:: WriteSummary ;
3636use vortex:: io:: ObjectStoreWriter ;
3737use vortex:: io:: VortexWrite ;
3838use vortex:: session:: VortexSession ;
@@ -108,58 +108,52 @@ impl FileSink for VortexSink {
108108 mut file_stream_rx : DemuxedStreamReceiver ,
109109 object_store : Arc < dyn ObjectStore > ,
110110 ) -> DFResult < u64 > {
111- // This is a hack
112- let row_counter = Arc :: new ( AtomicU64 :: new ( 0 ) ) ;
113-
114- let mut file_write_tasks: JoinSet < DFResult < Path > > = JoinSet :: new ( ) ;
111+ let mut file_write_tasks: JoinSet < DFResult < ( Path , WriteSummary ) > > = JoinSet :: new ( ) ;
115112
116113 // TODO(adamg):
117114 // 1. We can probably be better at signaling how much memory we're consuming (potentially when reading too), see ParquetSink::spawn_writer_tasks_and_join.
118115 while let Some ( ( path, rx) ) = file_stream_rx. recv ( ) . await {
119116 let session = self . session . clone ( ) ;
120- let row_counter = row_counter. clone ( ) ;
121117 let object_store = object_store. clone ( ) ;
122118 let writer_schema = get_writer_schema ( & self . config ) ;
123119 let dtype = DType :: from_arrow ( writer_schema) ;
124120
125121 // We need to spawn work because there's a dependency between the different files. If one file has too many batches buffered,
126122 // the demux task might deadlock itself.
127123 file_write_tasks. spawn ( async move {
128- let stream = ReceiverStream :: new ( rx) . map ( move |rb| {
129- row_counter. fetch_add ( rb. num_rows ( ) as u64 , Ordering :: Relaxed ) ;
130- VortexResult :: Ok ( ArrayRef :: from_arrow ( rb, false ) )
131- } ) ;
124+ let stream = ReceiverStream :: new ( rx)
125+ . map ( move |rb| VortexResult :: Ok ( ArrayRef :: from_arrow ( rb, false ) ) ) ;
132126
133127 let stream_adapter = ArrayStreamAdapter :: new ( dtype, stream) ;
134128
135- let mut sink = ObjectStoreWriter :: new ( object_store. clone ( ) , & path)
129+ let mut object_writer = ObjectStoreWriter :: new ( object_store, & path)
136130 . await
137- . map_err ( |e| {
138- DataFusionError :: Execution ( format ! (
139- "Failed to create ObjectStoreWriter: {e}"
140- ) )
141- } ) ?;
131+ . map_err ( |e| exec_datafusion_err ! ( "Failed to create ObjectStoreWriter: {e}" ) ) ?;
142132
143- session
133+ let summary = session
144134 . write_options ( )
145- . write ( & mut sink , stream_adapter)
135+ . write ( & mut object_writer , stream_adapter)
146136 . await
147- . map_err ( |e| {
148- DataFusionError :: Execution ( format ! ( "Failed to write Vortex file: {e}" ) )
149- } ) ?;
137+ . map_err ( |e| exec_datafusion_err ! ( "Failed to write Vortex file: {e}" ) ) ?;
150138
151- sink. shutdown ( ) . await . map_err ( |e| {
152- DataFusionError :: Execution ( format ! ( "Failed to shutdown Vortex writer: {e}" ) )
153- } ) ?;
139+ object_writer
140+ . shutdown ( )
141+ . await
142+ . map_err ( |e| exec_datafusion_err ! ( "Failed to shutdown Vortex writer: {e}" ) ) ?;
154143
155- Ok ( path)
144+ Ok ( ( path, summary ) )
156145 } ) ;
157146 }
158147
148+ let mut row_count = 0 ;
149+
159150 while let Some ( result) = file_write_tasks. join_next ( ) . await {
160151 match result {
161- Ok ( path) => {
162- let path = path?;
152+ Ok ( r) => {
153+ let ( path, summary) = r?;
154+
155+ row_count += summary. row_count ( ) ;
156+
163157 tracing:: info!( path = %path, "Successfully written file" ) ;
164158 }
165159 Err ( e) => {
@@ -177,7 +171,7 @@ impl FileSink for VortexSink {
177171 . await
178172 . map_err ( |e| DataFusionError :: ExecutionJoin ( Box :: new ( e) ) ) ??;
179173
180- Ok ( row_counter . load ( Ordering :: SeqCst ) )
174+ Ok ( row_count )
181175 }
182176}
183177
0 commit comments