Skip to content

Commit f90da3e

Browse files
committed
Implement DroppingPriorityQueue class
1 parent c01e9ec commit f90da3e

File tree

1 file changed

+36
-2
lines changed

1 file changed

+36
-2
lines changed

plugwise_usb/connection/queue.py

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,10 @@
22

33
from __future__ import annotations
44

5-
from asyncio import PriorityQueue, Task, get_running_loop, sleep
5+
from asyncio import Queue, Task, get_running_loop, sleep
66
from collections.abc import Callable
77
from dataclasses import dataclass
8+
from sortedcontainers import SortedList
89
import logging
910

1011
from ..api import StickEvent
@@ -25,14 +26,47 @@ class RequestState:
2526
zigbee_address: int
2627

2728

29+
class DroppingPriorityQueue(Queue):
30+
def _init(self, maxsize):
31+
# called by asyncio.Queue.__init__
32+
self._queue = SortedList()
33+
34+
def _put(self, item):
35+
# called by asyncio.Queue.put_nowait
36+
self._queue.add(item)
37+
38+
def _get(self):
39+
# called by asyncio.Queue.get_nowait
40+
# pop the first (most important) item off the queue
41+
return self._queue.pop(0)
42+
43+
def __drop(self):
44+
# drop the last (least important) item from the queue
45+
self._queue.pop()
46+
# no consumer will get a chance to process this item, so
47+
# we must decrement the unfinished count ourselves
48+
self.task_done()
49+
50+
def put_nowait(self, item):
51+
if self.full():
52+
self.__drop()
53+
super().put_nowait(item)
54+
55+
async def put(self, item):
56+
# Queue.put blocks when full, so we must override it.
57+
# Since our put_nowait never raises QueueFull, we can just
58+
# call it directly
59+
self.put_nowait(item)
60+
61+
2862
class StickQueue:
2963
"""Manage queue of all request sessions."""
3064

3165
def __init__(self) -> None:
3266
"""Initialize the message session controller."""
3367
self._stick: StickConnectionManager | None = None
3468
self._loop = get_running_loop()
35-
self._submit_queue: PriorityQueue[PlugwiseRequest] = PriorityQueue()
69+
self._submit_queue: DroppingPriorityQueue[PlugwiseRequest] = DroppingPriorityQueue(maxsize=85)
3670
self._submit_worker_task: Task[None] | None = None
3771
self._unsubscribe_connection_events: Callable[[], None] | None = None
3872
self._running = False

0 commit comments

Comments
 (0)