2222 R ,
2323 T ,
2424 TaskID ,
25- TracebackStr ,
2625)
27- from data_pipeline .polygon_utils import HistoricalQuotes
2826
2927from option_bot .proj_constants import log
3028
@@ -123,11 +121,12 @@ def __init__(
123121 session_base_url = session_base_url ,
124122 )
125123 self .o_ticker_count_mapping = o_ticker_count_mapping
126- self .o_ticker_queue_progress : Dict [str , set [int ]] = {} # list of tids per o_ticker pulled from queue
124+ self .o_ticker_queue_progress : Dict [str , set [int ]] = {} # tids pulled to execute
125+ self .o_ticker_skip_tids : Dict [str , set [int ]] = {} # tids pulled to skip
127126 self .tid_result_progress : set = set ()
128- self .o_ticker : str = ""
129127 self .empty_tids : set = set ()
130128 self .complete_otkrs : set [str ] = set ()
129+ self .completely_processed_otkrs : list [str ] = []
131130
132131 async def run (self ):
133132 if self .init_client_session :
@@ -138,6 +137,7 @@ async def run(self):
138137 ) as client_session :
139138 pending : Dict [asyncio .Future , TaskID ] = {}
140139 completed : int = 0
140+ skipped : int = 0
141141 running = True
142142 while running or pending :
143143 # TTL, Tasks To Live, determines how many tasks to execute before dying
@@ -157,22 +157,26 @@ async def run(self):
157157
158158 tid , func , args , kwargs = task
159159
160- self .o_ticker = args [0 ]
161- if self .o_ticker not in self .o_ticker_queue_progress :
162- self .o_ticker_queue_progress [self .o_ticker ] = set ([tid ])
160+ # tracking progress
161+ o_ticker = args [0 ]
162+ if o_ticker not in self .o_ticker_queue_progress :
163+ self .o_ticker_queue_progress [o_ticker ] = set ([tid ])
164+
165+ # start work on task, add to pending if not a skipped/complete otkr.
166+ # Otherwise add tid to "results" to mark as done
167+ if o_ticker not in self .complete_otkrs :
168+ self .o_ticker_queue_progress [o_ticker ].add (tid )
169+ args = [
170+ * args ,
171+ client_session ,
172+ ] # NOTE: adds client session to the args list
173+ future = asyncio .ensure_future (func (* args , ** kwargs ))
174+ pending [future ] = tid
163175 else :
164- self .o_ticker_queue_progress [self .o_ticker ].add (tid )
176+ self .rx .put_nowait ((tid , None , None ))
177+ self .o_ticker_skip_tids [o_ticker ].add (tid )
178+ skipped += 1
165179
166- # start work on task, add to pending
167-
168- args = [
169- * args ,
170- client_session ,
171- ] # NOTE: adds client session to the args list
172- future = asyncio .ensure_future (func (* args , ** kwargs ))
173- pending [future ] = tid
174-
175- # NOTE: this won't initially start processing "pending" until pending is as big as concurrency limit
176180 if not pending :
177181 await asyncio .sleep (0.005 )
178182 continue
@@ -190,10 +194,11 @@ async def run(self):
190194 tb = None
191195 try :
192196 result = future .result ()
193- if result [0 ] is False :
194- if result [1 ] not in self .complete_otkrs :
195- self .empty_tids .add (tid )
196- result = None
197+ if result :
198+ if result [0 ] is False :
199+ if result [1 ] not in self .complete_otkrs :
200+ self .empty_tids .add (tid )
201+ result = None
197202 except BaseException as e :
198203 if self .exception_handler is not None :
199204 self .exception_handler (e )
@@ -203,25 +208,25 @@ async def run(self):
203208 self .tid_result_progress .add (tid )
204209 completed += 1
205210
206- k = 15 # indicator that we've passed the listing date for the option
207- if len (self .empty_tids ) > k :
208- seq_start = self .has_consecutive_sequence (k = k )
209- if seq_start :
210- empty_otkr = self .check_completed_otkr (seq_start )
211- await self .clean_up_queue (empty_otkr )
211+ self .eval_list_date ()
212+
212213 self .clean_o_ticker_progress ()
213- log .debug (
214- f"loop complete, restarting while again, running: { running } , total processed: { completed } "
215- )
216214
217- log .info (f"worker finished: processed { completed } tasks" )
215+ log .info (f"worker finished: processed { completed } tasks, and skipped { skipped } " )
216+
217+ def eval_list_date (self ):
218+ k = 15 # indicator that we've passed the listing date for the option
219+ if len (self .empty_tids ) > k :
220+ seq_start = self .has_consecutive_sequence (k = k )
221+ if seq_start :
222+ self .check_completed_otkr (seq_start )
218223
219224 def has_consecutive_sequence (self , k = 15 ) -> int | bool :
220- """check if there is a sequence of length 16 or longer in which the tids are consecutive"""
225+ """check if there is a sequence of length k or longer in which the tids are consecutive"""
221226 for tid in self .empty_tids :
222227 if all ((tid + i ) in self .empty_tids for i in range (k )):
223- # log.debug(f"consecutive sequence found with {len(self.empty_tids)} empty tids")
224- # log.debug(f"empty tids: {self.empty_tids}")
228+ log .debug (f"consecutive sequence of { k } found within { len (self .empty_tids )} total empty tids" )
229+ log .debug (f"empty tids: { self .empty_tids } " )
225230 return tid
226231 return False
227232
@@ -231,45 +236,41 @@ def check_completed_otkr(self, tid):
231236 for otkr in self .o_ticker_queue_progress :
232237 if tid in self .o_ticker_queue_progress [otkr ]:
233238 now_empty_otkr = otkr
239+ break
234240 if now_empty_otkr :
235241 self .complete_otkrs .add (now_empty_otkr )
236- return now_empty_otkr
237-
238- async def clean_up_queue (self , otkr : str ):
239- """identifies the o_ticker that has the tid with the consecutive sequence.
240- It calculates the remaining tasks that have to be pulled from the queue for the oticker and removes them.
241- Everything that has already been pulled and is in `pending` will still be processed.
242- Removes completed
243- """
244- done_tids = self .o_ticker_queue_progress [otkr ]
245- remaining_tasks = self .o_ticker_count_mapping [otkr ] - len (done_tids )
246- i = 0
247- while i < remaining_tasks :
248- try :
249- self .tx .get_nowait ()
250- i += 1
251- except queue .Empty :
252- await asyncio .sleep (0.001 )
253-
254- self .empty_tids = self .empty_tids - done_tids
255- self .o_ticker_count_mapping [otkr ] -= remaining_tasks
242+ self .o_ticker_skip_tids [now_empty_otkr ] = set ()
243+ self .empty_tids -= self .o_ticker_queue_progress [now_empty_otkr ]
256244
257245 def clean_o_ticker_progress (self ):
258- """removes anything"""
259- completely_done_otkrs = []
260- for otkr in self .o_ticker_queue_progress :
261- if otkr in self .complete_otkrs :
262- self .empty_tids = self .empty_tids - self .o_ticker_queue_progress [otkr ]
263- if len (self .o_ticker_queue_progress [otkr ]) >= self .o_ticker_count_mapping [otkr ]:
264- if len (self .o_ticker_queue_progress [otkr ] - self .tid_result_progress ) == 0 :
265- completely_done_otkrs .append (otkr )
266-
267- if completely_done_otkrs :
268- for otkr in completely_done_otkrs :
269- self .tid_result_progress -= self .o_ticker_queue_progress [otkr ]
270- self .o_ticker_queue_progress [otkr ].clear ()
271- self .o_ticker_queue_progress .pop (otkr )
272- log .info (f"\n completely done with otkrs: { completely_done_otkrs } !!" )
246+ """reports how many o_tickers have had all tids pulled from the queue and cleans internal tracking sets"""
247+
248+ for otkr in self .complete_otkrs :
249+ total_tids = self .o_ticker_count_mapping [otkr ]
250+ self .empty_tids -= self .o_ticker_queue_progress [otkr ]
251+ if otkr not in self .completely_processed_otkrs :
252+ if (
253+ len (self .o_ticker_skip_tids .get (otkr , []) | self .o_ticker_queue_progress .get (otkr , []))
254+ >= total_tids
255+ ):
256+ self .completely_processed_otkrs .append (otkr )
257+ log .info (f"all processed for { otkr } ! ({ total_tids } tasks)" )
258+
259+ elif (
260+ len (
261+ (
262+ self .o_ticker_queue_progress .get (otkr , set ())
263+ | self .o_ticker_skip_tids .get (otkr , set ())
264+ )
265+ - self .tid_result_progress
266+ )
267+ == 0
268+ ):
269+ self .completely_processed_otkrs .append (otkr )
270+ log .info (f"all processed for { otkr } !! \
271+ ({ len (self .o_ticker_queue_progress .get (otkr , []))} processed, \
272+ { total_tids - len (self .o_ticker_queue_progress .get (otkr , []))} will be skipped, \
273+ { total_tids } expected)" )
273274
274275
275276class QuotePool (Pool ):
@@ -287,7 +288,6 @@ def __init__(
287288 init_client_session : bool = False ,
288289 session_base_url : Optional [str ] = None ,
289290 o_ticker_count_mapping : Dict [str , int ] = None ,
290- paginator : HistoricalQuotes = None ,
291291 ) -> None :
292292 self .o_ticker_count_mapping : dict [str , int ] = o_ticker_count_mapping
293293 scheduler = QuoteScheduler (self .o_ticker_count_mapping )
@@ -338,9 +338,9 @@ def queue_work(
338338
339339 return task_id
340340
341- def finish_work (self , task_id : TaskID , value : Any , tb : Optional [TracebackStr ]):
342- """overwriting the inherited function. Not using ._results in the pool"""
343- self .scheduler .complete_task (task_id )
341+ # def finish_work(self, task_id: TaskID, value: Any, tb: Optional[TracebackStr]):
342+ # """overwriting the inherited function. Not using ._results in the pool"""
343+ # self.scheduler.complete_task(task_id)
344344
345345 def starmap (
346346 self ,
@@ -373,7 +373,6 @@ def create_worker(
373373
374374 :meta private:
375375 """
376- # paginator = copy.deepcopy(self.paginator)
377376 tx , rx = self .queues [qid ]
378377 process = QuoteWorker (
379378 tx ,
@@ -387,7 +386,6 @@ def create_worker(
387386 init_client_session = self .init_client_session ,
388387 session_base_url = self .session_base_url ,
389388 o_ticker_count_mapping = self .o_ticker_count_mapping ,
390- # paginator=paginator,
391389 )
392390 process .start ()
393391 return process
0 commit comments