Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ def lambda_handler(event, context):

tracer_provider (TracerProvider) - an optional tracer provider
meter_provider (MeterProvider) - an optional meter provider
logger_provider (LoggerProvider) - an optional logger provider
event_context_extractor (Callable) - a function that returns an OTel Trace
Context given the Lambda Event the AWS Lambda was invoked with
this function signature is: def event_context_extractor(lambda_event: Any) -> Context
Expand Down Expand Up @@ -77,6 +78,7 @@ def custom_event_context_extractor(lambda_event):
from wrapt import wrap_function_wrapper

from opentelemetry import context as context_api
from opentelemetry._logs import LoggerProvider, get_logger_provider
from opentelemetry.context.context import Context
from opentelemetry.instrumentation.aws_lambda.package import _instruments
from opentelemetry.instrumentation.aws_lambda.version import __version__
Expand All @@ -94,9 +96,7 @@ def custom_event_context_extractor(lambda_event):
_HANDLER = "_HANDLER"
_X_AMZN_TRACE_ID = "_X_AMZN_TRACE_ID"
ORIG_HANDLER = "ORIG_HANDLER"
OTEL_INSTRUMENTATION_AWS_LAMBDA_FLUSH_TIMEOUT = (
"OTEL_INSTRUMENTATION_AWS_LAMBDA_FLUSH_TIMEOUT"
)
OTEL_INSTRUMENTATION_AWS_LAMBDA_FLUSH_TIMEOUT = "OTEL_INSTRUMENTATION_AWS_LAMBDA_FLUSH_TIMEOUT"


def _default_event_context_extractor(lambda_event: Any) -> Context:
Expand Down Expand Up @@ -157,17 +157,13 @@ def _determine_parent_context(
return event_context_extractor(lambda_event)


def _set_api_gateway_v1_proxy_attributes(
lambda_event: Any, span: Span
) -> Span:
def _set_api_gateway_v1_proxy_attributes(lambda_event: Any, span: Span) -> Span:
"""Sets HTTP attributes for REST APIs and v1 HTTP APIs

More info:
https://docs.aws.amazon.com/apigateway/latest/developerguide/set-up-lambda-proxy-integrations.html#api-gateway-simple-proxy-for-lambda-input-format
"""
span.set_attribute(
SpanAttributes.HTTP_METHOD, lambda_event.get("httpMethod")
)
span.set_attribute(SpanAttributes.HTTP_METHOD, lambda_event.get("httpMethod"))

if lambda_event.get("headers"):
if "User-Agent" in lambda_event["headers"]:
Expand All @@ -194,16 +190,12 @@ def _set_api_gateway_v1_proxy_attributes(
f"{lambda_event['resource']}?{urlencode(lambda_event['queryStringParameters'])}",
)
else:
span.set_attribute(
SpanAttributes.HTTP_TARGET, lambda_event["resource"]
)
span.set_attribute(SpanAttributes.HTTP_TARGET, lambda_event["resource"])

return span


def _set_api_gateway_v2_proxy_attributes(
lambda_event: Any, span: Span
) -> Span:
def _set_api_gateway_v2_proxy_attributes(lambda_event: Any, span: Span) -> Span:
"""Sets HTTP attributes for v2 HTTP APIs

More info:
Expand Down Expand Up @@ -253,6 +245,7 @@ def _instrument(
event_context_extractor: Callable[[Any], Context],
tracer_provider: TracerProvider = None,
meter_provider: MeterProvider = None,
logger_provider: LoggerProvider = None,
):

# pylint: disable=too-many-locals
Expand All @@ -261,9 +254,7 @@ def _instrumented_lambda_handler_call( # noqa pylint: disable=too-many-branches
call_wrapped, instance, args, kwargs
):

orig_handler_name = ".".join(
[wrapped_module_name, wrapped_function_name]
)
orig_handler_name = ".".join([wrapped_module_name, wrapped_function_name])

lambda_event = args[0]

Expand Down Expand Up @@ -306,9 +297,7 @@ def _instrumented_lambda_handler_call( # noqa pylint: disable=too-many-branches
#
# See more:
# https://github.com/open-telemetry/semantic-conventions/blob/main/docs/faas/aws-lambda.md#all-triggers
account_id = lambda_context.invoked_function_arn.split(
":"
)[4]
account_id = lambda_context.invoked_function_arn.split(":")[4]
span.set_attribute(
ResourceAttributes.CLOUD_ACCOUNT_ID,
account_id,
Expand All @@ -326,19 +315,13 @@ def _instrumented_lambda_handler_call( # noqa pylint: disable=too-many-branches
# If the request came from an API Gateway, extract http attributes from the event
# https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/instrumentation/aws-lambda.md#api-gateway
# https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/http.md#http-server-semantic-conventions
if isinstance(lambda_event, dict) and lambda_event.get(
"requestContext"
):
if isinstance(lambda_event, dict) and lambda_event.get("requestContext"):
span.set_attribute(SpanAttributes.FAAS_TRIGGER, "http")

if lambda_event.get("version") == "2.0":
_set_api_gateway_v2_proxy_attributes(
lambda_event, span
)
_set_api_gateway_v2_proxy_attributes(lambda_event, span)
else:
_set_api_gateway_v1_proxy_attributes(
lambda_event, span
)
_set_api_gateway_v1_proxy_attributes(lambda_event, span)

if isinstance(result, dict) and result.get("statusCode"):
span.set_attribute(
Expand Down Expand Up @@ -377,6 +360,22 @@ def _instrumented_lambda_handler_call( # noqa pylint: disable=too-many-branches
" case of a Lambda freeze and would exist in the OTel SDK implementation."
)

_logger_provider = logger_provider or get_logger_provider()
if hasattr(_logger_provider, "force_flush"):
rem = flush_timeout - (time.time() - now) * 1000
if rem > 0:
try:
# NOTE: `force_flush` before function quit in case of Lambda freeze.
_logger_provider.force_flush(rem)
except Exception: # pylint: disable=broad-except
logger.exception("LoggerProvider failed to flush logs")
else:
logger.debug(
"LoggerProvider (%s) was missing `force_flush` method. This is necessary in"
" case of a Lambda freeze and would exist in the OTel SDK implementation.",
_logger_provider.__class__.__name__,
)

if exception is not None:
raise exception.with_traceback(exception.__traceback__)

Expand All @@ -403,6 +402,7 @@ def _instrument(self, **kwargs):
**kwargs: Optional arguments
``tracer_provider``: a TracerProvider, defaults to global
``meter_provider``: a MeterProvider, defaults to global
``logger_provider``: a LoggerProvider, defaults to global
``event_context_extractor``: a method which takes the Lambda
Event as input and extracts an OTel Context from it. By default,
the context is extracted from the HTTP headers of an API Gateway
Expand All @@ -423,9 +423,7 @@ def _instrument(self, **kwargs):
self._wrapped_function_name,
) = lambda_handler.rsplit(".", 1)

flush_timeout_env = os.environ.get(
OTEL_INSTRUMENTATION_AWS_LAMBDA_FLUSH_TIMEOUT, None
)
flush_timeout_env = os.environ.get(OTEL_INSTRUMENTATION_AWS_LAMBDA_FLUSH_TIMEOUT, None)
flush_timeout = 30000
try:
if flush_timeout_env is not None:
Expand All @@ -440,11 +438,10 @@ def _instrument(self, **kwargs):
self._wrapped_module_name,
self._wrapped_function_name,
flush_timeout,
event_context_extractor=kwargs.get(
"event_context_extractor", _default_event_context_extractor
),
event_context_extractor=kwargs.get("event_context_extractor", _default_event_context_extractor),
tracer_provider=kwargs.get("tracer_provider"),
meter_provider=kwargs.get("meter_provider"),
logger_provider=kwargs.get("logger_provider"),
)

def _uninstrument(self, **kwargs):
Expand Down
Loading