@@ -344,7 +344,7 @@ RGWSelectObj_ObjStore_S3::~RGWSelectObj_ObjStore_S3()
344344
345345int RGWSelectObj_ObjStore_S3::get_params (optional_yield y)
346346{
347- if (m_s3select_query.empty () == false ) {
347+ if (m_s3select_query.empty () == false ) {
348348 return 0 ;
349349 }
350350#ifndef _ARROW_EXIST
@@ -416,14 +416,14 @@ int RGWSelectObj_ObjStore_S3::run_s3select_on_csv(const char* query, const char*
416416 if (output_escape_char.size ()) {
417417 csv.output_escape_char = *output_escape_char.c_str ();
418418 }
419- if (output_quote_fields.compare (" ALWAYS" ) == 0 ) {
419+ if (output_quote_fields.compare (" ALWAYS" ) == 0 ) {
420420 csv.quote_fields_always = true ;
421- } else if (output_quote_fields.compare (" ASNEEDED" ) == 0 ) {
421+ } else if (output_quote_fields.compare (" ASNEEDED" ) == 0 ) {
422422 csv.quote_fields_asneeded = true ;
423423 }
424- if (m_header_info.compare (" IGNORE" )==0 ) {
424+ if (m_header_info.compare (" IGNORE" )==0 ) {
425425 csv.ignore_header_info =true ;
426- } else if (m_header_info.compare (" USE" )==0 ) {
426+ } else if (m_header_info.compare (" USE" )==0 ) {
427427 csv.use_header_info =true ;
428428 }
429429
@@ -478,15 +478,18 @@ int RGWSelectObj_ObjStore_S3::run_s3select_on_parquet(const char* query)
478478 if (!m_s3_parquet_object.is_set ()) {
479479 // parsing the SQL statement.
480480 s3select_syntax.parse_query (m_sql_query.c_str ());
481+ parquet_object::csv_definitions parquet;
481482
482483 m_s3_parquet_object.set_external_system_functions (fp_s3select_continue,
483484 fp_s3select_result_format,
484485 fp_result_header_format,
485486 fp_debug_mesg);
486487
487488 try {
489+ // setting the Parquet-reader properties. i.e. the buffer-size for the Parquet-reader
490+ parquet::ceph::S3select_Config::getInstance ().set_s3select_reader_properties (s->cct ->_conf ->rgw_parquet_buffer_size );
488491 // at this stage the Parquet-processing requires for the meta-data that reside on Parquet object
489- m_s3_parquet_object.set_parquet_object (std::string (" s3object" ), &s3select_syntax, &m_rgw_api);
492+ m_s3_parquet_object.set_parquet_object (std::string (" s3object" ), &s3select_syntax, &m_rgw_api, parquet );
490493 } catch (base_s3select_exception& e) {
491494 ldpp_dout (this , 10 ) << " S3select: failed upon parquet-reader construction: " << e.what () << dendl;
492495 fp_result_header_format (m_aws_response_handler.get_sql_result ());
@@ -524,6 +527,7 @@ int RGWSelectObj_ObjStore_S3::run_s3select_on_json(const char* query, const char
524527 fp_s3select_result_format,
525528 fp_result_header_format,
526529 fp_debug_mesg);
530+ json_object::csv_definitions json;
527531
528532 m_aws_response_handler.init_response ();
529533
@@ -547,8 +551,7 @@ int RGWSelectObj_ObjStore_S3::run_s3select_on_json(const char* query, const char
547551 }
548552
549553 // initializing json processor
550- json_object::csv_definitions output_definition;
551- m_s3_json_object.set_json_query (&s3select_syntax,output_definition);
554+ m_s3_json_object.set_json_query (&s3select_syntax, json);
552555
553556 if (input == nullptr ) {
554557 input = " " ;
@@ -706,6 +709,7 @@ int RGWSelectObj_ObjStore_S3::range_request(int64_t ofs, int64_t len, void* buff
706709 RGWGetObj::parse_range ();
707710 requested_buffer.clear ();
708711 m_request_range = len;
712+ m_aws_response_handler.update_processed_size (len);
709713 ldout (s->cct , 10 ) << " S3select: calling execute(async):" << " request-offset :" << ofs << " request-length :" << len << " buffer size : " << requested_buffer.size () << dendl;
710714 RGWGetObj::execute (y);
711715 if (buff) {
@@ -730,7 +734,7 @@ void RGWSelectObj_ObjStore_S3::execute(optional_yield y)
730734 m_aws_response_handler.set (s, this , fp_chunked_transfer_encoding);
731735 }
732736
733- if (s->cct ->_conf ->rgw_disable_s3select == true )
737+ if (s->cct ->_conf ->rgw_disable_s3select == true )
734738 {
735739 std::string error_msg=" s3select : is disabled by rgw_disable_s3select configuration parameter" ;
736740 ldpp_dout (this , 10 ) << error_msg << dendl;
@@ -749,20 +753,32 @@ void RGWSelectObj_ObjStore_S3::execute(optional_yield y)
749753 return ;
750754 }
751755 s3select_syntax.parse_query (m_sql_query.c_str ());
756+ // the run_s3select_on_parquet() calling the s3select-query-engine, that read and process the parquet object with RGW::range_request,
757+ // upon query-engine finish the processing, the control is back to execute()
758+ // the parquet-reader indicates the end of the parquet object.
752759 status = run_s3select_on_parquet (m_sql_query.c_str ());
753760 if (status) {
754761 ldout (s->cct , 10 ) << " S3select: failed to process query <" << m_sql_query << " > on object " << s->object ->get_name () << dendl;
755762 op_ret = -ERR_INVALID_REQUEST;
756763 } else {
757- ldout (s->cct , 10 ) << " S3select: complete query with success " << dendl;
764+ // status per amount of processed data
765+ m_aws_response_handler.update_total_bytes_returned (m_s3_parquet_object.get_return_result_size ());
766+ m_aws_response_handler.init_stats_response ();
767+ m_aws_response_handler.send_stats_response ();
768+ m_aws_response_handler.init_end_response ();
769+ ldpp_dout (this , 10 ) << " s3select : reached the end of parquet query request : aws_response_handler.get_processed_size() "
770+ << m_aws_response_handler.get_processed_size ()
771+ << " m_object_size_for_processing : " << m_object_size_for_processing << dendl;
772+
773+ ldout (s->cct , 10 ) << " S3select: complete parquet query with success " << dendl;
758774 }
759775 } else {
760776 // CSV or JSON processing
761777 if (m_scan_range_ind) {
762778
763779 m_requested_range = (m_end_scan_sz - m_start_scan_sz);
764780
765- if (m_is_trino_request){
781+ if (m_is_trino_request){
766782 // fetch more than requested(m_scan_offset), that additional bytes are scanned for end of row,
767783 // thus the additional length will be processed, and no broken row for Trino.
768784 // assumption: row is smaller than m_scan_offset. (a different approach is to request for additional range)
@@ -778,7 +794,8 @@ void RGWSelectObj_ObjStore_S3::execute(optional_yield y)
778794}
779795
780796int RGWSelectObj_ObjStore_S3::parquet_processing (bufferlist& bl, off_t ofs, off_t len)
781- {
797+ {// purpose: to process the returned buffer from range-request, and to send it to the Parquet-reader.
798+ // range_request() is called by arrow::ReadAt, and upon request completion the control is back to RGWSelectObj_ObjStore_S3::execute()
782799 fp_chunked_transfer_encoding ();
783800 size_t append_in_callback = 0 ;
784801 int part_no = 1 ;
@@ -809,18 +826,18 @@ void RGWSelectObj_ObjStore_S3::shape_chunk_per_trino_requests(const char* it_cp,
809826// the purpose is to return "perfect" results, with no broken or missing lines.
810827
811828 off_t new_offset = 0 ;
812- if (m_scan_range_ind){// only upon range-scan
829+ if (m_scan_range_ind){// only upon range-scan
813830 int64_t sc=0 ;
814831 int64_t start =0 ;
815832 const char * row_delimiter = m_row_delimiter.c_str ();
816833
817834 ldpp_dout (this , 10 ) << " s3select query: per Trino request the first and last chunk should modified." << dendl;
818835
819836 // chop the head of the first chunk and only upon the slice does not include the head of the object.
820- if (m_start_scan_sz && (m_aws_response_handler.get_processed_size ()==0 )){
837+ if (m_start_scan_sz && (m_aws_response_handler.get_processed_size ()==0 )){
821838 char * p = const_cast <char *>(it_cp+ofs);
822839 while (strncmp (row_delimiter,p,1 ) && (p - (it_cp+ofs)) < len)p++;
823- if (!strncmp (row_delimiter,p,1 )){
840+ if (!strncmp (row_delimiter,p,1 )){
824841 new_offset += (p - (it_cp+ofs))+1 ;
825842 }
826843 }
@@ -831,14 +848,14 @@ void RGWSelectObj_ObjStore_S3::shape_chunk_per_trino_requests(const char* it_cp,
831848
832849 // chop the end of the last chunk for this request
833850 // if it's the last chunk, search for first row-delimiter for the following different use-cases
834- if ((m_aws_response_handler.get_processed_size ()+len) >= m_requested_range){
851+ if ((m_aws_response_handler.get_processed_size ()+len) >= m_requested_range){
835852 // had pass the requested range, start to search for first delimiter
836- if (m_aws_response_handler.get_processed_size ()>m_requested_range){
853+ if (m_aws_response_handler.get_processed_size ()>m_requested_range){
837854 // the previous chunk contain the complete request(all data) and an extra bytes.
838855 // thus, search for the first row-delimiter
839856 // [:previous (RR) ... ][:current (RD) ]
840857 start = 0 ;
841- } else if (m_aws_response_handler.get_processed_size ()){
858+ } else if (m_aws_response_handler.get_processed_size ()){
842859 // the *current* chunk contain the complete request in the middle of the chunk.
843860 // thus, search for the first row-delimiter after the complete request position
844861 // [:current (RR) .... (RD) ]
@@ -852,7 +869,7 @@ void RGWSelectObj_ObjStore_S3::shape_chunk_per_trino_requests(const char* it_cp,
852869 for (sc=start;sc<len;sc++)// assumption : row-delimiter must exist or its end ebject
853870 {
854871 char * p = const_cast <char *>(it_cp) + ofs + sc;
855- if (!strncmp (row_delimiter,p,1 )){
872+ if (!strncmp (row_delimiter,p,1 )){
856873 ldout (s->cct , 10 ) << " S3select: found row-delimiter on " << sc << " get_processed_size = " << m_aws_response_handler.get_processed_size () << dendl;
857874 len = sc + 1 ;// +1 is for delimiter. TODO what about m_object_size_for_processing (to update according to len)
858875 // the end of row exist in current chunk.
@@ -872,7 +889,7 @@ void RGWSelectObj_ObjStore_S3::shape_chunk_per_trino_requests(const char* it_cp,
872889int RGWSelectObj_ObjStore_S3::csv_processing (bufferlist& bl, off_t ofs, off_t len)
873890{
874891 int status = 0 ;
875- if (m_skip_next_chunk == true ){
892+ if (m_skip_next_chunk == true ){
876893 return status;
877894 }
878895
@@ -894,13 +911,13 @@ int RGWSelectObj_ObjStore_S3::csv_processing(bufferlist& bl, off_t ofs, off_t le
894911 }
895912
896913
897- if (ofs > it.length ()){
914+ if (ofs > it.length ()){
898915 // safety check
899916 ldpp_dout (this , 10 ) << " offset and length may cause invalid read: ofs = " << ofs << " len = " << len << " it.length() = " << it.length () << dendl;
900917 ofs = 0 ;
901918 }
902919
903- if (m_is_trino_request){
920+ if (m_is_trino_request){
904921 // TODO replace len with it.length() ? ; test Trino flow with compressed objects.
905922 // is it possible to send get-by-ranges? in parallel?
906923 shape_chunk_per_trino_requests (&(it)[0 ], ofs, len);
@@ -964,7 +981,7 @@ int RGWSelectObj_ObjStore_S3::json_processing(bufferlist& bl, off_t ofs, off_t l
964981 continue ;
965982 }
966983
967- if ((ofs + len) > it.length ()){
984+ if ((ofs + len) > it.length ()){
968985 ldpp_dout (this , 10 ) << " s3select: offset and length may cause invalid read: ofs = " << ofs << " len = " << len << " it.length() = " << it.length () << dendl;
969986 ofs = 0 ;
970987 len = it.length ();
@@ -1025,7 +1042,7 @@ int RGWSelectObj_ObjStore_S3::send_response_data(bufferlist& bl, off_t ofs, off_
10251042 if (len == 0 && s->obj_size != 0 ) {
10261043 return 0 ;
10271044 }
1028- if (m_parquet_type) {
1045+ if (m_parquet_type) {// bufferlist sendback upon range-request
10291046 return parquet_processing (bl,ofs,len);
10301047 }
10311048 if (m_json_type) {
0 commit comments