Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
1 change: 1 addition & 0 deletions sdks/python/apache_beam/coders/avro_record.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

class AvroRecord(object):
"""Simple wrapper class for dictionary records."""

def __init__(self, value):
self.record = value

Expand Down
48 changes: 42 additions & 6 deletions sdks/python/apache_beam/coders/coder_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@

class CoderImpl(object):
"""For internal use only; no backwards-compatibility guarantees."""

def encode_to_stream(self, value, stream, nested):
# type: (Any, create_OutputStream, bool) -> None

Expand Down Expand Up @@ -211,6 +212,7 @@ class SimpleCoderImpl(CoderImpl):
"""For internal use only; no backwards-compatibility guarantees.

Subclass of CoderImpl implementing stream methods using encode/decode."""

def encode_to_stream(self, value, stream, nested):
# type: (Any, create_OutputStream, bool) -> None

Expand All @@ -228,6 +230,7 @@ class StreamCoderImpl(CoderImpl):
"""For internal use only; no backwards-compatibility guarantees.

Subclass of CoderImpl implementing encode/decode using stream methods."""

def encode(self, value):
# type: (Any) -> bytes
out = create_OutputStream()
Expand Down Expand Up @@ -255,6 +258,7 @@ class CallbackCoderImpl(CoderImpl):
This is the default implementation used if Coder._get_impl()
is not overwritten.
"""

def __init__(self, encoder, decoder, size_estimator=None):
self._encoder = encoder
self._decoder = decoder
Expand Down Expand Up @@ -297,6 +301,7 @@ def __repr__(self):

class ProtoCoderImpl(SimpleCoderImpl):
"""For internal use only; no backwards-compatibility guarantees."""

def __init__(self, proto_message_type):
self.proto_message_type = proto_message_type

Expand All @@ -311,12 +316,14 @@ def decode(self, encoded):

class DeterministicProtoCoderImpl(ProtoCoderImpl):
"""For internal use only; no backwards-compatibility guarantees."""

def encode(self, value):
return value.SerializePartialToString(deterministic=True)


class ProtoPlusCoderImpl(SimpleCoderImpl):
"""For internal use only; no backwards-compatibility guarantees."""

def __init__(self, proto_plus_type):
# type: (Type[proto.Message]) -> None
self.proto_plus_type = proto_plus_type
Expand Down Expand Up @@ -356,6 +363,7 @@ def decode(self, value):

class FastPrimitivesCoderImpl(StreamCoderImpl):
"""For internal use only; no backwards-compatibility guarantees."""

def __init__(
self, fallback_coder_impl, requires_deterministic_step_label=None):
self.fallback_coder_impl = fallback_coder_impl
Expand Down Expand Up @@ -610,6 +618,7 @@ class BytesCoderImpl(CoderImpl):
"""For internal use only; no backwards-compatibility guarantees.

A coder for bytes/str objects."""

def encode_to_stream(self, value, out, nested):
# type: (bytes, create_OutputStream, bool) -> None

Expand All @@ -636,6 +645,7 @@ class BooleanCoderImpl(CoderImpl):
"""For internal use only; no backwards-compatibility guarantees.

A coder for bool objects."""

def encode_to_stream(self, value, out, nested):
out.write_byte(1 if value else 0)

Expand Down Expand Up @@ -675,12 +685,12 @@ class MapCoderImpl(StreamCoderImpl):
attribute values.

A coder for typing.Mapping objects."""

def __init__(
self,
key_coder, # type: CoderImpl
value_coder, # type: CoderImpl
is_deterministic = False
):
is_deterministic=False):
self._key_coder = key_coder
self._value_coder = value_coder
self._is_deterministic = is_deterministic
Expand Down Expand Up @@ -760,6 +770,7 @@ def estimate_size(self, unused_value, nested=False):

class BigEndianShortCoderImpl(StreamCoderImpl):
"""For internal use only; no backwards-compatibility guarantees."""

def encode_to_stream(self, value, out, nested):
# type: (int, create_OutputStream, bool) -> None
out.write_bigendian_int16(value)
Expand All @@ -776,6 +787,7 @@ def estimate_size(self, unused_value, nested=False):

class SinglePrecisionFloatCoderImpl(StreamCoderImpl):
"""For internal use only; no backwards-compatibility guarantees."""

def encode_to_stream(self, value, out, nested):
# type: (float, create_OutputStream, bool) -> None
out.write_bigendian_float(value)
Expand All @@ -792,6 +804,7 @@ def estimate_size(self, unused_value, nested=False):

class FloatCoderImpl(StreamCoderImpl):
"""For internal use only; no backwards-compatibility guarantees."""

def encode_to_stream(self, value, out, nested):
# type: (float, create_OutputStream, bool) -> None
out.write_bigendian_double(value)
Expand Down Expand Up @@ -863,6 +876,7 @@ class TimestampCoderImpl(StreamCoderImpl):
that of the Java SDK InstantCoder.
https://github.com/apache/beam/blob/f5029b4f0dfff404310b2ef55e2632bbacc7b04f/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/InstantCoder.java#L79
"""

def encode_to_stream(self, value, out, nested):
# type: (Timestamp, create_OutputStream, bool) -> None
millis = value.micros // 1000
Expand All @@ -889,6 +903,7 @@ def estimate_size(self, unused_value, nested=False):

class TimerCoderImpl(StreamCoderImpl):
"""For internal use only; no backwards-compatibility guarantees."""

def __init__(self, key_coder_impl, window_coder_impl):
self._timestamp_coder_impl = TimestampCoderImpl()
self._boolean_coder_impl = BooleanCoderImpl()
Expand Down Expand Up @@ -947,6 +962,7 @@ class VarIntCoderImpl(StreamCoderImpl):
"""For internal use only; no backwards-compatibility guarantees.

A coder for int objects."""

def encode_to_stream(self, value, out, nested):
# type: (int, create_OutputStream, bool) -> None
out.write_var_int64(value)
Expand Down Expand Up @@ -978,6 +994,7 @@ class SingletonCoderImpl(CoderImpl):
"""For internal use only; no backwards-compatibility guarantees.

A coder that always encodes exactly one value."""

def __init__(self, value):
self._value = value

Expand Down Expand Up @@ -1005,6 +1022,7 @@ class AbstractComponentCoderImpl(StreamCoderImpl):
"""For internal use only; no backwards-compatibility guarantees.

CoderImpl for coders that are comprised of several component coders."""

def __init__(self, coder_impls):
for c in coder_impls:
assert isinstance(c, CoderImpl), c
Expand All @@ -1030,8 +1048,8 @@ def decode_from_stream(self, in_stream, nested):
# type: (create_InputStream, bool) -> Any
return self._construct_from_components([
c.decode_from_stream(
in_stream, nested or i + 1 < len(self._coder_impls)) for i,
c in enumerate(self._coder_impls)
in_stream, nested or i + 1 < len(self._coder_impls))
for i, c in enumerate(self._coder_impls)
])

def estimate_size(self, value, nested=False):
Expand Down Expand Up @@ -1061,6 +1079,7 @@ def get_estimated_size_and_observables(self, value, nested=False):

class AvroCoderImpl(SimpleCoderImpl):
"""For internal use only; no backwards-compatibility guarantees."""

def __init__(self, schema):
self.parsed_schema = parse_schema(json.loads(schema))

Expand All @@ -1077,6 +1096,7 @@ def decode(self, encoded):

class TupleCoderImpl(AbstractComponentCoderImpl):
"""A coder for tuple objects."""

def _extract_components(self, value):
return tuple(value)

Expand All @@ -1085,6 +1105,7 @@ def _construct_from_components(self, components):


class _ConcatSequence(object):

def __init__(self, head, tail):
# type: (Iterable[Any], Iterable[Any]) -> None
self._head = head
Expand Down Expand Up @@ -1277,12 +1298,14 @@ class TupleSequenceCoderImpl(SequenceCoderImpl):
"""For internal use only; no backwards-compatibility guarantees.

A coder for homogeneous tuple objects."""

def _construct_from_sequence(self, components):
return tuple(components)


class _AbstractIterable(object):
"""Wraps an iterable hiding methods that might not always be available."""

def __init__(self, contents):
self._contents = contents

Expand Down Expand Up @@ -1315,6 +1338,7 @@ class IterableCoderImpl(SequenceCoderImpl):
"""For internal use only; no backwards-compatibility guarantees.

A coder for homogeneous iterable objects."""

def __init__(self, *args, use_abstract_iterable=None, **kwargs):
super().__init__(*args, **kwargs)
if use_abstract_iterable is None:
Expand All @@ -1332,6 +1356,7 @@ class ListCoderImpl(SequenceCoderImpl):
"""For internal use only; no backwards-compatibility guarantees.

A coder for homogeneous list objects."""

def _construct_from_sequence(self, components):
return components if isinstance(components, list) else list(components)

Expand Down Expand Up @@ -1360,6 +1385,7 @@ class PaneInfoCoderImpl(StreamCoderImpl):
"""For internal use only; no backwards-compatibility guarantees.

Coder for a PaneInfo descriptor."""

def _choose_encoding(self, value):
if ((value._index == 0 and value._nonspeculative_index == 0) or
value._timing == PaneInfoTiming_UNKNOWN):
Expand Down Expand Up @@ -1422,6 +1448,7 @@ def estimate_size(self, value, nested=False):


class _OrderedUnionCoderImpl(StreamCoderImpl):

def __init__(self, coder_impl_types, fallback_coder_impl):
assert len(coder_impl_types) < 128
self._types, self._coder_impls = zip(*coder_impl_types)
Expand Down Expand Up @@ -1555,6 +1582,7 @@ class ParamWindowedValueCoderImpl(WindowedValueCoderImpl):
encoding, and uses the supplied parameterized timestamp, windows
and pane info values during decoding when reconstructing the windowed
value."""

def __init__(self, value_coder, window_coder, payload):
super().__init__(value_coder, TimestampCoderImpl(), window_coder)
self._timestamp, self._windows, self._pane_info = self._from_proto(
Expand Down Expand Up @@ -1595,6 +1623,7 @@ class LengthPrefixCoderImpl(StreamCoderImpl):
"""For internal use only; no backwards-compatibility guarantees.

Coder which prefixes the length of the encoded object in the stream."""

def __init__(self, value_coder):
# type: (CoderImpl) -> None
self._value_coder = value_coder
Expand Down Expand Up @@ -1625,6 +1654,7 @@ class ShardedKeyCoderImpl(StreamCoderImpl):
shard id byte string
encoded user key
"""

def __init__(self, key_coder_impl):
self._shard_id_coder_impl = BytesCoderImpl()
self._key_coder_impl = key_coder_impl
Expand Down Expand Up @@ -1660,6 +1690,7 @@ class TimestampPrefixingWindowCoderImpl(StreamCoderImpl):
window's max_timestamp()
encoded window using it's own coder.
"""

def __init__(self, window_coder_impl: CoderImpl) -> None:
self._window_coder_impl = window_coder_impl

Expand Down Expand Up @@ -1687,6 +1718,7 @@ def _create_opaque_window(end, encoded_window):
from apache_beam.transforms.window import BoundedWindow

class _OpaqueWindow(BoundedWindow):

def __init__(self, end, encoded_window):
super().__init__(end)
self.encoded_window = encoded_window
Expand Down Expand Up @@ -1715,6 +1747,7 @@ class TimestampPrefixingOpaqueWindowCoderImpl(StreamCoderImpl):
window's max_timestamp()
length prefixed encoded window
"""

def __init__(self) -> None:
pass

Expand Down Expand Up @@ -1770,6 +1803,7 @@ def finalize_write(self):


class GenericRowColumnEncoder(RowColumnEncoder):

def __init__(self, coder_impl, column):
self.coder_impl = coder_impl
self.column = column
Expand All @@ -1790,6 +1824,7 @@ def finalize_write(self):

class RowCoderImpl(StreamCoderImpl):
"""For internal use only; no backwards-compatibility guarantees."""

def __init__(self, schema, components):
self.schema = schema
self.num_fields = len(self.schema.fields)
Expand Down Expand Up @@ -1859,8 +1894,7 @@ def _row_column_encoders(self, columns):
RowColumnEncoder.create(
self.schema.fields[i].type.atomic_type,
self.components[i],
columns[name]) for i,
name in enumerate(self.field_names)
columns[name]) for i, name in enumerate(self.field_names)
]

def encode_batch_to_stream(self, columns: Dict[str, np.ndarray], out):
Expand Down Expand Up @@ -1964,6 +1998,7 @@ def decode_batch_from_stream(self, dest: Dict[str, np.ndarray], in_stream):


class LogicalTypeCoderImpl(StreamCoderImpl):

def __init__(self, logical_type, representation_coder):
self.logical_type = logical_type
self.representation_coder = representation_coder.get_impl()
Expand All @@ -1982,6 +2017,7 @@ class BigIntegerCoderImpl(StreamCoderImpl):

For interoperability with Java SDK, encoding needs to match that of the Java
SDK BigIntegerCoder."""

def encode_to_stream(self, value, out, nested):
# type: (int, create_OutputStream, bool) -> None
if value < 0:
Expand Down
Loading
Loading