@@ -75,6 +75,8 @@ enum Commands {
7575 } ,
7676}
7777
78+ const OPTIMISTIC_CONCURRENCY_RETRIES : u32 = 3 ;
79+
7880pub async fn do_main ( args : Cli ) -> Result < ( ) , DataLoadingError > {
7981 match args. command {
8082 Commands :: ParquetToDelta {
@@ -117,20 +119,35 @@ pub async fn do_main(args: Cli) -> Result<(), DataLoadingError> {
117119 target_url,
118120 overwrite,
119121 } => {
120- let file = tokio:: fs:: File :: open ( source_file) . await ?;
121- let record_batch_reader = ParquetRecordBatchStreamBuilder :: new ( file)
122- . await ?
123- . build ( )
124- . unwrap ( ) ;
125- let schema = record_batch_reader. schema ( ) . clone ( ) ;
126- info ! ( "File schema: {}" , schema) ;
127- record_batches_to_iceberg (
128- record_batch_reader. map_err ( DataLoadingError :: ParquetError ) ,
129- schema,
130- target_url,
131- overwrite,
132- )
133- . await
122+ for _ in 0 ..OPTIMISTIC_CONCURRENCY_RETRIES {
123+ let file = tokio:: fs:: File :: open ( & source_file) . await ?;
124+ let record_batch_reader = ParquetRecordBatchStreamBuilder :: new ( file)
125+ . await ?
126+ . build ( )
127+ . unwrap ( ) ;
128+ let arrow_schema = record_batch_reader. schema ( ) . clone ( ) ;
129+ info ! ( "File schema: {}" , arrow_schema) ;
130+ match record_batches_to_iceberg (
131+ record_batch_reader. map_err ( DataLoadingError :: ParquetError ) ,
132+ arrow_schema,
133+ target_url. clone ( ) ,
134+ overwrite,
135+ )
136+ . await
137+ {
138+ Err ( DataLoadingError :: OptimisticConcurrencyError ( ) ) => {
139+ info ! ( "Optimistic concurrency error. Retrying" ) ;
140+ continue ;
141+ }
142+ Err ( e) => {
143+ return Err ( e) ;
144+ }
145+ Ok ( _) => {
146+ break ;
147+ }
148+ }
149+ }
150+ Ok ( ( ) )
134151 }
135152 Commands :: PgToIceberg {
136153 connection_string,
@@ -139,14 +156,34 @@ pub async fn do_main(args: Cli) -> Result<(), DataLoadingError> {
139156 overwrite,
140157 batch_size,
141158 } => {
142- let mut source = PgArrowSource :: new ( connection_string. as_ref ( ) , & query, batch_size)
143- . await
144- . map_err ( DataLoadingError :: PostgresError ) ?;
145- let arrow_schema = source. get_arrow_schema ( ) ;
146- let record_batch_stream = source. get_record_batch_stream ( ) ;
147- info ! ( "Rowset schema: {}" , arrow_schema) ;
148- record_batches_to_iceberg ( record_batch_stream, arrow_schema, target_url, overwrite)
159+ for _ in 0 ..OPTIMISTIC_CONCURRENCY_RETRIES {
160+ let mut source = PgArrowSource :: new ( connection_string. as_ref ( ) , & query, batch_size)
161+ . await
162+ . map_err ( DataLoadingError :: PostgresError ) ?;
163+ let arrow_schema = source. get_arrow_schema ( ) ;
164+ let record_batch_stream = source. get_record_batch_stream ( ) ;
165+ info ! ( "Rowset schema: {}" , arrow_schema) ;
166+ match record_batches_to_iceberg (
167+ record_batch_stream,
168+ arrow_schema,
169+ target_url. clone ( ) ,
170+ overwrite,
171+ )
149172 . await
173+ {
174+ Err ( DataLoadingError :: OptimisticConcurrencyError ( ) ) => {
175+ info ! ( "Optimistic concurrency error. Retrying" ) ;
176+ continue ;
177+ }
178+ Err ( e) => {
179+ return Err ( e) ;
180+ }
181+ Ok ( _) => {
182+ break ;
183+ }
184+ }
185+ }
186+ Ok ( ( ) )
150187 }
151188 }
152189 // TODO
0 commit comments