Skip to content

Commit b4f2f23

Browse files
committed
black format
1 parent d2534c9 commit b4f2f23

File tree

15 files changed

+703
-380
lines changed

15 files changed

+703
-380
lines changed

examples/topic/reader_async_example.py

Lines changed: 32 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,19 @@
66

77

88
async def connect():
9-
db = ydb.aio.Driver(connection_string="grpc://localhost:2135?database=/local", credentials=ydb.credentials.AnonymousCredentials())
10-
reader = ydb.TopicClientAsyncIO(db).topic_reader("/local/topic", consumer="consumer")
9+
db = ydb.aio.Driver(
10+
connection_string="grpc://localhost:2135?database=/local",
11+
credentials=ydb.credentials.AnonymousCredentials(),
12+
)
13+
reader = ydb.TopicClientAsyncIO(db).topic_reader(
14+
"/local/topic", consumer="consumer"
15+
)
1116

1217

1318
async def create_reader_and_close_with_context_manager(db: ydb.aio.Driver):
14-
with ydb.TopicClientAsyncIO(db).topic_reader("/database/topic/path", consumer="consumer") as reader:
19+
with ydb.TopicClientAsyncIO(db).topic_reader(
20+
"/database/topic/path", consumer="consumer"
21+
) as reader:
1522
async for message in reader.messages():
1623
pass
1724

@@ -83,9 +90,13 @@ async def get_one_batch_from_external_loop_async(reader: ydb.TopicReaderAsyncIO)
8390
async def auto_deserialize_message(db: ydb.aio.Driver):
8491
# async, batch work similar to this
8592

86-
async with ydb.TopicClientAsyncIO(db).topic_reader("/database/topic/path", consumer="asd", deserializer=json.loads) as reader:
93+
async with ydb.TopicClientAsyncIO(db).topic_reader(
94+
"/database/topic/path", consumer="asd", deserializer=json.loads
95+
) as reader:
8796
async for message in reader.messages():
88-
print(message.data.Name) # message.data replaces by json.loads(message.data) of raw message
97+
print(
98+
message.data.Name
99+
) # message.data replaces by json.loads(message.data) of raw message
89100
reader.commit(message)
90101

91102

@@ -122,7 +133,11 @@ def process_batch(batch):
122133

123134
async def connect_and_read_few_topics(db: ydb.aio.Driver):
124135
with ydb.TopicClientAsyncIO(db).topic_reader(
125-
["/database/topic/path", ydb.TopicSelector("/database/second-topic", partitions=3)]) as reader:
136+
[
137+
"/database/topic/path",
138+
ydb.TopicSelector("/database/second-topic", partitions=3),
139+
]
140+
) as reader:
126141
async for message in reader.messages():
127142
await _process(message)
128143
await reader.commit(message)
@@ -140,26 +155,28 @@ def on_commit(event: ydb.TopicReaderEvents.OnCommit) -> None:
140155
print(event.topic)
141156
print(event.offset)
142157

143-
async with ydb.TopicClientAsyncIO(db).topic_reader("/local",
144-
consumer="consumer",
145-
commit_batch_time=4,
146-
on_commit=on_commit) as reader:
158+
async with ydb.TopicClientAsyncIO(db).topic_reader(
159+
"/local", consumer="consumer", commit_batch_time=4, on_commit=on_commit
160+
) as reader:
147161
async for message in reader.messages():
148162
await _process(message)
149163
await reader.commit(message)
150164

151165

152166
async def advanced_read_with_own_progress_storage(db: ydb.TopicReaderAsyncIO):
153-
async def on_get_partition_start_offset(req: ydb.TopicReaderEvents.OnPartitionGetStartOffsetRequest) -> \
154-
ydb.TopicReaderEvents.OnPartitionGetStartOffsetResponse:
167+
async def on_get_partition_start_offset(
168+
req: ydb.TopicReaderEvents.OnPartitionGetStartOffsetRequest,
169+
) -> ydb.TopicReaderEvents.OnPartitionGetStartOffsetResponse:
155170
# read current progress from database
156171
resp = ydb.TopicReaderEvents.OnPartitionGetStartOffsetResponse()
157172
resp.start_offset = 123
158173
return resp
159174

160-
async with ydb.TopicClient(db).topic_reader("/local/test", consumer="consumer",
161-
on_get_partition_start_offset=on_get_partition_start_offset
162-
) as reader:
175+
async with ydb.TopicClient(db).topic_reader(
176+
"/local/test",
177+
consumer="consumer",
178+
on_get_partition_start_offset=on_get_partition_start_offset,
179+
) as reader:
163180
async for mess in reader.messages():
164181
await _process(mess)
165182
# save progress to own database

examples/topic/reader_example.py

Lines changed: 30 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,17 @@
55

66

77
def connect():
8-
db = ydb.Driver(connection_string="grpc://localhost:2135?database=/local", credentials=ydb.credentials.AnonymousCredentials())
8+
db = ydb.Driver(
9+
connection_string="grpc://localhost:2135?database=/local",
10+
credentials=ydb.credentials.AnonymousCredentials(),
11+
)
912
reader = ydb.TopicClient(db).topic_reader("/local/topic", consumer="consumer")
1013

1114

1215
def create_reader_and_close_with_context_manager(db: ydb.Driver):
13-
with ydb.TopicClient(db).topic_reader("/database/topic/path", consumer="consumer", buffer_size_bytes=123) as reader:
16+
with ydb.TopicClient(db).topic_reader(
17+
"/database/topic/path", consumer="consumer", buffer_size_bytes=123
18+
) as reader:
1419
for message in reader:
1520
pass
1621

@@ -75,9 +80,13 @@ def get_one_batch_from_external_loop(reader: ydb.TopicReader):
7580
def auto_deserialize_message(db: ydb.Driver):
7681
# async, batch work similar to this
7782

78-
reader = ydb.TopicClient(db).topic_reader("/database/topic/path", consumer="asd", deserializer=json.loads)
83+
reader = ydb.TopicClient(db).topic_reader(
84+
"/database/topic/path", consumer="asd", deserializer=json.loads
85+
)
7986
for message in reader.messages():
80-
print(message.data.Name) # message.data replaces by json.loads(message.data) of raw message
87+
print(
88+
message.data.Name
89+
) # message.data replaces by json.loads(message.data) of raw message
8190
reader.commit(message)
8291

8392

@@ -113,7 +122,12 @@ def process_batch(batch):
113122

114123

115124
def connect_and_read_few_topics(db: ydb.Driver):
116-
with ydb.TopicClient(db).topic_reader(["/database/topic/path", ydb.TopicSelector("/database/second-topic", partitions=3)]) as reader:
125+
with ydb.TopicClient(db).topic_reader(
126+
[
127+
"/database/topic/path",
128+
ydb.TopicSelector("/database/second-topic", partitions=3),
129+
]
130+
) as reader:
117131
for message in reader:
118132
_process(message)
119133
reader.commit(message)
@@ -131,24 +145,29 @@ def on_commit(event: ydb.TopicReaderEvents.OnCommit) -> None:
131145
print(event.topic)
132146
print(event.offset)
133147

134-
with ydb.TopicClient(db).topic_reader("/local", consumer="consumer", commit_batch_time=4, on_commit=on_commit) as reader:
148+
with ydb.TopicClient(db).topic_reader(
149+
"/local", consumer="consumer", commit_batch_time=4, on_commit=on_commit
150+
) as reader:
135151
for message in reader:
136152
with reader.commit_on_exit(message):
137153
_process(message)
138154

139155

140156
def advanced_read_with_own_progress_storage(db: ydb.TopicReader):
141-
def on_get_partition_start_offset(req: ydb.TopicReaderEvents.OnPartitionGetStartOffsetRequest) -> \
142-
ydb.TopicReaderEvents.OnPartitionGetStartOffsetResponse:
157+
def on_get_partition_start_offset(
158+
req: ydb.TopicReaderEvents.OnPartitionGetStartOffsetRequest,
159+
) -> ydb.TopicReaderEvents.OnPartitionGetStartOffsetResponse:
143160

144161
# read current progress from database
145162
resp = ydb.TopicReaderEvents.OnPartitionGetStartOffsetResponse()
146163
resp.start_offset = 123
147164
return resp
148165

149-
with ydb.TopicClient(db).topic_reader("/local/test", consumer="consumer",
150-
on_get_partition_start_offset=on_get_partition_start_offset
151-
) as reader:
166+
with ydb.TopicClient(db).topic_reader(
167+
"/local/test",
168+
consumer="consumer",
169+
on_get_partition_start_offset=on_get_partition_start_offset,
170+
) as reader:
152171
for mess in reader:
153172
_process(mess)
154173
# save progress to own database
@@ -170,4 +189,3 @@ def get_current_statistics(reader: ydb.TopicReader):
170189

171190
def _process(msg):
172191
raise NotImplementedError()
173-

examples/topic/writer_async_example.py

Lines changed: 33 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -7,23 +7,26 @@
77

88

99
async def create_writer(db: ydb.aio.Driver):
10-
async with ydb.TopicClientAsyncIO(db).topic_writer("/database/topic/path",
11-
producer_and_message_group_id="producer-id",
12-
) as writer:
10+
async with ydb.TopicClientAsyncIO(db).topic_writer(
11+
"/database/topic/path",
12+
producer_and_message_group_id="producer-id",
13+
) as writer:
1314
pass
1415

1516

1617
async def connect_and_wait(db: ydb.aio.Driver):
17-
async with ydb.TopicClientAsyncIO(db).topic_writer("/database/topic/path",
18-
producer_and_message_group_id="producer-id",
19-
) as writer:
18+
async with ydb.TopicClientAsyncIO(db).topic_writer(
19+
"/database/topic/path",
20+
producer_and_message_group_id="producer-id",
21+
) as writer:
2022
writer.wait_init()
2123

2224

2325
async def connect_without_context_manager(db: ydb.aio.Driver):
24-
writer = ydb.TopicClientAsyncIO(db).topic_writer("/database/topic/path",
25-
producer_and_message_group_id="producer-id",
26-
)
26+
writer = ydb.TopicClientAsyncIO(db).topic_writer(
27+
"/database/topic/path",
28+
producer_and_message_group_id="producer-id",
29+
)
2730
try:
2831
pass # some code
2932
finally:
@@ -39,14 +42,19 @@ async def send_messages(writer: ydb.TopicWriterAsyncIO):
3942
# full forms
4043
await writer.write(ydb.TopicWriterMessage("mess")) # send text
4144
await writer.write(ydb.TopicWriterMessage(bytes([1, 2, 3]))) # send bytes
42-
await writer.write(ydb.TopicWriterMessage("mess-1"),
43-
ydb.TopicWriterMessage("mess-2")) # send few messages by one call
45+
await writer.write(
46+
ydb.TopicWriterMessage("mess-1"), ydb.TopicWriterMessage("mess-2")
47+
) # send few messages by one call
4448

4549
# with meta
46-
await writer.write(ydb.TopicWriterMessage("asd", seqno=123, created_at_ns=time.time_ns()))
50+
await writer.write(
51+
ydb.TopicWriterMessage("asd", seqno=123, created_at_ns=time.time_ns())
52+
)
4753

4854

49-
async def send_message_without_block_if_internal_buffer_is_full(writer: ydb.TopicWriterAsyncIO, msg) -> bool:
55+
async def send_message_without_block_if_internal_buffer_is_full(
56+
writer: ydb.TopicWriterAsyncIO, msg
57+
) -> bool:
5058
try:
5159
# put message to internal queue for send, but if buffer is full - fast return
5260
# without wait
@@ -62,15 +70,19 @@ def send_messages_with_manual_seqno(writer: ydb.TopicWriter):
6270

6371
async def send_messages_with_wait_ack(writer: ydb.TopicWriterAsyncIO):
6472
# future wait
65-
await writer.write_with_result(ydb.TopicWriterMessage("mess", seqno=1), ydb.TopicWriterMessage("mess", seqno=2))
73+
await writer.write_with_result(
74+
ydb.TopicWriterMessage("mess", seqno=1), ydb.TopicWriterMessage("mess", seqno=2)
75+
)
6676

6777
# send with flush
6878
await writer.write("1", "2", "3")
6979
await writer.flush()
7080

7181

7282
async def send_json_message(db: ydb.aio.Driver):
73-
async with ydb.TopicClientAsyncIO(db).topic_writer("/database/path/topic", serializer=json.dumps) as writer:
83+
async with ydb.TopicClientAsyncIO(db).topic_writer(
84+
"/database/path/topic", serializer=json.dumps
85+
) as writer:
7486
writer.write({"a": 123})
7587

7688

@@ -80,7 +92,9 @@ async def send_messages_and_wait_all_commit_with_flush(writer: ydb.TopicWriterAs
8092
await writer.flush()
8193

8294

83-
async def send_messages_and_wait_all_commit_with_results(writer: ydb.TopicWriterAsyncIO):
95+
async def send_messages_and_wait_all_commit_with_results(
96+
writer: ydb.TopicWriterAsyncIO,
97+
):
8498
last_future = None
8599
for i in range(10):
86100
content = "%s" % i
@@ -91,7 +105,9 @@ async def send_messages_and_wait_all_commit_with_results(writer: ydb.TopicWriter
91105
raise last_future.exception()
92106

93107

94-
async def switch_messages_with_many_producers(writers: Dict[str, ydb.TopicWriterAsyncIO], messages: List[str]):
108+
async def switch_messages_with_many_producers(
109+
writers: Dict[str, ydb.TopicWriterAsyncIO], messages: List[str]
110+
):
95111
futures = [] # type: List[asyncio.Future]
96112

97113
for msg in messages:

examples/topic/writer_example.py

Lines changed: 38 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -8,28 +8,37 @@
88

99

1010
async def connect():
11-
db = ydb.aio.Driver(connection_string="grpc://localhost:2135?database=/local", credentials=ydb.credentials.AnonymousCredentials())
12-
reader = ydb.TopicClientAsyncIO(db).topic_writer("/local/topic", producer_and_message_group_id="producer-id", )
11+
db = ydb.aio.Driver(
12+
connection_string="grpc://localhost:2135?database=/local",
13+
credentials=ydb.credentials.AnonymousCredentials(),
14+
)
15+
reader = ydb.TopicClientAsyncIO(db).topic_writer(
16+
"/local/topic",
17+
producer_and_message_group_id="producer-id",
18+
)
1319

1420

1521
def create_writer(db: ydb.Driver):
16-
with ydb.TopicClient(db).topic_writer("/database/topic/path",
17-
producer_and_message_group_id="producer-id",
18-
) as writer:
22+
with ydb.TopicClient(db).topic_writer(
23+
"/database/topic/path",
24+
producer_and_message_group_id="producer-id",
25+
) as writer:
1926
pass
2027

2128

2229
def connect_and_wait(db: ydb.Driver):
23-
with ydb.TopicClient(db).topic_writer("/database/topic/path",
24-
producer_and_message_group_id="producer-id",
25-
) as writer:
30+
with ydb.TopicClient(db).topic_writer(
31+
"/database/topic/path",
32+
producer_and_message_group_id="producer-id",
33+
) as writer:
2634
writer.wait()
2735

2836

2937
def connect_without_context_manager(db: ydb.Driver):
30-
writer = ydb.TopicClient(db).topic_writer("/database/topic/path",
31-
producer_and_message_group_id="producer-id",
32-
)
38+
writer = ydb.TopicClient(db).topic_writer(
39+
"/database/topic/path",
40+
producer_and_message_group_id="producer-id",
41+
)
3342
try:
3443
pass # some code
3544
finally:
@@ -45,13 +54,17 @@ def send_messages(writer: ydb.TopicWriter):
4554
# full forms
4655
writer.write(ydb.TopicWriterMessage("mess")) # send text
4756
writer.write(ydb.TopicWriterMessage(bytes([1, 2, 3]))) # send bytes
48-
writer.write(ydb.TopicWriterMessage("mess-1"), ydb.TopicWriterMessage("mess-2")) # send few messages by one call
57+
writer.write(
58+
ydb.TopicWriterMessage("mess-1"), ydb.TopicWriterMessage("mess-2")
59+
) # send few messages by one call
4960

5061
# with meta
5162
writer.write(ydb.TopicWriterMessage("asd", seqno=123, created_at_ns=time.time_ns()))
5263

5364

54-
def send_message_without_block_if_internal_buffer_is_full(writer: ydb.TopicWriter, msg) -> bool:
65+
def send_message_without_block_if_internal_buffer_is_full(
66+
writer: ydb.TopicWriter, msg
67+
) -> bool:
5568
try:
5669
# put message to internal queue for send, but if buffer is full - fast return
5770
# without wait
@@ -67,10 +80,14 @@ def send_messages_with_manual_seqno(writer: ydb.TopicWriter):
6780

6881
def send_messages_with_wait_ack(writer: ydb.TopicWriter):
6982
# Explicit future wait
70-
writer.async_write_with_ack(ydb.TopicWriterMessage("mess", seqno=1), ydb.TopicWriterMessage("mess", seqno=2)).result()
83+
writer.async_write_with_ack(
84+
ydb.TopicWriterMessage("mess", seqno=1), ydb.TopicWriterMessage("mess", seqno=2)
85+
).result()
7186

7287
# implicit, by sync call
73-
writer.write_with_ack(ydb.TopicWriterMessage("mess", seqno=1), ydb.TopicWriterMessage("mess", seqno=2))
88+
writer.write_with_ack(
89+
ydb.TopicWriterMessage("mess", seqno=1), ydb.TopicWriterMessage("mess", seqno=2)
90+
)
7491
# write_with_ack
7592

7693
# send with flush
@@ -79,7 +96,9 @@ def send_messages_with_wait_ack(writer: ydb.TopicWriter):
7996

8097

8198
def send_json_message(db: ydb.Driver):
82-
with ydb.TopicClient(db).topic_writer("/database/path/topic", serializer=json.dumps) as writer:
99+
with ydb.TopicClient(db).topic_writer(
100+
"/database/path/topic", serializer=json.dumps
101+
) as writer:
83102
writer.write({"a": 123})
84103

85104

@@ -102,7 +121,9 @@ def send_messages_and_wait_all_commit_with_results(writer: ydb.TopicWriter):
102121
raise future.exception()
103122

104123

105-
def switch_messages_with_many_producers(writers: Dict[str, ydb.TopicWriter], messages: List[str]):
124+
def switch_messages_with_many_producers(
125+
writers: Dict[str, ydb.TopicWriter], messages: List[str]
126+
):
106127
futures = [] # type: List[Future]
107128

108129
for msg in messages:

0 commit comments

Comments
 (0)