Skip to content

Commit 5f583ea

Browse files
authored
Create a varint32 coder and used it for RowCoder (#34354)
* Create a varint32 coder and used it for RowCoder * Fix checks * Handle a race when multiple process downloading jar at the same time
1 parent 10f7f0a commit 5f583ea

File tree

13 files changed

+165
-12
lines changed

13 files changed

+165
-12
lines changed
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
{
22
"comment": "Modify this file in a trivial way to cause this test suite to run",
3-
"modification": 5
3+
"modification": 4
44
}

sdks/python/apache_beam/coders/coder_impl.pxd

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,11 @@ cdef class VarIntCoderImpl(StreamCoderImpl):
130130
cpdef bytes encode(self, value)
131131

132132

133+
cdef class VarInt32CoderImpl(StreamCoderImpl):
134+
@cython.locals(ivalue=libc.stdint.int32_t)
135+
cpdef bytes encode(self, value)
136+
137+
133138
cdef class SingletonCoderImpl(CoderImpl):
134139
cdef object _value
135140

sdks/python/apache_beam/coders/coder_impl.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -974,6 +974,37 @@ def estimate_size(self, value, nested=False):
974974
return get_varint_size(value)
975975

976976

977+
class VarInt32CoderImpl(StreamCoderImpl):
978+
"""For internal use only; no backwards-compatibility guarantees.
979+
980+
A coder for int32 objects."""
981+
def encode_to_stream(self, value, out, nested):
982+
# type: (int, create_OutputStream, bool) -> None
983+
out.write_var_int32(value)
984+
985+
def decode_from_stream(self, in_stream, nested):
986+
# type: (create_InputStream, bool) -> int
987+
return in_stream.read_var_int32()
988+
989+
def encode(self, value):
990+
ivalue = value # type cast
991+
if 0 <= ivalue < len(small_ints):
992+
return small_ints[ivalue]
993+
return StreamCoderImpl.encode(self, value)
994+
995+
def decode(self, encoded):
996+
if len(encoded) == 1:
997+
i = ord(encoded)
998+
if 0 <= i < 128:
999+
return i
1000+
return StreamCoderImpl.decode(self, encoded)
1001+
1002+
def estimate_size(self, value, nested=False):
1003+
# type: (Any, bool) -> int
1004+
# Note that VarInts are encoded the same way regardless of nesting.
1005+
return get_varint_size(value & 0xFFFFFFFF)
1006+
1007+
9771008
class SingletonCoderImpl(CoderImpl):
9781009
"""For internal use only; no backwards-compatibility guarantees.
9791010

sdks/python/apache_beam/coders/coders.py

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -629,7 +629,7 @@ def __repr__(self):
629629

630630

631631
class VarIntCoder(FastCoder):
632-
"""Variable-length integer coder."""
632+
"""Variable-length integer coder matches Java SDK's VarLongCoder."""
633633
def _create_impl(self):
634634
return coder_impl.VarIntCoderImpl()
635635

@@ -650,6 +650,25 @@ def __hash__(self):
650650
Coder.register_structured_urn(common_urns.coders.VARINT.urn, VarIntCoder)
651651

652652

653+
class VarInt32Coder(FastCoder):
654+
"""Variable-length integer coder matches Java SDK's VarIntCoder."""
655+
def _create_impl(self):
656+
return coder_impl.VarInt32CoderImpl()
657+
658+
def is_deterministic(self):
659+
# type: () -> bool
660+
return True
661+
662+
def to_type_hint(self):
663+
return int
664+
665+
def __eq__(self, other):
666+
return type(self) == type(other)
667+
668+
def __hash__(self):
669+
return hash(type(self))
670+
671+
653672
class BigEndianShortCoder(FastCoder):
654673
"""A coder used for big-endian int16 values."""
655674
def _create_impl(self):

sdks/python/apache_beam/coders/coders_test_common.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -318,6 +318,20 @@ def test_varint_coder(self):
318318
for k in range(0, int(math.log(MAX_64_BIT_INT)))
319319
])
320320

321+
def test_varint32_coder(self):
322+
# Small ints.
323+
self.check_coder(coders.VarInt32Coder(), *range(-10, 10))
324+
# Multi-byte encoding starts at 128
325+
self.check_coder(coders.VarInt32Coder(), *range(120, 140))
326+
# Large values
327+
MAX_32_BIT_INT = 0x7fffffff
328+
self.check_coder(
329+
coders.VarIntCoder(),
330+
*[
331+
int(math.pow(-1, k) * math.exp(k))
332+
for k in range(0, int(math.log(MAX_32_BIT_INT)))
333+
])
334+
321335
def test_float_coder(self):
322336
self.check_coder(
323337
coders.FloatCoder(), *[float(0.1 * x) for x in range(-100, 100)])

sdks/python/apache_beam/coders/row_coder.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
from apache_beam.coders.coders import SinglePrecisionFloatCoder
3434
from apache_beam.coders.coders import StrUtf8Coder
3535
from apache_beam.coders.coders import TimestampCoder
36+
from apache_beam.coders.coders import VarInt32Coder
3637
from apache_beam.coders.coders import VarIntCoder
3738
from apache_beam.portability import common_urns
3839
from apache_beam.portability.api import schema_pb2
@@ -142,8 +143,10 @@ def _coder_from_type(field_type):
142143
def _nonnull_coder_from_type(field_type):
143144
type_info = field_type.WhichOneof("type_info")
144145
if type_info == "atomic_type":
145-
if field_type.atomic_type in (schema_pb2.INT32, schema_pb2.INT64):
146+
if field_type.atomic_type == schema_pb2.INT64:
146147
return VarIntCoder()
148+
elif field_type.atomic_type == schema_pb2.INT32:
149+
return VarInt32Coder()
147150
if field_type.atomic_type == schema_pb2.INT16:
148151
return BigEndianShortCoder()
149152
elif field_type.atomic_type == schema_pb2.FLOAT:

sdks/python/apache_beam/coders/row_coder_test.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,29 @@ def test_create_row_coder_from_schema(self):
203203
for test_case in self.PEOPLE:
204204
self.assertEqual(test_case, coder.decode(coder.encode(test_case)))
205205

206+
def test_row_coder_negative_varint(self):
207+
schema = schema_pb2.Schema(
208+
id="negative",
209+
fields=[
210+
schema_pb2.Field(
211+
name="i64",
212+
type=schema_pb2.FieldType(atomic_type=schema_pb2.INT64)),
213+
schema_pb2.Field(
214+
name="i32",
215+
type=schema_pb2.FieldType(atomic_type=schema_pb2.INT32))
216+
])
217+
coder = RowCoder(schema)
218+
Negative = typing.NamedTuple(
219+
"Negative", [
220+
("i64", np.int64),
221+
("i32", np.int32),
222+
])
223+
test_cases = [
224+
Negative(-1, -1023), Negative(-1023, -1), Negative(-2**63, -2**31)
225+
]
226+
for test_case in test_cases:
227+
self.assertEqual(test_case, coder.decode(coder.encode(test_case)))
228+
206229
@unittest.skip(
207230
"https://github.com/apache/beam/issues/19696 - Overflow behavior in "
208231
"VarIntCoder is currently inconsistent")

sdks/python/apache_beam/coders/slow_stream.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,9 @@ def write_var_int64(self, v: int) -> None:
5858
if not v:
5959
break
6060

61+
def write_var_int32(self, v: int) -> None:
62+
self.write_var_int64(int(v) & 0xFFFFFFFF)
63+
6164
def write_bigendian_int64(self, v):
6265
self.write(struct.pack('>q', v))
6366

@@ -156,6 +159,10 @@ def read_var_int64(self):
156159
result -= 1 << 64
157160
return result
158161

162+
def read_var_int32(self):
163+
v = self.read_var_int64()
164+
return struct.unpack('<i', struct.pack('<I', v))[0]
165+
159166
def read_bigendian_int64(self):
160167
return struct.unpack('>q', self.read(8))[0]
161168

sdks/python/apache_beam/coders/stream.pxd

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ cdef class OutputStream(object):
2626
cpdef write(self, bytes b, bint nested=*)
2727
cpdef write_byte(self, unsigned char val)
2828
cpdef write_var_int64(self, libc.stdint.int64_t v)
29+
cpdef write_var_int32(self, libc.stdint.int32_t v)
2930
cpdef write_bigendian_int64(self, libc.stdint.int64_t signed_v)
3031
cpdef write_bigendian_uint64(self, libc.stdint.uint64_t signed_v)
3132
cpdef write_bigendian_int32(self, libc.stdint.int32_t signed_v)
@@ -43,6 +44,8 @@ cdef class ByteCountingOutputStream(OutputStream):
4344
cdef size_t count
4445

4546
cpdef write(self, bytes b, bint nested=*)
47+
cpdef write_var_int64(self, libc.stdint.int64_t val)
48+
cpdef write_var_int32(self, libc.stdint.int32_t val)
4649
cpdef write_byte(self, unsigned char val)
4750
cpdef write_bigendian_int64(self, libc.stdint.int64_t val)
4851
cpdef write_bigendian_uint64(self, libc.stdint.uint64_t val)
@@ -61,6 +64,7 @@ cdef class InputStream(object):
6164
cpdef bytes read(self, size_t len)
6265
cpdef long read_byte(self) except? -1
6366
cpdef libc.stdint.int64_t read_var_int64(self) except? -1
67+
cpdef libc.stdint.int32_t read_var_int32(self) except? -1
6468
cpdef libc.stdint.int64_t read_bigendian_int64(self) except? -1
6569
cpdef libc.stdint.uint64_t read_bigendian_uint64(self) except? -1
6670
cpdef libc.stdint.int32_t read_bigendian_int32(self) except? -1

sdks/python/apache_beam/coders/stream.pyx

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,11 @@ cdef class OutputStream(object):
7373
if not v:
7474
break
7575

76+
cpdef write_var_int32(self, libc.stdint.int32_t signed_v):
77+
"""Encode an int using variable-length encoding to a stream."""
78+
cdef libc.stdint.int64_t v = signed_v & <libc.stdint.int64_t>(0xFFFFFFFF)
79+
self.write_var_int64(v)
80+
7681
cpdef write_bigendian_int64(self, libc.stdint.int64_t signed_v):
7782
self.write_bigendian_uint64(signed_v)
7883

@@ -91,7 +96,7 @@ cdef class OutputStream(object):
9196

9297
cpdef write_bigendian_int32(self, libc.stdint.int32_t signed_v):
9398
cdef libc.stdint.uint32_t v = signed_v
94-
if self.buffer_size < self.pos + 4:
99+
if self.buffer_size < self.pos + 4:
95100
self.extend(4)
96101
self.data[self.pos ] = <unsigned char>(v >> 24)
97102
self.data[self.pos + 1] = <unsigned char>(v >> 16)
@@ -151,6 +156,12 @@ cdef class ByteCountingOutputStream(OutputStream):
151156
cpdef write_var_int64(self, libc.stdint.int64_t signed_v):
152157
self.count += get_varint_size(signed_v)
153158

159+
cpdef write_var_int32(self, libc.stdint.int32_t signed_v):
160+
if signed_v < 0:
161+
self.count += 5
162+
else:
163+
self.count += get_varint_size(signed_v)
164+
154165
cpdef write_byte(self, unsigned char _):
155166
self.count += 1
156167

@@ -225,6 +236,11 @@ cdef class InputStream(object):
225236

226237
return result
227238

239+
cpdef libc.stdint.int32_t read_var_int32(self) except? -1:
240+
"""Decode a variable-length encoded int32 from a stream."""
241+
cdef libc.stdint.int64_t v = self.read_var_int64()
242+
return <libc.stdint.int32_t>(v);
243+
228244
cpdef libc.stdint.int64_t read_bigendian_int64(self) except? -1:
229245
return self.read_bigendian_uint64()
230246

0 commit comments

Comments
 (0)