55#![ feature( let_chains) ]
66#![ feature( impl_trait_in_assoc_type) ]
77#![ feature( try_blocks) ]
8+ #![ feature( if_let_guard) ]
89mod chunks;
910mod connection;
1011mod metrics;
@@ -127,6 +128,11 @@ use crate::{
127128 } ,
128129} ;
129130
131+ // Vitess limits query results to 64MiB.
132+ // As documents can be up to 1MiB (plus some overhead), we may need to fall back
133+ // to a much smaller page size if we hit the limit while loading documents.
134+ const FALLBACK_PAGE_SIZE : u32 = 10 ;
135+
130136#[ derive( Clone , Debug ) ]
131137pub struct MySqlInstanceName {
132138 raw : String ,
@@ -669,7 +675,7 @@ impl<RT: Runtime> MySqlReader<RT> {
669675 include_prev_rev : bool ,
670676 range : TimestampRange ,
671677 order : Order ,
672- page_size : u32 ,
678+ mut page_size : u32 ,
673679 retention_validator : Arc < dyn RetentionValidator > ,
674680 ) {
675681 anyhow:: ensure!( page_size > 0 ) ; // 0 size pages loop forever.
@@ -717,11 +723,33 @@ impl<RT: Runtime> MySqlReader<RT> {
717723 params. push ( self . instance_name . to_string ( ) . into ( ) ) ;
718724 }
719725 params. push ( ( page_size as i64 ) . into ( ) ) ;
720- let rows: Vec < _ > = client
721- . query_stream ( query, params, page_size as usize )
722- . await ?
723- . try_collect ( )
724- . await ?;
726+ let stream_result = match client. query_stream ( query, params, page_size as usize ) . await {
727+ Ok ( stream) => Ok ( stream) ,
728+ Err ( ref e)
729+ if let Some ( db_err) = e. downcast_ref :: < mysql_async:: ServerError > ( )
730+ && db_err. state == "HY000"
731+ && db_err. code == 1105
732+ && db_err
733+ . message
734+ . contains ( "trying to send message larger than max" ) =>
735+ {
736+ if page_size == FALLBACK_PAGE_SIZE {
737+ anyhow:: bail!(
738+ "Failed to load documents with fallback page size \
739+ `{FALLBACK_PAGE_SIZE}`: {}",
740+ db_err. message
741+ ) ;
742+ }
743+ tracing:: warn!(
744+ "Falling back to page size `{FALLBACK_PAGE_SIZE}` due to server error: {}" ,
745+ db_err. message
746+ ) ;
747+ page_size = FALLBACK_PAGE_SIZE ;
748+ continue ;
749+ } ,
750+ Err ( e) => Err ( e) ,
751+ } ?;
752+ let rows: Vec < _ > = stream_result. try_collect ( ) . await ?;
725753 drop ( client) ;
726754
727755 retention_validator
@@ -823,7 +851,7 @@ impl<RT: Runtime> MySqlReader<RT> {
823851 // common case we should do a single query. Exceptions are if the size_hint
824852 // is wrong or if we truncate it or if we observe too many deletes.
825853 let mut batch_size =
826- size_hint. clamp ( * MYSQL_MIN_QUERY_BATCH_SIZE , * MYSQL_MAX_QUERY_BATCH_SIZE ) ;
854+ size_hint. clamp ( * MYSQL_MIN_QUERY_BATCH_SIZE , * MYSQL_MAX_QUERY_BATCH_SIZE ) as u32 ;
827855
828856 // We iterate results in (key_prefix, key_sha256) order while we actually
829857 // need them in (key_prefix, key_suffix order). key_suffix is not part of the
@@ -832,6 +860,7 @@ impl<RT: Runtime> MySqlReader<RT> {
832860 let mut result_buffer: Vec < ( IndexKeyBytes , Timestamp , ConvexValue , Option < Timestamp > ) > =
833861 Vec :: new ( ) ;
834862 let mut has_more = true ;
863+ let mut fallback = false ;
835864 while has_more {
836865 let page = {
837866 let mut to_yield = vec ! [ ] ;
@@ -845,7 +874,7 @@ impl<RT: Runtime> MySqlReader<RT> {
845874 lower. clone ( ) ,
846875 upper. clone ( ) ,
847876 order,
848- batch_size,
877+ batch_size as usize ,
849878 self . multitenant ,
850879 & self . instance_name ,
851880 ) ;
@@ -856,7 +885,37 @@ impl<RT: Runtime> MySqlReader<RT> {
856885
857886 let execute_timer =
858887 metrics:: query_index_sql_execute_timer ( self . read_pool . cluster_name ( ) ) ;
859- let row_stream = client. query_stream ( query, params, batch_size) . await ?;
888+ let row_stream = match client
889+ . query_stream ( query, params, batch_size as usize )
890+ . await
891+ {
892+ Ok ( stream) => Ok ( stream) ,
893+ Err ( ref e)
894+ if let Some ( db_err) = e. downcast_ref :: < mysql_async:: ServerError > ( )
895+ && db_err. state == "HY000"
896+ && db_err. code == 1105
897+ && db_err
898+ . message
899+ . contains ( "trying to send message larger than max" ) =>
900+ {
901+ if batch_size == FALLBACK_PAGE_SIZE {
902+ anyhow:: bail!(
903+ "Failed to load documents with fallback page size \
904+ `{FALLBACK_PAGE_SIZE}`: {}",
905+ db_err. message
906+ ) ;
907+ }
908+ tracing:: warn!(
909+ "Falling back to page size `{FALLBACK_PAGE_SIZE}` due to server \
910+ error: {}",
911+ db_err. message
912+ ) ;
913+ batch_size = FALLBACK_PAGE_SIZE ;
914+ fallback = true ;
915+ continue ;
916+ } ,
917+ Err ( e) => Err ( e) ,
918+ } ?;
860919 execute_timer. finish ( ) ;
861920
862921 let retention_validate_timer =
@@ -972,9 +1031,11 @@ impl<RT: Runtime> MySqlReader<RT> {
9721031 // Double the batch size every iteration until we max dynamic batch size. This
9731032 // helps correct for tombstones, long prefixes and wrong client
9741033 // size estimates.
975- // TODO: Take size into consideration and increase the max dynamic batch size.
976- if batch_size < * MYSQL_MAX_QUERY_DYNAMIC_BATCH_SIZE {
977- batch_size = ( batch_size * 2 ) . min ( * MYSQL_MAX_QUERY_DYNAMIC_BATCH_SIZE ) ;
1034+ // If we've had to fall back to the fallback page size, stay there without
1035+ // doubling. TODO: Take size into consideration and increase the max
1036+ // dynamic batch size.
1037+ if batch_size < * MYSQL_MAX_QUERY_DYNAMIC_BATCH_SIZE as u32 && !fallback {
1038+ batch_size = ( batch_size * 2 ) . min ( * MYSQL_MAX_QUERY_DYNAMIC_BATCH_SIZE as u32 ) ;
9781039 }
9791040 }
9801041 }
0 commit comments