@@ -344,7 +344,7 @@ RGWSelectObj_ObjStore_S3::~RGWSelectObj_ObjStore_S3()
344344
345345int RGWSelectObj_ObjStore_S3::get_params (optional_yield y)
346346{
347- if (m_s3select_query.empty () == false ) {
347+ if (m_s3select_query.empty () == false ) {
348348 return 0 ;
349349 }
350350#ifndef _ARROW_EXIST
@@ -416,14 +416,14 @@ int RGWSelectObj_ObjStore_S3::run_s3select_on_csv(const char* query, const char*
416416 if (output_escape_char.size ()) {
417417 csv.output_escape_char = *output_escape_char.c_str ();
418418 }
419- if (output_quote_fields.compare (" ALWAYS" ) == 0 ) {
419+ if (output_quote_fields.compare (" ALWAYS" ) == 0 ) {
420420 csv.quote_fields_always = true ;
421- } else if (output_quote_fields.compare (" ASNEEDED" ) == 0 ) {
421+ } else if (output_quote_fields.compare (" ASNEEDED" ) == 0 ) {
422422 csv.quote_fields_asneeded = true ;
423423 }
424- if (m_header_info.compare (" IGNORE" )==0 ) {
424+ if (m_header_info.compare (" IGNORE" )==0 ) {
425425 csv.ignore_header_info =true ;
426- } else if (m_header_info.compare (" USE" )==0 ) {
426+ } else if (m_header_info.compare (" USE" )==0 ) {
427427 csv.use_header_info =true ;
428428 }
429429
@@ -478,15 +478,18 @@ int RGWSelectObj_ObjStore_S3::run_s3select_on_parquet(const char* query)
478478 if (!m_s3_parquet_object.is_set ()) {
479479 // parsing the SQL statement.
480480 s3select_syntax.parse_query (m_sql_query.c_str ());
481+ parquet_object::csv_definitions parquet;
481482
482483 m_s3_parquet_object.set_external_system_functions (fp_s3select_continue,
483484 fp_s3select_result_format,
484485 fp_result_header_format,
485486 fp_debug_mesg);
486487
487488 try {
489+ // setting the Parquet-reader properties. i.e. the buffer-size for the Parquet-reader
490+ parquet::ceph::S3select_Config::getInstance ().set_s3select_reader_properties (s->cct ->_conf ->rgw_parquet_buffer_size );
488491 // at this stage the Parquet-processing requires for the meta-data that reside on Parquet object
489- m_s3_parquet_object.set_parquet_object (std::string (" s3object" ), &s3select_syntax, &m_rgw_api);
492+ m_s3_parquet_object.set_parquet_object (std::string (" s3object" ), &s3select_syntax, &m_rgw_api, parquet );
490493 } catch (base_s3select_exception& e) {
491494 ldpp_dout (this , 10 ) << " S3select: failed upon parquet-reader construction: " << e.what () << dendl;
492495 fp_result_header_format (m_aws_response_handler.get_sql_result ());
@@ -524,6 +527,7 @@ int RGWSelectObj_ObjStore_S3::run_s3select_on_json(const char* query, const char
524527 fp_s3select_result_format,
525528 fp_result_header_format,
526529 fp_debug_mesg);
530+ json_object::csv_definitions json;
527531
528532 m_aws_response_handler.init_response ();
529533
@@ -547,8 +551,7 @@ int RGWSelectObj_ObjStore_S3::run_s3select_on_json(const char* query, const char
547551 }
548552
549553 // initializing json processor
550- json_object::csv_definitions output_definition;
551- m_s3_json_object.set_json_query (&s3select_syntax,output_definition);
554+ m_s3_json_object.set_json_query (&s3select_syntax, json);
552555
553556 if (input == nullptr ) {
554557 input = " " ;
@@ -706,6 +709,7 @@ int RGWSelectObj_ObjStore_S3::range_request(int64_t ofs, int64_t len, void* buff
706709 RGWGetObj::parse_range ();
707710 requested_buffer.clear ();
708711 m_request_range = len;
712+ m_aws_response_handler.update_processed_size (len);
709713 ldout (s->cct , 10 ) << " S3select: calling execute(async):" << " request-offset :" << ofs << " request-length :" << len << " buffer size : " << requested_buffer.size () << dendl;
710714 RGWGetObj::execute (y);
711715 if (buff) {
@@ -730,7 +734,7 @@ void RGWSelectObj_ObjStore_S3::execute(optional_yield y)
730734 m_aws_response_handler.set (s, this , fp_chunked_transfer_encoding);
731735 }
732736
733- if (s->cct ->_conf ->rgw_disable_s3select == true )
737+ if (s->cct ->_conf ->rgw_disable_s3select == true )
734738 {
735739 std::string error_msg=" s3select : is disabled by rgw_disable_s3select configuration parameter" ;
736740 ldpp_dout (this , 10 ) << error_msg << dendl;
@@ -749,20 +753,32 @@ void RGWSelectObj_ObjStore_S3::execute(optional_yield y)
749753 return ;
750754 }
751755 s3select_syntax.parse_query (m_sql_query.c_str ());
756+ // the run_s3select_on_parquet() calling the s3select-query-engine, that read and process the parquet object with RGW::range_request,
757+ // upon query-engine finish the processing, the control is back to execute()
758+ // the parquet-reader indicates the end of the parquet object.
752759 status = run_s3select_on_parquet (m_sql_query.c_str ());
753760 if (status) {
754761 ldout (s->cct , 10 ) << " S3select: failed to process query <" << m_sql_query << " > on object " << s->object ->get_name () << dendl;
755762 op_ret = -ERR_INVALID_REQUEST;
756763 } else {
757- ldout (s->cct , 10 ) << " S3select: complete query with success " << dendl;
764+ // status per amount of processed data
765+ m_aws_response_handler.update_total_bytes_returned (m_s3_parquet_object.get_return_result_size ());
766+ m_aws_response_handler.init_stats_response ();
767+ m_aws_response_handler.send_stats_response ();
768+ m_aws_response_handler.init_end_response ();
769+ ldpp_dout (this , 10 ) << " s3select : reached the end of parquet query request : aws_response_handler.get_processed_size() "
770+ << m_aws_response_handler.get_processed_size ()
771+ << " m_object_size_for_processing : " << m_object_size_for_processing << dendl;
772+
773+ ldout (s->cct , 10 ) << " S3select: complete parquet query with success " << dendl;
758774 }
759775 } else {
760776 // CSV or JSON processing
761777 if (m_scan_range_ind) {
762778
763779 m_requested_range = (m_end_scan_sz - m_start_scan_sz);
764780
765- if (m_is_trino_request){
781+ if (m_is_trino_request){
766782 // fetch more than requested(m_scan_offset), that additional bytes are scanned for end of row,
767783 // thus the additional length will be processed, and no broken row for Trino.
768784 // assumption: row is smaller than m_scan_offset. (a different approach is to request for additional range)
@@ -778,7 +794,8 @@ void RGWSelectObj_ObjStore_S3::execute(optional_yield y)
778794}
779795
780796int RGWSelectObj_ObjStore_S3::parquet_processing (bufferlist& bl, off_t ofs, off_t len)
781- {
797+ {// purpose: to process the returned buffer from range-request, and to send it to the Parquet-reader.
798+ // range_request() is called by arrow::ReadAt, and upon request completion the control is back to RGWSelectObj_ObjStore_S3::execute()
782799 fp_chunked_transfer_encoding ();
783800 size_t append_in_callback = 0 ;
784801 int part_no = 1 ;
@@ -789,6 +806,11 @@ int RGWSelectObj_ObjStore_S3::parquet_processing(bufferlist& bl, off_t ofs, off_
789806 }
790807 append_in_callback += it.length ();
791808 ldout (s->cct , 10 ) << " S3select: part " << part_no++ << " it.length() = " << it.length () << dendl;
809+ if ((ofs + len) > it.length ()){
810+ ldpp_dout (this , 10 ) << " s3select: offset and length may cause invalid read: ofs = " << ofs << " len = " << len << " it.length() = " << it.length () << dendl;
811+ ofs = 0 ;
812+ len = it.length ();
813+ }
792814 requested_buffer.append (&(it)[0 ]+ofs, len);
793815 }
794816 ldout (s->cct , 10 ) << " S3select:append_in_callback = " << append_in_callback << dendl;
@@ -809,18 +831,18 @@ void RGWSelectObj_ObjStore_S3::shape_chunk_per_trino_requests(const char* it_cp,
809831// the purpose is to return "perfect" results, with no broken or missing lines.
810832
811833 off_t new_offset = 0 ;
812- if (m_scan_range_ind){// only upon range-scan
834+ if (m_scan_range_ind){// only upon range-scan
813835 int64_t sc=0 ;
814836 int64_t start =0 ;
815837 const char * row_delimiter = m_row_delimiter.c_str ();
816838
817839 ldpp_dout (this , 10 ) << " s3select query: per Trino request the first and last chunk should modified." << dendl;
818840
819841 // chop the head of the first chunk and only upon the slice does not include the head of the object.
820- if (m_start_scan_sz && (m_aws_response_handler.get_processed_size ()==0 )){
842+ if (m_start_scan_sz && (m_aws_response_handler.get_processed_size ()==0 )){
821843 char * p = const_cast <char *>(it_cp+ofs);
822844 while (strncmp (row_delimiter,p,1 ) && (p - (it_cp+ofs)) < len)p++;
823- if (!strncmp (row_delimiter,p,1 )){
845+ if (!strncmp (row_delimiter,p,1 )){
824846 new_offset += (p - (it_cp+ofs))+1 ;
825847 }
826848 }
@@ -831,14 +853,14 @@ void RGWSelectObj_ObjStore_S3::shape_chunk_per_trino_requests(const char* it_cp,
831853
832854 // chop the end of the last chunk for this request
833855 // if it's the last chunk, search for first row-delimiter for the following different use-cases
834- if ((m_aws_response_handler.get_processed_size ()+len) >= m_requested_range){
856+ if ((m_aws_response_handler.get_processed_size ()+len) >= m_requested_range){
835857 // had pass the requested range, start to search for first delimiter
836- if (m_aws_response_handler.get_processed_size ()>m_requested_range){
858+ if (m_aws_response_handler.get_processed_size ()>m_requested_range){
837859 // the previous chunk contain the complete request(all data) and an extra bytes.
838860 // thus, search for the first row-delimiter
839861 // [:previous (RR) ... ][:current (RD) ]
840862 start = 0 ;
841- } else if (m_aws_response_handler.get_processed_size ()){
863+ } else if (m_aws_response_handler.get_processed_size ()){
842864 // the *current* chunk contain the complete request in the middle of the chunk.
843865 // thus, search for the first row-delimiter after the complete request position
844866 // [:current (RR) .... (RD) ]
@@ -852,7 +874,7 @@ void RGWSelectObj_ObjStore_S3::shape_chunk_per_trino_requests(const char* it_cp,
852874 for (sc=start;sc<len;sc++)// assumption : row-delimiter must exist or its end ebject
853875 {
854876 char * p = const_cast <char *>(it_cp) + ofs + sc;
855- if (!strncmp (row_delimiter,p,1 )){
877+ if (!strncmp (row_delimiter,p,1 )){
856878 ldout (s->cct , 10 ) << " S3select: found row-delimiter on " << sc << " get_processed_size = " << m_aws_response_handler.get_processed_size () << dendl;
857879 len = sc + 1 ;// +1 is for delimiter. TODO what about m_object_size_for_processing (to update according to len)
858880 // the end of row exist in current chunk.
@@ -872,7 +894,7 @@ void RGWSelectObj_ObjStore_S3::shape_chunk_per_trino_requests(const char* it_cp,
872894int RGWSelectObj_ObjStore_S3::csv_processing (bufferlist& bl, off_t ofs, off_t len)
873895{
874896 int status = 0 ;
875- if (m_skip_next_chunk == true ){
897+ if (m_skip_next_chunk == true ){
876898 return status;
877899 }
878900
@@ -894,13 +916,13 @@ int RGWSelectObj_ObjStore_S3::csv_processing(bufferlist& bl, off_t ofs, off_t le
894916 }
895917
896918
897- if (ofs > it.length ()){
919+ if (ofs > it.length ()){
898920 // safety check
899921 ldpp_dout (this , 10 ) << " offset and length may cause invalid read: ofs = " << ofs << " len = " << len << " it.length() = " << it.length () << dendl;
900922 ofs = 0 ;
901923 }
902924
903- if (m_is_trino_request){
925+ if (m_is_trino_request){
904926 // TODO replace len with it.length() ? ; test Trino flow with compressed objects.
905927 // is it possible to send get-by-ranges? in parallel?
906928 shape_chunk_per_trino_requests (&(it)[0 ], ofs, len);
@@ -964,7 +986,7 @@ int RGWSelectObj_ObjStore_S3::json_processing(bufferlist& bl, off_t ofs, off_t l
964986 continue ;
965987 }
966988
967- if ((ofs + len) > it.length ()){
989+ if ((ofs + len) > it.length ()){
968990 ldpp_dout (this , 10 ) << " s3select: offset and length may cause invalid read: ofs = " << ofs << " len = " << len << " it.length() = " << it.length () << dendl;
969991 ofs = 0 ;
970992 len = it.length ();
@@ -1025,7 +1047,7 @@ int RGWSelectObj_ObjStore_S3::send_response_data(bufferlist& bl, off_t ofs, off_
10251047 if (len == 0 && s->obj_size != 0 ) {
10261048 return 0 ;
10271049 }
1028- if (m_parquet_type) {
1050+ if (m_parquet_type) {// bufferlist sendback upon range-request
10291051 return parquet_processing (bl,ofs,len);
10301052 }
10311053 if (m_json_type) {
0 commit comments