Skip to content

Commit 523d833

Browse files
claudevdmClaude
andauthored
Cache deterministic types (#36032)
* Cache deterministically encoded types. * Cache pickled deterministic type encodings. * Fix lint. --------- Co-authored-by: Claude <cvandermerwe@google.com>
1 parent 9f3b160 commit 523d833

File tree

3 files changed

+22
-5
lines changed

3 files changed

+22
-5
lines changed

sdks/python/apache_beam/coders/coder_impl.pxd

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,8 @@ cdef class FastPrimitivesCoderImpl(StreamCoderImpl):
9393
cdef encode_type(self, t, OutputStream stream)
9494
cdef decode_type(self, InputStream stream)
9595

96+
cdef dict _pickled_types
97+
9698
cdef dict _unpickled_types
9799

98100

sdks/python/apache_beam/coders/coder_impl.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -550,15 +550,19 @@ def encode_type_2_67_0(self, t, stream):
550550
"""
551551
Encode special type with <=2.67.0 compatibility.
552552
"""
553-
_verify_dill_compat()
554-
stream.write(dill.dumps(t), True)
553+
if t not in _pickled_types:
554+
_verify_dill_compat()
555+
_pickled_types[t] = dill.dumps(t)
556+
stream.write(_pickled_types[t], True)
555557

556558
def encode_type(self, t, stream):
557559
if self.force_use_dill:
558560
return self.encode_type_2_67_0(t, stream)
559-
bs = cloudpickle_pickler.dumps(
560-
t, config=cloudpickle_pickler.NO_DYNAMIC_CLASS_TRACKING_CONFIG)
561-
stream.write(bs, True)
561+
562+
if t not in _pickled_types:
563+
_pickled_types[t] = cloudpickle_pickler.dumps(
564+
t, config=cloudpickle_pickler.NO_DYNAMIC_CLASS_TRACKING_CONFIG)
565+
stream.write(_pickled_types[t], True)
562566

563567
def decode_type(self, stream):
564568
if self.force_use_dill:
@@ -620,6 +624,7 @@ def decode_from_stream(self, stream, nested):
620624
raise ValueError('Unknown type tag %x' % t)
621625

622626

627+
_pickled_types = {} # type: Dict[type, bytes]
623628
_unpickled_types = {} # type: Dict[bytes, type]
624629

625630

sdks/python/apache_beam/tools/coders_microbenchmark.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
from apache_beam.coders import proto2_coder_test_messages_pb2 as test_message
4343
from apache_beam.coders import coder_impl
4444
from apache_beam.coders import coders
45+
from apache_beam.coders import coders_test_common
4546
from apache_beam.coders import row_coder
4647
from apache_beam.coders import typecoders
4748
from apache_beam.tools import utils
@@ -249,6 +250,10 @@ def row_coder_benchmark_factory(generate_fn):
249250
return coder_benchmark_factory(get_row_coder(generate_fn()), generate_fn)
250251

251252

253+
def importable_named_tuple():
254+
return [coders_test_common.MyTypedNamedTuple('a', i) for i in range(1000)]
255+
256+
252257
def run_coder_benchmarks(
253258
num_runs, input_size, seed, verbose, filter_regex='.*'):
254259
random.seed(seed)
@@ -310,6 +315,11 @@ def run_coder_benchmarks(
310315
batch_row_coder_benchmark_factory(nullable_row, True),
311316
batch_row_coder_benchmark_factory(diverse_row, False),
312317
batch_row_coder_benchmark_factory(diverse_row, True),
318+
coder_benchmark_factory(
319+
coders.IterableCoder(
320+
coders.FastPrimitivesCoder().as_deterministic_coder(
321+
step_label="step")),
322+
importable_named_tuple),
313323
]
314324

315325
suite = [

0 commit comments

Comments
 (0)