Skip to content

Commit dc56fb2

Browse files
fenilfalduDwij1704
andauthored
Concurrent fix (#1113)
* feat: Introduce utility dependencies management for improved instrumentation handling * refactor concurrent.futures instrumentation * code cleanup --------- Co-authored-by: Dwij <[email protected]>
1 parent 63a9545 commit dc56fb2

File tree

2 files changed

+135
-61
lines changed

2 files changed

+135
-61
lines changed

agentops/instrumentation/__init__.py

Lines changed: 134 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,14 @@ class InstrumentorConfig(TypedDict):
8585
},
8686
}
8787

88+
# Define which packages require which utility instrumentors
89+
# This maps package names to the list of utility instrumentors they depend on
90+
UTILITY_DEPENDENCIES: dict[str, list[str]] = {
91+
"mem0": ["concurrent.futures"], # mem0 uses concurrent.futures for parallel processing
92+
# Add more dependencies as needed in the future
93+
# "langchain": ["concurrent.futures", "asyncio"],
94+
}
95+
8896
# Configuration for supported agentic libraries
8997
AGENTIC_LIBRARIES: dict[str, InstrumentorConfig] = {
9098
"crewai": {
@@ -130,6 +138,7 @@ class InstrumentorConfig(TypedDict):
130138
_original_builtins_import = builtins.__import__ # Store original import
131139
_instrumenting_packages: Set[str] = set()
132140
_has_agentic_library: bool = False
141+
_pending_utility_instrumentation: Set[str] = set() # Track packages that need utility instrumentation
133142

134143

135144
# New helper function to check module origin
@@ -250,10 +259,20 @@ def _should_instrument_package(package_name: str) -> bool:
250259
logger.debug(f"_should_instrument_package: '{package_name}' already instrumented by AgentOps. Skipping.")
251260
return False
252261

253-
# Utility instrumentors should always be instrumented regardless of agentic library state
262+
# Utility instrumentors should only be instrumented when their dependent packages are active
254263
if package_name in UTILITY_INSTRUMENTORS:
255-
logger.debug(f"_should_instrument_package: '{package_name}' is a utility instrumentor. Always allowing.")
256-
return True
264+
# Check if any package that depends on this utility is instrumented
265+
for dependent_package, utilities in UTILITY_DEPENDENCIES.items():
266+
if package_name in utilities and _is_package_instrumented(dependent_package):
267+
logger.debug(
268+
f"_should_instrument_package: '{package_name}' is a utility instrumentor needed by '{dependent_package}'. Allowing."
269+
)
270+
return True
271+
272+
logger.debug(
273+
f"_should_instrument_package: '{package_name}' is a utility instrumentor but no dependent packages are active. Skipping."
274+
)
275+
return False
257276

258277
# Only apply agentic/provider logic if it's NOT a utility instrumentor
259278
is_target_agentic = package_name in AGENTIC_LIBRARIES
@@ -297,9 +316,43 @@ def _should_instrument_package(package_name: str) -> bool:
297316
return False
298317

299318

319+
def _instrument_utility_dependencies(package_name: str):
320+
"""
321+
Instrument any utility dependencies required by the given package.
322+
323+
Args:
324+
package_name: The package that was just instrumented
325+
"""
326+
if package_name in UTILITY_DEPENDENCIES:
327+
utilities_needed = UTILITY_DEPENDENCIES[package_name]
328+
logger.debug(
329+
f"_instrument_utility_dependencies: Package '{package_name}' requires utilities: {utilities_needed}"
330+
)
331+
332+
for utility_name in utilities_needed:
333+
if utility_name in UTILITY_INSTRUMENTORS and not _is_package_instrumented(utility_name):
334+
logger.info(f"AgentOps: Instrumenting utility '{utility_name}' required by '{package_name}'")
335+
336+
# Check if the utility module is available
337+
if utility_name in sys.modules:
338+
_perform_instrumentation(utility_name)
339+
else:
340+
logger.debug(
341+
f"_instrument_utility_dependencies: Utility '{utility_name}' not yet imported, will instrument when imported"
342+
)
343+
344+
300345
def _perform_instrumentation(package_name: str):
301346
"""Helper function to perform instrumentation for a given package."""
302347
global _instrumenting_packages, _active_instrumentors, _has_agentic_library
348+
349+
# Check if we're already instrumenting this package (prevent circular instrumentation)
350+
if package_name in _instrumenting_packages:
351+
logger.debug(
352+
f"_perform_instrumentation: Already instrumenting '{package_name}', skipping to prevent circular instrumentation"
353+
)
354+
return
355+
303356
if not _should_instrument_package(package_name):
304357
return
305358

@@ -318,47 +371,81 @@ def _perform_instrumentation(package_name: str):
318371
config = PROVIDERS.get(package_name) or AGENTIC_LIBRARIES.get(package_name) or UTILITY_INSTRUMENTORS[package_name]
319372
loader = InstrumentorLoader(**config)
320373

321-
# instrument_one already checks loader.should_activate
322-
instrumentor_instance = instrument_one(loader)
323-
if instrumentor_instance is not None:
324-
# Check if it was *actually* instrumented by instrument_one by seeing if the instrument method was called successfully.
325-
# This relies on instrument_one returning None if its internal .instrument() call failed (if we revert that, this needs adjustment)
326-
# For now, assuming instrument_one returns instance only on full success.
327-
# User request was to return instrumentor even if .instrument() fails. So, we check if _agentops_instrumented_package_key was set by us.
328-
329-
# Let's assume instrument_one might return an instance whose .instrument() failed.
330-
# The key is set before _active_instrumentors.append, so if it's already there and matches, it means it's a re-attempt on the same package.
331-
# The _is_package_instrumented check at the start of _should_instrument_package should prevent most re-entry for the same package_name.
332-
333-
# Store the package key this instrumentor is for, to aid _is_package_instrumented
334-
instrumentor_instance._agentops_instrumented_package_key = package_name
335-
336-
# Add to active_instrumentors only if it's not a duplicate in terms of package_key being instrumented
337-
# This is a safeguard, _is_package_instrumented should catch this earlier.
338-
is_newly_added = True
339-
for existing_inst in _active_instrumentors:
374+
# Add to _instrumenting_packages to prevent circular instrumentation
375+
_instrumenting_packages.add(package_name)
376+
377+
try:
378+
# instrument_one already checks loader.should_activate
379+
instrumentor_instance = instrument_one(loader)
380+
if instrumentor_instance is not None:
381+
# Check if it was *actually* instrumented by instrument_one by seeing if the instrument method was called successfully.
382+
# This relies on instrument_one returning None if its internal .instrument() call failed (if we revert that, this needs adjustment)
383+
# For now, assuming instrument_one returns instance only on full success.
384+
# User request was to return instrumentor even if .instrument() fails. So, we check if _agentops_instrumented_package_key was set by us.
385+
386+
# Let's assume instrument_one might return an instance whose .instrument() failed.
387+
# The key is set before _active_instrumentors.append, so if it's already there and matches, it means it's a re-attempt on the same package.
388+
# The _is_package_instrumented check at the start of _should_instrument_package should prevent most re-entry for the same package_name.
389+
390+
# Store the package key this instrumentor is for, to aid _is_package_instrumented
391+
instrumentor_instance._agentops_instrumented_package_key = package_name
392+
393+
# Add to active_instrumentors only if it's not a duplicate in terms of package_key being instrumented
394+
# This is a safeguard, _is_package_instrumented should catch this earlier.
395+
is_newly_added = True
396+
for existing_inst in _active_instrumentors:
397+
if (
398+
hasattr(existing_inst, "_agentops_instrumented_package_key")
399+
and existing_inst._agentops_instrumented_package_key == package_name
400+
):
401+
is_newly_added = False
402+
logger.debug(
403+
f"_perform_instrumentation: Instrumentor for '{package_name}' already in _active_instrumentors. Not adding again."
404+
)
405+
break
406+
if is_newly_added:
407+
_active_instrumentors.append(instrumentor_instance)
408+
409+
# If this was an agentic library AND it's newly effectively instrumented.
340410
if (
341-
hasattr(existing_inst, "_agentops_instrumented_package_key")
342-
and existing_inst._agentops_instrumented_package_key == package_name
343-
):
344-
is_newly_added = False
345-
logger.debug(
346-
f"_perform_instrumentation: Instrumentor for '{package_name}' already in _active_instrumentors. Not adding again."
347-
)
348-
break
349-
if is_newly_added:
350-
_active_instrumentors.append(instrumentor_instance)
411+
package_name in AGENTIC_LIBRARIES and not _has_agentic_library
412+
): # Check _has_agentic_library to ensure this is the *first* one.
413+
# _uninstrument_providers() was already called in _should_instrument_package for the first agentic library.
414+
_has_agentic_library = True
415+
416+
# Mark package for utility dependency instrumentation
417+
# We defer this to avoid circular imports during package initialization
418+
if package_name not in UTILITY_INSTRUMENTORS and is_newly_added: # Don't recursively instrument utilities
419+
if package_name in UTILITY_DEPENDENCIES:
420+
_pending_utility_instrumentation.add(package_name)
421+
logger.debug(
422+
f"_perform_instrumentation: Marked '{package_name}' for deferred utility instrumentation"
423+
)
424+
else:
425+
logger.debug(
426+
f"_perform_instrumentation: instrument_one for '{package_name}' returned None. Not added to active instrumentors."
427+
)
428+
finally:
429+
# Always remove from _instrumenting_packages when done
430+
_instrumenting_packages.discard(package_name)
351431

352-
# If this was an agentic library AND it's newly effectively instrumented.
353-
if (
354-
package_name in AGENTIC_LIBRARIES and not _has_agentic_library
355-
): # Check _has_agentic_library to ensure this is the *first* one.
356-
# _uninstrument_providers() was already called in _should_instrument_package for the first agentic library.
357-
_has_agentic_library = True
358-
else:
359-
logger.debug(
360-
f"_perform_instrumentation: instrument_one for '{package_name}' returned None. Not added to active instrumentors."
361-
)
432+
433+
def _process_pending_utility_instrumentation():
434+
"""Process any pending utility instrumentations."""
435+
global _pending_utility_instrumentation
436+
437+
if not _pending_utility_instrumentation:
438+
return
439+
440+
# Copy and clear to avoid modifying during iteration
441+
pending = _pending_utility_instrumentation.copy()
442+
_pending_utility_instrumentation.clear()
443+
444+
for package_name in pending:
445+
try:
446+
_instrument_utility_dependencies(package_name)
447+
except Exception as e:
448+
logger.debug(f"Error instrumenting utility dependencies for {package_name}: {e}")
362449

363450

364451
def _import_monitor(name: str, globals_dict=None, locals_dict=None, fromlist=(), level=0):
@@ -368,6 +455,9 @@ def _import_monitor(name: str, globals_dict=None, locals_dict=None, fromlist=(),
368455
"""
369456
global _instrumenting_packages, _has_agentic_library
370457

458+
# Process any pending utility instrumentations before handling new imports
459+
_process_pending_utility_instrumentation()
460+
371461
# If an agentic library is already instrumented, skip all further instrumentation
372462
if _has_agentic_library:
373463
return _original_builtins_import(name, globals_dict, locals_dict, fromlist, level)
@@ -408,7 +498,7 @@ def _import_monitor(name: str, globals_dict=None, locals_dict=None, fromlist=(),
408498

409499
# Instrument all matching packages
410500
for package_to_check in packages_to_check:
411-
if package_to_check not in _instrumenting_packages and not _is_package_instrumented(package_to_check):
501+
if not _is_package_instrumented(package_to_check):
412502
target_module_obj = sys.modules.get(package_to_check)
413503

414504
if target_module_obj:
@@ -423,16 +513,13 @@ def _import_monitor(name: str, globals_dict=None, locals_dict=None, fromlist=(),
423513
f"_import_monitor: No module object found in sys.modules for '{package_to_check}', proceeding with SDK instrumentation attempt."
424514
)
425515

426-
_instrumenting_packages.add(package_to_check)
427516
try:
428517
_perform_instrumentation(package_to_check)
429518
# If we just instrumented an agentic library, stop
430519
if _has_agentic_library:
431520
break
432521
except Exception as e:
433522
logger.error(f"Error instrumenting {package_to_check}: {str(e)}")
434-
finally:
435-
_instrumenting_packages.discard(package_to_check)
436523

437524
return module
438525

@@ -540,11 +627,7 @@ def instrument_all():
540627
package_to_check = target
541628
break
542629

543-
if (
544-
package_to_check
545-
and package_to_check not in _instrumenting_packages
546-
and not _is_package_instrumented(package_to_check)
547-
):
630+
if package_to_check and not _is_package_instrumented(package_to_check):
548631
target_module_obj = sys.modules.get(package_to_check)
549632

550633
if target_module_obj:
@@ -556,13 +639,10 @@ def instrument_all():
556639
f"instrument_all: No module object found for '{package_to_check}' in sys.modules during startup scan. Proceeding cautiously."
557640
)
558641

559-
_instrumenting_packages.add(package_to_check)
560642
try:
561643
_perform_instrumentation(package_to_check)
562644
except Exception as e:
563645
logger.error(f"Error instrumenting {package_to_check}: {str(e)}")
564-
finally:
565-
_instrumenting_packages.discard(package_to_check)
566646

567647

568648
def uninstrument_all():

agentops/instrumentation/utilities/concurrent_futures/instrumentation.py

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@ def wrapped_init(
4040

4141
def context_aware_initializer() -> None:
4242
"""Initializer that sets up the captured context in each worker thread."""
43-
logger.debug("[ConcurrentFuturesInstrumentor] Setting up context in worker thread")
4443

4544
# Set the main context variables in this thread
4645
for var, value in main_context.items():
@@ -60,8 +59,6 @@ def context_aware_initializer() -> None:
6059
logger.error(f"[ConcurrentFuturesInstrumentor] Error in user initializer: {e}")
6160
raise
6261

63-
logger.debug("[ConcurrentFuturesInstrumentor] Worker thread context setup complete")
64-
6562
# Create executor with context-aware initializer
6663
prefix = f"AgentOps-{thread_name_prefix}" if thread_name_prefix else "AgentOps-Thread"
6764

@@ -74,8 +71,6 @@ def context_aware_initializer() -> None:
7471
initargs=(), # We handle initargs in our wrapper
7572
)
7673

77-
logger.debug("[ConcurrentFuturesInstrumentor] ThreadPoolExecutor initialized with context propagation")
78-
7974
return wrapped_init
8075

8176

@@ -85,8 +80,7 @@ def _context_propagating_submit(original_submit: Callable) -> Callable:
8580
@functools.wraps(original_submit)
8681
def wrapped_submit(self: ThreadPoolExecutor, func: Callable[..., R], *args: Any, **kwargs: Any) -> Future[R]:
8782
# Log the submission
88-
func_name = getattr(func, "__name__", str(func))
89-
logger.debug(f"[ConcurrentFuturesInstrumentor] Submitting function: {func_name}")
83+
func_name = getattr(func, "__name__", str(func)) # noqa: F841
9084

9185
# The context propagation is handled by the initializer, so we can submit normally
9286
# But we can add additional logging or monitoring here if needed

0 commit comments

Comments
 (0)