diff --git a/agentops/instrumentation/__init__.py b/agentops/instrumentation/__init__.py index f46bc192a..cebd5d132 100644 --- a/agentops/instrumentation/__init__.py +++ b/agentops/instrumentation/__init__.py @@ -85,6 +85,14 @@ class InstrumentorConfig(TypedDict): }, } +# Define which packages require which utility instrumentors +# This maps package names to the list of utility instrumentors they depend on +UTILITY_DEPENDENCIES: dict[str, list[str]] = { + "mem0": ["concurrent.futures"], # mem0 uses concurrent.futures for parallel processing + # Add more dependencies as needed in the future + # "langchain": ["concurrent.futures", "asyncio"], +} + # Configuration for supported agentic libraries AGENTIC_LIBRARIES: dict[str, InstrumentorConfig] = { "crewai": { @@ -130,6 +138,7 @@ class InstrumentorConfig(TypedDict): _original_builtins_import = builtins.__import__ # Store original import _instrumenting_packages: Set[str] = set() _has_agentic_library: bool = False +_pending_utility_instrumentation: Set[str] = set() # Track packages that need utility instrumentation # New helper function to check module origin @@ -250,10 +259,20 @@ def _should_instrument_package(package_name: str) -> bool: logger.debug(f"_should_instrument_package: '{package_name}' already instrumented by AgentOps. Skipping.") return False - # Utility instrumentors should always be instrumented regardless of agentic library state + # Utility instrumentors should only be instrumented when their dependent packages are active if package_name in UTILITY_INSTRUMENTORS: - logger.debug(f"_should_instrument_package: '{package_name}' is a utility instrumentor. Always allowing.") - return True + # Check if any package that depends on this utility is instrumented + for dependent_package, utilities in UTILITY_DEPENDENCIES.items(): + if package_name in utilities and _is_package_instrumented(dependent_package): + logger.debug( + f"_should_instrument_package: '{package_name}' is a utility instrumentor needed by '{dependent_package}'. Allowing." + ) + return True + + logger.debug( + f"_should_instrument_package: '{package_name}' is a utility instrumentor but no dependent packages are active. Skipping." + ) + return False # Only apply agentic/provider logic if it's NOT a utility instrumentor is_target_agentic = package_name in AGENTIC_LIBRARIES @@ -297,9 +316,43 @@ def _should_instrument_package(package_name: str) -> bool: return False +def _instrument_utility_dependencies(package_name: str): + """ + Instrument any utility dependencies required by the given package. + + Args: + package_name: The package that was just instrumented + """ + if package_name in UTILITY_DEPENDENCIES: + utilities_needed = UTILITY_DEPENDENCIES[package_name] + logger.debug( + f"_instrument_utility_dependencies: Package '{package_name}' requires utilities: {utilities_needed}" + ) + + for utility_name in utilities_needed: + if utility_name in UTILITY_INSTRUMENTORS and not _is_package_instrumented(utility_name): + logger.info(f"AgentOps: Instrumenting utility '{utility_name}' required by '{package_name}'") + + # Check if the utility module is available + if utility_name in sys.modules: + _perform_instrumentation(utility_name) + else: + logger.debug( + f"_instrument_utility_dependencies: Utility '{utility_name}' not yet imported, will instrument when imported" + ) + + def _perform_instrumentation(package_name: str): """Helper function to perform instrumentation for a given package.""" global _instrumenting_packages, _active_instrumentors, _has_agentic_library + + # Check if we're already instrumenting this package (prevent circular instrumentation) + if package_name in _instrumenting_packages: + logger.debug( + f"_perform_instrumentation: Already instrumenting '{package_name}', skipping to prevent circular instrumentation" + ) + return + if not _should_instrument_package(package_name): return @@ -318,47 +371,81 @@ def _perform_instrumentation(package_name: str): config = PROVIDERS.get(package_name) or AGENTIC_LIBRARIES.get(package_name) or UTILITY_INSTRUMENTORS[package_name] loader = InstrumentorLoader(**config) - # instrument_one already checks loader.should_activate - instrumentor_instance = instrument_one(loader) - if instrumentor_instance is not None: - # Check if it was *actually* instrumented by instrument_one by seeing if the instrument method was called successfully. - # This relies on instrument_one returning None if its internal .instrument() call failed (if we revert that, this needs adjustment) - # For now, assuming instrument_one returns instance only on full success. - # User request was to return instrumentor even if .instrument() fails. So, we check if _agentops_instrumented_package_key was set by us. - - # Let's assume instrument_one might return an instance whose .instrument() failed. - # The key is set before _active_instrumentors.append, so if it's already there and matches, it means it's a re-attempt on the same package. - # The _is_package_instrumented check at the start of _should_instrument_package should prevent most re-entry for the same package_name. - - # Store the package key this instrumentor is for, to aid _is_package_instrumented - instrumentor_instance._agentops_instrumented_package_key = package_name - - # Add to active_instrumentors only if it's not a duplicate in terms of package_key being instrumented - # This is a safeguard, _is_package_instrumented should catch this earlier. - is_newly_added = True - for existing_inst in _active_instrumentors: + # Add to _instrumenting_packages to prevent circular instrumentation + _instrumenting_packages.add(package_name) + + try: + # instrument_one already checks loader.should_activate + instrumentor_instance = instrument_one(loader) + if instrumentor_instance is not None: + # Check if it was *actually* instrumented by instrument_one by seeing if the instrument method was called successfully. + # This relies on instrument_one returning None if its internal .instrument() call failed (if we revert that, this needs adjustment) + # For now, assuming instrument_one returns instance only on full success. + # User request was to return instrumentor even if .instrument() fails. So, we check if _agentops_instrumented_package_key was set by us. + + # Let's assume instrument_one might return an instance whose .instrument() failed. + # The key is set before _active_instrumentors.append, so if it's already there and matches, it means it's a re-attempt on the same package. + # The _is_package_instrumented check at the start of _should_instrument_package should prevent most re-entry for the same package_name. + + # Store the package key this instrumentor is for, to aid _is_package_instrumented + instrumentor_instance._agentops_instrumented_package_key = package_name + + # Add to active_instrumentors only if it's not a duplicate in terms of package_key being instrumented + # This is a safeguard, _is_package_instrumented should catch this earlier. + is_newly_added = True + for existing_inst in _active_instrumentors: + if ( + hasattr(existing_inst, "_agentops_instrumented_package_key") + and existing_inst._agentops_instrumented_package_key == package_name + ): + is_newly_added = False + logger.debug( + f"_perform_instrumentation: Instrumentor for '{package_name}' already in _active_instrumentors. Not adding again." + ) + break + if is_newly_added: + _active_instrumentors.append(instrumentor_instance) + + # If this was an agentic library AND it's newly effectively instrumented. if ( - hasattr(existing_inst, "_agentops_instrumented_package_key") - and existing_inst._agentops_instrumented_package_key == package_name - ): - is_newly_added = False - logger.debug( - f"_perform_instrumentation: Instrumentor for '{package_name}' already in _active_instrumentors. Not adding again." - ) - break - if is_newly_added: - _active_instrumentors.append(instrumentor_instance) + package_name in AGENTIC_LIBRARIES and not _has_agentic_library + ): # Check _has_agentic_library to ensure this is the *first* one. + # _uninstrument_providers() was already called in _should_instrument_package for the first agentic library. + _has_agentic_library = True + + # Mark package for utility dependency instrumentation + # We defer this to avoid circular imports during package initialization + if package_name not in UTILITY_INSTRUMENTORS and is_newly_added: # Don't recursively instrument utilities + if package_name in UTILITY_DEPENDENCIES: + _pending_utility_instrumentation.add(package_name) + logger.debug( + f"_perform_instrumentation: Marked '{package_name}' for deferred utility instrumentation" + ) + else: + logger.debug( + f"_perform_instrumentation: instrument_one for '{package_name}' returned None. Not added to active instrumentors." + ) + finally: + # Always remove from _instrumenting_packages when done + _instrumenting_packages.discard(package_name) - # If this was an agentic library AND it's newly effectively instrumented. - if ( - package_name in AGENTIC_LIBRARIES and not _has_agentic_library - ): # Check _has_agentic_library to ensure this is the *first* one. - # _uninstrument_providers() was already called in _should_instrument_package for the first agentic library. - _has_agentic_library = True - else: - logger.debug( - f"_perform_instrumentation: instrument_one for '{package_name}' returned None. Not added to active instrumentors." - ) + +def _process_pending_utility_instrumentation(): + """Process any pending utility instrumentations.""" + global _pending_utility_instrumentation + + if not _pending_utility_instrumentation: + return + + # Copy and clear to avoid modifying during iteration + pending = _pending_utility_instrumentation.copy() + _pending_utility_instrumentation.clear() + + for package_name in pending: + try: + _instrument_utility_dependencies(package_name) + except Exception as e: + logger.debug(f"Error instrumenting utility dependencies for {package_name}: {e}") def _import_monitor(name: str, globals_dict=None, locals_dict=None, fromlist=(), level=0): @@ -368,6 +455,9 @@ def _import_monitor(name: str, globals_dict=None, locals_dict=None, fromlist=(), """ global _instrumenting_packages, _has_agentic_library + # Process any pending utility instrumentations before handling new imports + _process_pending_utility_instrumentation() + # If an agentic library is already instrumented, skip all further instrumentation if _has_agentic_library: return _original_builtins_import(name, globals_dict, locals_dict, fromlist, level) @@ -408,7 +498,7 @@ def _import_monitor(name: str, globals_dict=None, locals_dict=None, fromlist=(), # Instrument all matching packages for package_to_check in packages_to_check: - if package_to_check not in _instrumenting_packages and not _is_package_instrumented(package_to_check): + if not _is_package_instrumented(package_to_check): target_module_obj = sys.modules.get(package_to_check) if target_module_obj: @@ -423,7 +513,6 @@ def _import_monitor(name: str, globals_dict=None, locals_dict=None, fromlist=(), f"_import_monitor: No module object found in sys.modules for '{package_to_check}', proceeding with SDK instrumentation attempt." ) - _instrumenting_packages.add(package_to_check) try: _perform_instrumentation(package_to_check) # If we just instrumented an agentic library, stop @@ -431,8 +520,6 @@ def _import_monitor(name: str, globals_dict=None, locals_dict=None, fromlist=(), break except Exception as e: logger.error(f"Error instrumenting {package_to_check}: {str(e)}") - finally: - _instrumenting_packages.discard(package_to_check) return module @@ -540,11 +627,7 @@ def instrument_all(): package_to_check = target break - if ( - package_to_check - and package_to_check not in _instrumenting_packages - and not _is_package_instrumented(package_to_check) - ): + if package_to_check and not _is_package_instrumented(package_to_check): target_module_obj = sys.modules.get(package_to_check) if target_module_obj: @@ -556,13 +639,10 @@ def instrument_all(): f"instrument_all: No module object found for '{package_to_check}' in sys.modules during startup scan. Proceeding cautiously." ) - _instrumenting_packages.add(package_to_check) try: _perform_instrumentation(package_to_check) except Exception as e: logger.error(f"Error instrumenting {package_to_check}: {str(e)}") - finally: - _instrumenting_packages.discard(package_to_check) def uninstrument_all(): diff --git a/agentops/instrumentation/utilities/concurrent_futures/instrumentation.py b/agentops/instrumentation/utilities/concurrent_futures/instrumentation.py index 594cd0420..0cd99ce6b 100644 --- a/agentops/instrumentation/utilities/concurrent_futures/instrumentation.py +++ b/agentops/instrumentation/utilities/concurrent_futures/instrumentation.py @@ -40,7 +40,6 @@ def wrapped_init( def context_aware_initializer() -> None: """Initializer that sets up the captured context in each worker thread.""" - logger.debug("[ConcurrentFuturesInstrumentor] Setting up context in worker thread") # Set the main context variables in this thread for var, value in main_context.items(): @@ -60,8 +59,6 @@ def context_aware_initializer() -> None: logger.error(f"[ConcurrentFuturesInstrumentor] Error in user initializer: {e}") raise - logger.debug("[ConcurrentFuturesInstrumentor] Worker thread context setup complete") - # Create executor with context-aware initializer prefix = f"AgentOps-{thread_name_prefix}" if thread_name_prefix else "AgentOps-Thread" @@ -74,8 +71,6 @@ def context_aware_initializer() -> None: initargs=(), # We handle initargs in our wrapper ) - logger.debug("[ConcurrentFuturesInstrumentor] ThreadPoolExecutor initialized with context propagation") - return wrapped_init @@ -85,8 +80,7 @@ def _context_propagating_submit(original_submit: Callable) -> Callable: @functools.wraps(original_submit) def wrapped_submit(self: ThreadPoolExecutor, func: Callable[..., R], *args: Any, **kwargs: Any) -> Future[R]: # Log the submission - func_name = getattr(func, "__name__", str(func)) - logger.debug(f"[ConcurrentFuturesInstrumentor] Submitting function: {func_name}") + func_name = getattr(func, "__name__", str(func)) # noqa: F841 # The context propagation is handled by the initializer, so we can submit normally # But we can add additional logging or monitoring here if needed