Skip to content

Commit 8e85546

Browse files
authored
Threaded iterable fix (#227)
* Unlock thread after closing when queue is full * No test defaults bcause it's env related * reuseport do not work for unix sockets since 6.12
1 parent b06cf3a commit 8e85546

File tree

5 files changed

+18
-5
lines changed

5 files changed

+18
-5
lines changed

aiomisc/iterator_wrapper.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,12 @@ def _in_thread(self) -> None:
213213

214214
def close(self) -> Awaitable[None]:
215215
self.__channel.close()
216+
# if the iterator inside thread is blocked on `.put()`
217+
# we need to wake it up to signal that it is closed.
218+
try:
219+
self.__channel.queue.get()
220+
except QueueEmpty:
221+
pass
216222
return asyncio.ensure_future(self.wait_closed())
217223

218224
async def wait_closed(self) -> None:

aiomisc/utils.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,10 @@ def bind_socket(
8888
if not args and ":" in address:
8989
sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0)
9090

91+
unix_address_family = getattr(socket, "AF_UNIX", None)
92+
if sock.family == unix_address_family:
93+
reuse_port = False
94+
9195
if reuse_addr:
9296
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
9397

@@ -99,7 +103,6 @@ def bind_socket(
99103
for level, option, value in options:
100104
sock.setsockopt(level, option, value)
101105

102-
unix_address_family = getattr(socket, "AF_UNIX", None)
103106
if sock.family == unix_address_family:
104107
proto_name = proto_name or "unix"
105108
sock.bind(address)

aiomisc/worker_pool.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ def _create_socket(self) -> None:
6060
socket.AF_UNIX,
6161
socket.SOCK_STREAM,
6262
address=path,
63+
reuse_port=False,
6364
)
6465
self.address = path
6566
chmod(path, 0o600)

tests/test_entrypoint.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,6 @@ def unix_socket_udp():
4949

5050
# Behaviour like in the bind_socket
5151
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
52-
if hasattr(socket, "SO_REUSEPORT"):
53-
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
5452

5553
try:
5654
sock.bind(socket_path)
@@ -959,10 +957,14 @@ async def test_add_remove_service(entrypoint: aiomisc.Entrypoint):
959957
@pytest.mark.parametrize(
960958
"entrypoint_logging_kwargs,basic_config_kwargs", [
961959
(
962-
{},
960+
{
961+
"log_level": LogLevel.info.name,
962+
"log_format": LogFormat.plain,
963+
"log_date_format": None,
964+
},
963965
{
964966
"level": LogLevel.info.name,
965-
"log_format": LogFormat.default(),
967+
"log_format": LogFormat.plain,
966968
"date_format": None,
967969
},
968970
),

tests/test_sdwatchdog_service.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ async def handle_datagram(
3838

3939
with bind_socket(
4040
socket.AF_UNIX, socket.SOCK_DGRAM, address=sock_path,
41+
reuse_port=False,
4142
) as sock:
4243
try:
4344
os.environ["NOTIFY_SOCKET"] = sock_path

0 commit comments

Comments
 (0)