Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
188 changes: 134 additions & 54 deletions agentops/instrumentation/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,14 @@ class InstrumentorConfig(TypedDict):
},
}

# Define which packages require which utility instrumentors
# This maps package names to the list of utility instrumentors they depend on
UTILITY_DEPENDENCIES: dict[str, list[str]] = {
"mem0": ["concurrent.futures"], # mem0 uses concurrent.futures for parallel processing
# Add more dependencies as needed in the future
# "langchain": ["concurrent.futures", "asyncio"],
}

# Configuration for supported agentic libraries
AGENTIC_LIBRARIES: dict[str, InstrumentorConfig] = {
"crewai": {
Expand Down Expand Up @@ -130,6 +138,7 @@ class InstrumentorConfig(TypedDict):
_original_builtins_import = builtins.__import__ # Store original import
_instrumenting_packages: Set[str] = set()
_has_agentic_library: bool = False
_pending_utility_instrumentation: Set[str] = set() # Track packages that need utility instrumentation


# New helper function to check module origin
Expand Down Expand Up @@ -250,10 +259,20 @@ def _should_instrument_package(package_name: str) -> bool:
logger.debug(f"_should_instrument_package: '{package_name}' already instrumented by AgentOps. Skipping.")
return False

# Utility instrumentors should always be instrumented regardless of agentic library state
# Utility instrumentors should only be instrumented when their dependent packages are active
if package_name in UTILITY_INSTRUMENTORS:
logger.debug(f"_should_instrument_package: '{package_name}' is a utility instrumentor. Always allowing.")
return True
# Check if any package that depends on this utility is instrumented
for dependent_package, utilities in UTILITY_DEPENDENCIES.items():
if package_name in utilities and _is_package_instrumented(dependent_package):
logger.debug(
f"_should_instrument_package: '{package_name}' is a utility instrumentor needed by '{dependent_package}'. Allowing."
)
return True

logger.debug(
f"_should_instrument_package: '{package_name}' is a utility instrumentor but no dependent packages are active. Skipping."
)
return False

# Only apply agentic/provider logic if it's NOT a utility instrumentor
is_target_agentic = package_name in AGENTIC_LIBRARIES
Expand Down Expand Up @@ -297,9 +316,43 @@ def _should_instrument_package(package_name: str) -> bool:
return False


def _instrument_utility_dependencies(package_name: str):
"""
Instrument any utility dependencies required by the given package.

Args:
package_name: The package that was just instrumented
"""
if package_name in UTILITY_DEPENDENCIES:
utilities_needed = UTILITY_DEPENDENCIES[package_name]
logger.debug(
f"_instrument_utility_dependencies: Package '{package_name}' requires utilities: {utilities_needed}"
)

for utility_name in utilities_needed:
if utility_name in UTILITY_INSTRUMENTORS and not _is_package_instrumented(utility_name):
logger.info(f"AgentOps: Instrumenting utility '{utility_name}' required by '{package_name}'")

# Check if the utility module is available
if utility_name in sys.modules:
_perform_instrumentation(utility_name)
else:
logger.debug(
f"_instrument_utility_dependencies: Utility '{utility_name}' not yet imported, will instrument when imported"
)


def _perform_instrumentation(package_name: str):
"""Helper function to perform instrumentation for a given package."""
global _instrumenting_packages, _active_instrumentors, _has_agentic_library

# Check if we're already instrumenting this package (prevent circular instrumentation)
if package_name in _instrumenting_packages:
logger.debug(
f"_perform_instrumentation: Already instrumenting '{package_name}', skipping to prevent circular instrumentation"
)
return

if not _should_instrument_package(package_name):
return

Expand All @@ -318,47 +371,81 @@ def _perform_instrumentation(package_name: str):
config = PROVIDERS.get(package_name) or AGENTIC_LIBRARIES.get(package_name) or UTILITY_INSTRUMENTORS[package_name]
loader = InstrumentorLoader(**config)

# instrument_one already checks loader.should_activate
instrumentor_instance = instrument_one(loader)
if instrumentor_instance is not None:
# Check if it was *actually* instrumented by instrument_one by seeing if the instrument method was called successfully.
# This relies on instrument_one returning None if its internal .instrument() call failed (if we revert that, this needs adjustment)
# For now, assuming instrument_one returns instance only on full success.
# User request was to return instrumentor even if .instrument() fails. So, we check if _agentops_instrumented_package_key was set by us.

# Let's assume instrument_one might return an instance whose .instrument() failed.
# The key is set before _active_instrumentors.append, so if it's already there and matches, it means it's a re-attempt on the same package.
# The _is_package_instrumented check at the start of _should_instrument_package should prevent most re-entry for the same package_name.

# Store the package key this instrumentor is for, to aid _is_package_instrumented
instrumentor_instance._agentops_instrumented_package_key = package_name

# Add to active_instrumentors only if it's not a duplicate in terms of package_key being instrumented
# This is a safeguard, _is_package_instrumented should catch this earlier.
is_newly_added = True
for existing_inst in _active_instrumentors:
# Add to _instrumenting_packages to prevent circular instrumentation
_instrumenting_packages.add(package_name)

try:
# instrument_one already checks loader.should_activate
instrumentor_instance = instrument_one(loader)
if instrumentor_instance is not None:
# Check if it was *actually* instrumented by instrument_one by seeing if the instrument method was called successfully.
# This relies on instrument_one returning None if its internal .instrument() call failed (if we revert that, this needs adjustment)
# For now, assuming instrument_one returns instance only on full success.
# User request was to return instrumentor even if .instrument() fails. So, we check if _agentops_instrumented_package_key was set by us.

# Let's assume instrument_one might return an instance whose .instrument() failed.
# The key is set before _active_instrumentors.append, so if it's already there and matches, it means it's a re-attempt on the same package.
# The _is_package_instrumented check at the start of _should_instrument_package should prevent most re-entry for the same package_name.

# Store the package key this instrumentor is for, to aid _is_package_instrumented
instrumentor_instance._agentops_instrumented_package_key = package_name

# Add to active_instrumentors only if it's not a duplicate in terms of package_key being instrumented
# This is a safeguard, _is_package_instrumented should catch this earlier.
is_newly_added = True
for existing_inst in _active_instrumentors:
if (
hasattr(existing_inst, "_agentops_instrumented_package_key")
and existing_inst._agentops_instrumented_package_key == package_name
):
is_newly_added = False
logger.debug(
f"_perform_instrumentation: Instrumentor for '{package_name}' already in _active_instrumentors. Not adding again."
)
break
if is_newly_added:
_active_instrumentors.append(instrumentor_instance)

# If this was an agentic library AND it's newly effectively instrumented.
if (
hasattr(existing_inst, "_agentops_instrumented_package_key")
and existing_inst._agentops_instrumented_package_key == package_name
):
is_newly_added = False
logger.debug(
f"_perform_instrumentation: Instrumentor for '{package_name}' already in _active_instrumentors. Not adding again."
)
break
if is_newly_added:
_active_instrumentors.append(instrumentor_instance)
package_name in AGENTIC_LIBRARIES and not _has_agentic_library
): # Check _has_agentic_library to ensure this is the *first* one.
# _uninstrument_providers() was already called in _should_instrument_package for the first agentic library.
_has_agentic_library = True

# Mark package for utility dependency instrumentation
# We defer this to avoid circular imports during package initialization
if package_name not in UTILITY_INSTRUMENTORS and is_newly_added: # Don't recursively instrument utilities
if package_name in UTILITY_DEPENDENCIES:
_pending_utility_instrumentation.add(package_name)
logger.debug(
f"_perform_instrumentation: Marked '{package_name}' for deferred utility instrumentation"
)
else:
logger.debug(
f"_perform_instrumentation: instrument_one for '{package_name}' returned None. Not added to active instrumentors."
)
finally:
# Always remove from _instrumenting_packages when done
_instrumenting_packages.discard(package_name)

# If this was an agentic library AND it's newly effectively instrumented.
if (
package_name in AGENTIC_LIBRARIES and not _has_agentic_library
): # Check _has_agentic_library to ensure this is the *first* one.
# _uninstrument_providers() was already called in _should_instrument_package for the first agentic library.
_has_agentic_library = True
else:
logger.debug(
f"_perform_instrumentation: instrument_one for '{package_name}' returned None. Not added to active instrumentors."
)

def _process_pending_utility_instrumentation():
"""Process any pending utility instrumentations."""
global _pending_utility_instrumentation

if not _pending_utility_instrumentation:
return

# Copy and clear to avoid modifying during iteration
pending = _pending_utility_instrumentation.copy()
_pending_utility_instrumentation.clear()

for package_name in pending:
try:
_instrument_utility_dependencies(package_name)
except Exception as e:
logger.debug(f"Error instrumenting utility dependencies for {package_name}: {e}")


def _import_monitor(name: str, globals_dict=None, locals_dict=None, fromlist=(), level=0):
Expand All @@ -368,6 +455,9 @@ def _import_monitor(name: str, globals_dict=None, locals_dict=None, fromlist=(),
"""
global _instrumenting_packages, _has_agentic_library

# Process any pending utility instrumentations before handling new imports
_process_pending_utility_instrumentation()

# If an agentic library is already instrumented, skip all further instrumentation
if _has_agentic_library:
return _original_builtins_import(name, globals_dict, locals_dict, fromlist, level)
Expand Down Expand Up @@ -408,7 +498,7 @@ def _import_monitor(name: str, globals_dict=None, locals_dict=None, fromlist=(),

# Instrument all matching packages
for package_to_check in packages_to_check:
if package_to_check not in _instrumenting_packages and not _is_package_instrumented(package_to_check):
if not _is_package_instrumented(package_to_check):
target_module_obj = sys.modules.get(package_to_check)

if target_module_obj:
Expand All @@ -423,16 +513,13 @@ def _import_monitor(name: str, globals_dict=None, locals_dict=None, fromlist=(),
f"_import_monitor: No module object found in sys.modules for '{package_to_check}', proceeding with SDK instrumentation attempt."
)

_instrumenting_packages.add(package_to_check)
try:
_perform_instrumentation(package_to_check)
# If we just instrumented an agentic library, stop
if _has_agentic_library:
break
except Exception as e:
logger.error(f"Error instrumenting {package_to_check}: {str(e)}")
finally:
_instrumenting_packages.discard(package_to_check)

return module

Expand Down Expand Up @@ -540,11 +627,7 @@ def instrument_all():
package_to_check = target
break

if (
package_to_check
and package_to_check not in _instrumenting_packages
and not _is_package_instrumented(package_to_check)
):
if package_to_check and not _is_package_instrumented(package_to_check):
target_module_obj = sys.modules.get(package_to_check)

if target_module_obj:
Expand All @@ -556,13 +639,10 @@ def instrument_all():
f"instrument_all: No module object found for '{package_to_check}' in sys.modules during startup scan. Proceeding cautiously."
)

_instrumenting_packages.add(package_to_check)
try:
_perform_instrumentation(package_to_check)
except Exception as e:
logger.error(f"Error instrumenting {package_to_check}: {str(e)}")
finally:
_instrumenting_packages.discard(package_to_check)


def uninstrument_all():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ def wrapped_init(

def context_aware_initializer() -> None:
"""Initializer that sets up the captured context in each worker thread."""
logger.debug("[ConcurrentFuturesInstrumentor] Setting up context in worker thread")

# Set the main context variables in this thread
for var, value in main_context.items():
Expand All @@ -60,8 +59,6 @@ def context_aware_initializer() -> None:
logger.error(f"[ConcurrentFuturesInstrumentor] Error in user initializer: {e}")
raise

logger.debug("[ConcurrentFuturesInstrumentor] Worker thread context setup complete")

# Create executor with context-aware initializer
prefix = f"AgentOps-{thread_name_prefix}" if thread_name_prefix else "AgentOps-Thread"

Expand All @@ -74,8 +71,6 @@ def context_aware_initializer() -> None:
initargs=(), # We handle initargs in our wrapper
)

logger.debug("[ConcurrentFuturesInstrumentor] ThreadPoolExecutor initialized with context propagation")

return wrapped_init


Expand All @@ -85,8 +80,7 @@ def _context_propagating_submit(original_submit: Callable) -> Callable:
@functools.wraps(original_submit)
def wrapped_submit(self: ThreadPoolExecutor, func: Callable[..., R], *args: Any, **kwargs: Any) -> Future[R]:
# Log the submission
func_name = getattr(func, "__name__", str(func))
logger.debug(f"[ConcurrentFuturesInstrumentor] Submitting function: {func_name}")
func_name = getattr(func, "__name__", str(func)) # noqa: F841

# The context propagation is handled by the initializer, so we can submit normally
# But we can add additional logging or monitoring here if needed
Expand Down
Loading