88import requests_mock
99from lightning .app import LightningFlow
1010from lightning .app .core import queues
11- from lightning .app .core .constants import HTTP_QUEUE_URL
12- from lightning .app .core .queues import READINESS_QUEUE_CONSTANT , BaseQueue , QueuingSystem , RedisQueue
11+ from lightning .app .core .constants import HTTP_QUEUE_URL , STATE_UPDATE_TIMEOUT
12+ from lightning .app .core .queues import (
13+ READINESS_QUEUE_CONSTANT ,
14+ BaseQueue ,
15+ HTTPQueue ,
16+ QueuingSystem ,
17+ RateLimitedQueue ,
18+ RedisQueue ,
19+ )
1320from lightning .app .utilities .imports import _is_redis_available
1421from lightning .app .utilities .redis import check_if_redis_running
1522
@@ -162,7 +169,7 @@ def test_redis_raises_error_if_failing(redis_mock):
162169
163170class TestHTTPQueue :
164171 def test_http_queue_failure_on_queue_name (self ):
165- test_queue = QueuingSystem . HTTP . get_queue ( queue_name = "test" )
172+ test_queue = HTTPQueue ( "test" , STATE_UPDATE_TIMEOUT )
166173 with pytest .raises (ValueError , match = "App ID couldn't be extracted" ):
167174 test_queue .put ("test" )
168175
@@ -174,7 +181,7 @@ def test_http_queue_failure_on_queue_name(self):
174181
175182 def test_http_queue_put (self , monkeypatch ):
176183 monkeypatch .setattr (queues , "HTTP_QUEUE_TOKEN" , "test-token" )
177- test_queue = QueuingSystem . HTTP . get_queue ( queue_name = "test_http_queue" )
184+ test_queue = HTTPQueue ( "test_http_queue" , STATE_UPDATE_TIMEOUT )
178185 test_obj = LightningFlow ()
179186
180187 # mocking requests and responses
@@ -200,8 +207,7 @@ def test_http_queue_put(self, monkeypatch):
200207
201208 def test_http_queue_get (self , monkeypatch ):
202209 monkeypatch .setattr (queues , "HTTP_QUEUE_TOKEN" , "test-token" )
203- test_queue = QueuingSystem .HTTP .get_queue (queue_name = "test_http_queue" )
204-
210+ test_queue = HTTPQueue ("test_http_queue" , STATE_UPDATE_TIMEOUT )
205211 adapter = requests_mock .Adapter ()
206212 test_queue .client .session .mount ("http://" , adapter )
207213
@@ -218,7 +224,7 @@ def test_http_queue_get(self, monkeypatch):
218224def test_unreachable_queue (monkeypatch ):
219225 monkeypatch .setattr (queues , "HTTP_QUEUE_TOKEN" , "test-token" )
220226
221- test_queue = QueuingSystem . HTTP . get_queue ( queue_name = "test_http_queue" )
227+ test_queue = HTTPQueue ( "test_http_queue" , STATE_UPDATE_TIMEOUT )
222228
223229 resp1 = mock .MagicMock ()
224230 resp1 .status_code = 204
@@ -235,3 +241,25 @@ def test_unreachable_queue(monkeypatch):
235241 # Test backoff on queue.put
236242 test_queue .put ("foo" )
237243 assert test_queue .client .post .call_count == 3
244+
245+
246+ @mock .patch ("lightning.app.core.queues.time.sleep" )
247+ def test_rate_limited_queue (mock_sleep ):
248+ sleeps = []
249+ mock_sleep .side_effect = lambda sleep_time : sleeps .append (sleep_time )
250+
251+ mock_queue = mock .MagicMock ()
252+
253+ mock_queue .name = "inner_queue"
254+ mock_queue .default_timeout = 10.0
255+
256+ rate_limited_queue = RateLimitedQueue (mock_queue , requests_per_second = 1 )
257+
258+ assert rate_limited_queue .name == "inner_queue"
259+ assert rate_limited_queue .default_timeout == 10.0
260+
261+ timeout = time .perf_counter () + 1
262+ while time .perf_counter () + sum (sleeps ) < timeout :
263+ rate_limited_queue .get ()
264+
265+ assert mock_queue .get .call_count == 2
0 commit comments