4848
4949__all__ = ['AlertsScheduler' ]
5050
51+ try :
52+ from deepchecks_monitoring import ee # pylint: disable=import-outside-toplevel
53+ with_ee = True
54+ except ImportError :
55+ with_ee = False
56+
5157
52- # TODO: rename to MonitorScheduler
5358class AlertsScheduler :
5459 """Alerts scheduler."""
5560
@@ -139,7 +144,14 @@ async def run_organization(self, organization):
139144 monitors_per_model = defaultdict (list )
140145 for monitor in list_monitor_scalars :
141146 monitors_per_model [monitor .check .model ].append (monitor )
147+ session .expunge_all ()
142148
149+ async with self .async_session_factory () as session :
150+ session : AsyncSession
151+ await database .attach_schema_switcher_listener (
152+ session = session ,
153+ schema_search_path = [organization .schema_name , 'public' ]
154+ )
143155 for model , monitors in monitors_per_model .items ():
144156 # Get the minimal time needed to query windows data for. Doing it together for all monitors in order to
145157 # query the data once
@@ -161,7 +173,6 @@ async def run_organization(self, organization):
161173 # For each monitor enqueue schedules
162174 for monitor in monitors :
163175 schedules = []
164- session .add (monitor )
165176 frequency = monitor .frequency .to_pendulum_duration ()
166177 schedule_time = monitor .next_schedule
167178
@@ -176,7 +187,10 @@ async def run_organization(self, organization):
176187 if schedules :
177188 try :
178189 await enqueue_tasks (monitor , schedules , organization , session )
179- monitor .latest_schedule = schedules [- 1 ]
190+ await session .execute (
191+ sa .update (Monitor )
192+ .where (Monitor .id == monitor .id ).values ({Monitor .latest_schedule : schedules [- 1 ]})
193+ )
180194 await session .commit ()
181195 # NOTE:
182196 # We use 'Repeatable Read Isolation Level' to run query therefore transaction serialization
@@ -234,19 +248,29 @@ async def run_object_storage_ingestion(self, organization):
234248
235249 if len (models ) == 0 :
236250 return
251+ session .expunge_all ()
237252
253+ async with self .async_session_factory () as session :
254+ await database .attach_schema_switcher_listener (
255+ session = session ,
256+ schema_search_path = [organization .schema_name , 'public' ]
257+ )
238258 time = pdl .now ()
239- tasks = []
240259 for model in models :
241260 if (model .obj_store_last_scan_time is None
242261 or pdl .instance (model .obj_store_last_scan_time ).add (hours = 2 ) < time ):
243- tasks .append (dict (name = f'{ organization .id } :{ model .id } ' ,
244- bg_worker_task = ee .bgtasks .ObjectStorageIngestor .queue_name (),
245- params = dict (model_id = model .id , organization_id = organization .id )))
246- if len (tasks ) > 0 :
247- await session .execute (insert (Task ).values (tasks )
248- .on_conflict_do_nothing (constraint = UNIQUE_NAME_TASK_CONSTRAINT ))
249- await session .commit ()
262+ task = dict (name = f'{ organization .id } :{ model .id } ' ,
263+ bg_worker_task = ee .bgtasks .ObjectStorageIngestor .queue_name (),
264+ params = dict (model_id = model .id , organization_id = organization .id ))
265+ try :
266+ await session .execute (insert (Task ).values (task )
267+ .on_conflict_do_nothing (constraint = UNIQUE_NAME_TASK_CONSTRAINT ))
268+ await session .commit ()
269+ except (SerializationError , DBAPIError ) as error :
270+ await session .rollback ()
271+ if isinstance (error , DBAPIError ) and not is_serialization_error (error ):
272+ self .logger .exception ('Model(id=%s) s3 task enqueue failed' , model .id )
273+ raise
250274
251275
252276async def get_versions_hour_windows (
@@ -383,7 +407,7 @@ def is_serialization_error(error: DBAPIError):
383407 )
384408
385409
386- class BaseSchedulerSettings (config .DatabaseSettings ):
410+ class SchedulerSettings (config .DatabaseSettings ):
387411 """Scheduler settings."""
388412
389413 scheduler_sleep_seconds : int = 30
@@ -393,39 +417,13 @@ class BaseSchedulerSettings(config.DatabaseSettings):
393417 scheduler_logfile_backup_count : int = 3
394418
395419
396- try :
397- from deepchecks_monitoring import ee # pylint: disable=import-outside-toplevel
398-
399- with_ee = True
400-
401- class SchedulerSettings (BaseSchedulerSettings , ee .config .TelemetrySettings ):
402- """Set of worker settings."""
403- pass
404- except ImportError :
405- with_ee = False
406-
407- class SchedulerSettings (BaseSchedulerSettings ):
408- """Set of worker settings."""
409- pass
410-
411-
412420def execute_alerts_scheduler (scheduler_implementation : t .Type [AlertsScheduler ]):
413421 """Execute alrets scheduler."""
414422
415423 async def main ():
416424 settings = SchedulerSettings () # type: ignore
417425 service_name = 'alerts-scheduler'
418426
419- if with_ee :
420- if settings .sentry_dsn :
421- import sentry_sdk # pylint: disable=import-outside-toplevel
422- sentry_sdk .init (
423- dsn = settings .sentry_dsn ,
424- traces_sample_rate = 0.6 ,
425- environment = settings .sentry_env
426- )
427- ee .utils .telemetry .collect_telemetry (scheduler_implementation )
428-
429427 logger = configure_logger (
430428 name = service_name ,
431429 log_level = settings .scheduler_loglevel ,
0 commit comments