@@ -19,25 +19,46 @@ use std::mem::take;
19
19
20
20
use arrow_array:: RecordBatch ;
21
21
use async_trait:: async_trait;
22
+ use futures:: future:: try_join_all;
22
23
24
+ use crate :: runtime:: { JoinHandle , spawn} ;
23
25
use crate :: spec:: DataFile ;
24
- use crate :: writer:: base_writer:: data_file_writer:: DataFileWriter ;
25
- use crate :: writer:: file_writer:: FileWriterBuilder ;
26
26
use crate :: writer:: { IcebergWriter , IcebergWriterBuilder } ;
27
27
use crate :: { Error , ErrorKind , Result } ;
28
28
29
+ /// A writer that can roll over to a new file when certain conditions are met.
30
+ ///
31
+ /// This trait extends `IcebergWriter` with the ability to determine when to start
32
+ /// writing to a new file based on the size of incoming data.
29
33
#[ async_trait]
30
34
pub trait RollingFileWriter : IcebergWriter {
35
+ /// Determines if the writer should roll over to a new file.
36
+ ///
37
+ /// # Arguments
38
+ ///
39
+ /// * `input_size` - The size in bytes of the incoming data
40
+ ///
41
+ /// # Returns
42
+ ///
43
+ /// `true` if a new file should be started, `false` otherwise
31
44
fn should_roll ( & mut self , input_size : u64 ) -> bool ;
32
45
}
33
46
47
+ /// Builder for creating a `RollingDataFileWriter` that rolls over to a new file
48
+ /// when the data size exceeds a target threshold.
34
49
#[ derive( Clone ) ]
35
- pub struct RollingDataFileWriterBuilder < B : FileWriterBuilder > {
50
+ pub struct RollingDataFileWriterBuilder < B : IcebergWriterBuilder > {
36
51
inner_builder : B ,
37
52
target_size : u64 ,
38
53
}
39
54
40
- impl < B : FileWriterBuilder > RollingDataFileWriterBuilder < B > {
55
+ impl < B : IcebergWriterBuilder > RollingDataFileWriterBuilder < B > {
56
+ /// Creates a new `RollingDataFileWriterBuilder` with the specified inner builder and target size.
57
+ ///
58
+ /// # Arguments
59
+ ///
60
+ /// * `inner_builder` - The builder for the underlying file writer
61
+ /// * `target_size` - The target size in bytes before rolling over to a new file
41
62
pub fn new ( inner_builder : B , target_size : u64 ) -> Self {
42
63
Self {
43
64
inner_builder,
@@ -47,7 +68,7 @@ impl<B: FileWriterBuilder> RollingDataFileWriterBuilder<B> {
47
68
}
48
69
49
70
#[ async_trait]
50
- impl < B : FileWriterBuilder > IcebergWriterBuilder for RollingDataFileWriterBuilder < B > {
71
+ impl < B : IcebergWriterBuilder > IcebergWriterBuilder for RollingDataFileWriterBuilder < B > {
51
72
type R = RollingDataFileWriter < B > ;
52
73
53
74
async fn build ( self ) -> Result < Self :: R > {
@@ -56,27 +77,34 @@ impl<B: FileWriterBuilder> IcebergWriterBuilder for RollingDataFileWriterBuilder
56
77
inner_builder : self . inner_builder ,
57
78
target_size : self . target_size ,
58
79
written_size : 0 ,
59
- data_files : vec ! [ ] ,
80
+ close_handles : vec ! [ ] ,
60
81
} )
61
82
}
62
83
}
63
84
64
- pub struct RollingDataFileWriter < B : FileWriterBuilder > {
65
- inner : Option < DataFileWriter < B :: R > > ,
85
+ /// A writer that automatically rolls over to a new file when the data size
86
+ /// exceeds a target threshold.
87
+ ///
88
+ /// This writer wraps another file writer and tracks the amount of data written.
89
+ /// When the data size exceeds the target size, it closes the current file and
90
+ /// starts writing to a new one.
91
+ pub struct RollingDataFileWriter < B : IcebergWriterBuilder > {
92
+ inner : Option < B :: R > ,
66
93
inner_builder : B ,
67
94
target_size : u64 ,
68
95
written_size : u64 ,
69
- data_files : Vec < DataFile > ,
96
+ close_handles : Vec < JoinHandle < Result < Vec < DataFile > > > > ,
70
97
}
71
98
72
99
#[ async_trait]
73
- impl < B : FileWriterBuilder > IcebergWriter for RollingDataFileWriter < B > {
100
+ impl < B : IcebergWriterBuilder > IcebergWriter for RollingDataFileWriter < B > {
74
101
async fn write ( & mut self , input : RecordBatch ) -> Result < ( ) > {
75
102
let input_size = input. get_array_memory_size ( ) as u64 ;
76
103
if self . should_roll ( input_size) {
77
104
if let Some ( mut inner) = self . inner . take ( ) {
78
105
// close the current writer, roll to a new file
79
- self . data_files . extend ( inner. close ( ) . await ?) ;
106
+ let handle = spawn ( async move { inner. close ( ) . await } ) ;
107
+ self . close_handles . push ( handle)
80
108
}
81
109
82
110
// clear bytes written
@@ -101,11 +129,22 @@ impl<B: FileWriterBuilder> IcebergWriter for RollingDataFileWriter<B> {
101
129
}
102
130
103
131
async fn close ( & mut self ) -> Result < Vec < DataFile > > {
104
- Ok ( take ( & mut self . data_files ) )
132
+ let mut data_files = try_join_all ( take ( & mut self . close_handles ) )
133
+ . await ?
134
+ . into_iter ( )
135
+ . flatten ( )
136
+ . collect :: < Vec < DataFile > > ( ) ;
137
+
138
+ // close the current writer and merge the output
139
+ if let Some ( mut current_writer) = take ( & mut self . inner ) {
140
+ data_files. extend ( current_writer. close ( ) . await ?) ;
141
+ }
142
+
143
+ Ok ( data_files)
105
144
}
106
145
}
107
146
108
- impl < B : FileWriterBuilder > RollingFileWriter for RollingDataFileWriter < B > {
147
+ impl < B : IcebergWriterBuilder > RollingFileWriter for RollingDataFileWriter < B > {
109
148
fn should_roll ( & mut self , input_size : u64 ) -> bool {
110
149
self . written_size + input_size > self . target_size
111
150
}
0 commit comments