Skip to content

Commit a834795

Browse files
authored
Reapply "ref(producer): Simplify typing, add more tests (#441)" (#448)
This reverts commit 2713592.
1 parent b0406cf commit a834795

File tree

4 files changed

+22
-54
lines changed

4 files changed

+22
-54
lines changed

.github/workflows/docs-pr.yml

Lines changed: 0 additions & 20 deletions
This file was deleted.

.github/workflows/docs.yml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
name: Arroyo Docs
22

33
on:
4+
pull_request:
45
push:
56
branches:
67
- main
@@ -14,13 +15,14 @@ jobs:
1415
- name: Setup Python
1516
uses: actions/setup-python@v2
1617
with:
17-
python-version: '3.8'
18+
python-version: '3.13'
1819

1920
- name: Build docs
2021
run: |
2122
pip install virtualenv
2223
make docs
2324
- uses: peaceiris/[email protected]
25+
if: ${{ (github.ref_name == 'main') }}
2426
name: Publish to GitHub Pages
2527
with:
2628
github_token: ${{ secrets.GITHUB_TOKEN }}

arroyo/backends/abstract.py

Lines changed: 10 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,7 @@
44
import time
55
from abc import ABC, abstractmethod, abstractproperty
66
from concurrent.futures import Future
7-
from typing import (
8-
Callable,
9-
Generic,
10-
Mapping,
11-
Optional,
12-
Protocol,
13-
Sequence,
14-
TypeVar,
15-
Union,
16-
)
7+
from typing import Callable, Generic, Mapping, Optional, Sequence, TypeVar, Union
178

189
from arroyo.types import BrokerValue, Partition, Topic, TStrategyPayload
1910

@@ -187,32 +178,11 @@ def member_id(self) -> str:
187178
raise NotImplementedError
188179

189180

190-
class ProducerFuture(Protocol, Generic[T]):
191-
"""
192-
An abstract interface for a kind of Future. Stdlib futures are too slow to
193-
construct, so we use these.
194-
"""
195-
196-
def done(self) -> bool:
197-
...
198-
199-
def result(self, timeout: float | None = None) -> T:
200-
"""
201-
Return result or raise exception. May block, but does not have to.
202-
"""
203-
...
204-
205-
def set_result(self, result: T) -> None:
206-
...
207-
208-
def set_exception(self, exception: Exception) -> None:
209-
...
210-
211-
212181
class SimpleProducerFuture(Generic[T]):
213182
"""
214183
A stub for concurrent.futures.Future that does not construct any Condition
215-
variables, therefore is faster to construct.
184+
variables, therefore is faster to construct. However, some methods are
185+
missing, and result() in particular is not efficient with timeout > 0.
216186
"""
217187

218188
def __init__(self) -> None:
@@ -232,6 +202,10 @@ def result(self, timeout: float | None = None) -> T:
232202
# only in tests at most. It is only here for the sake of implementing
233203
# the contract. If you really need result with timeout>0, you should
234204
# use the stdlib future.
205+
#
206+
# If this becomes performance sensitive, we can potentially implement
207+
# something more sophisticated such as lazily creating the condition
208+
# variable, and synchronizing the creation of that using a global lock.
235209
while deadline is None or time.time() < deadline:
236210
if self.result_exception is not None:
237211
raise self.result_exception
@@ -248,6 +222,9 @@ def set_exception(self, exception: Exception) -> None:
248222
self.result_exception = exception
249223

250224

225+
ProducerFuture = Union[SimpleProducerFuture[T], Future[T]]
226+
227+
251228
class Producer(Generic[TStrategyPayload], ABC):
252229
@abstractmethod
253230
def produce(

tests/backends/test_kafka.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,15 @@ def test_auto_offset_reset_latest(self, use_simple_futures: bool) -> None:
167167
else:
168168
raise AssertionError("expected EndOfPartition error")
169169

170+
@pytest.mark.parametrize("use_simple_futures", [True, False])
171+
def test_producer_future_behavior(self, use_simple_futures: bool) -> None:
172+
with self.get_topic() as topic:
173+
with closing(self.get_producer(use_simple_futures)) as producer:
174+
future = producer.produce(topic, next(self.get_payloads()))
175+
assert not future.done()
176+
assert future.result(5.0)
177+
assert future.done()
178+
170179
def test_lenient_offset_reset_latest(self) -> None:
171180
payload = KafkaPayload(b"a", b"0", [])
172181
with self.get_topic() as topic:

0 commit comments

Comments
 (0)