Skip to content

Add per-domain metrics #62

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 62 additions & 3 deletions src/meta_memcache/executors/default.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
from typing import Callable, Dict, List, Optional, Tuple
from meta_memcache.metrics.base import BaseMetricsCollector, MetricDefinition

from meta_memcache_socket import (
RequestFlags,
Expand Down Expand Up @@ -34,18 +35,34 @@


class DefaultExecutor:

def __init__(
self,
serializer: BaseSerializer,
key_encoder_fn: Callable[[Key], bytes] = default_key_encoder,
raise_on_server_error: bool = True,
touch_ttl_to_consider_write_failure: Optional[int] = 50,
metrics_collector: Optional[BaseMetricsCollector] = None,
) -> None:
self._serializer = serializer
self._key_encoder_fn = key_encoder_fn
self._raise_on_server_error = raise_on_server_error
self._touch_ttl_to_consider_write_failure = touch_ttl_to_consider_write_failure
self.on_write_failure = WriteFailureEvent()
if metrics_collector:
labels = ("domain",)
metrics_collector.init_metrics(
namespace="cache",
metrics=[
MetricDefinition("read_hits", "Number of hits", labels),
MetricDefinition("read_misses", "Number of misses", labels),
MetricDefinition("read_bytes", "Size of writes", labels),
MetricDefinition("write_count", "Number of writes", labels),
MetricDefinition("write_bytes", "Size of writes", labels),
],
gauges=[],
)
self._metrics = metrics_collector

def _build_cmd(
self,
Expand Down Expand Up @@ -94,6 +111,36 @@ def _is_a_write_failure(
return True
return False

def _collect_metrics(
self,
command: MetaCommand,
key: Key,
value_size: Optional[int],
result: MemcacheResponse,
) -> None:
if not self._metrics:
return None

try:
labels = {"domain": key.domain or "default"}
if command == MetaCommand.META_GET:
if isinstance(result, Value):
self._metrics.metric_inc("hits", labels=labels)
if result.value:
self._metrics.metric_inc(
"read_bytes", value=len(result.value.value), labels=labels
)
elif isinstance(result, Miss):
self._metrics.metric_inc("misses", labels=labels)
if command == MetaCommand.META_SET:
self._metrics.metric_inc("write_count", labels=labels)
if value_size:
self._metrics.metric_inc(
"write_bytes", value=value_size, labels=labels
)
except Exception as e:
_log.exception(f"Error collecting metrics")

def exec_on_pool(
self,
pool: ConnectionPool,
Expand All @@ -120,7 +167,11 @@ def exec_on_pool(
value=cmd_value,
flags=flags,
)
return self._conn_recv_response(conn, flags=flags)
result = self._conn_recv_response(conn, flags=flags)
self._collect_metrics(
command, key, len(cmd_value) if cmd_value else None, result
)
return result
except Exception as e:
error = True
raise MemcacheServerError(pool.server, "Memcache error") from e
Expand Down Expand Up @@ -155,13 +206,15 @@ def exec_multi_on_pool( # noqa: C901
conn = pool.pop_connection()
error = False
try:
# with pool.get_connection() as conn:
value_sizes = {}
for key, value in key_values:
cmd_value, flags = (
(None, flags)
if value is None
else self._prepare_serialized_value_and_flags(key, value, flags)
)
if cmd_value:
value_sizes[key] = len(cmd_value)

self._conn_send_cmd(
conn,
Expand All @@ -170,8 +223,14 @@ def exec_multi_on_pool( # noqa: C901
value=cmd_value,
flags=flags,
)
for key, _ in key_values:
for key, value in key_values:
results[key] = self._conn_recv_response(conn, flags=flags)
self._collect_metrics(
command,
key,
value_sizes.get(key) if value else None,
results[key],
)
except Exception as e:
error = True
raise MemcacheServerError(pool.server, "Memcache error") from e
Expand Down