Skip to content

Commit cb802a3

Browse files
committed
bug
1 parent 1708272 commit cb802a3

File tree

4 files changed

+33
-25
lines changed

4 files changed

+33
-25
lines changed

examples/topic/topic_transactions_async_example.py

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
import logging
44
import ydb
55

6+
# logging.basicConfig(level=logging.DEBUG)
7+
68

79
async def connect(endpoint: str, database: str) -> ydb.aio.Driver:
810
config = ydb.DriverConfig(endpoint=endpoint, database=database)
@@ -26,13 +28,19 @@ async def write_with_tx_example(driver: ydb.aio.Driver, topic: str, message_coun
2628

2729
async def callee(tx: ydb.aio.QueryTxContext):
2830
tx_writer: ydb.TopicTxWriterAsyncIO = driver.topic_client.tx_writer(tx, topic)
29-
for i in range(message_count):
30-
result_stream = await tx.execute(query=f"select {i} as res")
31-
messages = [result_set.rows[0]["res"] async for result_set in result_stream]
3231

33-
await tx_writer.write([ydb.TopicWriterMessage(data=str(message)) for message in messages])
34-
35-
print(f"Messages {messages} were written with tx.")
32+
for i in range(message_count):
33+
try:
34+
print("try to execute")
35+
async with await tx.execute(query=f"select {i} as res") as result_stream:
36+
async for result_set in result_stream:
37+
print("try to write msg")
38+
await tx_writer.write(ydb.TopicWriterMessage(data=str(result_set.rows[0]["res"])))
39+
print(f"Messages {result_set.rows[0]['res']} were written with tx.")
40+
except BaseException as e:
41+
print("Caught unexpected exception")
42+
print(e)
43+
raise e
3644

3745
await session_pool.retry_tx_async(callee)
3846

@@ -70,7 +78,7 @@ async def main():
7078
args = parser.parse_args()
7179

7280
if args.verbose:
73-
logger = logging.getLogger("topicexample")
81+
logger = logging.getLogger(__name__)
7482
logger.setLevel(logging.DEBUG)
7583
logger.addHandler(logging.StreamHandler())
7684

@@ -79,7 +87,7 @@ async def main():
7987
await create_topic(driver, args.path, args.consumer)
8088

8189
await write_with_tx_example(driver, args.path)
82-
await read_with_tx_example(driver, args.path, args.consumer)
90+
# await read_with_tx_example(driver, args.path, args.consumer)
8391

8492

8593
if __name__ == "__main__":

examples/topic/topic_transactions_example.py

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,10 @@ def write_with_tx_example(driver: ydb.Driver, topic: str, message_count: int = 1
2626
def callee(tx: ydb.QueryTxContext):
2727
tx_writer: ydb.TopicTxWriter = driver.topic_client.tx_writer(tx, topic)
2828
for i in range(message_count):
29-
result_stream = tx.execute(query=f"select {i} as res")
30-
messages = [result_set.rows[0]["res"] for result_set in result_stream]
31-
32-
tx_writer.write([ydb.TopicWriterMessage(data=str(message)) for message in messages])
33-
34-
print(f"Messages {messages} were written with tx.")
29+
result_stream = tx.execute(query=f"select {i} as res;")
30+
for result_set in result_stream:
31+
tx_writer.write(ydb.TopicWriterMessage(data=str(result_set.rows[0]["res"])))
32+
print(f"Messages {result_set.rows[0]['res']} were written with tx.")
3533

3634
session_pool.retry_tx_sync(callee)
3735

ydb/_topic_writer/topic_writer_asyncio.py

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ def __del__(self):
8383
logger.warning("Topic writer was not closed properly. Consider using method close().")
8484

8585
async def close(self, *, flush: bool = True):
86+
print("closing writer")
8687
if self._closed:
8788
return
8889

@@ -188,24 +189,24 @@ def __init__(
188189
tx._add_callback(TxEvent.BEFORE_COMMIT, self._on_before_commit, self._loop)
189190
tx._add_callback(TxEvent.BEFORE_ROLLBACK, self._on_before_rollback, self._loop)
190191

191-
async def write(
192-
self,
193-
messages: Union[Message, List[Message]],
194-
):
195-
"""
196-
send one or number of messages to server.
197-
it put message to internal buffer
192+
# async def write(
193+
# self,
194+
# messages: Union[Message, List[Message]],
195+
# ):
196+
# """
197+
# send one or number of messages to server.
198+
# it put message to internal buffer
198199

199-
For wait with timeout use asyncio.wait_for.
200-
"""
201-
await self.write_with_ack(messages)
200+
# For wait with timeout use asyncio.wait_for.
201+
# """
202+
# await self.write_with_ack(messages)
202203

203204
async def _on_before_commit(self, tx: "BaseQueryTxContext"):
204205
if self._is_implicit:
205206
return
206-
await self.flush()
207207
await self.close()
208208

209+
209210
async def _on_before_rollback(self, tx: "BaseQueryTxContext"):
210211
if self._is_implicit:
211212
return

ydb/aio/query/base.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,4 @@ async def __aexit__(self, exc_type, exc_val, exc_tb):
99
# To close stream on YDB it is necessary to scroll through it to the end
1010
async for _ in self:
1111
pass
12+
print("exit from execute context iterator")

0 commit comments

Comments
 (0)