Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
242 changes: 78 additions & 164 deletions agentops/instrumentation/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,24 +75,6 @@
},
}

# Configuration for utility instrumentors
UTILITY_INSTRUMENTORS: dict[str, InstrumentorConfig] = {
"concurrent.futures": {
"module_name": "agentops.instrumentation.utilities.concurrent_futures",
"class_name": "ConcurrentFuturesInstrumentor",
"min_version": "3.7.0", # Python 3.7+ (concurrent.futures is stdlib)
"package_name": "python", # Special case for stdlib modules
},
}

# Define which packages require which utility instrumentors
# This maps package names to the list of utility instrumentors they depend on
UTILITY_DEPENDENCIES: dict[str, list[str]] = {
"mem0": ["concurrent.futures"], # mem0 uses concurrent.futures for parallel processing
# Add more dependencies as needed in the future
# "langchain": ["concurrent.futures", "asyncio"],
}

# Configuration for supported agentic libraries
AGENTIC_LIBRARIES: dict[str, InstrumentorConfig] = {
"crewai": {
Expand Down Expand Up @@ -133,7 +115,7 @@
}

# Combine all target packages for monitoring
TARGET_PACKAGES = set(PROVIDERS.keys()) | set(AGENTIC_LIBRARIES.keys()) | set(UTILITY_INSTRUMENTORS.keys())
TARGET_PACKAGES = set(PROVIDERS.keys()) | set(AGENTIC_LIBRARIES.keys())

# Create a single instance of the manager
# _manager = InstrumentationManager() # Removed
Expand All @@ -143,7 +125,6 @@
_original_builtins_import = builtins.__import__ # Store original import
_instrumenting_packages: Set[str] = set()
_has_agentic_library: bool = False
_pending_utility_instrumentation: Set[str] = set() # Track packages that need utility instrumentation


# New helper function to check module origin
Expand All @@ -153,16 +134,6 @@
rather than a local module, especially when names might collide.
`package_name_key` is the key from TARGET_PACKAGES (e.g., 'agents', 'google.adk').
"""
# Special case for stdlib modules (marked with package_name="python" in UTILITY_INSTRUMENTORS)
if (
package_name_key in UTILITY_INSTRUMENTORS
and UTILITY_INSTRUMENTORS[package_name_key].get("package_name") == "python"
):
logger.debug(
f"_is_installed_package: Module '{package_name_key}' is a Python standard library module. Considering it an installed package."
)
return True

if not hasattr(module_obj, "__file__") or not module_obj.__file__:
logger.debug(
f"_is_installed_package: Module '{package_name_key}' has no __file__, assuming it might be an SDK namespace package. Returning True."
Expand Down Expand Up @@ -255,7 +226,7 @@
def _should_instrument_package(package_name: str) -> bool:
"""
Determine if a package should be instrumented based on current state.
Handles special cases for agentic libraries, providers, and utility instrumentors.
Handles special cases for agentic libraries and providers.
"""
global _has_agentic_library

Expand All @@ -264,22 +235,6 @@
logger.debug(f"_should_instrument_package: '{package_name}' already instrumented by AgentOps. Skipping.")
return False

# Utility instrumentors should only be instrumented when their dependent packages are active
if package_name in UTILITY_INSTRUMENTORS:
# Check if any package that depends on this utility is instrumented
for dependent_package, utilities in UTILITY_DEPENDENCIES.items():
if package_name in utilities and _is_package_instrumented(dependent_package):
logger.debug(
f"_should_instrument_package: '{package_name}' is a utility instrumentor needed by '{dependent_package}'. Allowing."
)
return True

logger.debug(
f"_should_instrument_package: '{package_name}' is a utility instrumentor but no dependent packages are active. Skipping."
)
return False

# Only apply agentic/provider logic if it's NOT a utility instrumentor
is_target_agentic = package_name in AGENTIC_LIBRARIES
is_target_provider = package_name in PROVIDERS

Expand Down Expand Up @@ -321,136 +276,88 @@
return False


def _instrument_utility_dependencies(package_name: str):
"""
Instrument any utility dependencies required by the given package.

Args:
package_name: The package that was just instrumented
"""
if package_name in UTILITY_DEPENDENCIES:
utilities_needed = UTILITY_DEPENDENCIES[package_name]
logger.debug(
f"_instrument_utility_dependencies: Package '{package_name}' requires utilities: {utilities_needed}"
)

for utility_name in utilities_needed:
if utility_name in UTILITY_INSTRUMENTORS and not _is_package_instrumented(utility_name):
logger.info(f"AgentOps: Instrumenting utility '{utility_name}' required by '{package_name}'")

# Check if the utility module is available
if utility_name in sys.modules:
_perform_instrumentation(utility_name)
else:
logger.debug(
f"_instrument_utility_dependencies: Utility '{utility_name}' not yet imported, will instrument when imported"
)


def _perform_instrumentation(package_name: str):
"""Helper function to perform instrumentation for a given package."""
global _instrumenting_packages, _active_instrumentors, _has_agentic_library

# Check if we're already instrumenting this package (prevent circular instrumentation)
if package_name in _instrumenting_packages:
logger.debug(
f"_perform_instrumentation: Already instrumenting '{package_name}', skipping to prevent circular instrumentation"
)
return

if not _should_instrument_package(package_name):
return

# Get the appropriate configuration for the package
# Ensure package_name is a key in either PROVIDERS, AGENTIC_LIBRARIES, or UTILITY_INSTRUMENTORS
if (
package_name not in PROVIDERS
and package_name not in AGENTIC_LIBRARIES
and package_name not in UTILITY_INSTRUMENTORS
):
# Ensure package_name is a key in either PROVIDERS or AGENTIC_LIBRARIES
if package_name not in PROVIDERS and package_name not in AGENTIC_LIBRARIES:
logger.debug(
f"_perform_instrumentation: Package '{package_name}' not found in PROVIDERS, AGENTIC_LIBRARIES, or UTILITY_INSTRUMENTORS. Skipping."
f"_perform_instrumentation: Package '{package_name}' not found in PROVIDERS or AGENTIC_LIBRARIES. Skipping."
)
return

config = PROVIDERS.get(package_name) or AGENTIC_LIBRARIES.get(package_name) or UTILITY_INSTRUMENTORS[package_name]
config = PROVIDERS.get(package_name) or AGENTIC_LIBRARIES.get(package_name)
loader = InstrumentorLoader(**config)

# Add to _instrumenting_packages to prevent circular instrumentation
_instrumenting_packages.add(package_name)

try:
# instrument_one already checks loader.should_activate
instrumentor_instance = instrument_one(loader)
if instrumentor_instance is not None:
# Check if it was *actually* instrumented by instrument_one by seeing if the instrument method was called successfully.
# This relies on instrument_one returning None if its internal .instrument() call failed (if we revert that, this needs adjustment)
# For now, assuming instrument_one returns instance only on full success.
# User request was to return instrumentor even if .instrument() fails. So, we check if _agentops_instrumented_package_key was set by us.

# Let's assume instrument_one might return an instance whose .instrument() failed.
# The key is set before _active_instrumentors.append, so if it's already there and matches, it means it's a re-attempt on the same package.
# The _is_package_instrumented check at the start of _should_instrument_package should prevent most re-entry for the same package_name.

# Store the package key this instrumentor is for, to aid _is_package_instrumented
instrumentor_instance._agentops_instrumented_package_key = package_name

# Add to active_instrumentors only if it's not a duplicate in terms of package_key being instrumented
# This is a safeguard, _is_package_instrumented should catch this earlier.
is_newly_added = True
for existing_inst in _active_instrumentors:
if (
hasattr(existing_inst, "_agentops_instrumented_package_key")
and existing_inst._agentops_instrumented_package_key == package_name
):
is_newly_added = False
logger.debug(
f"_perform_instrumentation: Instrumentor for '{package_name}' already in _active_instrumentors. Not adding again."
)
break
if is_newly_added:
_active_instrumentors.append(instrumentor_instance)

# If this was an agentic library AND it's newly effectively instrumented.
# instrument_one already checks loader.should_activate
instrumentor_instance = instrument_one(loader)
if instrumentor_instance is not None:
# Check if it was *actually* instrumented by instrument_one by seeing if the instrument method was called successfully.
# This relies on instrument_one returning None if its internal .instrument() call failed (if we revert that, this needs adjustment)
# For now, assuming instrument_one returns instance only on full success.
# User request was to return instrumentor even if .instrument() fails. So, we check if _agentops_instrumented_package_key was set by us.

# Let's assume instrument_one might return an instance whose .instrument() failed.
# The key is set before _active_instrumentors.append, so if it's already there and matches, it means it's a re-attempt on the same package.
# The _is_package_instrumented check at the start of _should_instrument_package should prevent most re-entry for the same package_name.

# Store the package key this instrumentor is for, to aid _is_package_instrumented
instrumentor_instance._agentops_instrumented_package_key = package_name

# Add to active_instrumentors only if it's not a duplicate in terms of package_key being instrumented
# This is a safeguard, _is_package_instrumented should catch this earlier.
is_newly_added = True
for existing_inst in _active_instrumentors:
if (
package_name in AGENTIC_LIBRARIES and not _has_agentic_library
): # Check _has_agentic_library to ensure this is the *first* one.
# _uninstrument_providers() was already called in _should_instrument_package for the first agentic library.
_has_agentic_library = True

# Mark package for utility dependency instrumentation
# We defer this to avoid circular imports during package initialization
if package_name not in UTILITY_INSTRUMENTORS and is_newly_added: # Don't recursively instrument utilities
if package_name in UTILITY_DEPENDENCIES:
_pending_utility_instrumentation.add(package_name)
logger.debug(
f"_perform_instrumentation: Marked '{package_name}' for deferred utility instrumentation"
)
else:
logger.debug(
f"_perform_instrumentation: instrument_one for '{package_name}' returned None. Not added to active instrumentors."
)
finally:
# Always remove from _instrumenting_packages when done
_instrumenting_packages.discard(package_name)

hasattr(existing_inst, "_agentops_instrumented_package_key")
and existing_inst._agentops_instrumented_package_key == package_name
):
is_newly_added = False
logger.debug(

Check warning on line 320 in agentops/instrumentation/__init__.py

View check run for this annotation

Codecov / codecov/patch

agentops/instrumentation/__init__.py#L319-L320

Added lines #L319 - L320 were not covered by tests
f"_perform_instrumentation: Instrumentor for '{package_name}' already in _active_instrumentors. Not adding again."
)
break

Check warning on line 323 in agentops/instrumentation/__init__.py

View check run for this annotation

Codecov / codecov/patch

agentops/instrumentation/__init__.py#L323

Added line #L323 was not covered by tests
if is_newly_added:
_active_instrumentors.append(instrumentor_instance)

def _process_pending_utility_instrumentation():
"""Process any pending utility instrumentations."""
global _pending_utility_instrumentation
# If this was an agentic library AND it's newly effectively instrumented.
if (
package_name in AGENTIC_LIBRARIES and not _has_agentic_library
): # Check _has_agentic_library to ensure this is the *first* one.
# _uninstrument_providers() was already called in _should_instrument_package for the first agentic library.
_has_agentic_library = True

if not _pending_utility_instrumentation:
return
# Special case: If mem0 is instrumented, also instrument concurrent.futures
if package_name == "mem0" and is_newly_added:
try:

Check warning on line 336 in agentops/instrumentation/__init__.py

View check run for this annotation

Codecov / codecov/patch

agentops/instrumentation/__init__.py#L336

Added line #L336 was not covered by tests
# Check if concurrent.futures module is available

# Create config for concurrent.futures instrumentor
concurrent_config = InstrumentorConfig(

Check warning on line 340 in agentops/instrumentation/__init__.py

View check run for this annotation

Codecov / codecov/patch

agentops/instrumentation/__init__.py#L340

Added line #L340 was not covered by tests
module_name="agentops.instrumentation.utilities.concurrent_futures",
class_name="ConcurrentFuturesInstrumentor",
min_version="3.7.0", # Python 3.7+ (concurrent.futures is stdlib)
package_name="python", # Special case for stdlib modules
)

# Copy and clear to avoid modifying during iteration
pending = _pending_utility_instrumentation.copy()
_pending_utility_instrumentation.clear()
# Create and instrument concurrent.futures
concurrent_loader = InstrumentorLoader(**concurrent_config)
concurrent_instrumentor = instrument_one(concurrent_loader)

Check warning on line 349 in agentops/instrumentation/__init__.py

View check run for this annotation

Codecov / codecov/patch

agentops/instrumentation/__init__.py#L348-L349

Added lines #L348 - L349 were not covered by tests

for package_name in pending:
try:
_instrument_utility_dependencies(package_name)
except Exception as e:
logger.debug(f"Error instrumenting utility dependencies for {package_name}: {e}")
if concurrent_instrumentor is not None:
concurrent_instrumentor._agentops_instrumented_package_key = "concurrent.futures"
_active_instrumentors.append(concurrent_instrumentor)
logger.info("AgentOps: Instrumented concurrent.futures as a dependency of mem0.")
except Exception as e:
logger.debug(f"Could not instrument concurrent.futures for mem0: {e}")

Check warning on line 356 in agentops/instrumentation/__init__.py

View check run for this annotation

Codecov / codecov/patch

agentops/instrumentation/__init__.py#L351-L356

Added lines #L351 - L356 were not covered by tests
else:
logger.debug(

Check warning on line 358 in agentops/instrumentation/__init__.py

View check run for this annotation

Codecov / codecov/patch

agentops/instrumentation/__init__.py#L358

Added line #L358 was not covered by tests
f"_perform_instrumentation: instrument_one for '{package_name}' returned None. Not added to active instrumentors."
)


def _import_monitor(name: str, globals_dict=None, locals_dict=None, fromlist=(), level=0):
Expand All @@ -460,9 +367,6 @@
"""
global _instrumenting_packages, _has_agentic_library

# Process any pending utility instrumentations before handling new imports
_process_pending_utility_instrumentation()

# If an agentic library is already instrumented, skip all further instrumentation
if _has_agentic_library:
return _original_builtins_import(name, globals_dict, locals_dict, fromlist, level)
Expand Down Expand Up @@ -503,7 +407,7 @@

# Instrument all matching packages
for package_to_check in packages_to_check:
if not _is_package_instrumented(package_to_check):
if package_to_check not in _instrumenting_packages and not _is_package_instrumented(package_to_check):
target_module_obj = sys.modules.get(package_to_check)

if target_module_obj:
Expand All @@ -518,13 +422,16 @@
f"_import_monitor: No module object found in sys.modules for '{package_to_check}', proceeding with SDK instrumentation attempt."
)

_instrumenting_packages.add(package_to_check)

Check warning on line 425 in agentops/instrumentation/__init__.py

View check run for this annotation

Codecov / codecov/patch

agentops/instrumentation/__init__.py#L425

Added line #L425 was not covered by tests
try:
_perform_instrumentation(package_to_check)
# If we just instrumented an agentic library, stop
if _has_agentic_library:
break
except Exception as e:
logger.error(f"Error instrumenting {package_to_check}: {str(e)}")
finally:
_instrumenting_packages.discard(package_to_check)

Check warning on line 434 in agentops/instrumentation/__init__.py

View check run for this annotation

Codecov / codecov/patch

agentops/instrumentation/__init__.py#L434

Added line #L434 was not covered by tests

return module

Expand Down Expand Up @@ -632,7 +539,11 @@
package_to_check = target
break

if package_to_check and not _is_package_instrumented(package_to_check):
if (
package_to_check
and package_to_check not in _instrumenting_packages
and not _is_package_instrumented(package_to_check)
):
target_module_obj = sys.modules.get(package_to_check)

if target_module_obj:
Expand All @@ -644,10 +555,13 @@
f"instrument_all: No module object found for '{package_to_check}' in sys.modules during startup scan. Proceeding cautiously."
)

_instrumenting_packages.add(package_to_check)
try:
_perform_instrumentation(package_to_check)
except Exception as e:
logger.error(f"Error instrumenting {package_to_check}: {str(e)}")
finally:
_instrumenting_packages.discard(package_to_check)


def uninstrument_all():
Expand Down
Loading