Skip to content

Commit f6fc5ef

Browse files
authored
Struct/Schema cleanups (#2704)
1 parent 46a1cc6 commit f6fc5ef

File tree

5 files changed

+19
-30
lines changed

5 files changed

+19
-30
lines changed

kafka/coordinator/assignors/sticky/sticky_assignor.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,8 @@
66
from kafka.coordinator.assignors.sticky.partition_movements import PartitionMovements
77
from kafka.coordinator.assignors.sticky.sorted_set import SortedSet
88
from kafka.coordinator.protocol import ConsumerProtocolMemberMetadata_v0, ConsumerProtocolMemberAssignment_v0
9-
from kafka.coordinator.protocol import Schema
109
from kafka.protocol.struct import Struct
11-
from kafka.protocol.types import String, Array, Int32
10+
from kafka.protocol.types import Array, Int32, Schema, String
1211
from kafka.structs import TopicPartition
1312

1413
log = logging.getLogger(__name__)
@@ -59,7 +58,10 @@ class StickyAssignorUserDataV1(Struct):
5958
"""
6059

6160
SCHEMA = Schema(
62-
("previous_assignment", Array(("topic", String("utf-8")), ("partitions", Array(Int32)))), ("generation", Int32)
61+
("previous_assignment", Array(
62+
("topic", String("utf-8")),
63+
("partitions", Array(Int32)))),
64+
("generation", Int32)
6365
)
6466

6567

kafka/protocol/abstract.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,12 @@
22

33

44
class AbstractType(object, metaclass=abc.ABCMeta):
5+
@classmethod
56
@abc.abstractmethod
67
def encode(cls, value): # pylint: disable=no-self-argument
78
pass
89

10+
@classmethod
911
@abc.abstractmethod
1012
def decode(cls, data): # pylint: disable=no-self-argument
1113
pass

kafka/protocol/api.py

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -60,11 +60,6 @@ def API_VERSION(self):
6060
"""Integer of api request version"""
6161
pass
6262

63-
@abc.abstractproperty
64-
def SCHEMA(self):
65-
"""An instance of Schema() representing the request structure"""
66-
pass
67-
6863
@abc.abstractproperty
6964
def RESPONSE_TYPE(self):
7065
"""The Response class associated with the api request"""
@@ -96,11 +91,6 @@ def API_VERSION(self):
9691
"""Integer of api request/response version"""
9792
pass
9893

99-
@abc.abstractproperty
100-
def SCHEMA(self):
101-
"""An instance of Schema() representing the response structure"""
102-
pass
103-
10494
def to_object(self):
10595
return _to_object(self.SCHEMA, self)
10696

kafka/protocol/group.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,7 @@ class JoinGroupRequest_v5(Request):
158158
]
159159

160160

161+
# Currently unused -- see kafka.coordinator.protocol
161162
class ProtocolMetadata(Struct):
162163
SCHEMA = Schema(
163164
('version', Int16),
@@ -250,6 +251,7 @@ class SyncGroupRequest_v3(Request):
250251
]
251252

252253

254+
# Currently unused -- see kafka.coordinator.protocol
253255
class MemberAssignment(Struct):
254256
SCHEMA = Schema(
255257
('version', Int16),

kafka/protocol/struct.py

Lines changed: 10 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import abc
12
from io import BytesIO
23

34
from kafka.protocol.abstract import AbstractType
@@ -6,11 +7,15 @@
67
from kafka.util import WeakMethod
78

89

9-
class Struct(AbstractType):
10-
SCHEMA = Schema()
10+
class Struct(metaclass=abc.ABCMeta):
11+
12+
@abc.abstractproperty
13+
def SCHEMA(self):
14+
"""An instance of Schema() representing the structure"""
15+
pass
1116

1217
def __init__(self, *args, **kwargs):
13-
if len(args) == len(self.SCHEMA.fields):
18+
if len(args) == len(self.SCHEMA):
1419
for i, name in enumerate(self.SCHEMA.names):
1520
setattr(self, name, args[i])
1621
elif len(args) > 0:
@@ -23,19 +28,7 @@ def __init__(self, *args, **kwargs):
2328
% (list(self.SCHEMA.names),
2429
', '.join(kwargs.keys())))
2530

26-
# overloading encode() to support both class and instance
27-
# Without WeakMethod() this creates circular ref, which
28-
# causes instances to "leak" to garbage
29-
self.encode = WeakMethod(self._encode_self)
30-
31-
@classmethod
32-
def encode(cls, item): # pylint: disable=E0202
33-
bits = []
34-
for i, field in enumerate(cls.SCHEMA.fields):
35-
bits.append(field.encode(item[i]))
36-
return b''.join(bits)
37-
38-
def _encode_self(self):
31+
def encode(self):
3932
return self.SCHEMA.encode(
4033
[getattr(self, name) for name in self.SCHEMA.names]
4134
)
@@ -44,7 +37,7 @@ def _encode_self(self):
4437
def decode(cls, data):
4538
if isinstance(data, bytes):
4639
data = BytesIO(data)
47-
return cls(*[field.decode(data) for field in cls.SCHEMA.fields])
40+
return cls(*cls.SCHEMA.decode(data))
4841

4942
def get_item(self, name):
5043
if name not in self.SCHEMA.names:

0 commit comments

Comments
 (0)