@@ -84,9 +84,10 @@ PartialResultSetSource::PartialResultSetSource(
8484 std::unique_ptr<PartialResultSetReader> reader)
8585 : options_(internal::CurrentOptions()),
8686 reader_ (std::move(reader)),
87- values_(absl::make_optional<
87+ values_(absl::make_optional(
88+ google::protobuf::Arena::Create<
8889 google::protobuf::RepeatedPtrField<google::protobuf::Value>>(
89- &arena_)) {
90+ &arena_) )) {
9091 if (options_.has <spanner::StreamingResumabilityBufferSizeOption>()) {
9192 values_space_limit_ =
9293 options_.get <spanner::StreamingResumabilityBufferSizeOption>();
@@ -119,19 +120,24 @@ StatusOr<spanner::Row> PartialResultSetSource::NextRow() {
119120 // There may be complete or partial rows in values_ that haven't been
120121 // returned to the clients yet. Let's copy it over before we reset
121122 // the arena.
123+ auto * values = *values_;
122124 int partial_size =
123- static_cast <int >(values_ ->size () - rows_returned_ * columns_->size ());
125+ static_cast <int >(values ->size () - rows_returned_ * columns_->size ());
124126 absl::FixedArray<google::protobuf::Value*> tmp (partial_size);
125127 if (!tmp.empty ()) {
126- values_ ->ExtractSubrange (values_ ->size () - partial_size, partial_size,
127- tmp.data ());
128+ values ->ExtractSubrange (values ->size () - partial_size, partial_size,
129+ tmp.data ());
128130 }
129131 values_.reset ();
130132 values_space_.Clear ();
131133 arena_.Reset ();
132- values_.emplace (&arena_);
134+ values_.emplace (
135+ google::protobuf::Arena::Create<
136+ google::protobuf::RepeatedPtrField<google::protobuf::Value>>(
137+ &arena_));
138+ values = *values_;
133139 for (auto * elem : tmp) {
134- values_ ->Add (std::move (*elem));
140+ values ->Add (std::move (*elem));
135141 delete elem;
136142 }
137143 rows_returned_ = 0 ;
@@ -142,7 +148,7 @@ StatusOr<spanner::Row> PartialResultSetSource::NextRow() {
142148 auto status = ReadFromStream ();
143149 if (!status.ok ()) return status;
144150 }
145- auto value_it = values_->begin () + rows_returned_ * columns_->size ();
151+ auto value_it = (* values_) ->begin () + rows_returned_ * columns_->size ();
146152 ++rows_returned_;
147153 --usable_rows_;
148154 std::vector<spanner::Value> values;
@@ -159,6 +165,7 @@ Status PartialResultSetSource::ReadFromStream() {
159165 return internal::InternalError (" PartialResultSetSource state error" ,
160166 GCP_ERROR_INFO ());
161167 }
168+ auto * values = *values_;
162169 auto * raw_result_set =
163170 google::protobuf::Arena::Create<google::spanner::v1::PartialResultSet>(
164171 &arena_);
@@ -169,7 +176,7 @@ Status PartialResultSetSource::ReadFromStream() {
169176 }
170177 if (state_ == kEndOfStream ) {
171178 // If we have no buffered data, we're done.
172- if (values_ ->empty ()) {
179+ if (values ->empty ()) {
173180 state_ = kFinished ;
174181 return reader_->Finish ();
175182 }
@@ -219,30 +226,30 @@ Status PartialResultSetSource::ReadFromStream() {
219226 GCP_ERROR_INFO ());
220227 }
221228 values_back_incomplete_ = false ;
222- values_ ->Clear ();
229+ values ->Clear ();
223230 values_space_.Clear ();
224231 }
225232
226233 // If the final value in the previous `PartialResultSet` was incomplete,
227234 // it must be combined with the first value from the new set. And then
228- // we move everything remaining from the new set to the end of `values_ `.
235+ // we move everything remaining from the new set to the end of `values `.
229236 if (!result_set.result .values ().empty ()) {
230237 auto & new_values = *result_set.result .mutable_values ();
231238 int append_start = 0 ;
232239 if (values_back_incomplete_) {
233240 auto & first = *new_values.Mutable (append_start++);
234- auto status = MergeChunk (*values_ ->rbegin (), std::move (first));
241+ auto status = MergeChunk (*values ->rbegin (), std::move (first));
235242 if (!status.ok ()) return status;
236243 }
237- ExtractSubrangeAndAppend (new_values, append_start, *values_ );
244+ ExtractSubrangeAndAppend (new_values, append_start, *values );
238245 values_back_incomplete_ = result_set.result .chunked_value ();
239246 }
240247
241248 // Deliver whatever rows we can muster.
242- auto const n_values = values_ ->size () - (values_back_incomplete_ ? 1 : 0 );
249+ auto const n_values = values ->size () - (values_back_incomplete_ ? 1 : 0 );
243250 auto const n_columns = columns_ ? static_cast <int >(columns_->size ()) : 0 ;
244251 auto n_rows = n_columns ? n_values / n_columns : 0 ;
245- if (n_columns == 0 && !values_ ->empty ()) {
252+ if (n_columns == 0 && !values ->empty ()) {
246253 return internal::InternalError (
247254 " PartialResultSetSource metadata is missing row type" ,
248255 GCP_ERROR_INFO ());
@@ -251,11 +258,11 @@ Status PartialResultSetSource::ReadFromStream() {
251258 // If we didn't receive a resume token, and have not exceeded our buffer
252259 // limit, then we choose to `Read()` again so as to maintain resumability.
253260 if (result_set.result .resume_token ().empty () && values_space_limit_ > 0 ) {
254- for (auto it = values_ ->begin () + values_space_.index ; it != values_ ->end ();
261+ for (auto it = values ->begin () + values_space_.index ; it != values ->end ();
255262 ++it) {
256263 values_space_.space_used += it->SpaceUsedLong ();
257264 }
258- values_space_.index = values_ ->size ();
265+ values_space_.index = values ->size ();
259266 if (values_space_.space_used < values_space_limit_) {
260267 return {}; // OK
261268 }
@@ -266,7 +273,7 @@ Status PartialResultSetSource::ReadFromStream() {
266273 // Otherwise, if we deliver anything at all, we must disable resumability.
267274 if (!result_set.result .resume_token ().empty ()) {
268275 resume_token_ = result_set.result .resume_token ();
269- if (n_rows * n_columns != values_ ->size ()) {
276+ if (n_rows * n_columns != values ->size ()) {
270277 if (state_ != kEndOfStream ) {
271278 return internal::InternalError (
272279 " PartialResultSetSource reader produced a resume token"
0 commit comments