@@ -45,7 +45,6 @@ class WriterAsyncIO:
4545 _loop : asyncio .AbstractEventLoop
4646 _reconnector : "WriterAsyncIOReconnector"
4747 _closed : bool
48- _compressor_thread_pool : concurrent .futures .Executor
4948
5049 @property
5150 def last_seqno (self ) -> int :
@@ -112,13 +111,14 @@ async def write_with_ack_future(
112111 For wait with timeout use asyncio.wait_for.
113112 """
114113 input_single_message = not isinstance (messages , list )
114+ converted_messages = []
115115 if isinstance (messages , list ):
116- for index , m in enumerate ( messages ) :
117- messages [ index ] = PublicMessage ._create_message (m )
116+ for m in messages :
117+ converted_messages . append ( PublicMessage ._create_message (m ) )
118118 else :
119- messages = [PublicMessage ._create_message (messages )]
119+ converted_messages = [PublicMessage ._create_message (messages )]
120120
121- futures = await self ._reconnector .write_with_ack_future (messages )
121+ futures = await self ._reconnector .write_with_ack_future (converted_messages )
122122 if input_single_message :
123123 return futures [0 ]
124124 else :
@@ -200,8 +200,7 @@ def __init__(self, driver: SupportedDriverType, settings: WriterSettings):
200200 }
201201
202202 if settings .encoders :
203- for codec , encoder in settings .encoders .items ():
204- self ._codec_functions [codec ] = encoder
203+ self ._codec_functions .update (settings .encoders )
205204
206205 self ._encode_executor = settings .encoder_executor
207206
@@ -211,8 +210,7 @@ def __init__(self, driver: SupportedDriverType, settings: WriterSettings):
211210
212211 self ._codec = self ._settings .codec
213212 if self ._codec and self ._codec not in self ._codec_functions :
214- known_codecs = [key for key in self ._codec_functions ]
215- known_codecs .sort ()
213+ known_codecs = sorted (self ._codec_functions .keys ())
216214 raise ValueError (
217215 "Unknown codec for writer: %s, supported codecs: %s"
218216 % (self ._codec , known_codecs )
@@ -445,8 +443,7 @@ async def _codec_selector(self, messages: List[InternalMessage]) -> PublicCodec:
445443 # use every of available encoders at start for prevent problems
446444 # with rare used encoders (on writer or reader side)
447445 if self ._codec_selector_batch_num < len (available_codecs ):
448- codec_index = self ._codec_selector_batch_num % len (available_codecs )
449- codec = available_codecs [codec_index ]
446+ codec = available_codecs [self ._codec_selector_batch_num ]
450447 else :
451448 codec = await self ._codec_selector_by_check_compress (messages )
452449 self ._codec_selector_last_codec = codec
@@ -488,9 +485,7 @@ async def _codec_selector_by_check_compress(
488485 Try to compress messages and choose codec with the smallest result size.
489486 """
490487
491- test_messages = messages
492- if len (test_messages ) > 10 :
493- test_messages = test_messages [:10 ]
488+ test_messages = messages [:10 ]
494489
495490 available_codecs = await self ._get_available_codecs ()
496491 if len (available_codecs ) == 1 :
0 commit comments