55import httpx
66import json
77from pydantic import BaseModel , AwareDatetime , ValidationError , model_validator
8- from typing import AsyncGenerator , Dict , Generator , Iterator , List , Literal , Optional , Any , get_args
8+ from typing import AsyncIterator , Dict , Iterator , List , Literal , Optional , Any , get_args
99from typing_extensions import Self
1010from numbers import Number
1111
@@ -90,12 +90,12 @@ def to_dict(self) -> Dict:
9090
9191class ChunkedParams (BaseModel ):
9292 dataset : str
93- metric : str
9493 groupBy : Optional [List [FieldName ]] = None
9594 aggregator : Optional [Aggregator ] = None
9695 fields : Optional [List [FieldName ]] = None
9796 orderBy : Optional [List [str ]] = None # More complex than just FieldName, can be prefixed with - to invert sort
9897 dataType : Optional [DataType ] = None
98+ filter : Optional [str ] = None
9999 start : datetime
100100 end : datetime
101101 jump : timedelta = timedelta (hours = 1 )
@@ -112,6 +112,9 @@ def chunks(self) -> Iterator[QueryParams]:
112112 current_start = self .start
113113 while current_start < self .end :
114114 current_end = current_start + self .jump
115+ filter = f'timestamp>={ current_start .isoformat ()} ;timestamp<{ current_end .isoformat ()} '
116+ if self .filter :
117+ filter += f';{ self .filter } '
115118
116119 yield QueryParams (
117120 dataset = self .dataset ,
@@ -120,7 +123,7 @@ def chunks(self) -> Iterator[QueryParams]:
120123 fields = self .fields ,
121124 orderBy = self .orderBy ,
122125 dataType = self .dataType ,
123- filter = f'timestamp>= { current_start . isoformat () } and timestamp< { current_end . isoformat () } and metric== { self . metric } '
126+ filter = filter
124127 )
125128
126129 current_start += self .jump
@@ -219,12 +222,9 @@ async def query(
219222
220223 async def query_time_chunked (
221224 self ,
222- dataset : str ,
223- params : QueryParams ,
224- from_time : datetime ,
225- to_time : datetime ,
226- jump : timedelta ,
227- filter_ : Optional [str ] = None ,
228- direction : Literal ["asc" , "desc" ] = "asc" ,
229- ) -> AsyncGenerator [List [Datapoint ], None ]:
230- raise NotImplementedError ()
225+ params : ChunkedParams
226+ ) -> AsyncIterator [List [Datapoint ]]:
227+ for chunk in params .chunks ():
228+ yield await self .query (
229+ chunk
230+ )
0 commit comments