File tree Expand file tree Collapse file tree 2 files changed +34
-3
lines changed
Expand file tree Collapse file tree 2 files changed +34
-3
lines changed Original file line number Diff line number Diff line change @@ -249,15 +249,23 @@ impl InsertFormatted {
249249 }
250250
251251 async fn send_inner ( & mut self , mut data : Bytes ) -> Result < ( ) > {
252+ if self . state . is_not_started ( ) {
253+ self . init_request ( ) ?;
254+ }
255+
252256 std:: future:: poll_fn ( move |cx| {
253257 loop {
254- ready ! ( self . poll_ready( cx) ) ?;
255-
256258 // Potentially cheaper than cloning `data` which touches the refcount
257259 match self . try_send ( mem:: take ( & mut data) ) {
258- ControlFlow :: Break ( res) => return Poll :: Ready ( res) ,
260+ ControlFlow :: Break ( Ok ( ( ) ) ) => return Poll :: Ready ( Ok ( ( ) ) ) ,
261+ ControlFlow :: Break ( Err ( _) ) => {
262+ // If the channel is closed, we should return the actual error
263+ return self . poll_wait_handle ( cx) ;
264+ }
259265 ControlFlow :: Continue ( unsent) => {
260266 data = unsent;
267+ // Shorter code-path if we just try to send the data first
268+ ready ! ( self . poll_ready( cx) ) ?;
261269 }
262270 }
263271 }
Original file line number Diff line number Diff line change @@ -48,6 +48,29 @@ async fn insert() {
4848 verify_insert ( & client) . await ;
4949}
5050
51+ #[ tokio:: test]
52+ async fn insert_small_chunks ( ) {
53+ let client = prepare_database ! ( )
54+ // Separate test for compression
55+ . with_compression ( Compression :: None ) ;
56+
57+ create_table ( & client) . await ;
58+
59+ let mut bytes = Bytes :: copy_from_slice ( TAXI_DATA_TSV ) ;
60+
61+ let mut insert =
62+ client. insert_formatted_with ( "INSERT INTO nyc_taxi_trips_small FORMAT TabSeparated" ) ;
63+
64+ while !bytes. is_empty ( ) {
65+ let chunk = bytes. split_to ( cmp:: min ( 16 , bytes. len ( ) ) ) ;
66+ insert. send ( chunk) . await . unwrap ( ) ;
67+ }
68+
69+ insert. end ( ) . await . unwrap ( ) ;
70+
71+ verify_insert ( & client) . await ;
72+ }
73+
5174#[ tokio:: test]
5275#[ cfg( feature = "lz4" ) ]
5376async fn insert_compressed ( ) {
You can’t perform that action at this time.
0 commit comments