Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG-loongsuite.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

# Added

- `loongsuite-instrumentation-mem0`: add hook extension
([#95](https://github.com/alibaba/loongsuite-python-agent/pull/95))

- `loongsuite-instrumentation-mem0`: add support for mem0
([#67](https://github.com/alibaba/loongsuite-python-agent/pull/67))
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
VectorStoreWrapper,
)
from opentelemetry.instrumentation.mem0.package import _instruments
from opentelemetry.instrumentation.mem0.types import set_memory_hooks
from opentelemetry.instrumentation.mem0.version import __version__
from opentelemetry.instrumentation.utils import unwrap
from opentelemetry.semconv.schemas import Schemas
Expand Down Expand Up @@ -127,6 +128,12 @@ def _instrument(self, **kwargs: Any) -> None:
# Optional: logger provider for GenAI events (util will no-op if not provided)
logger_provider = kwargs.get("logger_provider")

# Optional hooks for extensions (e.g. commercial metrics). We only pass through.
memory_before_hook = kwargs.get("memory_before_hook")
memory_after_hook = kwargs.get("memory_after_hook")
inner_before_hook = kwargs.get("inner_before_hook")
inner_after_hook = kwargs.get("inner_after_hook")

# Create util GenAI handler (strong dependency, no fallback).
# Avoid singleton here so tests (and multiple tracer providers) don't leak across runs.
telemetry_handler = ExtendedTelemetryHandler(
Expand All @@ -144,13 +151,33 @@ def _instrument(self, **kwargs: Any) -> None:
)

# Execute instrumentation (traces only, metrics removed)
self._instrument_memory_operations(telemetry_handler)
self._instrument_memory_client_operations(telemetry_handler)
self._instrument_memory_operations(
telemetry_handler,
memory_before_hook=memory_before_hook,
memory_after_hook=memory_after_hook,
)
self._instrument_memory_client_operations(
telemetry_handler,
memory_before_hook=memory_before_hook,
memory_after_hook=memory_after_hook,
)
# Sub-phases controlled by toggle, avoid binding wrapper when disabled to reduce overhead
if mem0_config.is_internal_phases_enabled():
self._instrument_vector_operations(tracer)
self._instrument_graph_operations(tracer)
self._instrument_reranker_operations(tracer)
self._instrument_vector_operations(
tracer,
inner_before_hook=inner_before_hook,
inner_after_hook=inner_after_hook,
)
self._instrument_graph_operations(
tracer,
inner_before_hook=inner_before_hook,
inner_after_hook=inner_after_hook,
)
self._instrument_reranker_operations(
tracer,
inner_before_hook=inner_before_hook,
inner_after_hook=inner_after_hook,
)

def _uninstrument(self, **kwargs: Any) -> None:
"""Remove instrumentation."""
Expand Down Expand Up @@ -392,7 +419,13 @@ def _wrapper(wrapped, instance, args, kwargs):

return _wrapper

def _instrument_memory_operations(self, telemetry_handler):
def _instrument_memory_operations(
self,
telemetry_handler,
*,
memory_before_hook=None,
memory_after_hook=None,
):
"""Instrument Memory and AsyncMemory operations."""
try:
if (
Expand All @@ -407,6 +440,11 @@ def _instrument_memory_operations(self, telemetry_handler):
return

wrapper = MemoryOperationWrapper(telemetry_handler)
set_memory_hooks(
wrapper,
memory_before_hook=memory_before_hook,
memory_after_hook=memory_after_hook,
)

# Instrument Memory (sync)
for method in self._public_methods_of(
Expand Down Expand Up @@ -444,7 +482,13 @@ def _instrument_memory_operations(self, telemetry_handler):
except Exception as e:
logger.debug(f"Failed to instrument Memory operations: {e}")

def _instrument_memory_client_operations(self, telemetry_handler):
def _instrument_memory_client_operations(
self,
telemetry_handler,
*,
memory_before_hook=None,
memory_after_hook=None,
):
"""Instrument MemoryClient and AsyncMemoryClient operations."""
try:
if (
Expand All @@ -459,6 +503,11 @@ def _instrument_memory_client_operations(self, telemetry_handler):
return

wrapper = MemoryOperationWrapper(telemetry_handler)
set_memory_hooks(
wrapper,
memory_before_hook=memory_before_hook,
memory_after_hook=memory_after_hook,
)

# Instrument MemoryClient (sync)
for method in self._public_methods_of(
Expand Down Expand Up @@ -594,7 +643,13 @@ def _factory_wrapper(wrapped, instance, args, kwargs):
except Exception as e:
logger.debug(f"Failed to wrap {factory_class}.create: {e}")

def _instrument_vector_operations(self, tracer):
def _instrument_vector_operations(
self,
tracer,
*,
inner_before_hook=None,
inner_after_hook=None,
):
"""Instrument VectorStore operations."""
try:
# Require both VectorStoreBase and VectorStoreFactory to be available
Expand Down Expand Up @@ -632,7 +687,11 @@ def _instrument_vector_operations(self, tracer):
]

# Create VectorStoreWrapper instance (trace-only)
vector_wrapper = VectorStoreWrapper(tracer)
vector_wrapper = VectorStoreWrapper(
tracer,
inner_before_hook=inner_before_hook,
inner_after_hook=inner_after_hook,
)

# Use generic factory wrapping method
self._wrap_factory_for_phase(
Expand All @@ -647,7 +706,13 @@ def _instrument_vector_operations(self, tracer):
except Exception as e:
logger.debug(f"Failed to instrument vector store operations: {e}")

def _instrument_graph_operations(self, tracer):
def _instrument_graph_operations(
self,
tracer,
*,
inner_before_hook=None,
inner_after_hook=None,
):
"""Instrument GraphStore operations."""
try:
# If factories are unavailable, graph subphase instrumentation cannot be enabled
Expand Down Expand Up @@ -687,7 +752,11 @@ def _instrument_graph_operations(self, tracer):
]

# Create GraphStoreWrapper instance (trace-only)
graph_wrapper = GraphStoreWrapper(tracer)
graph_wrapper = GraphStoreWrapper(
tracer,
inner_before_hook=inner_before_hook,
inner_after_hook=inner_after_hook,
)

# Use generic factory wrapping method
self._wrap_factory_for_phase(
Expand All @@ -702,7 +771,13 @@ def _instrument_graph_operations(self, tracer):
except Exception as e:
logger.debug(f"Failed to instrument graph store operations: {e}")

def _instrument_reranker_operations(self, tracer):
def _instrument_reranker_operations(
self,
tracer,
*,
inner_before_hook=None,
inner_after_hook=None,
):
"""Instrument Reranker operations."""
try:
if not _FACTORIES_AVAILABLE or RerankerFactory is None:
Expand All @@ -713,7 +788,11 @@ def _instrument_reranker_operations(self, tracer):
return

# Create RerankerWrapper instance (trace-only)
reranker_wrapper = RerankerWrapper(tracer)
reranker_wrapper = RerankerWrapper(
tracer,
inner_before_hook=inner_before_hook,
inner_after_hook=inner_after_hook,
)

# Use generic factory wrapping method
self._wrap_factory_for_phase(
Expand Down
Loading