Skip to content

Commit f09dc2e

Browse files
committed
Add client methods for waiting for data to land in ConsDB
1 parent 09dbddc commit f09dc2e

File tree

1 file changed

+125
-0
lines changed

1 file changed

+125
-0
lines changed

python/lsst/summit/utils/consdbClient.py

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import logging
2323
import os
24+
import time
2425
from dataclasses import dataclass
2526
from typing import Any
2627
from urllib.parse import quote, urlparse
@@ -607,6 +608,130 @@ def query(self, query: str) -> Table:
607608
return Table(rows=[])
608609
return Table(rows=result["data"], names=result["columns"])
609610

611+
def wait_for_row_to_exist(self, query: str, timeout: float, poll_frequency_hz: float = 2) -> Table:
612+
"""Returns a row once it exists, or an empty table if it times out.
613+
614+
The supplied ``query`` must be expected to return exactly a single row,
615+
(once it exists), e.g. it should be something like
616+
'select * from cdb_latiss.exposure where exposure_id = 2024100200541'
617+
or similar. If the query were like
618+
'select * from cdb_latiss.exposure where exposure_id in (2024100200541,
619+
2024100200542)', then the query would return multiple rows and an error
620+
would be raised. The user is expected to check that the query meets
621+
this criterion, because if 2024100200541 existed but 2024100200542 was
622+
about to be created the error would not be raised, and downstream
623+
beaviour would be undefined.
624+
625+
Parameters
626+
----------
627+
query : `str`
628+
A SQL query (currently) to the database.
629+
timeout : `float`
630+
Maximum time to wait for a non-empty result, in seconds.
631+
poll_frequency_hz : `float`, optional
632+
Frequency to poll the database for results, in Hz.
633+
634+
Returns
635+
-------
636+
result : `Table`
637+
An ``astropy.Table`` containing the query results, or an empty
638+
if the row was not inserted before the timeout.
639+
640+
Raises
641+
------
642+
requests.RequestException
643+
Raised if any kind of connection error occurs.
644+
requests.HTTPError
645+
Raised if a non-successful status is returned.
646+
ValueError
647+
Raised if the query returns more than one row.
648+
"""
649+
sleep_duration = 1 / poll_frequency_hz
650+
t0 = time.time()
651+
while time.time() - t0 < timeout:
652+
result = self.query(query)
653+
if len(result) > 1:
654+
raise ValueError(f"Query {query} returned more than one row")
655+
elif len(result) == 1:
656+
return result
657+
time.sleep(sleep_duration)
658+
659+
logger.warning(f"Query {query} did not return any results within {timeout}s")
660+
return Table(rows=[])
661+
662+
def wait_for_item_in_row(
663+
self, query: str, item: str, timeout: float, poll_frequency_hz: float = 2
664+
) -> float | None:
665+
"""Returns the value of an item in a row once it exists, or ``None``
666+
if it times out.
667+
668+
If the item is not in the schema of the table, an error will be raised.
669+
670+
The supplied ``query`` must be expected to return exactly a single row,
671+
(once it exists), e.g. it should be something like
672+
'select * from cdb_latiss.exposure where exposure_id = 2024100200541'
673+
or similar. If the query were like
674+
'select * from cdb_latiss.exposure where exposure_id in (2024100200541,
675+
2024100200542)', then the query would return multiple rows and an error
676+
would be raised. The user is expected to check that the query meets
677+
this criterion, because if 2024100200541 existed but 2024100200542 was
678+
about to be created the error would not be raised, and downstream
679+
beaviour would be undefined.
680+
681+
Parameters
682+
----------
683+
query : `str`
684+
A SQL query (currently) to the database.
685+
item : `str`
686+
The item to check for in the query results.
687+
timeout : `float`
688+
Maximum time to wait for a non-empty result, in seconds.
689+
poll_frequency_hz : `float`, optional
690+
Frequency to poll the database for results, in Hz.
691+
692+
Returns
693+
-------
694+
value : `float` or `None`
695+
The corresponding value of the item in the row in the table, or
696+
``None`` if the item was not found before the timeout.
697+
698+
Raises
699+
------
700+
requests.RequestException
701+
Raised if any kind of connection error occurs.
702+
requests.HTTPError
703+
Raised if a non-successful status is returned.
704+
ValueError
705+
Raised if the query returns more than one row, or if the requested
706+
item is not in the schema of the table.
707+
"""
708+
709+
row = self.wait_for_row_to_exist(query, timeout, poll_frequency_hz)
710+
if len(row) == 0:
711+
# wait_for_row_to_exist already logged a warning if table is empty
712+
return None
713+
714+
# we know the row now exists but the required item may not be there yet
715+
sleep_duration = 1 / poll_frequency_hz
716+
t0 = time.time()
717+
while time.time() - t0 < timeout:
718+
result = self.query(query)
719+
if len(result) > 1:
720+
raise ValueError(f"Query {query} returned more than one row")
721+
assert len(result) == 1, "Somehow no rows came back, which should be impossible"
722+
row = result[0]
723+
if item not in row.columns:
724+
raise ValueError(f"Query {query} did not return a column named {item} - check the schema")
725+
value = result[0][item]
726+
if value is not None:
727+
return value
728+
time.sleep(sleep_duration)
729+
730+
logger.warning(
731+
f"The row returned by {query} did not end up containing a value for {item} within {timeout}s"
732+
)
733+
return None
734+
610735
def schema(
611736
self, instrument: str | None = None, table: str | None = None
612737
) -> dict[str, tuple[str, str]] | list[str]:

0 commit comments

Comments
 (0)