Skip to content

Commit 640d927

Browse files
committed
fix linter
1 parent 8df806b commit 640d927

File tree

4 files changed

+21
-18
lines changed

4 files changed

+21
-18
lines changed

tests/topics/test_topic_reader.py

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ def decode(b: bytes):
168168
@pytest.mark.asyncio
169169
class TestBugFixesAsync:
170170
async def test_issue_297_bad_handle_stop_partition(
171-
self, driver, topic_consumer, topic_with_two_partitions_path: str
171+
self, driver, topic_consumer, topic_with_two_partitions_path: str
172172
):
173173
async def wait(fut):
174174
return await asyncio.wait_for(fut, timeout=10)
@@ -189,29 +189,19 @@ async def wait(fut):
189189
# Start second reader for same topic, same consumer, partition 1
190190
reader1 = driver.topic_client.reader(topic, consumer=topic_consumer)
191191

192-
await asyncio.sleep(1)
193-
194-
async with driver.topic_client.writer(topic, partition_id=0) as writer:
195-
await writer.write_with_ack("--")
196-
async with driver.topic_client.writer(topic, partition_id=1) as writer:
197-
await writer.write_with_ack("--")
198-
199-
await reader0.receive_message()
200-
await reader0.receive_message()
201-
202192
# receive uncommited message
203193
await reader1.receive_message()
204194

205195
# write one message for every partition
206196
async with driver.topic_client.writer(topic, partition_id=0) as writer:
207197
await writer.write_with_ack("10")
208-
async with driver.topic_client.writer(topic, partition_id=0) as writer:
198+
async with driver.topic_client.writer(topic, partition_id=1) as writer:
209199
await writer.write_with_ack("11")
210200

211201
msg0 = await wait(reader0.receive_message())
212202
msg1 = await wait(reader1.receive_message())
213203

214-
datas = [msg0.data.decode(), msg1.data.decode]
204+
datas = [msg0.data.decode(), msg1.data.decode()]
215205
datas.sort()
216206

217207
assert datas == ["10", "11"]

ydb/_grpc/grpcwrapper/ydb_topic.py

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -591,9 +591,14 @@ def from_proto(
591591
)
592592

593593
@dataclass
594-
class PartitionSessionStatusRequest:
594+
class PartitionSessionStatusRequest(IToProto):
595595
partition_session_id: int
596596

597+
def to_proto(self) -> ydb_topic_pb2.StreamReadMessage.PartitionSessionStatusRequest:
598+
return ydb_topic_pb2.StreamReadMessage.PartitionSessionStatusRequest(
599+
partition_session_id=self.partition_session_id
600+
)
601+
597602
@dataclass
598603
class PartitionSessionStatusResponse(IFromProto):
599604
partition_session_id: int
@@ -662,9 +667,14 @@ def from_proto(
662667
)
663668

664669
@dataclass
665-
class StopPartitionSessionResponse:
670+
class StopPartitionSessionResponse(IToProto):
666671
partition_session_id: int
667672

673+
def to_proto(self) -> ydb_topic_pb2.StreamReadMessage.StopPartitionSessionResponse:
674+
return ydb_topic_pb2.StreamReadMessage.StopPartitionSessionResponse(
675+
partition_session_id=self.partition_session_id,
676+
)
677+
668678
@dataclass
669679
class FromClient(IToProto):
670680
client_message: "ReaderMessagesFromClientToServer"
@@ -684,6 +694,10 @@ def to_proto(self) -> ydb_topic_pb2.StreamReadMessage.FromClient:
684694
res.update_token_request.CopyFrom(self.client_message.to_proto())
685695
elif isinstance(self.client_message, StreamReadMessage.StartPartitionSessionResponse):
686696
res.start_partition_session_response.CopyFrom(self.client_message.to_proto())
697+
elif isinstance(self.client_message, StreamReadMessage.StopPartitionSessionResponse):
698+
res.stop_partition_session_response.CopyFrom(self.client_message.to_proto())
699+
elif isinstance(self.client_message, StreamReadMessage.PartitionSessionStatusRequest):
700+
res.start_partition_session_response.CopyFrom(self.client_message.to_proto())
687701
else:
688702
raise NotImplementedError("Unknown message type: %s" % type(self.client_message))
689703
return res

ydb/_topic_reader/topic_reader_asyncio.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import asyncio
44
import concurrent.futures
5-
import copy
65
import gzip
76
import typing
87
from asyncio import Task

ydb/topic.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ def reader(
168168
if not decoder_executor:
169169
decoder_executor = self._executor
170170

171-
args = locals()
171+
args = locals().copy()
172172
del args["self"]
173173

174174
settings = TopicReaderSettings(**args)
@@ -188,7 +188,7 @@ def writer(
188188
encoders: Optional[Mapping[_ydb_topic_public_types.PublicCodec, Callable[[bytes], bytes]]] = None,
189189
encoder_executor: Optional[concurrent.futures.Executor] = None, # default shared client executor pool
190190
) -> TopicWriterAsyncIO:
191-
args = locals()
191+
args = locals().copy()
192192
del args["self"]
193193

194194
settings = TopicWriterSettings(**args)

0 commit comments

Comments
 (0)