@@ -21,7 +21,7 @@ class ChannelClosed(Exception):
21
21
22
22
class ChannelDone (Exception ):
23
23
"""
24
- An exception raised on an attempt to send recieve from a channel that is both closed
24
+ An exception raised on an attempt to send receive from a channel that is both closed
25
25
and empty.
26
26
"""
27
27
@@ -32,49 +32,49 @@ class AsyncChannel(AsyncIterable[T]):
32
32
"""
33
33
A buffered async channel for sending items between coroutines with FIFO ordering.
34
34
35
- This makes decoupled bidirection steaming gRPC requests easy if used like:
35
+ This makes decoupled bidirectional steaming gRPC requests easy if used like:
36
36
37
37
.. code-block:: python
38
38
client = GeneratedStub(grpclib_chan)
39
- request_chan = await AsyncChannel()
39
+ request_channel = await AsyncChannel()
40
40
# We can start be sending all the requests we already have
41
- await request_chan .send_from([ReqestObject (...), ReqestObject (...)])
42
- async for response in client.rpc_call(request_chan ):
41
+ await request_channel .send_from([RequestObject (...), RequestObject (...)])
42
+ async for response in client.rpc_call(request_channel ):
43
43
# The response iterator will remain active until the connection is closed
44
44
...
45
45
# More items can be sent at any time
46
- await request_chan .send(ReqestObject (...))
46
+ await request_channel .send(RequestObject (...))
47
47
...
48
48
# The channel must be closed to complete the gRPC connection
49
- request_chan .close()
49
+ request_channel .close()
50
50
51
51
Items can be sent through the channel by either:
52
52
- providing an iterable to the send_from method
53
53
- passing them to the send method one at a time
54
54
55
- Items can be recieved from the channel by either:
55
+ Items can be received from the channel by either:
56
56
- iterating over the channel with a for loop to get all items
57
- - calling the recieve method to get one item at a time
57
+ - calling the receive method to get one item at a time
58
58
59
- If the channel is empty then recievers will wait until either an item appears or the
59
+ If the channel is empty then receivers will wait until either an item appears or the
60
60
channel is closed.
61
61
62
62
Once the channel is closed then subsequent attempt to send through the channel will
63
63
fail with a ChannelClosed exception.
64
64
65
- When th channel is closed and empty then it is done, and further attempts to recieve
65
+ When th channel is closed and empty then it is done, and further attempts to receive
66
66
from it will fail with a ChannelDone exception
67
67
68
- If multiple coroutines recieve from the channel concurrently, each item sent will be
69
- recieved by only one of the recievers .
68
+ If multiple coroutines receive from the channel concurrently, each item sent will be
69
+ received by only one of the receivers .
70
70
71
71
:param source:
72
72
An optional iterable will items that should be sent through the channel
73
73
immediately.
74
74
:param buffer_limit:
75
75
Limit the number of items that can be buffered in the channel, A value less than
76
76
1 implies no limit. If the channel is full then attempts to send more items will
77
- result in the sender waiting until an item is recieved from the channel.
77
+ result in the sender waiting until an item is received from the channel.
78
78
:param close:
79
79
If set to True then the channel will automatically close after exhausting source
80
80
or immediately if no source is provided.
@@ -85,7 +85,7 @@ def __init__(
85
85
):
86
86
self ._queue : asyncio .Queue [Union [T , object ]] = asyncio .Queue (buffer_limit )
87
87
self ._closed = False
88
- self ._waiting_recievers : int = 0
88
+ self ._waiting_receivers : int = 0
89
89
# Track whether flush has been invoked so it can only happen once
90
90
self ._flushed = False
91
91
@@ -95,14 +95,14 @@ def __aiter__(self) -> AsyncIterator[T]:
95
95
async def __anext__ (self ) -> T :
96
96
if self .done ():
97
97
raise StopAsyncIteration
98
- self ._waiting_recievers += 1
98
+ self ._waiting_receivers += 1
99
99
try :
100
100
result = await self ._queue .get ()
101
101
if result is self .__flush :
102
102
raise StopAsyncIteration
103
103
return result
104
104
finally :
105
- self ._waiting_recievers -= 1
105
+ self ._waiting_receivers -= 1
106
106
self ._queue .task_done ()
107
107
108
108
def closed (self ) -> bool :
@@ -116,12 +116,12 @@ def done(self) -> bool:
116
116
Check if this channel is done.
117
117
118
118
:return: True if this channel is closed and and has been drained of items in
119
- which case any further attempts to recieve an item from this channel will raise
119
+ which case any further attempts to receive an item from this channel will raise
120
120
a ChannelDone exception.
121
121
"""
122
122
# After close the channel is not yet done until there is at least one waiting
123
- # reciever per enqueued item.
124
- return self ._closed and self ._queue .qsize () <= self ._waiting_recievers
123
+ # receiver per enqueued item.
124
+ return self ._closed and self ._queue .qsize () <= self ._waiting_receivers
125
125
126
126
async def send_from (
127
127
self , source : Union [Iterable [T ], AsyncIterable [T ]], close : bool = False
@@ -158,22 +158,22 @@ async def send(self, item: T) -> "AsyncChannel[T]":
158
158
await self ._queue .put (item )
159
159
return self
160
160
161
- async def recieve (self ) -> Optional [T ]:
161
+ async def receive (self ) -> Optional [T ]:
162
162
"""
163
163
Returns the next item from this channel when it becomes available,
164
164
or None if the channel is closed before another item is sent.
165
165
:return: An item from the channel
166
166
"""
167
167
if self .done ():
168
- raise ChannelDone ("Cannot recieve from a closed channel" )
169
- self ._waiting_recievers += 1
168
+ raise ChannelDone ("Cannot receive from a closed channel" )
169
+ self ._waiting_receivers += 1
170
170
try :
171
171
result = await self ._queue .get ()
172
172
if result is self .__flush :
173
173
return None
174
174
return result
175
175
finally :
176
- self ._waiting_recievers -= 1
176
+ self ._waiting_receivers -= 1
177
177
self ._queue .task_done ()
178
178
179
179
def close (self ):
@@ -190,8 +190,8 @@ async def _flush_queue(self):
190
190
"""
191
191
if not self ._flushed :
192
192
self ._flushed = True
193
- deadlocked_recievers = max (0 , self ._waiting_recievers - self ._queue .qsize ())
194
- for _ in range (deadlocked_recievers ):
193
+ deadlocked_receivers = max (0 , self ._waiting_receivers - self ._queue .qsize ())
194
+ for _ in range (deadlocked_receivers ):
195
195
await self ._queue .put (self .__flush )
196
196
197
197
# A special signal object for flushing the queue when the channel is closed
0 commit comments