@@ -736,6 +736,7 @@ impl Layout {
736736
737737 pub async fn insert < ' a > (
738738 & ' a self ,
739+ logger : & Logger ,
739740 conn : & mut AsyncPgConnection ,
740741 group : & ' a RowGroup ,
741742 stopwatch : & StopwatchMetrics ,
@@ -769,13 +770,39 @@ impl Layout {
769770 for chunk in group. write_chunks ( chunk_size) {
770771 // Empty chunks would lead to invalid SQL
771772 if !chunk. is_empty ( ) {
772- InsertQuery :: new ( table, & chunk) ?
773- . execute ( conn)
774- . await
775- . map_err ( |e| {
773+ if let Err ( e) = InsertQuery :: new ( table, & chunk) ?. execute ( conn) . await {
774+ // We occasionally get these errors but it's entirely
775+ // unclear what causes them. We work around that by
776+ // switching to row-by-row inserts until we can figure
777+ // out what the underlying cause is
778+ let err_msg = e. to_string ( ) ;
779+ if !err_msg. contains ( "value too large to transmit" ) {
776780 let ( block, msg) = chunk_details ( & chunk) ;
777- StoreError :: write_failure ( e, table. object . as_str ( ) , block, msg)
778- } ) ?;
781+ return Err ( StoreError :: write_failure (
782+ e,
783+ table. object . as_str ( ) ,
784+ block,
785+ msg,
786+ ) ) ;
787+ }
788+ let ( block, msg) = chunk_details ( & chunk) ;
789+ warn ! ( logger, "Insert of entire chunk failed. Trying row by row insert." ;
790+ "table" => table. object. as_str( ) ,
791+ "block" => block,
792+ "error" => err_msg,
793+ "details" => msg
794+ ) ;
795+ for single_chunk in chunk. as_single_writes ( ) {
796+ InsertQuery :: new ( table, & single_chunk) ?
797+ . execute ( conn)
798+ . await
799+ . map_err ( |e| {
800+ let ( block, msg) = chunk_details ( & single_chunk) ;
801+ let msg = format ! ( "{}: offending row {:?}" , msg, single_chunk) ;
802+ StoreError :: write_failure ( e, table. object . as_str ( ) , block, msg)
803+ } ) ?;
804+ }
805+ }
779806 }
780807 }
781808 Ok ( ( ) )
0 commit comments