Skip to content

Commit e8fab26

Browse files
claudevdmClaudetvalentyn
authored
Pass update compat through as_deterministic_coder and use cloudpickle for deterministic special types. (#35725)
* Pass update compat through as_deterministic_coder. * Coder changes. * Pass update compat through pipeline options. * Add tests. * asd * Fix test. * Trigger tests. * Undo disable tests. * Fix test. * Fix tests and lint. * Refactor. * Rebase and comments. * Lint fix. * Update the base message to include possible lower values for the flag. * Update sdks/python/apache_beam/coders/coders.py --------- Co-authored-by: Claude <[email protected]> Co-authored-by: tvalentyn <[email protected]>
1 parent 6bdfcb3 commit e8fab26

File tree

11 files changed

+363
-78
lines changed

11 files changed

+363
-78
lines changed
Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
{
2-
"https://github.com/apache/beam/pull/35951": "triggering sideinput test"
2+
"comment": "Modify this file in a trivial way to cause this test suite to run",
3+
"modification": 1
34
}

sdks/python/apache_beam/coders/coder_impl.pxd

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,13 +81,15 @@ cdef class FastPrimitivesCoderImpl(StreamCoderImpl):
8181
cdef CoderImpl iterable_coder_impl
8282
cdef object requires_deterministic_step_label
8383
cdef bint warn_deterministic_fallback
84+
cdef bint force_use_dill
8485

8586
@cython.locals(dict_value=dict, int_value=libc.stdint.int64_t,
8687
unicode_value=unicode)
8788
cpdef encode_to_stream(self, value, OutputStream stream, bint nested)
8889
@cython.locals(t=int)
8990
cpdef decode_from_stream(self, InputStream stream, bint nested)
9091
cdef encode_special_deterministic(self, value, OutputStream stream)
92+
cdef encode_type_2_67_0(self, t, OutputStream stream)
9193
cdef encode_type(self, t, OutputStream stream)
9294
cdef decode_type(self, InputStream stream)
9395

sdks/python/apache_beam/coders/coder_impl.py

Lines changed: 56 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -50,14 +50,14 @@
5050
from typing import Tuple
5151
from typing import Type
5252

53-
import dill
5453
import numpy as np
5554
from fastavro import parse_schema
5655
from fastavro import schemaless_reader
5756
from fastavro import schemaless_writer
5857

5958
from apache_beam.coders import observable
6059
from apache_beam.coders.avro_record import AvroRecord
60+
from apache_beam.internal import cloudpickle_pickler
6161
from apache_beam.typehints.schemas import named_tuple_from_schema
6262
from apache_beam.utils import proto_utils
6363
from apache_beam.utils import windowed_value
@@ -71,6 +71,11 @@
7171
except ImportError:
7272
dataclasses = None # type: ignore
7373

74+
try:
75+
import dill
76+
except ImportError:
77+
dill = None
78+
7479
if TYPE_CHECKING:
7580
import proto
7681
from apache_beam.transforms import userstate
@@ -354,14 +359,30 @@ def decode(self, value):
354359
_ITERABLE_LIKE_TYPES = set() # type: Set[Type]
355360

356361

362+
def _verify_dill_compat():
363+
base_error = (
364+
"This pipeline runs with the pipeline option "
365+
"--update_compatibility_version=2.67.0 or earlier. "
366+
"When running with this option on SDKs 2.68.0 or "
367+
"later, you must ensure dill==0.3.1.1 is installed.")
368+
if not dill:
369+
raise RuntimeError(base_error + ". Dill is not installed.")
370+
if dill.__version__ != "0.3.1.1":
371+
raise RuntimeError(base_error + f". Found dill version '{dill.__version__}")
372+
373+
357374
class FastPrimitivesCoderImpl(StreamCoderImpl):
358375
"""For internal use only; no backwards-compatibility guarantees."""
359376
def __init__(
360-
self, fallback_coder_impl, requires_deterministic_step_label=None):
377+
self,
378+
fallback_coder_impl,
379+
requires_deterministic_step_label=None,
380+
force_use_dill=False):
361381
self.fallback_coder_impl = fallback_coder_impl
362382
self.iterable_coder_impl = IterableCoderImpl(self)
363383
self.requires_deterministic_step_label = requires_deterministic_step_label
364384
self.warn_deterministic_fallback = True
385+
self.force_use_dill = force_use_dill
365386

366387
@staticmethod
367388
def register_iterable_like_type(t):
@@ -525,10 +546,23 @@ def _deterministic_encoding_error_msg(self, value):
525546
"please provide a type hint for the input of '%s'" %
526547
(value, type(value), self.requires_deterministic_step_label))
527548

528-
def encode_type(self, t, stream):
549+
def encode_type_2_67_0(self, t, stream):
550+
"""
551+
Encode special type with <=2.67.0 compatibility.
552+
"""
553+
_verify_dill_compat()
529554
stream.write(dill.dumps(t), True)
530555

556+
def encode_type(self, t, stream):
557+
if self.force_use_dill:
558+
return self.encode_type_2_67_0(t, stream)
559+
bs = cloudpickle_pickler.dumps(
560+
t, config=cloudpickle_pickler.NO_DYNAMIC_CLASS_TRACKING_CONFIG)
561+
stream.write(bs, True)
562+
531563
def decode_type(self, stream):
564+
if self.force_use_dill:
565+
return _unpickle_type_2_67_0(stream.read_all(True))
532566
return _unpickle_type(stream.read_all(True))
533567

534568
def decode_from_stream(self, stream, nested):
@@ -589,19 +623,35 @@ def decode_from_stream(self, stream, nested):
589623
_unpickled_types = {} # type: Dict[bytes, type]
590624

591625

592-
def _unpickle_type(bs):
626+
def _unpickle_type_2_67_0(bs):
627+
"""
628+
Decode special type with <=2.67.0 compatibility.
629+
"""
593630
t = _unpickled_types.get(bs, None)
594631
if t is None:
632+
_verify_dill_compat()
595633
t = _unpickled_types[bs] = dill.loads(bs)
596634
# Fix unpicklable anonymous named tuples for Python 3.6.
597635
if t.__base__ is tuple and hasattr(t, '_fields'):
598636
try:
599637
pickle.loads(pickle.dumps(t))
600638
except pickle.PicklingError:
601-
t.__reduce__ = lambda self: (_unpickle_named_tuple, (bs, tuple(self)))
639+
t.__reduce__ = lambda self: (
640+
_unpickle_named_tuple_2_67_0, (bs, tuple(self)))
602641
return t
603642

604643

644+
def _unpickle_named_tuple_2_67_0(bs, items):
645+
return _unpickle_type_2_67_0(bs)(*items)
646+
647+
648+
def _unpickle_type(bs):
649+
if not _unpickled_types.get(bs, None):
650+
_unpickled_types[bs] = cloudpickle_pickler.loads(bs)
651+
652+
return _unpickled_types[bs]
653+
654+
605655
def _unpickle_named_tuple(bs, items):
606656
return _unpickle_type(bs)(*items)
607657

@@ -837,6 +887,7 @@ def decode_from_stream(self, in_, nested):
837887
if IntervalWindow is None:
838888
from apache_beam.transforms.window import IntervalWindow
839889
# instantiating with None is not part of the public interface
890+
# pylint: disable=too-many-function-args
840891
typed_value = IntervalWindow(None, None) # type: ignore[arg-type]
841892
typed_value._end_micros = (
842893
1000 * self._to_normal_time(in_.read_bigendian_uint64()))

sdks/python/apache_beam/coders/coders.py

Lines changed: 70 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -911,6 +911,44 @@ def _create_impl(self):
911911
cloudpickle_pickler.dumps, cloudpickle_pickler.loads)
912912

913913

914+
class DeterministicFastPrimitivesCoderV2(FastCoder):
915+
"""Throws runtime errors when encoding non-deterministic values."""
916+
def __init__(self, coder, step_label):
917+
self._underlying_coder = coder
918+
self._step_label = step_label
919+
920+
def _create_impl(self):
921+
922+
return coder_impl.FastPrimitivesCoderImpl(
923+
self._underlying_coder.get_impl(),
924+
requires_deterministic_step_label=self._step_label,
925+
force_use_dill=False)
926+
927+
def is_deterministic(self):
928+
# type: () -> bool
929+
return True
930+
931+
def is_kv_coder(self):
932+
# type: () -> bool
933+
return True
934+
935+
def key_coder(self):
936+
return self
937+
938+
def value_coder(self):
939+
return self
940+
941+
def to_type_hint(self):
942+
return Any
943+
944+
def to_runner_api_parameter(self, context):
945+
# type: (Optional[PipelineContext]) -> Tuple[str, Any, Sequence[Coder]]
946+
return (
947+
python_urns.PICKLED_CODER,
948+
google.protobuf.wrappers_pb2.BytesValue(value=serialize_coder(self)),
949+
())
950+
951+
914952
class DeterministicFastPrimitivesCoder(FastCoder):
915953
"""Throws runtime errors when encoding non-deterministic values."""
916954
def __init__(self, coder, step_label):
@@ -920,7 +958,8 @@ def __init__(self, coder, step_label):
920958
def _create_impl(self):
921959
return coder_impl.FastPrimitivesCoderImpl(
922960
self._underlying_coder.get_impl(),
923-
requires_deterministic_step_label=self._step_label)
961+
requires_deterministic_step_label=self._step_label,
962+
force_use_dill=True)
924963

925964
def is_deterministic(self):
926965
# type: () -> bool
@@ -940,6 +979,34 @@ def to_type_hint(self):
940979
return Any
941980

942981

982+
def _should_force_use_dill():
983+
from apache_beam.coders import typecoders
984+
from apache_beam.transforms.util import is_v1_prior_to_v2
985+
update_compat_version = typecoders.registry.update_compatibility_version
986+
987+
if not update_compat_version:
988+
return False
989+
990+
if not is_v1_prior_to_v2(v1=update_compat_version, v2="2.68.0"):
991+
return False
992+
993+
try:
994+
import dill
995+
assert dill.__version__ == "0.3.1.1"
996+
except Exception as e:
997+
raise RuntimeError("This pipeline runs with the pipeline option " \
998+
"--update_compatibility_version=2.67.0 or earlier. When running with " \
999+
"this option on SDKs 2.68.0 or later, you must ensure dill==0.3.1.1 " \
1000+
f"is installed. Error {e}")
1001+
return True
1002+
1003+
1004+
def _update_compatible_deterministic_fast_primitives_coder(coder, step_label):
1005+
if _should_force_use_dill():
1006+
return DeterministicFastPrimitivesCoder(coder, step_label)
1007+
return DeterministicFastPrimitivesCoderV2(coder, step_label)
1008+
1009+
9431010
class FastPrimitivesCoder(FastCoder):
9441011
"""Encodes simple primitives (e.g. str, int) efficiently.
9451012
@@ -960,7 +1027,8 @@ def as_deterministic_coder(self, step_label, error_message=None):
9601027
if self.is_deterministic():
9611028
return self
9621029
else:
963-
return DeterministicFastPrimitivesCoder(self, step_label)
1030+
return _update_compatible_deterministic_fast_primitives_coder(
1031+
self, step_label)
9641032

9651033
def to_type_hint(self):
9661034
return Any

0 commit comments

Comments
 (0)