Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 0 additions & 14 deletions backend/deepchecks_monitoring/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,20 +203,6 @@ def auto_removal(task): # pylint: disable=unused-argument
if settings.is_cloud:
app.include_router(ee.api.v1.cloud_router, dependencies=[Depends(LicenseCheckDependency())])

# Configure telemetry
if settings.sentry_dsn:
import sentry_sdk

sentry_sdk.init(
dsn=settings.sentry_dsn,
traces_sampler=ee.utils.sentry.traces_sampler,
environment=settings.sentry_env,
before_send_transaction=ee.utils.sentry.sentry_send_hook
)
# Ignoring this logger since it can spam sentry with errors
sentry_sdk.integrations.logging.ignore_logger("aiokafka.cluster")
ee.utils.telemetry.collect_telemetry(DataIngestionBackend)

if settings.debug_mode:
app.add_middleware(ee.middlewares.ProfilingMiddleware)

Expand Down
74 changes: 36 additions & 38 deletions backend/deepchecks_monitoring/bgtasks/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,13 @@

__all__ = ['AlertsScheduler']

try:
from deepchecks_monitoring import ee # pylint: disable=import-outside-toplevel
with_ee = True
except ImportError:
with_ee = False


# TODO: rename to MonitorScheduler
class AlertsScheduler:
"""Alerts scheduler."""

Expand Down Expand Up @@ -139,7 +144,14 @@ async def run_organization(self, organization):
monitors_per_model = defaultdict(list)
for monitor in list_monitor_scalars:
monitors_per_model[monitor.check.model].append(monitor)
session.expunge_all()

async with self.async_session_factory() as session:
session: AsyncSession
await database.attach_schema_switcher_listener(
session=session,
schema_search_path=[organization.schema_name, 'public']
)
for model, monitors in monitors_per_model.items():
# Get the minimal time needed to query windows data for. Doing it together for all monitors in order to
# query the data once
Expand All @@ -161,7 +173,6 @@ async def run_organization(self, organization):
# For each monitor enqueue schedules
for monitor in monitors:
schedules = []
session.add(monitor)
frequency = monitor.frequency.to_pendulum_duration()
schedule_time = monitor.next_schedule

Expand All @@ -176,7 +187,10 @@ async def run_organization(self, organization):
if schedules:
try:
await enqueue_tasks(monitor, schedules, organization, session)
monitor.latest_schedule = schedules[-1]
await session.execute(
sa.update(Monitor)
.where(Monitor.id == monitor.id).values({Monitor.latest_schedule: schedules[-1]})
)
await session.commit()
# NOTE:
# We use 'Repeatable Read Isolation Level' to run query therefore transaction serialization
Expand Down Expand Up @@ -234,19 +248,29 @@ async def run_object_storage_ingestion(self, organization):

if len(models) == 0:
return
session.expunge_all()

async with self.async_session_factory() as session:
await database.attach_schema_switcher_listener(
session=session,
schema_search_path=[organization.schema_name, 'public']
)
time = pdl.now()
tasks = []
for model in models:
if (model.obj_store_last_scan_time is None
or pdl.instance(model.obj_store_last_scan_time).add(hours=2) < time):
tasks.append(dict(name=f'{organization.id}:{model.id}',
bg_worker_task=ee.bgtasks.ObjectStorageIngestor.queue_name(),
params=dict(model_id=model.id, organization_id=organization.id)))
if len(tasks) > 0:
await session.execute(insert(Task).values(tasks)
.on_conflict_do_nothing(constraint=UNIQUE_NAME_TASK_CONSTRAINT))
await session.commit()
task = dict(name=f'{organization.id}:{model.id}',
bg_worker_task=ee.bgtasks.ObjectStorageIngestor.queue_name(),
params=dict(model_id=model.id, organization_id=organization.id))
try:
await session.execute(insert(Task).values(task)
.on_conflict_do_nothing(constraint=UNIQUE_NAME_TASK_CONSTRAINT))
await session.commit()
except (SerializationError, DBAPIError) as error:
await session.rollback()
if isinstance(error, DBAPIError) and not is_serialization_error(error):
self.logger.exception('Model(id=%s) s3 task enqueue failed', model.id)
raise


async def get_versions_hour_windows(
Expand Down Expand Up @@ -383,7 +407,7 @@ def is_serialization_error(error: DBAPIError):
)


class BaseSchedulerSettings(config.DatabaseSettings):
class SchedulerSettings(config.DatabaseSettings):
"""Scheduler settings."""

scheduler_sleep_seconds: int = 30
Expand All @@ -393,39 +417,13 @@ class BaseSchedulerSettings(config.DatabaseSettings):
scheduler_logfile_backup_count: int = 3


try:
from deepchecks_monitoring import ee # pylint: disable=import-outside-toplevel

with_ee = True

class SchedulerSettings(BaseSchedulerSettings, ee.config.TelemetrySettings):
"""Set of worker settings."""
pass
except ImportError:
with_ee = False

class SchedulerSettings(BaseSchedulerSettings):
"""Set of worker settings."""
pass


def execute_alerts_scheduler(scheduler_implementation: t.Type[AlertsScheduler]):
"""Execute alrets scheduler."""

async def main():
settings = SchedulerSettings() # type: ignore
service_name = 'alerts-scheduler'

if with_ee:
if settings.sentry_dsn:
import sentry_sdk # pylint: disable=import-outside-toplevel
sentry_sdk.init(
dsn=settings.sentry_dsn,
traces_sample_rate=0.6,
environment=settings.sentry_env
)
ee.utils.telemetry.collect_telemetry(scheduler_implementation)

logger = configure_logger(
name=service_name,
log_level=settings.scheduler_loglevel,
Expand Down
22 changes: 1 addition & 21 deletions backend/deepchecks_monitoring/bgtasks/tasks_queuer.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ async def move_tasks_to_queue(self, session) -> int:
return 0


class BaseWorkerSettings(DatabaseSettings, RedisSettings):
class WorkerSettings(DatabaseSettings, RedisSettings):
"""Worker settings."""

logfile: t.Optional[str] = None
Expand All @@ -152,16 +152,6 @@ class Config:
env_file_encoding = 'utf-8'


if with_ee:
class WorkerSettings(BaseWorkerSettings, ee.config.TelemetrySettings):
"""Set of worker settings."""
pass
else:
class WorkerSettings(BaseWorkerSettings):
"""Set of worker settings."""
pass


async def init_async_redis(redis_uri):
"""Initialize redis connection."""
try:
Expand Down Expand Up @@ -190,16 +180,6 @@ async def main():
# the telemetry collection. Adding here this import to fix this
from deepchecks_monitoring.bgtasks import tasks_queuer # pylint: disable=import-outside-toplevel

if with_ee:
if settings.sentry_dsn:
import sentry_sdk # pylint: disable=import-outside-toplevel
sentry_sdk.init(
dsn=settings.sentry_dsn,
traces_sample_rate=0.1,
environment=settings.sentry_env,
)
ee.utils.telemetry.collect_telemetry(tasks_queuer.TasksQueuer)

workers = [
# ModelVersionTopicDeletionWorker,
ModelVersionCacheInvalidation,
Expand Down
23 changes: 0 additions & 23 deletions backend/deepchecks_monitoring/bgtasks/tasks_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
from deepchecks_monitoring.logic.keys import GLOBAL_TASK_QUEUE, TASK_RUNNER_LOCK
from deepchecks_monitoring.monitoring_utils import configure_logger
from deepchecks_monitoring.public_models.task import BackgroundWorker, Task
from deepchecks_monitoring.utils.other import ExtendedAIOKafkaAdminClient

try:
from deepchecks_monitoring import ee
Expand Down Expand Up @@ -189,18 +188,6 @@ async def main():
# the telemetry collection. Adding here this import to fix this
from deepchecks_monitoring.bgtasks import tasks_runner # pylint: disable=import-outside-toplevel

if with_ee and settings.sentry_dsn:
import sentry_sdk # pylint: disable=import-outside-toplevel

sentry_sdk.init(
dsn=settings.sentry_dsn,
traces_sample_rate=0.1,
environment=settings.sentry_env
)
ee.utils.telemetry.collect_telemetry(tasks_runner.TaskRunner)
# Ignoring this logger since it can spam sentry with errors
sentry_sdk.integrations.logging.ignore_logger('aiokafka.cluster')

async with ResourcesProvider(settings) as rp:
async_redis = await init_async_redis(rp.redis_settings.redis_uri)

Expand All @@ -212,16 +199,6 @@ async def main():
MixpanelSystemStateEvent()
]

# Adding kafka related workers
if settings.kafka_host is not None:
# AIOKafka is spamming our logs, disable it for errors and warnings
logging.getLogger('aiokafka.cluster').setLevel(logging.CRITICAL)
kafka_admin = ExtendedAIOKafkaAdminClient(**rp.kafka_settings.kafka_params)
await kafka_admin.start()

# flake8: noqa: SC100
# workers.append(ModelVersionTopicDeletionWorker(kafka_admin))

# Adding ee workers
if with_ee:
workers.append(ee.bgtasks.ObjectStorageIngestor(rp))
Expand Down
2 changes: 1 addition & 1 deletion backend/deepchecks_monitoring/ee/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
from . import api, bgtasks, config, integrations, middlewares, notifications, resources, utils
from . import api, bgtasks, config, integrations, middlewares, notifications, resources

__all__ = []
12 changes: 1 addition & 11 deletions backend/deepchecks_monitoring/ee/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,13 @@

__all__ = [
'Settings',
'TelemetrySettings',
'SlackSettings'
]


PROJECT_DIR = pathlib.Path(__file__).parent.parent.absolute()


class TelemetrySettings(BaseDeepchecksSettings):
"""Telemetry settings."""

instrument_telemetry: bool = False
sentry_dsn: t.Optional[str] = None
sentry_env: str = 'dev'


class SlackSettings(BaseDeepchecksSettings):
"""Settings for Slack."""

Expand All @@ -52,8 +43,7 @@ def validate_scopes(cls, value: str): # pylint: disable=no-self-argument

class Settings(
OpenSourceSettings,
SlackSettings,
TelemetrySettings,
SlackSettings
):
"""Settings for the deepchecks_monitoring package."""

Expand Down
40 changes: 1 addition & 39 deletions backend/deepchecks_monitoring/ee/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@
from ldclient.client import LDClient
from ldclient.config import Config as LDConfig

from deepchecks_monitoring.ee import utils
from deepchecks_monitoring.ee.config import Settings, SlackSettings, TelemetrySettings
from deepchecks_monitoring.ee.config import Settings, SlackSettings
from deepchecks_monitoring.ee.features_control_cloud import CloudFeaturesControl
from deepchecks_monitoring.ee.features_control_on_prem import OnPremFeaturesControl
from deepchecks_monitoring.ee.notifications import AlertNotificator as EEAlertNotificator
Expand All @@ -42,18 +41,6 @@ class ResourcesProvider(OpenSourceResourcesProvider):
def __init__(self, settings: Settings):
super().__init__(settings)
self._lauchdarkly_client = None
self._is_telemetry_initialized = False

@property
def telemetry_settings(self) -> TelemetrySettings:
"""Get the telemetry settings."""
if not isinstance(self._settings, TelemetrySettings):
raise AssertionError(
"Provided settings instance type is not a subclass of "
"the 'TelemetrySettings', you need to provide instance "
"of 'TelemetrySettings' to the 'ResourcesProvider' constructor"
)
return self._settings

@property
def slack_settings(self) -> SlackSettings:
Expand Down Expand Up @@ -110,35 +97,10 @@ def parallel_check_executors_pool(self) -> "ActorPool | None":
if parallel_check_executor_flag:
return super().parallel_check_executors_pool

def initialize_telemetry_collectors(
self,
*targets,
traces_sample_rate: float = 0.6,
):
"""Initialize telemetry."""
settings = self.telemetry_settings

if settings.sentry_dsn and not self._is_telemetry_initialized:
import sentry_sdk # pylint: disable=import-outside-toplevel

sentry_sdk.init(
dsn=settings.sentry_dsn,
traces_sample_rate=traces_sample_rate,
environment=settings.sentry_env,
before_send_transaction=utils.sentry.sentry_send_hook
)

self._is_telemetry_initialized = True

if self._is_telemetry_initialized:
for it in targets:
utils.telemetry.collect_telemetry(it)

def get_client_configuration(self) -> dict:
if self.settings.is_cloud:
settings = cast(Settings, self.settings)
return {
"sentryDsn": settings.sentry_dsn,
"lauchdarklySdkKey": settings.lauchdarkly_sdk_key,
"environment": settings.enviroment,
"mixpanel_id": settings.mixpanel_id,
Expand Down
3 changes: 0 additions & 3 deletions backend/deepchecks_monitoring/ee/utils/__init__.py

This file was deleted.

28 changes: 0 additions & 28 deletions backend/deepchecks_monitoring/ee/utils/sentry.py

This file was deleted.

Loading
Loading