@@ -741,83 +741,104 @@ void progress(int p, int eta)
741741}
742742// # nocov end
743743
744- // Spill connection contents to a tempfile so R-level fread can treat it like a filename
745- SEXP spillConnectionToFile (SEXP connection , SEXP tempfile_path , SEXP nrows_limit ) {
746- #if R_CONNECTIONS_VERSION != 1
747- INTERNAL_STOP (_ ("spillConnectionToFile: unexpected R_CONNECTIONS_VERSION = %d" , R_CONNECTIONS_VERSION )); // # nocov
748- #else
749- if (!isString (tempfile_path ) || LENGTH (tempfile_path ) != 1 ) {
750- INTERNAL_STOP (_ ("spillConnectionToFile: tempfile_path must be a single string" )); // # nocov
751- }
752-
753- if (!isReal (nrows_limit ) || LENGTH (nrows_limit ) != 1 ) {
754- INTERNAL_STOP (_ ("spillConnectionToFile: nrows_limit must be a single numeric value" )); // # nocov
744+ typedef struct {
745+ Rconnection con ;
746+ const char * filepath ;
747+ size_t row_limit ;
748+ FILE * outfile ;
749+ char * buffer ;
750+ } SpillState ;
751+
752+ static void spill_cleanup (void * data )
753+ {
754+ SpillState * state = (SpillState * )data ;
755+ if (!state ) return ;
756+ free (state -> buffer ); // free(NULL) is safe no-op
757+ if (state -> outfile ) {
758+ fclose (state -> outfile );
755759 }
760+ }
756761
757- Rconnection con = R_GetConnection (connection );
758- const char * filepath = CHAR (STRING_ELT (tempfile_path , 0 ));
759- const double nrows_max = REAL_RO (nrows_limit )[0 ];
760- const bool limit_rows = R_FINITE (nrows_max ) && nrows_max >= 0.0 ;
761- size_t row_limit = 0 ;
762- if (limit_rows ) {
763- row_limit = (size_t )nrows_max ;
764- if (row_limit == 0 ) row_limit = 100 ; // read at least 100 rows if nrows==0
765- row_limit ++ ; // cater for potential header row
766- }
762+ static SEXP do_spill (void * data )
763+ {
764+ SpillState * state = (SpillState * )data ;
765+ const size_t chunk_size = 256 * 1024 ; // TODO tune chunk size
767766
768- FILE * outfile = fopen (filepath , "wb" );
769- if (outfile == NULL ) {
770- STOP (_ ("spillConnectionToFile: failed to open temp file '%s' for writing: %s" ), filepath , strerror (errno )); // # nocov
767+ state -> outfile = fopen (state -> filepath , "wb" );
768+ if (state -> outfile == NULL ) {
769+ STOP (_ ("spillConnectionToFile: failed to open temp file '%s' for writing: %s" ), state -> filepath , strerror (errno )); // # nocov
771770 }
772771
773- // Read and write in chunks // TODO tune chunk size
774- size_t chunk_size = 256 * 1024 ;
775- char * buffer = malloc (chunk_size );
776- if (!buffer ) {
777- fclose (outfile ); // # nocov
772+ state -> buffer = malloc (chunk_size );
773+ if (!state -> buffer ) {
778774 STOP (_ ("spillConnectionToFile: failed to allocate buffer" )); // # nocov
779775 }
780-
776+ const bool limit_rows = R_FINITE ( state -> row_limit ) && ( state -> row_limit > 0 );
781777 size_t total_read = 0 ;
782778 size_t nrows_seen = 0 ;
783779
784780 while (true) {
785- size_t nread = R_ReadConnection (con , buffer , chunk_size );
781+ size_t nread = R_ReadConnection (state -> con , state -> buffer , chunk_size );
786782 if (nread == 0 ) {
787783 break ; // EOF
788784 }
789785
790786 size_t bytes_to_write = nread ;
791- if (limit_rows && nrows_seen < row_limit ) {
787+ if (limit_rows && nrows_seen < state -> row_limit ) {
792788 for (size_t i = 0 ; i < nread ; i ++ ) {
793- if (buffer [i ] == '\n' ) {
789+ if (state -> buffer [i ] == '\n' ) {
794790 nrows_seen ++ ;
795- if (nrows_seen >= row_limit ) {
791+ if (nrows_seen >= state -> row_limit ) {
796792 bytes_to_write = i + 1 ;
797793 break ;
798794 }
799795 }
800796 }
801797 }
802798
803- size_t nwritten = fwrite (buffer , 1 , bytes_to_write , outfile );
799+ size_t nwritten = fwrite (state -> buffer , 1 , bytes_to_write , state -> outfile );
804800 if (nwritten != bytes_to_write ) {
805- // # nocov start
806- free (buffer );
807- fclose (outfile );
808- STOP (_ ("spillConnectionToFile: write error %s (wrote %zu of %zu bytes)" ), strerror (errno ), nwritten , bytes_to_write );
809- // # nocov end
801+ STOP (_ ("spillConnectionToFile: write error %s (wrote %zu of %zu bytes)" ), strerror (errno ), nwritten , bytes_to_write ); // # nocov
810802 }
811803 total_read += bytes_to_write ;
812804
813- if (limit_rows && nrows_seen >= row_limit ) {
805+ if (limit_rows && nrows_seen >= state -> row_limit ) {
814806 break ;
815807 }
816808 }
817809
818- free (buffer );
819- fclose (outfile );
820810 return ScalarReal ((double )total_read );
811+ }
812+
813+ // Spill connection contents to a tempfile so R-level fread can treat it like a filename
814+ SEXP spillConnectionToFile (SEXP connection , SEXP tempfile_path , SEXP nrows_limit ) {
815+ #if R_CONNECTIONS_VERSION != 1
816+ INTERNAL_STOP (_ ("spillConnectionToFile: unexpected R_CONNECTIONS_VERSION = %d" , R_CONNECTIONS_VERSION )); // # nocov
817+ #else
818+ if (!isString (tempfile_path ) || LENGTH (tempfile_path ) != 1 ) {
819+ INTERNAL_STOP (_ ("spillConnectionToFile: tempfile_path must be a single string" )); // # nocov
820+ }
821+
822+ if (!isReal (nrows_limit ) || LENGTH (nrows_limit ) != 1 ) {
823+ INTERNAL_STOP (_ ("spillConnectionToFile: nrows_limit must be a single numeric value" )); // # nocov
824+ }
825+
826+ SpillState state = {
827+ .con = R_GetConnection (connection ),
828+ .filepath = CHAR (STRING_ELT (tempfile_path , 0 )),
829+ .row_limit = 0 ,
830+ .outfile = NULL ,
831+ .buffer = NULL
832+ };
833+
834+ const double nrows_max = REAL_RO (nrows_limit )[0 ];
835+ if (R_FINITE (nrows_max ) && nrows_max >= 0.0 ) {
836+ state .row_limit = (size_t )nrows_max ;
837+ if (state .row_limit == 0 ) state .row_limit = 100 ; // read at least 100 rows if nrows==0
838+ state .row_limit ++ ; // cater for potential header row
839+ }
840+
841+ return R_ExecWithCleanup (do_spill , & state , spill_cleanup , & state );
821842#endif // was R_CONNECTIONS_VERSION not != 1?
822843}
823844
0 commit comments