Skip to content

Commit 06eadd6

Browse files
committed
refactor of s3select response handler, adding continuation-response to parquet flow, refacor of error response
bug fix (init_success_response). s3select submodule fix for json-error-flow Signed-off-by: Gal Salomon <[email protected]>
1 parent 0e3c55c commit 06eadd6

File tree

3 files changed

+122
-80
lines changed

3 files changed

+122
-80
lines changed

src/rgw/rgw_s3select.cc

Lines changed: 87 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -46,13 +46,13 @@ void aws_response_handler::push_header(const char* header_name, const char* head
4646
char x;
4747
short s;
4848
x = char(strlen(header_name));
49-
m_buff_header.append(&x, sizeof(x));
50-
m_buff_header.append(header_name);
49+
get_buffer()->append(&x, sizeof(x));
50+
get_buffer()->append(header_name);
5151
x = char(7);
52-
m_buff_header.append(&x, sizeof(x));
52+
get_buffer()->append(&x, sizeof(x));
5353
s = htons(uint16_t(strlen(header_value)));
54-
m_buff_header.append(reinterpret_cast<char*>(&s), sizeof(s));
55-
m_buff_header.append(header_value);
54+
get_buffer()->append(reinterpret_cast<char*>(&s), sizeof(s));
55+
get_buffer()->append(header_value);
5656
}
5757

5858
#define IDX( x ) static_cast<int>( x )
@@ -67,7 +67,7 @@ int aws_response_handler::create_header_records()
6767
push_header(header_name_str[IDX(header_name_En::CONTENT_TYPE)], header_value_str[IDX(header_value_En::OCTET_STREAM)]);
6868
//3
6969
push_header(header_name_str[IDX(header_name_En::MESSAGE_TYPE)], header_value_str[IDX(header_value_En::EVENT)]);
70-
return m_buff_header.size();
70+
return get_buffer()->size();
7171
}
7272

7373
int aws_response_handler::create_header_continuation()
@@ -77,7 +77,7 @@ int aws_response_handler::create_header_continuation()
7777
push_header(header_name_str[IDX(header_name_En::EVENT_TYPE)], header_value_str[IDX(header_value_En::CONT)]);
7878
//2
7979
push_header(header_name_str[IDX(header_name_En::MESSAGE_TYPE)], header_value_str[IDX(header_value_En::EVENT)]);
80-
return m_buff_header.size();
80+
return get_buffer()->size();
8181
}
8282

8383
int aws_response_handler::create_header_progress()
@@ -89,7 +89,7 @@ int aws_response_handler::create_header_progress()
8989
push_header(header_name_str[IDX(header_name_En::CONTENT_TYPE)], header_value_str[IDX(header_value_En::XML)]);
9090
//3
9191
push_header(header_name_str[IDX(header_name_En::MESSAGE_TYPE)], header_value_str[IDX(header_value_En::EVENT)]);
92-
return m_buff_header.size();
92+
return get_buffer()->size();
9393
}
9494

9595
int aws_response_handler::create_header_stats()
@@ -101,7 +101,7 @@ int aws_response_handler::create_header_stats()
101101
push_header(header_name_str[IDX(header_name_En::CONTENT_TYPE)], header_value_str[IDX(header_value_En::XML)]);
102102
//3
103103
push_header(header_name_str[IDX(header_name_En::MESSAGE_TYPE)], header_value_str[IDX(header_value_En::EVENT)]);
104-
return m_buff_header.size();
104+
return get_buffer()->size();
105105
}
106106

107107
int aws_response_handler::create_header_end()
@@ -111,7 +111,7 @@ int aws_response_handler::create_header_end()
111111
push_header(header_name_str[IDX(header_name_En::EVENT_TYPE)], header_value_str[IDX(header_value_En::END)]);
112112
//2
113113
push_header(header_name_str[IDX(header_name_En::MESSAGE_TYPE)], header_value_str[IDX(header_value_En::EVENT)]);
114-
return m_buff_header.size();
114+
return get_buffer()->size();
115115
}
116116

117117
int aws_response_handler::create_error_header_records(const char* error_message)
@@ -124,35 +124,38 @@ int aws_response_handler::create_error_header_records(const char* error_message)
124124
push_header(header_name_str[IDX(header_name_En::ERROR_MESSAGE)], error_message);
125125
//3
126126
push_header(header_name_str[IDX(header_name_En::MESSAGE_TYPE)], header_value_str[IDX(header_value_En::ERROR_TYPE)]);
127-
return m_buff_header.size();
127+
return get_buffer()->size();
128128
}
129129

130-
int aws_response_handler::create_message(u_int32_t header_len)
130+
int aws_response_handler::create_message(u_int32_t header_len,std::string *msg_string = nullptr)
131131
{
132132
//message description(AWS):
133133
//[total-byte-length:4][header-byte-length:4][crc:4][headers:variable-length][payload:variable-length][crc:4]
134134
//s3select result is produced into sql_result, the sql_result is also the response-message, thus the attach headers and CRC
135135
//are created later to the produced SQL result, and actually wrapping the payload.
136136
auto push_encode_int = [&](u_int32_t s, int pos) {
137137
u_int32_t x = htonl(s);
138-
sql_result.replace(pos, sizeof(x), reinterpret_cast<char*>(&x), sizeof(x));
138+
msg_string->replace(pos, sizeof(x), reinterpret_cast<char*>(&x), sizeof(x));
139139
};
140+
141+
msg_string = (msg_string == nullptr) ? &sql_result : msg_string;
142+
140143
u_int32_t total_byte_len = 0;
141144
u_int32_t preload_crc = 0;
142145
u_int32_t message_crc = 0;
143-
total_byte_len = sql_result.size() + 4; //the total is greater in 4 bytes than current size
146+
total_byte_len = msg_string->size() + 4; //the total is greater in 4 bytes than current size
144147
push_encode_int(total_byte_len, 0);
145148
push_encode_int(header_len, 4);
146149
crc32.reset();
147-
crc32 = std::for_each(sql_result.data(), sql_result.data() + 8, crc32); //crc for starting 8 bytes
150+
crc32 = std::for_each(msg_string->data(), msg_string->data() + 8, crc32); //crc for starting 8 bytes
148151
preload_crc = crc32();
149152
push_encode_int(preload_crc, 8);
150153
crc32.reset();
151-
crc32 = std::for_each(sql_result.begin(), sql_result.end(), crc32); //crc for payload + checksum
154+
crc32 = std::for_each(msg_string->begin(), msg_string->end(), crc32); //crc for payload + checksum
152155
message_crc = crc32();
153156
u_int32_t x = htonl(message_crc);
154-
sql_result.append(reinterpret_cast<char*>(&x), sizeof(x));
155-
return sql_result.size();
157+
msg_string->append(reinterpret_cast<char*>(&x), sizeof(x));
158+
return msg_string->size();
156159
}
157160

158161
void aws_response_handler::init_response()
@@ -163,71 +166,76 @@ void aws_response_handler::init_response()
163166

164167
void aws_response_handler::init_success_response()
165168
{
166-
m_buff_header.clear();
167-
header_size = create_header_records();
168-
sql_result.append(m_buff_header.c_str(), header_size);
169-
#ifdef PAYLOAD_TAG
170-
sql_result.append(PAYLOAD_LINE);
171-
#endif
169+
get_buffer()->clear();
170+
m_success_header_size = create_header_records();
171+
sql_result.append(get_buffer()->c_str(), m_success_header_size);
172172
}
173173

174174
void aws_response_handler::send_continuation_response()
175175
{
176-
sql_result.resize(header_crc_size, '\0');
177-
m_buff_header.clear();
176+
set_continue_buffer();
177+
continue_result.resize(header_crc_size, '\0');
178+
get_buffer()->clear();
178179
header_size = create_header_continuation();
179-
sql_result.append(m_buff_header.c_str(), header_size);
180-
int buff_len = create_message(header_size);
181-
s->formatter->write_bin_data(sql_result.data(), buff_len);
180+
continue_result.append(get_buffer()->c_str(), header_size);
181+
int buff_len = create_message(header_size,&continue_result);
182+
s->formatter->write_bin_data(continue_result.data(), buff_len);
182183
rgw_flush_formatter_and_reset(s, s->formatter);
184+
get_buffer()->clear();
185+
set_main_buffer();
183186
}
184187

185188
void aws_response_handler::init_progress_response()
186189
{
187190
sql_result.resize(header_crc_size, '\0');
188-
m_buff_header.clear();
191+
get_buffer()->clear();
189192
header_size = create_header_progress();
190-
sql_result.append(m_buff_header.c_str(), header_size);
193+
sql_result.append(get_buffer()->c_str(), header_size);
191194
}
192195

193196
void aws_response_handler::init_stats_response()
194197
{
195198
sql_result.resize(header_crc_size, '\0');
196-
m_buff_header.clear();
199+
get_buffer()->clear();
197200
header_size = create_header_stats();
198-
sql_result.append(m_buff_header.c_str(), header_size);
201+
sql_result.append(get_buffer()->c_str(), header_size);
199202
}
200203

201204
void aws_response_handler::init_end_response()
202205
{
203206
sql_result.resize(header_crc_size, '\0');
204-
m_buff_header.clear();
207+
get_buffer()->clear();
205208
header_size = create_header_end();
206-
sql_result.append(m_buff_header.c_str(), header_size);
209+
sql_result.append(get_buffer()->c_str(), header_size);
207210
int buff_len = create_message(header_size);
208211
s->formatter->write_bin_data(sql_result.data(), buff_len);
209212
rgw_flush_formatter_and_reset(s, s->formatter);
210213
}
211214

212-
void aws_response_handler::init_error_response(const char* error_message)
215+
void aws_response_handler::send_error_response(const char* error_message)
213216
{
214-
//currently not in use. the headers in the case of error, are not extracted by AWS-cli.
215-
m_buff_header.clear();
217+
//currently not in use. need to change the s3-test, this error-response raises a boto3 exception
218+
error_result.resize(header_crc_size, '\0');
219+
get_buffer()->clear();
216220
header_size = create_error_header_records(error_message);
217-
sql_result.append(m_buff_header.c_str(), header_size);
221+
error_result.append(get_buffer()->c_str(), header_size);
222+
223+
int buff_len = create_message(header_size,&error_result);
224+
s->formatter->write_bin_data(error_result.data(), buff_len);
225+
rgw_flush_formatter_and_reset(s, s->formatter);
218226
}
219227

220228
void aws_response_handler::send_success_response()
221229
{
222230
#ifdef PAYLOAD_TAG
223231
sql_result.append(END_PAYLOAD_LINE);
224232
#endif
225-
int buff_len = create_message(header_size);
233+
int buff_len = create_message(m_success_header_size);
226234
s->formatter->write_bin_data(sql_result.data(), buff_len);
227235
rgw_flush_formatter_and_reset(s, s->formatter);
228236
}
229237

230-
void aws_response_handler::send_error_response(const char* error_code,
238+
void aws_response_handler::send_error_response_rgw_formatter(const char* error_code,
231239
const char* error_message,
232240
const char* resource_id)
233241
{
@@ -265,7 +273,6 @@ void aws_response_handler::send_stats_response()
265273
}
266274

267275
RGWSelectObj_ObjStore_S3::RGWSelectObj_ObjStore_S3():
268-
m_buff_header(std::make_unique<char[]>(1000)),
269276
m_scan_range_ind(false),
270277
m_start_scan_sz(0),
271278
m_end_scan_sz(0),
@@ -301,6 +308,11 @@ RGWSelectObj_ObjStore_S3::RGWSelectObj_ObjStore_S3():
301308
m_aws_response_handler.send_success_response();
302309
return 0;
303310
};
311+
fp_s3select_continue = [this](std::string& result) {
312+
fp_chunked_transfer_encoding();
313+
m_aws_response_handler.send_continuation_response();
314+
return 0;
315+
};
304316

305317
fp_debug_mesg = [&](const char* mesg){
306318
ldpp_dout(this, 10) << mesg << dendl;
@@ -371,9 +383,6 @@ int RGWSelectObj_ObjStore_S3::run_s3select_on_csv(const char* query, const char*
371383
int status = 0;
372384
uint32_t length_before_processing, length_post_processing;
373385
csv_object::csv_defintions csv;
374-
const char* s3select_syntax_error = "s3select-Syntax-Error";
375-
const char* s3select_resource_id = "resource-id";
376-
const char* s3select_processTime_error = "s3select-ProcessingTime-Error";
377386

378387
s3select_syntax.parse_query(query);
379388
if (m_row_delimiter.size()) {
@@ -410,14 +419,17 @@ int RGWSelectObj_ObjStore_S3::run_s3select_on_csv(const char* query, const char*
410419
} else if(m_header_info.compare("USE")==0) {
411420
csv.use_header_info=true;
412421
}
413-
m_s3_csv_object.set_external_debug_system(fp_debug_mesg);
414-
m_s3_csv_object.set_result_formatters(fp_s3select_result_format,fp_result_header_format);
422+
415423
m_s3_csv_object.set_csv_query(&s3select_syntax, csv);
424+
425+
m_s3_csv_object.set_external_system_functions(fp_s3select_continue,
426+
fp_s3select_result_format,
427+
fp_result_header_format,
428+
fp_debug_mesg);
429+
416430
if (s3select_syntax.get_error_description().empty() == false) {
417431
//error-flow (syntax-error)
418-
m_aws_response_handler.send_error_response(s3select_syntax_error,
419-
s3select_syntax.get_error_description().c_str(),
420-
s3select_resource_id);
432+
m_aws_response_handler.send_error_response_rgw_formatter(s3select_syntax_error,s3select_syntax.get_error_description().c_str(),s3select_resource_id);
421433
ldpp_dout(this, 10) << "s3-select query: failed to prase the following query {" << query << "}" << dendl;
422434
ldpp_dout(this, 10) << "s3-select query: syntax-error {" << s3select_syntax.get_error_description() << "}" << dendl;
423435
return -1;
@@ -434,18 +446,15 @@ int RGWSelectObj_ObjStore_S3::run_s3select_on_csv(const char* query, const char*
434446

435447
if (status < 0) {
436448
//error flow(processing-time)
437-
m_aws_response_handler.send_error_response(s3select_processTime_error,
438-
m_s3_csv_object.get_error_description().c_str(),
439-
s3select_resource_id);
449+
m_aws_response_handler.send_error_response_rgw_formatter(s3select_processTime_error,m_s3_csv_object.get_error_description().c_str(),s3select_resource_id);
450+
440451
ldpp_dout(this, 10) << "s3-select query: failed to process query; {" << m_s3_csv_object.get_error_description() << "}" << dendl;
441452
return -1;
442453
}
443454

444455
}
445456
if ((length_post_processing-length_before_processing) != 0) {
446457
ldpp_dout(this, 10) << "s3-select: sql-result-size = " << m_aws_response_handler.get_sql_result().size() << dendl;
447-
} else {
448-
m_aws_response_handler.send_continuation_response();
449458
}
450459
ldpp_dout(this, 10) << "s3-select: complete chunk processing : chunk length = " << input_length << dendl;
451460
if (enable_progress == true) {
@@ -463,7 +472,12 @@ int RGWSelectObj_ObjStore_S3::run_s3select_on_parquet(const char* query)
463472
if (!m_s3_parquet_object.is_set()) {
464473
//parsing the SQL statement.
465474
s3select_syntax.parse_query(m_sql_query.c_str());
466-
//m_s3_parquet_object.set_external_debug_system(fp_debug_mesg);
475+
476+
m_s3_parquet_object.set_external_system_functions(fp_s3select_continue,
477+
fp_s3select_result_format,
478+
fp_result_header_format,
479+
fp_debug_mesg);
480+
467481
try {
468482
//at this stage the Parquet-processing requires for the meta-data that reside on Parquet object
469483
m_s3_parquet_object.set_parquet_object(std::string("s3object"), &s3select_syntax, &m_rgw_api);
@@ -477,19 +491,21 @@ int RGWSelectObj_ObjStore_S3::run_s3select_on_parquet(const char* query)
477491
}
478492
if (s3select_syntax.get_error_description().empty() == false) {
479493
//the SQL statement failed the syntax parser
480-
fp_result_header_format(m_aws_response_handler.get_sql_result());
481-
m_aws_response_handler.get_sql_result().append(s3select_syntax.get_error_description().data());
482-
fp_s3select_result_format(m_aws_response_handler.get_sql_result());
494+
fp_chunked_transfer_encoding();
495+
m_aws_response_handler.send_error_response(m_s3_parquet_object.get_error_description().c_str());
496+
483497
ldpp_dout(this, 10) << "s3-select query: failed to prase query; {" << s3select_syntax.get_error_description() << "}" << dendl;
484498
status = -1;
485499
} else {
486500
fp_result_header_format(m_aws_response_handler.get_sql_result());
487501
//at this stage the Parquet-processing "takes control", it keep calling to s3-range-request according to the SQL statement.
488-
status = m_s3_parquet_object.run_s3select_on_object(m_aws_response_handler.get_sql_result(), fp_s3select_result_format, fp_result_header_format);
502+
status = m_s3_parquet_object.run_s3select_on_object(m_aws_response_handler.get_sql_result());
489503
if (status < 0) {
490-
m_aws_response_handler.get_sql_result().append(m_s3_parquet_object.get_error_description());
491-
fp_s3select_result_format(m_aws_response_handler.get_sql_result());
492-
ldout(s->cct, 10) << "S3select: failure while execution" << m_s3_parquet_object.get_error_description() << dendl;
504+
505+
fp_chunked_transfer_encoding();
506+
m_aws_response_handler.send_error_response(m_s3_parquet_object.get_error_description().c_str());
507+
508+
return -1;
493509
}
494510
}
495511
#endif
@@ -500,17 +516,17 @@ int RGWSelectObj_ObjStore_S3::run_s3select_on_json(const char* query, const char
500516
{
501517
int status = 0;
502518

503-
const char* s3select_processTime_error = "s3select-ProcessingTime-Error";
504-
const char* s3select_syntax_error = "s3select-Syntax-Error";
505-
const char* s3select_resource_id = "resourcse-id";
506-
const char* s3select_json_error = "json-Format-Error";
519+
m_s3_csv_object.set_external_system_functions(fp_s3select_continue,
520+
fp_s3select_result_format,
521+
fp_result_header_format,
522+
fp_debug_mesg);
507523

508524
m_aws_response_handler.init_response();
509525

510526
//the JSON data-type should be(currently) only DOCUMENT
511527
if (m_json_datatype.compare("DOCUMENT") != 0) {
512528
const char* s3select_json_error_msg = "s3-select query: wrong json dataType should use DOCUMENT; ";
513-
m_aws_response_handler.send_error_response(s3select_json_error,
529+
m_aws_response_handler.send_error_response_rgw_formatter(s3select_json_error,
514530
s3select_json_error_msg,
515531
s3select_resource_id);
516532
ldpp_dout(this, 10) << s3select_json_error_msg << dendl;
@@ -521,7 +537,7 @@ int RGWSelectObj_ObjStore_S3::run_s3select_on_json(const char* query, const char
521537
s3select_syntax.parse_query(m_sql_query.c_str());
522538
if (s3select_syntax.get_error_description().empty() == false) {
523539
//SQL statement is wrong(syntax).
524-
m_aws_response_handler.send_error_response(s3select_syntax_error,
540+
m_aws_response_handler.send_error_response_rgw_formatter(s3select_syntax_error,
525541
s3select_syntax.get_error_description().c_str(),
526542
s3select_resource_id);
527543
ldpp_dout(this, 10) << "s3-select query: failed to prase query; {" << s3select_syntax.get_error_description() << "}" << dendl;
@@ -543,7 +559,7 @@ int RGWSelectObj_ObjStore_S3::run_s3select_on_json(const char* query, const char
543559
} catch(base_s3select_exception& e) {
544560
ldpp_dout(this, 10) << "S3select: failed to process JSON object: " << e.what() << dendl;
545561
m_aws_response_handler.get_sql_result().append(e.what());
546-
m_aws_response_handler.send_error_response(s3select_processTime_error,
562+
m_aws_response_handler.send_error_response_rgw_formatter(s3select_processTime_error,
547563
e.what(),
548564
s3select_resource_id);
549565
return -EINVAL;
@@ -552,7 +568,7 @@ int RGWSelectObj_ObjStore_S3::run_s3select_on_json(const char* query, const char
552568
m_aws_response_handler.update_total_bytes_returned(length_post_processing - length_before_processing);
553569
if (status < 0) {
554570
//error flow(processing-time)
555-
m_aws_response_handler.send_error_response(s3select_processTime_error,
571+
m_aws_response_handler.send_error_response_rgw_formatter(s3select_processTime_error,
556572
m_s3_json_object.get_error_description().c_str(),
557573
s3select_resource_id);
558574
ldpp_dout(this, 10) << "s3-select query: failed to process query; {" << m_s3_json_object.get_error_description() << "}" << dendl;
@@ -562,8 +578,6 @@ int RGWSelectObj_ObjStore_S3::run_s3select_on_json(const char* query, const char
562578

563579
if (length_post_processing-length_before_processing != 0) {
564580
m_aws_response_handler.send_success_response();
565-
} else {
566-
m_aws_response_handler.send_continuation_response();
567581
}
568582
if (enable_progress == true) {
569583
m_aws_response_handler.init_progress_response();

0 commit comments

Comments
 (0)