Skip to content

Commit 3e35d4a

Browse files
authored
perf: improve streaming performance (#240)
* perf: improve streaming performance by using raw pbs * refactor: remove unused import Co-authored-by: larkee <[email protected]>
1 parent 434967e commit 3e35d4a

File tree

5 files changed

+191
-365
lines changed

5 files changed

+191
-365
lines changed

google/cloud/spanner_v1/_helpers.py

Lines changed: 33 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -161,41 +161,6 @@ def _make_list_value_pbs(values):
161161

162162

163163
# pylint: disable=too-many-branches
164-
def _parse_value(value, field_type):
165-
if value is None:
166-
return None
167-
if field_type.code == TypeCode.STRING:
168-
result = value
169-
elif field_type.code == TypeCode.BYTES:
170-
result = value.encode("utf8")
171-
elif field_type.code == TypeCode.BOOL:
172-
result = value
173-
elif field_type.code == TypeCode.INT64:
174-
result = int(value)
175-
elif field_type.code == TypeCode.FLOAT64:
176-
if isinstance(value, str):
177-
result = float(value)
178-
else:
179-
result = value
180-
elif field_type.code == TypeCode.DATE:
181-
result = _date_from_iso8601_date(value)
182-
elif field_type.code == TypeCode.TIMESTAMP:
183-
DatetimeWithNanoseconds = datetime_helpers.DatetimeWithNanoseconds
184-
result = DatetimeWithNanoseconds.from_rfc3339(value)
185-
elif field_type.code == TypeCode.ARRAY:
186-
result = [_parse_value(item, field_type.array_element_type) for item in value]
187-
elif field_type.code == TypeCode.STRUCT:
188-
result = [
189-
_parse_value(item, field_type.struct_type.fields[i].type_)
190-
for (i, item) in enumerate(value)
191-
]
192-
elif field_type.code == TypeCode.NUMERIC:
193-
result = decimal.Decimal(value)
194-
else:
195-
raise ValueError("Unknown type: %s" % (field_type,))
196-
return result
197-
198-
199164
def _parse_value_pb(value_pb, field_type):
200165
"""Convert a Value protobuf to cell data.
201166
@@ -209,17 +174,41 @@ def _parse_value_pb(value_pb, field_type):
209174
:returns: value extracted from value_pb
210175
:raises ValueError: if unknown type is passed
211176
"""
177+
type_code = field_type.code
212178
if value_pb.HasField("null_value"):
213179
return None
214-
if value_pb.HasField("string_value"):
215-
return _parse_value(value_pb.string_value, field_type)
216-
if value_pb.HasField("bool_value"):
217-
return _parse_value(value_pb.bool_value, field_type)
218-
if value_pb.HasField("number_value"):
219-
return _parse_value(value_pb.number_value, field_type)
220-
if value_pb.HasField("list_value"):
221-
return _parse_value(value_pb.list_value, field_type)
222-
raise ValueError("No value set in Value: %s" % (value_pb,))
180+
if type_code == TypeCode.STRING:
181+
return value_pb.string_value
182+
elif type_code == TypeCode.BYTES:
183+
return value_pb.string_value.encode("utf8")
184+
elif type_code == TypeCode.BOOL:
185+
return value_pb.bool_value
186+
elif type_code == TypeCode.INT64:
187+
return int(value_pb.string_value)
188+
elif type_code == TypeCode.FLOAT64:
189+
if value_pb.HasField("string_value"):
190+
return float(value_pb.string_value)
191+
else:
192+
return value_pb.number_value
193+
elif type_code == TypeCode.DATE:
194+
return _date_from_iso8601_date(value_pb.string_value)
195+
elif type_code == TypeCode.TIMESTAMP:
196+
DatetimeWithNanoseconds = datetime_helpers.DatetimeWithNanoseconds
197+
return DatetimeWithNanoseconds.from_rfc3339(value_pb.string_value)
198+
elif type_code == TypeCode.ARRAY:
199+
return [
200+
_parse_value_pb(item_pb, field_type.array_element_type)
201+
for item_pb in value_pb.list_value.values
202+
]
203+
elif type_code == TypeCode.STRUCT:
204+
return [
205+
_parse_value_pb(item_pb, field_type.struct_type.fields[i].type_)
206+
for (i, item_pb) in enumerate(value_pb.list_value.values)
207+
]
208+
elif field_type.code == TypeCode.NUMERIC:
209+
return decimal.Decimal(value_pb.string_value)
210+
else:
211+
raise ValueError("Unknown type: %s" % (field_type,))
223212

224213

225214
# pylint: enable=too-many-branches

google/cloud/spanner_v1/streamed.py

Lines changed: 41 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,15 @@
1414

1515
"""Wrapper for streaming results."""
1616

17+
from google.protobuf.struct_pb2 import ListValue
18+
from google.protobuf.struct_pb2 import Value
1719
from google.cloud import exceptions
20+
from google.cloud.spanner_v1 import PartialResultSet
1821
from google.cloud.spanner_v1 import TypeCode
1922
import six
2023

2124
# pylint: disable=ungrouped-imports
22-
from google.cloud.spanner_v1._helpers import _parse_value
25+
from google.cloud.spanner_v1._helpers import _parse_value_pb
2326

2427
# pylint: enable=ungrouped-imports
2528

@@ -88,29 +91,33 @@ def _merge_chunk(self, value):
8891
field = self.fields[current_column]
8992
merged = _merge_by_type(self._pending_chunk, value, field.type_)
9093
self._pending_chunk = None
91-
return _parse_value(merged, field.type_)
94+
return merged
9295

9396
def _merge_values(self, values):
9497
"""Merge values into rows.
9598
9699
:type values: list of :class:`~google.protobuf.struct_pb2.Value`
97100
:param values: non-chunked values from partial result set.
98101
"""
99-
width = len(self.fields)
102+
print(self.fields)
103+
field_types = [field.type_ for field in self.fields]
104+
width = len(field_types)
105+
index = len(self._current_row)
100106
for value in values:
101-
index = len(self._current_row)
102-
field = self.fields[index]
103-
self._current_row.append(_parse_value(value, field.type_))
104-
if len(self._current_row) == width:
107+
self._current_row.append(_parse_value_pb(value, field_types[index]))
108+
index += 1
109+
if index == width:
105110
self._rows.append(self._current_row)
106111
self._current_row = []
112+
index = 0
107113

108114
def _consume_next(self):
109115
"""Consume the next partial result set from the stream.
110116
111117
Parse the result set into new/existing rows in :attr:`_rows`
112118
"""
113119
response = six.next(self._response_iterator)
120+
response_pb = PartialResultSet.pb(response)
114121

115122
if self._metadata is None: # first response
116123
metadata = self._metadata = response.metadata
@@ -119,29 +126,27 @@ def _consume_next(self):
119126
if source is not None and source._transaction_id is None:
120127
source._transaction_id = metadata.transaction.id
121128

122-
if "stats" in response: # last response
129+
if response_pb.HasField("stats"): # last response
123130
self._stats = response.stats
124131

125-
values = list(response.values)
132+
values = list(response_pb.values)
126133
if self._pending_chunk is not None:
127134
values[0] = self._merge_chunk(values[0])
128135

129-
if response.chunked_value:
136+
if response_pb.chunked_value:
130137
self._pending_chunk = values.pop()
131138

132139
self._merge_values(values)
133140

134141
def __iter__(self):
135-
iter_rows, self._rows[:] = self._rows[:], ()
136142
while True:
137-
if not iter_rows:
138-
try:
139-
self._consume_next()
140-
except StopIteration:
141-
return
142-
iter_rows, self._rows[:] = self._rows[:], ()
143+
iter_rows, self._rows[:] = self._rows[:], ()
143144
while iter_rows:
144145
yield iter_rows.pop(0)
146+
try:
147+
self._consume_next()
148+
except StopIteration:
149+
return
145150

146151
def one(self):
147152
"""Return exactly one result, or raise an exception.
@@ -213,17 +218,23 @@ def _unmergeable(lhs, rhs, type_):
213218

214219
def _merge_float64(lhs, rhs, type_): # pylint: disable=unused-argument
215220
"""Helper for '_merge_by_type'."""
216-
if type(lhs) == str:
217-
return float(lhs + rhs)
218-
array_continuation = type(lhs) == float and type(rhs) == str and rhs == ""
221+
lhs_kind = lhs.WhichOneof("kind")
222+
if lhs_kind == "string_value":
223+
return Value(string_value=lhs.string_value + rhs.string_value)
224+
rhs_kind = rhs.WhichOneof("kind")
225+
array_continuation = (
226+
lhs_kind == "number_value"
227+
and rhs_kind == "string_value"
228+
and rhs.string_value == ""
229+
)
219230
if array_continuation:
220231
return lhs
221232
raise Unmergeable(lhs, rhs, type_)
222233

223234

224235
def _merge_string(lhs, rhs, type_): # pylint: disable=unused-argument
225236
"""Helper for '_merge_by_type'."""
226-
return str(lhs) + str(rhs)
237+
return Value(string_value=lhs.string_value + rhs.string_value)
227238

228239

229240
_UNMERGEABLE_TYPES = (TypeCode.BOOL,)
@@ -234,17 +245,17 @@ def _merge_array(lhs, rhs, type_):
234245
element_type = type_.array_element_type
235246
if element_type.code in _UNMERGEABLE_TYPES:
236247
# Individual values cannot be merged, just concatenate
237-
lhs.extend(rhs)
248+
lhs.list_value.values.extend(rhs.list_value.values)
238249
return lhs
250+
lhs, rhs = list(lhs.list_value.values), list(rhs.list_value.values)
239251

240252
# Sanity check: If either list is empty, short-circuit.
241253
# This is effectively a no-op.
242254
if not len(lhs) or not len(rhs):
243-
lhs.extend(rhs)
244-
return lhs
255+
return Value(list_value=ListValue(values=(lhs + rhs)))
245256

246257
first = rhs.pop(0)
247-
if first is None: # can't merge
258+
if first.HasField("null_value"): # can't merge
248259
lhs.append(first)
249260
else:
250261
last = lhs.pop()
@@ -255,23 +266,22 @@ def _merge_array(lhs, rhs, type_):
255266
lhs.append(first)
256267
else:
257268
lhs.append(merged)
258-
lhs.extend(rhs)
259-
return lhs
269+
return Value(list_value=ListValue(values=(lhs + rhs)))
260270

261271

262272
def _merge_struct(lhs, rhs, type_):
263273
"""Helper for '_merge_by_type'."""
264274
fields = type_.struct_type.fields
275+
lhs, rhs = list(lhs.list_value.values), list(rhs.list_value.values)
265276

266277
# Sanity check: If either list is empty, short-circuit.
267278
# This is effectively a no-op.
268279
if not len(lhs) or not len(rhs):
269-
lhs.extend(rhs)
270-
return lhs
280+
return Value(list_value=ListValue(values=(lhs + rhs)))
271281

272282
candidate_type = fields[len(lhs) - 1].type_
273283
first = rhs.pop(0)
274-
if first is None or candidate_type.code in _UNMERGEABLE_TYPES:
284+
if first.HasField("null_value") or candidate_type.code in _UNMERGEABLE_TYPES:
275285
lhs.append(first)
276286
else:
277287
last = lhs.pop()
@@ -282,8 +292,7 @@ def _merge_struct(lhs, rhs, type_):
282292
lhs.append(first)
283293
else:
284294
lhs.append(merged)
285-
lhs.extend(rhs)
286-
return lhs
295+
return Value(list_value=ListValue(values=lhs + rhs))
287296

288297

289298
_MERGE_BY_TYPE = {

0 commit comments

Comments
 (0)