diff --git a/backend/deepchecks_monitoring/app.py b/backend/deepchecks_monitoring/app.py index 37016c679..07d98b9a0 100644 --- a/backend/deepchecks_monitoring/app.py +++ b/backend/deepchecks_monitoring/app.py @@ -203,20 +203,6 @@ def auto_removal(task): # pylint: disable=unused-argument if settings.is_cloud: app.include_router(ee.api.v1.cloud_router, dependencies=[Depends(LicenseCheckDependency())]) - # Configure telemetry - if settings.sentry_dsn: - import sentry_sdk - - sentry_sdk.init( - dsn=settings.sentry_dsn, - traces_sampler=ee.utils.sentry.traces_sampler, - environment=settings.sentry_env, - before_send_transaction=ee.utils.sentry.sentry_send_hook - ) - # Ignoring this logger since it can spam sentry with errors - sentry_sdk.integrations.logging.ignore_logger("aiokafka.cluster") - ee.utils.telemetry.collect_telemetry(DataIngestionBackend) - if settings.debug_mode: app.add_middleware(ee.middlewares.ProfilingMiddleware) diff --git a/backend/deepchecks_monitoring/bgtasks/scheduler.py b/backend/deepchecks_monitoring/bgtasks/scheduler.py index c0726f409..7cedb3b31 100644 --- a/backend/deepchecks_monitoring/bgtasks/scheduler.py +++ b/backend/deepchecks_monitoring/bgtasks/scheduler.py @@ -48,8 +48,13 @@ __all__ = ['AlertsScheduler'] +try: + from deepchecks_monitoring import ee # pylint: disable=import-outside-toplevel + with_ee = True +except ImportError: + with_ee = False + -# TODO: rename to MonitorScheduler class AlertsScheduler: """Alerts scheduler.""" @@ -139,7 +144,14 @@ async def run_organization(self, organization): monitors_per_model = defaultdict(list) for monitor in list_monitor_scalars: monitors_per_model[monitor.check.model].append(monitor) + session.expunge_all() + async with self.async_session_factory() as session: + session: AsyncSession + await database.attach_schema_switcher_listener( + session=session, + schema_search_path=[organization.schema_name, 'public'] + ) for model, monitors in monitors_per_model.items(): # Get the minimal time needed to query windows data for. Doing it together for all monitors in order to # query the data once @@ -161,7 +173,6 @@ async def run_organization(self, organization): # For each monitor enqueue schedules for monitor in monitors: schedules = [] - session.add(monitor) frequency = monitor.frequency.to_pendulum_duration() schedule_time = monitor.next_schedule @@ -176,7 +187,10 @@ async def run_organization(self, organization): if schedules: try: await enqueue_tasks(monitor, schedules, organization, session) - monitor.latest_schedule = schedules[-1] + await session.execute( + sa.update(Monitor) + .where(Monitor.id == monitor.id).values({Monitor.latest_schedule: schedules[-1]}) + ) await session.commit() # NOTE: # We use 'Repeatable Read Isolation Level' to run query therefore transaction serialization @@ -234,19 +248,29 @@ async def run_object_storage_ingestion(self, organization): if len(models) == 0: return + session.expunge_all() + async with self.async_session_factory() as session: + await database.attach_schema_switcher_listener( + session=session, + schema_search_path=[organization.schema_name, 'public'] + ) time = pdl.now() - tasks = [] for model in models: if (model.obj_store_last_scan_time is None or pdl.instance(model.obj_store_last_scan_time).add(hours=2) < time): - tasks.append(dict(name=f'{organization.id}:{model.id}', - bg_worker_task=ee.bgtasks.ObjectStorageIngestor.queue_name(), - params=dict(model_id=model.id, organization_id=organization.id))) - if len(tasks) > 0: - await session.execute(insert(Task).values(tasks) - .on_conflict_do_nothing(constraint=UNIQUE_NAME_TASK_CONSTRAINT)) - await session.commit() + task = dict(name=f'{organization.id}:{model.id}', + bg_worker_task=ee.bgtasks.ObjectStorageIngestor.queue_name(), + params=dict(model_id=model.id, organization_id=organization.id)) + try: + await session.execute(insert(Task).values(task) + .on_conflict_do_nothing(constraint=UNIQUE_NAME_TASK_CONSTRAINT)) + await session.commit() + except (SerializationError, DBAPIError) as error: + await session.rollback() + if isinstance(error, DBAPIError) and not is_serialization_error(error): + self.logger.exception('Model(id=%s) s3 task enqueue failed', model.id) + raise async def get_versions_hour_windows( @@ -383,7 +407,7 @@ def is_serialization_error(error: DBAPIError): ) -class BaseSchedulerSettings(config.DatabaseSettings): +class SchedulerSettings(config.DatabaseSettings): """Scheduler settings.""" scheduler_sleep_seconds: int = 30 @@ -393,22 +417,6 @@ class BaseSchedulerSettings(config.DatabaseSettings): scheduler_logfile_backup_count: int = 3 -try: - from deepchecks_monitoring import ee # pylint: disable=import-outside-toplevel - - with_ee = True - - class SchedulerSettings(BaseSchedulerSettings, ee.config.TelemetrySettings): - """Set of worker settings.""" - pass -except ImportError: - with_ee = False - - class SchedulerSettings(BaseSchedulerSettings): - """Set of worker settings.""" - pass - - def execute_alerts_scheduler(scheduler_implementation: t.Type[AlertsScheduler]): """Execute alrets scheduler.""" @@ -416,16 +424,6 @@ async def main(): settings = SchedulerSettings() # type: ignore service_name = 'alerts-scheduler' - if with_ee: - if settings.sentry_dsn: - import sentry_sdk # pylint: disable=import-outside-toplevel - sentry_sdk.init( - dsn=settings.sentry_dsn, - traces_sample_rate=0.6, - environment=settings.sentry_env - ) - ee.utils.telemetry.collect_telemetry(scheduler_implementation) - logger = configure_logger( name=service_name, log_level=settings.scheduler_loglevel, diff --git a/backend/deepchecks_monitoring/bgtasks/tasks_queuer.py b/backend/deepchecks_monitoring/bgtasks/tasks_queuer.py index d8e88a3ff..0f1a6f225 100644 --- a/backend/deepchecks_monitoring/bgtasks/tasks_queuer.py +++ b/backend/deepchecks_monitoring/bgtasks/tasks_queuer.py @@ -136,7 +136,7 @@ async def move_tasks_to_queue(self, session) -> int: return 0 -class BaseWorkerSettings(DatabaseSettings, RedisSettings): +class WorkerSettings(DatabaseSettings, RedisSettings): """Worker settings.""" logfile: t.Optional[str] = None @@ -152,16 +152,6 @@ class Config: env_file_encoding = 'utf-8' -if with_ee: - class WorkerSettings(BaseWorkerSettings, ee.config.TelemetrySettings): - """Set of worker settings.""" - pass -else: - class WorkerSettings(BaseWorkerSettings): - """Set of worker settings.""" - pass - - async def init_async_redis(redis_uri): """Initialize redis connection.""" try: @@ -190,16 +180,6 @@ async def main(): # the telemetry collection. Adding here this import to fix this from deepchecks_monitoring.bgtasks import tasks_queuer # pylint: disable=import-outside-toplevel - if with_ee: - if settings.sentry_dsn: - import sentry_sdk # pylint: disable=import-outside-toplevel - sentry_sdk.init( - dsn=settings.sentry_dsn, - traces_sample_rate=0.1, - environment=settings.sentry_env, - ) - ee.utils.telemetry.collect_telemetry(tasks_queuer.TasksQueuer) - workers = [ # ModelVersionTopicDeletionWorker, ModelVersionCacheInvalidation, diff --git a/backend/deepchecks_monitoring/bgtasks/tasks_runner.py b/backend/deepchecks_monitoring/bgtasks/tasks_runner.py index 4eeef58cf..2cf7bff91 100644 --- a/backend/deepchecks_monitoring/bgtasks/tasks_runner.py +++ b/backend/deepchecks_monitoring/bgtasks/tasks_runner.py @@ -29,7 +29,6 @@ from deepchecks_monitoring.logic.keys import GLOBAL_TASK_QUEUE, TASK_RUNNER_LOCK from deepchecks_monitoring.monitoring_utils import configure_logger from deepchecks_monitoring.public_models.task import BackgroundWorker, Task -from deepchecks_monitoring.utils.other import ExtendedAIOKafkaAdminClient try: from deepchecks_monitoring import ee @@ -189,18 +188,6 @@ async def main(): # the telemetry collection. Adding here this import to fix this from deepchecks_monitoring.bgtasks import tasks_runner # pylint: disable=import-outside-toplevel - if with_ee and settings.sentry_dsn: - import sentry_sdk # pylint: disable=import-outside-toplevel - - sentry_sdk.init( - dsn=settings.sentry_dsn, - traces_sample_rate=0.1, - environment=settings.sentry_env - ) - ee.utils.telemetry.collect_telemetry(tasks_runner.TaskRunner) - # Ignoring this logger since it can spam sentry with errors - sentry_sdk.integrations.logging.ignore_logger('aiokafka.cluster') - async with ResourcesProvider(settings) as rp: async_redis = await init_async_redis(rp.redis_settings.redis_uri) @@ -212,16 +199,6 @@ async def main(): MixpanelSystemStateEvent() ] - # Adding kafka related workers - if settings.kafka_host is not None: - # AIOKafka is spamming our logs, disable it for errors and warnings - logging.getLogger('aiokafka.cluster').setLevel(logging.CRITICAL) - kafka_admin = ExtendedAIOKafkaAdminClient(**rp.kafka_settings.kafka_params) - await kafka_admin.start() - - # flake8: noqa: SC100 - # workers.append(ModelVersionTopicDeletionWorker(kafka_admin)) - # Adding ee workers if with_ee: workers.append(ee.bgtasks.ObjectStorageIngestor(rp)) diff --git a/backend/deepchecks_monitoring/ee/__init__.py b/backend/deepchecks_monitoring/ee/__init__.py index 5cf2d328b..9a657911f 100644 --- a/backend/deepchecks_monitoring/ee/__init__.py +++ b/backend/deepchecks_monitoring/ee/__init__.py @@ -1,3 +1,3 @@ -from . import api, bgtasks, config, integrations, middlewares, notifications, resources, utils +from . import api, bgtasks, config, integrations, middlewares, notifications, resources __all__ = [] diff --git a/backend/deepchecks_monitoring/ee/config.py b/backend/deepchecks_monitoring/ee/config.py index ddf21354e..fb772d720 100644 --- a/backend/deepchecks_monitoring/ee/config.py +++ b/backend/deepchecks_monitoring/ee/config.py @@ -18,7 +18,6 @@ __all__ = [ 'Settings', - 'TelemetrySettings', 'SlackSettings' ] @@ -26,14 +25,6 @@ PROJECT_DIR = pathlib.Path(__file__).parent.parent.absolute() -class TelemetrySettings(BaseDeepchecksSettings): - """Telemetry settings.""" - - instrument_telemetry: bool = False - sentry_dsn: t.Optional[str] = None - sentry_env: str = 'dev' - - class SlackSettings(BaseDeepchecksSettings): """Settings for Slack.""" @@ -52,8 +43,7 @@ def validate_scopes(cls, value: str): # pylint: disable=no-self-argument class Settings( OpenSourceSettings, - SlackSettings, - TelemetrySettings, + SlackSettings ): """Settings for the deepchecks_monitoring package.""" diff --git a/backend/deepchecks_monitoring/ee/resources.py b/backend/deepchecks_monitoring/ee/resources.py index b9233df10..3195f0e49 100644 --- a/backend/deepchecks_monitoring/ee/resources.py +++ b/backend/deepchecks_monitoring/ee/resources.py @@ -17,8 +17,7 @@ from ldclient.client import LDClient from ldclient.config import Config as LDConfig -from deepchecks_monitoring.ee import utils -from deepchecks_monitoring.ee.config import Settings, SlackSettings, TelemetrySettings +from deepchecks_monitoring.ee.config import Settings, SlackSettings from deepchecks_monitoring.ee.features_control_cloud import CloudFeaturesControl from deepchecks_monitoring.ee.features_control_on_prem import OnPremFeaturesControl from deepchecks_monitoring.ee.notifications import AlertNotificator as EEAlertNotificator @@ -42,18 +41,6 @@ class ResourcesProvider(OpenSourceResourcesProvider): def __init__(self, settings: Settings): super().__init__(settings) self._lauchdarkly_client = None - self._is_telemetry_initialized = False - - @property - def telemetry_settings(self) -> TelemetrySettings: - """Get the telemetry settings.""" - if not isinstance(self._settings, TelemetrySettings): - raise AssertionError( - "Provided settings instance type is not a subclass of " - "the 'TelemetrySettings', you need to provide instance " - "of 'TelemetrySettings' to the 'ResourcesProvider' constructor" - ) - return self._settings @property def slack_settings(self) -> SlackSettings: @@ -110,35 +97,10 @@ def parallel_check_executors_pool(self) -> "ActorPool | None": if parallel_check_executor_flag: return super().parallel_check_executors_pool - def initialize_telemetry_collectors( - self, - *targets, - traces_sample_rate: float = 0.6, - ): - """Initialize telemetry.""" - settings = self.telemetry_settings - - if settings.sentry_dsn and not self._is_telemetry_initialized: - import sentry_sdk # pylint: disable=import-outside-toplevel - - sentry_sdk.init( - dsn=settings.sentry_dsn, - traces_sample_rate=traces_sample_rate, - environment=settings.sentry_env, - before_send_transaction=utils.sentry.sentry_send_hook - ) - - self._is_telemetry_initialized = True - - if self._is_telemetry_initialized: - for it in targets: - utils.telemetry.collect_telemetry(it) - def get_client_configuration(self) -> dict: if self.settings.is_cloud: settings = cast(Settings, self.settings) return { - "sentryDsn": settings.sentry_dsn, "lauchdarklySdkKey": settings.lauchdarkly_sdk_key, "environment": settings.enviroment, "mixpanel_id": settings.mixpanel_id, diff --git a/backend/deepchecks_monitoring/ee/utils/__init__.py b/backend/deepchecks_monitoring/ee/utils/__init__.py deleted file mode 100644 index 845c69ac4..000000000 --- a/backend/deepchecks_monitoring/ee/utils/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -from . import sentry, telemetry - -__all__ = [] diff --git a/backend/deepchecks_monitoring/ee/utils/sentry.py b/backend/deepchecks_monitoring/ee/utils/sentry.py deleted file mode 100644 index fc2dfbac7..000000000 --- a/backend/deepchecks_monitoring/ee/utils/sentry.py +++ /dev/null @@ -1,28 +0,0 @@ - -def sentry_send_hook(event, *args, **kwargs): # pylint: disable=unused-argument - """Sentry transaction send hook. - - Sentry "N+1 DB queries" detector incorrectly identifies a load of - monitoring data during monitor execution as the 'N+1' problem, to - prevent this we change a span 'op' key to the next value - 'monitoring-data-load'. - Sentry uses this key to identify database queries and expects it to be equal - to 'db' or 'db.query'. - """ - if event.get('type') == 'transaction': - for span in event.get('spans', tuple()): - if ( - span.get('op') in ['db', 'db.query', 'db.sql.query'] - and '_monitor_data_' in span.get('description', '') - ): - span['op'] = 'monitoring-data-load' - return event - - -def traces_sampler(sampling_context): - """Return trace sampling rate for given context.""" - source = sampling_context['transaction_context']['source'] - # Filtering out say-hello messages completely - if source == 'route' and sampling_context['asgi_scope'].get('path') == '/api/v1/say-hello': - return 0 - # For everything else return default rate - return 0.1 diff --git a/backend/deepchecks_monitoring/ee/utils/telemetry.py b/backend/deepchecks_monitoring/ee/utils/telemetry.py deleted file mode 100644 index cbbd9e844..000000000 --- a/backend/deepchecks_monitoring/ee/utils/telemetry.py +++ /dev/null @@ -1,388 +0,0 @@ -# ---------------------------------------------------------------------------- -# Copyright (C) 2021-2022 Deepchecks (https://www.deepchecks.com) -# -# This file is part of Deepchecks. -# Deepchecks is distributed under the terms of the GNU Affero General -# Public License (version 3 or later). -# You should have received a copy of the GNU Affero General Public License -# along with Deepchecks. If not, see . -# ---------------------------------------------------------------------------- -# -# pylint: disable=unused-import -"""Open-telementy instrumentors.""" -import enum -import json -import logging -import typing as t -from functools import wraps -from time import perf_counter - -import anyio -import pendulum as pdl -import sentry_sdk - -from deepchecks_monitoring import __version__ -from deepchecks_monitoring.public_models import Organization, User -from deepchecks_monitoring.schema_models import Model, ModelVersion - -if t.TYPE_CHECKING: - from pendulum.datetime import DateTime as PendulumDateTime - from sqlalchemy.ext.asyncio import AsyncSession - - from deepchecks_monitoring.bgtasks.scheduler import AlertsScheduler - from deepchecks_monitoring.bgtasks.tasks_queuer import TasksQueuer - from deepchecks_monitoring.bgtasks.tasks_runner import TaskRunner - from deepchecks_monitoring.logic.data_ingestion import DataIngestionBackend - - -__all__ = [ - "collect_telemetry", - "SchedulerInstrumentor", - "DataIngetionInstrumentor" -] - - -class SpanStatus(str, enum.Enum): - CANCELED = "Coroutine Canceled" - FAILED = "Execution Failed" - OK = "Ok" - - -def collect_telemetry(routine: t.Any): - """Instrument open-telementry for given routine.""" - # pylint: disable=redefined-outer-name,import-outside-toplevel - from deepchecks_monitoring.bgtasks.scheduler import AlertsScheduler - from deepchecks_monitoring.bgtasks.tasks_queuer import TasksQueuer - from deepchecks_monitoring.bgtasks.tasks_runner import TaskRunner - from deepchecks_monitoring.logic.data_ingestion import DataIngestionBackend - - logger = logging.getLogger("instrumentation") - - if issubclass(routine, AlertsScheduler): - SchedulerInstrumentor(scheduler_type=routine).instrument() - logger.info("Instrumented alerts scheduler telemetry collectors") - return routine - - if issubclass(routine, DataIngestionBackend): - DataIngetionInstrumentor(data_ingestion_backend_type=routine).instrument() - logger.info("Instrumented data ingestion backend telemetry collectors") - return routine - - if issubclass(routine, TaskRunner): - TaskRunerInstrumentor(task_runner_type=routine).instrument() - logger.info("Instrumented task runner telemetry collectors") - return routine - - if issubclass(routine, TasksQueuer): - TasksQueuerInstrumentor(task_queuer_type=routine).instrument() - logger.info("Instrumented task queuer telemetry collectors") - return routine - - raise ValueError( - "Unknown routine, do not know how to do " - "open-telemetry instrumentation for it." - ) - - -class SchedulerInstrumentor: - """Alerts scheduler open-telemetry instrumentor.""" - - def __init__(self, scheduler_type: "t.Type[AlertsScheduler]"): - self.scheduler_type = scheduler_type - self.original_run_all_organizations = self.scheduler_type.run_all_organizations - self.original_run_organization = self.scheduler_type.run_organization - - def instrument(self): - """Instrument open-telemetry for given scheduler type.""" - - @wraps(self.original_run_all_organizations) - async def run_all_organizations(scheduler: "AlertsScheduler", *args, **kwargs): - db_url = scheduler.engine.url - with sentry_sdk.start_transaction(name="Alerts Execution"): - sentry_sdk.set_context("deepchecks_monitoring", { - "version": __version__ - }) - sentry_sdk.set_context("database", { - "name": str(db_url.database), - "uri": str(db_url), - "user": str(db_url.username) - }) - with sentry_sdk.start_span(op="AlertsScheduler.run_all_organizations") as span: - span.set_data("sleep_seconds", scheduler.sleep_seconds) - try: - await self.original_run_all_organizations( - scheduler, - *args, - **kwargs - ) - except Exception as error: - sentry_sdk.capture_exception(error) - if isinstance(error, anyio.get_cancelled_exc_class()): - span.set_status(SpanStatus.CANCELED) - else: - span.set_status(SpanStatus.FAILED) - raise - else: - span.set_status(SpanStatus.OK) - - @wraps(self.original_run_organization) - async def run_organization( - scheduler: "AlertsScheduler", - organization: "Organization", - *args, - **kwargs - ): - with sentry_sdk.start_span(op="AlertsScheduler.run_organization") as span: - span.set_data("organization.id", organization.id) - span.set_data("organization.schema_name", organization.schema_name) - kwargs = {**kwargs, "organization": organization} - try: - enqueued_tasks = await self.original_run_organization( - scheduler, - *args, - **kwargs - ) - except Exception as error: - sentry_sdk.capture_exception(error) - span.set_status( - SpanStatus.CANCELED - if isinstance(error, anyio.get_cancelled_exc_class()) - else SpanStatus.FAILED - ) - raise - else: - span.set_status(SpanStatus.OK) - - if enqueued_tasks is not None: - stringified_tasks = "\n".join([repr(task) for task in enqueued_tasks]) - span.set_data("enqueued_tasks", stringified_tasks) - span.set_data("description", f"Enqueued {len(enqueued_tasks)} tasks") - else: - span.set_data("description", "Enqueued 0 tasks") - - return enqueued_tasks - - self.scheduler_type.run_all_organizations = run_all_organizations - self.scheduler_type.run_organization = run_organization - - def uninstrument(self): - self.scheduler_type.run_all_organizations = self.original_run_all_organizations - self.scheduler_type.run_organization = self.original_run_organization - - -class DataIngetionInstrumentor: - """Data ingestion backend open-telemetry instrumentor.""" - - def __init__(self, data_ingestion_backend_type: t.Type["DataIngestionBackend"]): - self.data_ingestion_backend_type = data_ingestion_backend_type - self.original_log_samples = self.data_ingestion_backend_type.log_samples - self.original_log_labels = self.data_ingestion_backend_type.log_labels - - def instrument(self): - """Instrument fo the data ingestion backend.""" - - @wraps(self.data_ingestion_backend_type.log_samples) - async def log_samples( - data_ingestion_backend: "DataIngestionBackend", - model_version: ModelVersion, - data: t.List[t.Dict[str, t.Any]], - session: "AsyncSession", - organization_id: int, - log_time: "PendulumDateTime", - ): - settings = data_ingestion_backend.resources_provider.settings - - with sentry_sdk.start_transaction(name="Log Samples"): - sentry_sdk.set_context("deepchecks_monitoring", { - "version": __version__ - }) - sentry_sdk.set_context("kafka", { - "host": settings.kafka_host, - "username": settings.kafka_username, - "security_protocol": settings.kafka_security_protocol, - "max_metadata_age": settings.kafka_max_metadata_age, - "replication_factor": settings.kafka_replication_factor, - "sasl_mechanism": settings.kafka_sasl_mechanism, - }) - sentry_sdk.set_context("redis", { - "uri": settings.redis_uri - }) - sentry_sdk.set_context("database", { - "uri": settings.database_uri - }) - with sentry_sdk.start_span(op="DataIngestionBackend.log_or_update") as span: - span.set_data("organization_id", organization_id) - span.set_data("n_of_samples", len(data)) - try: - result = await self.original_log_samples( - data_ingestion_backend, - model_version, - data, - session, - organization_id, - log_time - ) - except Exception as error: - span.set_status(SpanStatus.FAILED) - sentry_sdk.capture_exception(error) - raise - else: - return result - - @wraps(self.data_ingestion_backend_type.log_labels) - async def log_labels( - data_ingestion_backend: "DataIngestionBackend", - model: Model, - data: t.List[t.Dict[str, t.Any]], - session: "AsyncSession", - organization_id: int, - ): - settings = data_ingestion_backend.resources_provider.settings - - with sentry_sdk.start_transaction(name="Log Labels"): - sentry_sdk.set_context("deepchecks_monitoring", { - "version": __version__ - }) - sentry_sdk.set_context("kafka", { - "host": settings.kafka_host, - "username": settings.kafka_username, - "security_protocol": settings.kafka_security_protocol, - "max_metadata_age": settings.kafka_max_metadata_age, - "replication_factor": settings.kafka_replication_factor, - "sasl_mechanism": settings.kafka_sasl_mechanism, - }) - sentry_sdk.set_context("redis", { - "uri": settings.redis_uri - }) - sentry_sdk.set_context("database", { - "uri": settings.database_uri - }) - with sentry_sdk.start_span(op="DataIngestionBackend.log_or_update") as span: - span.set_data("organization_id", organization_id) - span.set_data("n_of_samples", len(data)) - try: - result = await self.original_log_labels( - data_ingestion_backend, - model, - data, - session, - organization_id - ) - except Exception as error: - span.set_status(SpanStatus.FAILED) - sentry_sdk.capture_exception(error) - raise - else: - return result - - self.data_ingestion_backend_type.log_samples = log_samples - self.data_ingestion_backend_type.log_labels = log_labels - - def uninstrument(self): - self.data_ingestion_backend_type.log_samples = self.original_log_samples - self.data_ingestion_backend_type.log_labels = self.original_log_labels - - -class TaskRunerInstrumentor: - """Task runner open-telemetry instrumentor.""" - - def __init__(self, task_runner_type: t.Type["TaskRunner"]): - self.task_runner_type = task_runner_type - self.original_run_task = self.task_runner_type._run_task - - def instrument(self): - """Instrument the task runner functions we want to monitor.""" - - @wraps(self.original_run_task) - async def _run_task(runner: "TaskRunner", task, session, queued_time, lock): - redis_uri = runner.resource_provider.redis_settings.redis_uri - database_uri = runner.resource_provider.database_settings.database_uri - kafka_settings = runner.resource_provider.kafka_settings - - with sentry_sdk.start_transaction(name="Task Runner"): - sentry_sdk.set_context("deepchecks_monitoring", { - "version": __version__ - }) - sentry_sdk.set_context("kafka", { - "host": kafka_settings.kafka_host, - "username": kafka_settings.kafka_username, - "security_protocol": kafka_settings.kafka_security_protocol, - "max_metadata_age": kafka_settings.kafka_max_metadata_age, - "replication_factor": kafka_settings.kafka_replication_factor, - "sasl_mechanism": kafka_settings.kafka_sasl_mechanism, - }) - sentry_sdk.set_context("redis", { - "uri": redis_uri - }) - sentry_sdk.set_context("database", { - "uri": database_uri - }) - with sentry_sdk.start_span(op="TaskRunner.run_single_task") as span: - span.set_data("task.num-pushed", str(task.num_pushed)) - span.set_data("task.params", json.dumps(task.params, indent=3)) - span.set_data("task.type", str(task.bg_worker_task)) - span.set_data("task.creation-time", str(task.creation_time)) - span.set_data("task.name", task.name) - span.set_data("task.duration-in-queue", pdl.now().int_timestamp - queued_time) - - try: - start = perf_counter() - result = await self.original_run_task(runner, task, session, queued_time, lock) - span.set_data("task.execution-duration", perf_counter() - start) - span.set_status(SpanStatus.OK) - except Exception as error: - span.set_status(SpanStatus.FAILED) - sentry_sdk.capture_exception(error) - raise - else: - return result - - self.task_runner_type._run_task = _run_task # pylint: disable=protected-access - - def uninstrument(self): - self.task_runner_type._run_task = self.original_run_task # pylint: disable=protected-access - - -class TasksQueuerInstrumentor: - """Task runner open-telemetry instrumentor.""" - - def __init__(self, task_queuer_type: t.Type["TasksQueuer"]): - self.task_queuer_type = task_queuer_type - self.original_move_tasks_to_queue = self.task_queuer_type.move_tasks_to_queue - - def instrument(self): - """Instrument the task runner functions we want to monitor.""" - - @wraps(self.original_move_tasks_to_queue) - async def move_tasks_to_queue(queuer: "TasksQueuer", session): - redis_uri = queuer.resource_provider.redis_settings.redis_uri - database_uri = queuer.resource_provider.database_settings.database_uri - - with sentry_sdk.start_transaction(name="Tasks Queuer"): - sentry_sdk.set_context("deepchecks_monitoring", { - "version": __version__ - }) - sentry_sdk.set_context("redis", { - "uri": redis_uri - }) - sentry_sdk.set_context("database", { - "uri": database_uri - }) - with sentry_sdk.start_span(op="TasksQueuer.move_tasks_to_queue") as span: - try: - start = perf_counter() - result = await self.original_move_tasks_to_queue(queuer, session) - span.set_data("execution-duration", perf_counter() - start) - span.set_data("queued-tasks-amount", result) - span.set_status(SpanStatus.OK) - except Exception as error: - span.set_status(SpanStatus.FAILED) - sentry_sdk.capture_exception(error) - raise - else: - return result - - self.task_queuer_type.move_tasks_to_queue = move_tasks_to_queue - - def uninstrument(self): - self.task_queuer_type.move_tasks_to_queue = self.original_move_tasks_to_queue diff --git a/backend/deepchecks_monitoring/logic/data_ingestion.py b/backend/deepchecks_monitoring/logic/data_ingestion.py index 8ca935fc0..661a5f901 100644 --- a/backend/deepchecks_monitoring/logic/data_ingestion.py +++ b/backend/deepchecks_monitoring/logic/data_ingestion.py @@ -12,6 +12,7 @@ import asyncio import copy import json +import logging import typing as t import asyncpg.exceptions @@ -346,6 +347,8 @@ def __init__( self.resources_provider: ResourcesProvider = resources_provider self.logger = logger or configure_logger(name="data-ingestion") self.use_kafka = self.resources_provider.kafka_settings.kafka_host is not None + if self.use_kafka: + logging.getLogger("aiokafka.cluster").setLevel(logging.CRITICAL) async def _send_with_retry( self, diff --git a/backend/deepchecks_monitoring/logic/model_logic.py b/backend/deepchecks_monitoring/logic/model_logic.py index 47220187f..b8b417111 100644 --- a/backend/deepchecks_monitoring/logic/model_logic.py +++ b/backend/deepchecks_monitoring/logic/model_logic.py @@ -312,7 +312,6 @@ def run_deepchecks( pass # For rest of the errors logs them except Exception as e: # pylint: disable=broad-except - # TODO: send error to sentry, needs to be done in the ee sub-package error_message = ( str(e) if not (msg := getattr(e, 'message', None)) diff --git a/backend/deepchecks_monitoring/resources.py b/backend/deepchecks_monitoring/resources.py index 1969a7bbe..5f7d14a29 100644 --- a/backend/deepchecks_monitoring/resources.py +++ b/backend/deepchecks_monitoring/resources.py @@ -475,14 +475,9 @@ def get_features_control(self, user: User) -> FeaturesControl: # pylint: disabl """Return features control.""" return FeaturesControl(self.settings) - def initialize_telemetry_collectors(self, *targets): - """Initialize telemetry.""" - pass - def get_client_configuration(self) -> "dict[str, t.Any]": """Return configuration to be used in client side.""" return { - "sentryDsn": None, "lauchdarklySdkKey": None, "environment": None, "mixpanel_id": None, diff --git a/backend/requirements.txt b/backend/requirements.txt index 9cefc786b..7305a9d85 100644 --- a/backend/requirements.txt +++ b/backend/requirements.txt @@ -32,7 +32,6 @@ slack-sdk==3.18.1 bcrypt==4.0.1 pyinstrument==4.6.1 watchtower==3.0.0 -sentry-sdk[fastapi]~=1.14.0 jupytext~=1.16.0 uvicorn==0.30.1 ddtrace>=1.0,<2 diff --git a/backend/tests/unittests/conftest.py b/backend/tests/unittests/conftest.py index 4b9c74d0a..b3083c88e 100644 --- a/backend/tests/unittests/conftest.py +++ b/backend/tests/unittests/conftest.py @@ -8,20 +8,3 @@ # along with Deepchecks. If not, see . # ---------------------------------------------------------------------------- # -# pylint: disable=import-outside-toplevel -import pytest - - -@pytest.fixture(scope="package", autouse=True) -def _(): - # adding telemetry to make sure that it does not break routines - from deepchecks_monitoring.bgtasks.scheduler import AlertsScheduler - from deepchecks_monitoring.bgtasks.tasks_queuer import TasksQueuer - from deepchecks_monitoring.bgtasks.tasks_runner import TaskRunner - from deepchecks_monitoring.ee.utils import telemetry - from deepchecks_monitoring.logic.data_ingestion import DataIngestionBackend - - telemetry.collect_telemetry(AlertsScheduler) - telemetry.collect_telemetry(DataIngestionBackend) - telemetry.collect_telemetry(TasksQueuer) - telemetry.collect_telemetry(TaskRunner)