Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions agave/core/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from .tracing import (
TRACE_HEADERS_KEY,
accept_trace_from_queue,
accept_trace_headers,
add_custom_attribute,
background_task,
get_trace_headers,
inject_trace_headers,
trace_attributes,
)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sugiero quitarlos de aqui.
Para el modulo agave.core no dependa de newrelic

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lsito


__all__ = [
'TRACE_HEADERS_KEY',
'accept_trace_from_queue',
'accept_trace_headers',
'add_custom_attribute',
'background_task',
'get_trace_headers',
'inject_trace_headers',
'trace_attributes',
]
223 changes: 223 additions & 0 deletions agave/core/tracing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
from __future__ import annotations

import inspect
from contextlib import contextmanager
from functools import wraps
from typing import Any, Callable, Optional

import newrelic.agent
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Creo que esto puede fallar, dado que newrelic es opcional, puedes hacer unas pruebas? Quizá podemos hacer algo como aquí:

try:
from aiobotocore.session import get_session
from types_aiobotocore_sqs import SQSClient
except ImportError:
raise ImportError(
"You must install agave with [asyncio_aws_tools] option.\n"
"You can install it with: pip install agave[asyncio_aws_tools]"
)

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

listo


# trace headers key
TRACE_HEADERS_KEY = "_nr_trace_headers"


@contextmanager
def background_task(
name: str,
group: str = "Task",
trace_headers: Optional[dict] = None,
):
with newrelic.agent.BackgroundTask(
application=newrelic.agent.application(),
name=name,
group=group,
):
if trace_headers:
accept_trace_headers(trace_headers, transport_type="Queue")
yield


def get_trace_headers() -> dict:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aquí podemos ser más específicos dict[str, str]

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lsito

headers_list: list = []
newrelic.agent.insert_distributed_trace_headers(headers_list)
return dict(headers_list)
Comment on lines +34 to +37
Copy link
Copy Markdown
Member

@rogelioLpz rogelioLpz Jan 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Esta funcion es un poco extraña, inserta una lista vacía y el resultado siempre es el mismo.
Creo que es mejor quitar la funcion y hacer directo el insert_distributed_trace_headers

Copy link
Copy Markdown
Author

@dpastranak dpastranak Jan 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

es que más bien el API de newrelic está extraña. Pide pasar una lista vacía que muta internamente donde regresa los headers y esta función encapsula esta complejidad.

sin esta función se tendría que hacer

headers_list = []
newrelic.agent.insert_distributed_trace_headers(headers_list)
# Después de llamar, headers_list ya tiene datos:
# [('newrelic', 'eyJ2IjpbMCwxXS...'), ('traceparent', '00-abc123...')]
headers = dict(headers_list) # lo paso a un diccionario para pasarlo entre servicio. 

con la función solo se hace
headers = get_trace_headers()

o por qué dices que siempre regresa lo mismo?

Creo que justo esta función hace la interfaz más clara y simple.



def accept_trace_headers(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Esta función no es necesaria.
Realmente solo hace una linea y el transport_type lo usas como Queue

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

En este caso la función también incluye la validación if not headers: return que evita errores cuando los headers son None. Además, el transport_type por default es "HTTP" y se puede usar en contexto HHTP no solo como Queue.
Sin esta función cada que se use tendría que repetirse la validación también. Creo que la función aporta valor.

headers: dict | None, transport_type: str = "HTTP"
) -> None:
"""
Accept incoming trace headers to continue a distributed trace.

Args:
headers: Trace headers from incoming request/message.
transport_type: "HTTP" for HTTP requests, "Queue" for queue messages.
"""
if not headers:
return
newrelic.agent.accept_distributed_trace_headers(
headers, transport_type=transport_type
)


def add_custom_attribute(key: str, value: Any) -> None:
if value is not None:
newrelic.agent.add_custom_attribute(key, value)


def accept_trace_from_queue(func: Callable) -> Callable:
"""
Decorator to accept distributed trace headers from queue messages.

Extracts '_nr_trace_headers' from kwargs, accepts them, and removes
them before calling the function.

Example:
@celery_sqs.task
@accept_trace_from_queue
def process_incoming_spei_transaction_task(
transaction: dict, session=None
):
...
"""

def _accept(kwargs):
trace_headers = kwargs.pop(TRACE_HEADERS_KEY, None)
if trace_headers:
accept_trace_headers(trace_headers, transport_type="Queue")

if inspect.iscoroutinefunction(func):

@wraps(func)
async def async_wrapper(*args, **kwargs):
_accept(kwargs)
return await func(*args, **kwargs)

return async_wrapper

@wraps(func)
def sync_wrapper(*args, **kwargs):
_accept(kwargs)
return func(*args, **kwargs)

return sync_wrapper


def inject_trace_headers(param_name: str = "trace_headers"):
"""
Decorator to inject trace headers into HTTP calls.

Args:
param_name: name of the parameter where headers will be injected.

Example:
@inject_trace_headers()
async def request(self, method, endpoint, trace_headers=None):
async with session.request(..., headers=trace_headers):
...
"""

def decorator(func: Callable) -> Callable:
sig = inspect.signature(func)

def _inject(args, kwargs):
bound = sig.bind_partial(*args, **kwargs)
headers = dict(bound.arguments.get(param_name) or {})
headers.update(get_trace_headers())
bound.arguments[param_name] = headers
return bound.args, bound.kwargs

if inspect.iscoroutinefunction(func):

@wraps(func)
async def async_wrapper(*args, **kwargs):
new_args, new_kwargs = _inject(args, kwargs)
return await func(*new_args, **new_kwargs)

return async_wrapper

@wraps(func)
def sync_wrapper(*args, **kwargs):
new_args, new_kwargs = _inject(args, kwargs)
return func(*new_args, **new_kwargs)

return sync_wrapper

return decorator


def trace_attributes(**extractors: Callable | str):
"""
Decorator to add custom attributes to New Relic traces.

Each kwarg is an attribute to add. The value can be:
- str: name of the function parameter (e.g., 'folio_abono')
- str with dot: path to an attribute (e.g., 'orden.clave_emisor')
- callable: function that receives the kwargs and returns the value

Example:
@trace_attributes(
clave_rastreo=lambda kw: ','.join(kw['orden'].claves_rastreo),
clave_emisor='orden.clave_emisor',
folio='folio_abono',
)
async def handle_orden(orden, folio_abono):
...
"""

def decorator(func: Callable) -> Callable:
sig = inspect.signature(func)

def _extract(args, kwargs):
bound = sig.bind(*args, **kwargs)
bound.apply_defaults()
_add_attributes(bound.arguments, extractors)

if inspect.iscoroutinefunction(func):

@wraps(func)
async def async_wrapper(*args, **kwargs):
_extract(args, kwargs)
return await func(*args, **kwargs)

return async_wrapper

@wraps(func)
def sync_wrapper(*args, **kwargs):
_extract(args, kwargs)
return func(*args, **kwargs)

return sync_wrapper

return decorator


def _get_nested_value(obj: Any, path: str) -> Any:
parts = path.split(".")
value = (
obj.get(parts[0])
if isinstance(obj, dict)
else getattr(obj, parts[0], None)
)

for part in parts[1:]:
if value is None:
return None
if isinstance(value, dict):
value = value.get(part)
else:
value = getattr(value, part, None)
return value


def _add_attributes(kwargs: dict, extractors: dict) -> None:
"""
Internal function to extract and add attributes to the current trace.

Args:
kwargs: Function arguments.
extractors: Dict of attribute_name -> extractor (callable or string).
"""
for attr_name, extractor in extractors.items():
try:
if callable(extractor):
value = extractor(kwargs)
elif isinstance(extractor, str):
value = _get_nested_value(kwargs, extractor)
else:
value = None

add_custom_attribute(attr_name, value)
except Exception:
pass # Silent exception
# we don't want to fail if unable to extract an attribute
2 changes: 1 addition & 1 deletion agave/version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '1.5.2'
__version__ = "1.5.3.dev01"
Copy link

Copilot AI Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR description mentions bumping the version to 1.5.3, but the code sets __version__ to "1.5.3.dev01". Please either update the version string here or adjust the PR description so they are consistent.

Copilot uses AI. Check for mistakes.
1 change: 1 addition & 0 deletions requirements-test.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ moto[server]==5.0.26
pytest-vcr==1.0.2
pytest-asyncio==0.18.*
typing_extensions==4.12.2
newrelic==11.2.0
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agregar una linea al final del archivo

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

listo

3 changes: 3 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
'chalice': [
'chalice>=1.30.0,<2.0.0',
],
'tracing': [
'newrelic>=7.0.0,<12.0.0',
],
'fastapi': [
'fastapi>=0.115.0,<1.0.0',
# TODO: Remove this once we upgrade to starlette:
Expand Down
Loading
Loading