Skip to content

Commit 5e21bf2

Browse files
committed
Merge pull request #581 from sodabrew/streaming_procedure_specs
Fixes for streaming and stored procedures with multiple result sets
2 parents 41ab20a + ba5c5a0 commit 5e21bf2

File tree

4 files changed

+60
-57
lines changed

4 files changed

+60
-57
lines changed

ext/mysql2/result.c

Lines changed: 37 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -446,7 +446,7 @@ static VALUE rb_mysql_result_each(int argc, VALUE * argv, VALUE self) {
446446
mysql2_result_wrapper * wrapper;
447447
unsigned long i;
448448
const char * errstr;
449-
int symbolizeKeys = 0, asArray = 0, castBool = 0, cacheRows = 1, cast = 1, streaming = 0;
449+
int symbolizeKeys, asArray, castBool, cacheRows, cast;
450450
MYSQL_FIELD * fields = NULL;
451451

452452
GetMysql2Result(self, wrapper);
@@ -459,31 +459,13 @@ static VALUE rb_mysql_result_each(int argc, VALUE * argv, VALUE self) {
459459
opts = defaults;
460460
}
461461

462-
if (rb_hash_aref(opts, sym_symbolize_keys) == Qtrue) {
463-
symbolizeKeys = 1;
464-
}
465-
466-
if (rb_hash_aref(opts, sym_as) == sym_array) {
467-
asArray = 1;
468-
}
469-
470-
if (rb_hash_aref(opts, sym_cast_booleans) == Qtrue) {
471-
castBool = 1;
472-
}
473-
474-
if (rb_hash_aref(opts, sym_cache_rows) == Qfalse) {
475-
cacheRows = 0;
476-
}
477-
478-
if (rb_hash_aref(opts, sym_cast) == Qfalse) {
479-
cast = 0;
480-
}
462+
symbolizeKeys = RTEST(rb_hash_aref(opts, sym_symbolize_keys));
463+
asArray = rb_hash_aref(opts, sym_as) == sym_array;
464+
castBool = RTEST(rb_hash_aref(opts, sym_cast_booleans));
465+
cacheRows = RTEST(rb_hash_aref(opts, sym_cache_rows));
466+
cast = RTEST(rb_hash_aref(opts, sym_cast));
481467

482-
if (rb_hash_aref(opts, sym_stream) == Qtrue) {
483-
streaming = 1;
484-
}
485-
486-
if (streaming && cacheRows) {
468+
if (wrapper->is_streaming && cacheRows) {
487469
rb_warn("cacheRows is ignored if streaming is true");
488470
}
489471

@@ -508,40 +490,28 @@ static VALUE rb_mysql_result_each(int argc, VALUE * argv, VALUE self) {
508490
app_timezone = Qnil;
509491
}
510492

511-
if (wrapper->lastRowProcessed == 0) {
512-
if (streaming) {
513-
/* We can't get number of rows if we're streaming, */
514-
/* until we've finished fetching all rows */
515-
wrapper->numberOfRows = 0;
493+
if (wrapper->is_streaming) {
494+
/* When streaming, we will only yield rows, not return them. */
495+
if (wrapper->rows == Qnil) {
516496
wrapper->rows = rb_ary_new();
517-
} else {
518-
wrapper->numberOfRows = mysql_num_rows(wrapper->result);
519-
if (wrapper->numberOfRows == 0) {
520-
wrapper->rows = rb_ary_new();
521-
return wrapper->rows;
522-
}
523-
wrapper->rows = rb_ary_new2(wrapper->numberOfRows);
524497
}
525-
}
526498

527-
if (streaming) {
528499
if (!wrapper->streamingComplete) {
529500
VALUE row;
530501

531502
fields = mysql_fetch_fields(wrapper->result);
532503

533504
do {
534505
row = rb_mysql_result_fetch_row(self, db_timezone, app_timezone, symbolizeKeys, asArray, castBool, cast, fields);
535-
536-
if (block != Qnil && row != Qnil) {
537-
rb_yield(row);
538-
wrapper->lastRowProcessed++;
506+
if (row != Qnil) {
507+
wrapper->numberOfRows++;
508+
if (block != Qnil) {
509+
rb_yield(row);
510+
}
539511
}
540512
} while(row != Qnil);
541513

542514
rb_mysql_result_free_result(wrapper);
543-
544-
wrapper->numberOfRows = wrapper->lastRowProcessed;
545515
wrapper->streamingComplete = 1;
546516

547517
// Check for errors, the connection might have gone out from under us
@@ -554,6 +524,15 @@ static VALUE rb_mysql_result_each(int argc, VALUE * argv, VALUE self) {
554524
rb_raise(cMysql2Error, "You have already fetched all the rows for this query and streaming is true. (to reiterate you must requery).");
555525
}
556526
} else {
527+
if (wrapper->lastRowProcessed == 0) {
528+
wrapper->numberOfRows = mysql_num_rows(wrapper->result);
529+
if (wrapper->numberOfRows == 0) {
530+
wrapper->rows = rb_ary_new();
531+
return wrapper->rows;
532+
}
533+
wrapper->rows = rb_ary_new2(wrapper->numberOfRows);
534+
}
535+
557536
if (cacheRows && wrapper->lastRowProcessed == wrapper->numberOfRows) {
558537
/* we've already read the entire dataset from the C result into our */
559538
/* internal array. Lets hand that over to the user since it's ready to go */
@@ -601,14 +580,17 @@ static VALUE rb_mysql_result_count(VALUE self) {
601580
mysql2_result_wrapper *wrapper;
602581

603582
GetMysql2Result(self, wrapper);
583+
if (wrapper->is_streaming) {
584+
/* This is an unsigned long per result.h */
585+
return ULONG2NUM(wrapper->numberOfRows);
586+
}
587+
604588
if (wrapper->resultFreed) {
605-
if (wrapper->streamingComplete){
606-
return LONG2NUM(wrapper->numberOfRows);
607-
} else {
608-
return LONG2NUM(RARRAY_LEN(wrapper->rows));
609-
}
589+
/* Ruby arrays have platform signed long length */
590+
return LONG2NUM(RARRAY_LEN(wrapper->rows));
610591
} else {
611-
return INT2FIX(mysql_num_rows(wrapper->result));
592+
/* MySQL returns an unsigned 64-bit long here */
593+
return ULL2NUM(mysql_num_rows(wrapper->result));
612594
}
613595
}
614596

@@ -634,6 +616,10 @@ VALUE rb_mysql_result_to_obj(VALUE client, VALUE encoding, VALUE options, MYSQL_
634616

635617
rb_iv_set(obj, "@query_options", options);
636618

619+
/* Options that cannot be changed in results.each(...) { |row| }
620+
* should be processed here. */
621+
wrapper->is_streaming = (rb_hash_aref(options, sym_stream) == Qtrue ? 1 : 0);
622+
637623
return obj;
638624
}
639625

ext/mysql2/result.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ typedef struct {
1212
unsigned int numberOfFields;
1313
unsigned long numberOfRows;
1414
unsigned long lastRowProcessed;
15+
char is_streaming;
1516
char streamingComplete;
1617
char resultFreed;
1718
MYSQL_RES *result;

spec/mysql2/client_spec.rb

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -614,6 +614,21 @@ def connect *args
614614

615615
@multi_client.more_results?.should be_false
616616
end
617+
618+
it "#more_results? should work with stored procedures" do
619+
@multi_client.query("DROP PROCEDURE IF EXISTS test_proc")
620+
@multi_client.query("CREATE PROCEDURE test_proc() BEGIN SELECT 1 AS 'set_1'; SELECT 2 AS 'set_2'; END")
621+
@multi_client.query("CALL test_proc()").first.should eql({ 'set_1' => 1 })
622+
@multi_client.more_results?.should be_true
623+
624+
@multi_client.next_result
625+
@multi_client.store_result.first.should eql({ 'set_2' => 2 })
626+
627+
@multi_client.next_result
628+
@multi_client.store_result.should be_nil # this is the result from CALL itself
629+
630+
@multi_client.more_results?.should be_false
631+
end
617632
end
618633
end
619634

spec/mysql2/result_spec.rb

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -107,18 +107,19 @@
107107

108108
context "streaming" do
109109
it "should maintain a count while streaming" do
110-
result = @client.query('SELECT 1')
111-
112-
result.count.should eql(1)
110+
result = @client.query('SELECT 1', :stream => true, :cache_rows => false)
111+
result.count.should eql(0)
113112
result.each.to_a
114113
result.count.should eql(1)
115114
end
116115

117-
it "should set the actual count of rows after streaming" do
118-
result = @client.query("SELECT * FROM mysql2_test", :stream => true, :cache_rows => false)
116+
it "should retain the count when mixing first and each" do
117+
result = @client.query("SELECT 1 UNION SELECT 2", :stream => true, :cache_rows => false)
119118
result.count.should eql(0)
120-
result.each {|r| }
119+
result.first
121120
result.count.should eql(1)
121+
result.each.to_a
122+
result.count.should eql(2)
122123
end
123124

124125
it "should not yield nil at the end of streaming" do

0 commit comments

Comments
 (0)