@@ -734,16 +734,16 @@ def _create_fetch_requests(self):
734734
735735 requests = {}
736736 for node_id , partition_data in six .iteritems (fetchable ):
737+ next_partitions = {TopicPartition (topic , partition_info [0 ]): partition_info
738+ for topic , partitions in six .iteritems (partition_data )
739+ for partition_info in partitions }
737740 if version >= 7 and self .config ['enable_incremental_fetch_sessions' ]:
738741 if node_id not in self ._session_handlers :
739742 self ._session_handlers [node_id ] = FetchSessionHandler (node_id )
740- next_partitions = {TopicPartition (topic , partition_info [0 ]): partition_info
741- for topic , partitions in six .iteritems (partition_data )
742- for partition_info in partitions }
743743 session = self ._session_handlers [node_id ].build_next (next_partitions )
744744 else :
745745 # No incremental fetch support
746- session = FetchRequestData (partition_data , [] , FetchMetadata .LEGACY )
746+ session = FetchRequestData (next_partitions , None , FetchMetadata .LEGACY )
747747 # As of version == 3 partitions will be returned in order as
748748 # they are requested, so to avoid starvation with
749749 # `fetch_max_bytes` option we need this shuffle
@@ -789,10 +789,9 @@ def _create_fetch_requests(self):
789789 session .to_forget )
790790
791791 fetch_offsets = {}
792- for topic , partitions in six .iteritems (partition_data ):
793- for partition_data in partitions :
794- partition , offset = partition_data [:2 ]
795- fetch_offsets [TopicPartition (topic , partition )] = offset
792+ for tp , partition_data in six .iteritems (next_partitions ):
793+ offset = partition_data [1 ]
794+ fetch_offsets [tp ] = offset
796795
797796 requests [node_id ] = (request , fetch_offsets )
798797
@@ -997,7 +996,7 @@ def build_next(self, next_partitions):
997996 log .debug ("Built full fetch %s for node %s with %s partition(s)." ,
998997 self .next_metadata , self .node_id , len (next_partitions ))
999998 self .session_partitions = next_partitions
1000- return FetchRequestData (next_partitions , [] , self .next_metadata );
999+ return FetchRequestData (next_partitions , None , self .next_metadata );
10011000
10021001 prev_tps = set (self .session_partitions .keys ())
10031002 next_tps = set (next_partitions .keys ())
@@ -1126,8 +1125,8 @@ class FetchRequestData(object):
11261125 __slots__ = ('_to_send' , '_to_forget' , '_metadata' )
11271126
11281127 def __init__ (self , to_send , to_forget , metadata ):
1129- self ._to_send = to_send # {TopicPartition: (partition, ...)}
1130- self ._to_forget = to_forget # {TopicPartition}
1128+ self ._to_send = to_send or dict () # {TopicPartition: (partition, ...)}
1129+ self ._to_forget = to_forget or set () # {TopicPartition}
11311130 self ._metadata = metadata
11321131
11331132 @property
@@ -1156,7 +1155,7 @@ def to_forget(self):
11561155 # Return as list of [(topic, (partiiton, ...)), ...]
11571156 # so it an be passed directly to encoder
11581157 partition_data = collections .defaultdict (list )
1159- for tp in six . iteritems ( self ._to_forget ) :
1158+ for tp in self ._to_forget :
11601159 partition_data [tp .topic ].append (tp .partition )
11611160 return list (partition_data .items ())
11621161
0 commit comments