diff --git a/src/meta_memcache/executors/default.py b/src/meta_memcache/executors/default.py index b7b36f2..f782f8b 100644 --- a/src/meta_memcache/executors/default.py +++ b/src/meta_memcache/executors/default.py @@ -1,5 +1,6 @@ import logging from typing import Callable, Dict, List, Optional, Tuple +from meta_memcache.metrics.base import BaseMetricsCollector, MetricDefinition from meta_memcache_socket import ( RequestFlags, @@ -34,18 +35,34 @@ class DefaultExecutor: + def __init__( self, serializer: BaseSerializer, key_encoder_fn: Callable[[Key], bytes] = default_key_encoder, raise_on_server_error: bool = True, touch_ttl_to_consider_write_failure: Optional[int] = 50, + metrics_collector: Optional[BaseMetricsCollector] = None, ) -> None: self._serializer = serializer self._key_encoder_fn = key_encoder_fn self._raise_on_server_error = raise_on_server_error self._touch_ttl_to_consider_write_failure = touch_ttl_to_consider_write_failure self.on_write_failure = WriteFailureEvent() + if metrics_collector: + labels = ("domain",) + metrics_collector.init_metrics( + namespace="cache", + metrics=[ + MetricDefinition("read_hits", "Number of hits", labels), + MetricDefinition("read_misses", "Number of misses", labels), + MetricDefinition("read_bytes", "Size of writes", labels), + MetricDefinition("write_count", "Number of writes", labels), + MetricDefinition("write_bytes", "Size of writes", labels), + ], + gauges=[], + ) + self._metrics = metrics_collector def _build_cmd( self, @@ -94,6 +111,36 @@ def _is_a_write_failure( return True return False + def _collect_metrics( + self, + command: MetaCommand, + key: Key, + value_size: Optional[int], + result: MemcacheResponse, + ) -> None: + if not self._metrics: + return None + + try: + labels = {"domain": key.domain or "default"} + if command == MetaCommand.META_GET: + if isinstance(result, Value): + self._metrics.metric_inc("hits", labels=labels) + if result.value: + self._metrics.metric_inc( + "read_bytes", value=len(result.value.value), labels=labels + ) + elif isinstance(result, Miss): + self._metrics.metric_inc("misses", labels=labels) + if command == MetaCommand.META_SET: + self._metrics.metric_inc("write_count", labels=labels) + if value_size: + self._metrics.metric_inc( + "write_bytes", value=value_size, labels=labels + ) + except Exception as e: + _log.exception(f"Error collecting metrics") + def exec_on_pool( self, pool: ConnectionPool, @@ -120,7 +167,11 @@ def exec_on_pool( value=cmd_value, flags=flags, ) - return self._conn_recv_response(conn, flags=flags) + result = self._conn_recv_response(conn, flags=flags) + self._collect_metrics( + command, key, len(cmd_value) if cmd_value else None, result + ) + return result except Exception as e: error = True raise MemcacheServerError(pool.server, "Memcache error") from e @@ -155,13 +206,15 @@ def exec_multi_on_pool( # noqa: C901 conn = pool.pop_connection() error = False try: - # with pool.get_connection() as conn: + value_sizes = {} for key, value in key_values: cmd_value, flags = ( (None, flags) if value is None else self._prepare_serialized_value_and_flags(key, value, flags) ) + if cmd_value: + value_sizes[key] = len(cmd_value) self._conn_send_cmd( conn, @@ -170,8 +223,14 @@ def exec_multi_on_pool( # noqa: C901 value=cmd_value, flags=flags, ) - for key, _ in key_values: + for key, value in key_values: results[key] = self._conn_recv_response(conn, flags=flags) + self._collect_metrics( + command, + key, + value_sizes.get(key) if value else None, + results[key], + ) except Exception as e: error = True raise MemcacheServerError(pool.server, "Memcache error") from e