1212# See the License for the specific language governing permissions and
1313# limitations under the License.
1414
15+ import base64
1516import multiprocessing
1617import pickle
1718import queue # needed as import instead from/import for mocking in tests
2021from abc import ABC , abstractmethod
2122from enum import Enum
2223from pathlib import Path
23- from typing import Any , Optional , Tuple
24+ from typing import Any , List , Optional , Tuple
2425from urllib .parse import urljoin
2526
2627import backoff
2728import requests
2829from requests .exceptions import ConnectionError , ConnectTimeout , ReadTimeout
2930
3031from lightning .app .core .constants import (
32+ BATCH_DELTA_COUNT ,
3133 HTTP_QUEUE_REFRESH_INTERVAL ,
3234 HTTP_QUEUE_REQUESTS_PER_SECOND ,
3335 HTTP_QUEUE_TOKEN ,
@@ -189,6 +191,20 @@ def get(self, timeout: Optional[float] = None) -> Any:
189191 """
190192 pass
191193
194+ @abstractmethod
195+ def batch_get (self , timeout : Optional [float ] = None , count : Optional [int ] = None ) -> List [Any ]:
196+ """Returns the left most elements of the queue.
197+
198+ Parameters
199+ ----------
200+ timeout:
201+ Read timeout in seconds, in case of input timeout is 0, the `self.default_timeout` is used.
202+ A timeout of None can be used to block indefinitely.
203+ count:
204+ The number of element to get from the queue
205+
206+ """
207+
192208 @property
193209 def is_running (self ) -> bool :
194210 """Returns True if the queue is running, False otherwise.
@@ -214,6 +230,12 @@ def get(self, timeout: Optional[float] = None) -> Any:
214230 timeout = self .default_timeout
215231 return self .queue .get (timeout = timeout , block = (timeout is None ))
216232
233+ def batch_get (self , timeout : Optional [float ] = None , count : Optional [int ] = None ) -> List [Any ]:
234+ if timeout == 0 :
235+ timeout = self .default_timeout
236+ # For multiprocessing, we can simply collect the latest upmost element
237+ return [self .queue .get (timeout = timeout , block = (timeout is None ))]
238+
217239
218240class RedisQueue (BaseQueue ):
219241 @requires ("redis" )
@@ -312,6 +334,9 @@ def get(self, timeout: Optional[float] = None) -> Any:
312334 raise queue .Empty
313335 return pickle .loads (out [1 ])
314336
337+ def batch_get (self , timeout : Optional [float ] = None , count : Optional [int ] = None ) -> Any :
338+ return [self .get (timeout = timeout )]
339+
315340 def clear (self ) -> None :
316341 """Clear all elements in the queue."""
317342 self .redis .delete (self .name )
@@ -366,7 +391,6 @@ def __init__(self, queue: BaseQueue, requests_per_second: float):
366391 self ._seconds_per_request = 1 / requests_per_second
367392
368393 self ._last_get = 0.0
369- self ._last_put = 0.0
370394
371395 @property
372396 def is_running (self ) -> bool :
@@ -383,9 +407,12 @@ def get(self, timeout: Optional[float] = None) -> Any:
383407 self ._last_get = time .time ()
384408 return self ._queue .get (timeout = timeout )
385409
410+ def batch_get (self , timeout : Optional [float ] = None , count : Optional [int ] = None ) -> Any :
411+ self ._wait_until_allowed (self ._last_get )
412+ self ._last_get = time .time ()
413+ return self ._queue .batch_get (timeout = timeout )
414+
386415 def put (self , item : Any ) -> None :
387- self ._wait_until_allowed (self ._last_put )
388- self ._last_put = time .time ()
389416 return self ._queue .put (item )
390417
391418
@@ -477,6 +504,20 @@ def _get(self) -> Any:
477504 # we consider the queue is empty to avoid failing the app.
478505 raise queue .Empty
479506
507+ def batch_get (self , timeout : Optional [float ] = None , count : Optional [int ] = None ) -> List [Any ]:
508+ try :
509+ resp = self .client .post (
510+ f"v1/{ self .app_id } /{ self ._name_suffix } " ,
511+ query_params = {"action" : "popCount" , "count" : str (count or BATCH_DELTA_COUNT )},
512+ )
513+ if resp .status_code == 204 :
514+ raise queue .Empty
515+ return [pickle .loads (base64 .b64decode (data )) for data in resp .json ()]
516+ except ConnectionError :
517+ # Note: If the Http Queue service isn't available,
518+ # we consider the queue is empty to avoid failing the app.
519+ raise queue .Empty
520+
480521 @backoff .on_exception (backoff .expo , (RuntimeError , requests .exceptions .HTTPError ))
481522 def put (self , item : Any ) -> None :
482523 if not self .app_id :
0 commit comments