Skip to content

Commit a51b226

Browse files
wangzleijj22ee
andauthored
add LoggerProvider force flush in Lambda function (#441)
*Issue #, if available:* The current Lambda instrumentation is missing a call to the loggerProvider.force_flush method, which can result in delayed or missing OTel logs in the Lambda environment due to Lambda freeze *Description of changes:* Adding LoggerProvider force flush in Lambda instrumentation. Logging a debug log if the loggerProvider does not support force flush because the default global LoggerProvider is ProxyLoggerProvider in OpenTelemetry Python, will be replaced to SDKLoggerProvider only if user set environment variable `OTEL_PYTHON_LOGGING_AUTO_INSTRUMENTATION_ENABLED = true`, which is not set in ADOT Lambda layer. By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice. Co-authored-by: Jonathan Lee <[email protected]>
1 parent 2a94c9a commit a51b226

File tree

1 file changed

+33
-36
lines changed
  • lambda-layer/src/opentelemetry/instrumentation/aws_lambda

1 file changed

+33
-36
lines changed

lambda-layer/src/opentelemetry/instrumentation/aws_lambda/__init__.py

Lines changed: 33 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ def lambda_handler(event, context):
4646
4747
tracer_provider (TracerProvider) - an optional tracer provider
4848
meter_provider (MeterProvider) - an optional meter provider
49+
logger_provider (LoggerProvider) - an optional logger provider
4950
event_context_extractor (Callable) - a function that returns an OTel Trace
5051
Context given the Lambda Event the AWS Lambda was invoked with
5152
this function signature is: def event_context_extractor(lambda_event: Any) -> Context
@@ -77,6 +78,7 @@ def custom_event_context_extractor(lambda_event):
7778
from wrapt import wrap_function_wrapper
7879

7980
from opentelemetry import context as context_api
81+
from opentelemetry._logs import LoggerProvider, get_logger_provider
8082
from opentelemetry.context.context import Context
8183
from opentelemetry.instrumentation.aws_lambda.package import _instruments
8284
from opentelemetry.instrumentation.aws_lambda.version import __version__
@@ -94,9 +96,7 @@ def custom_event_context_extractor(lambda_event):
9496
_HANDLER = "_HANDLER"
9597
_X_AMZN_TRACE_ID = "_X_AMZN_TRACE_ID"
9698
ORIG_HANDLER = "ORIG_HANDLER"
97-
OTEL_INSTRUMENTATION_AWS_LAMBDA_FLUSH_TIMEOUT = (
98-
"OTEL_INSTRUMENTATION_AWS_LAMBDA_FLUSH_TIMEOUT"
99-
)
99+
OTEL_INSTRUMENTATION_AWS_LAMBDA_FLUSH_TIMEOUT = "OTEL_INSTRUMENTATION_AWS_LAMBDA_FLUSH_TIMEOUT"
100100

101101

102102
def _default_event_context_extractor(lambda_event: Any) -> Context:
@@ -157,17 +157,13 @@ def _determine_parent_context(
157157
return event_context_extractor(lambda_event)
158158

159159

160-
def _set_api_gateway_v1_proxy_attributes(
161-
lambda_event: Any, span: Span
162-
) -> Span:
160+
def _set_api_gateway_v1_proxy_attributes(lambda_event: Any, span: Span) -> Span:
163161
"""Sets HTTP attributes for REST APIs and v1 HTTP APIs
164162
165163
More info:
166164
https://docs.aws.amazon.com/apigateway/latest/developerguide/set-up-lambda-proxy-integrations.html#api-gateway-simple-proxy-for-lambda-input-format
167165
"""
168-
span.set_attribute(
169-
SpanAttributes.HTTP_METHOD, lambda_event.get("httpMethod")
170-
)
166+
span.set_attribute(SpanAttributes.HTTP_METHOD, lambda_event.get("httpMethod"))
171167

172168
if lambda_event.get("headers"):
173169
if "User-Agent" in lambda_event["headers"]:
@@ -194,16 +190,12 @@ def _set_api_gateway_v1_proxy_attributes(
194190
f"{lambda_event['resource']}?{urlencode(lambda_event['queryStringParameters'])}",
195191
)
196192
else:
197-
span.set_attribute(
198-
SpanAttributes.HTTP_TARGET, lambda_event["resource"]
199-
)
193+
span.set_attribute(SpanAttributes.HTTP_TARGET, lambda_event["resource"])
200194

201195
return span
202196

203197

204-
def _set_api_gateway_v2_proxy_attributes(
205-
lambda_event: Any, span: Span
206-
) -> Span:
198+
def _set_api_gateway_v2_proxy_attributes(lambda_event: Any, span: Span) -> Span:
207199
"""Sets HTTP attributes for v2 HTTP APIs
208200
209201
More info:
@@ -253,6 +245,7 @@ def _instrument(
253245
event_context_extractor: Callable[[Any], Context],
254246
tracer_provider: TracerProvider = None,
255247
meter_provider: MeterProvider = None,
248+
logger_provider: LoggerProvider = None,
256249
):
257250

258251
# pylint: disable=too-many-locals
@@ -261,9 +254,7 @@ def _instrumented_lambda_handler_call( # noqa pylint: disable=too-many-branches
261254
call_wrapped, instance, args, kwargs
262255
):
263256

264-
orig_handler_name = ".".join(
265-
[wrapped_module_name, wrapped_function_name]
266-
)
257+
orig_handler_name = ".".join([wrapped_module_name, wrapped_function_name])
267258

268259
lambda_event = args[0]
269260

@@ -306,9 +297,7 @@ def _instrumented_lambda_handler_call( # noqa pylint: disable=too-many-branches
306297
#
307298
# See more:
308299
# https://github.com/open-telemetry/semantic-conventions/blob/main/docs/faas/aws-lambda.md#all-triggers
309-
account_id = lambda_context.invoked_function_arn.split(
310-
":"
311-
)[4]
300+
account_id = lambda_context.invoked_function_arn.split(":")[4]
312301
span.set_attribute(
313302
ResourceAttributes.CLOUD_ACCOUNT_ID,
314303
account_id,
@@ -326,19 +315,13 @@ def _instrumented_lambda_handler_call( # noqa pylint: disable=too-many-branches
326315
# If the request came from an API Gateway, extract http attributes from the event
327316
# https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/instrumentation/aws-lambda.md#api-gateway
328317
# https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/http.md#http-server-semantic-conventions
329-
if isinstance(lambda_event, dict) and lambda_event.get(
330-
"requestContext"
331-
):
318+
if isinstance(lambda_event, dict) and lambda_event.get("requestContext"):
332319
span.set_attribute(SpanAttributes.FAAS_TRIGGER, "http")
333320

334321
if lambda_event.get("version") == "2.0":
335-
_set_api_gateway_v2_proxy_attributes(
336-
lambda_event, span
337-
)
322+
_set_api_gateway_v2_proxy_attributes(lambda_event, span)
338323
else:
339-
_set_api_gateway_v1_proxy_attributes(
340-
lambda_event, span
341-
)
324+
_set_api_gateway_v1_proxy_attributes(lambda_event, span)
342325

343326
if isinstance(result, dict) and result.get("statusCode"):
344327
span.set_attribute(
@@ -377,6 +360,22 @@ def _instrumented_lambda_handler_call( # noqa pylint: disable=too-many-branches
377360
" case of a Lambda freeze and would exist in the OTel SDK implementation."
378361
)
379362

363+
_logger_provider = logger_provider or get_logger_provider()
364+
if hasattr(_logger_provider, "force_flush"):
365+
rem = flush_timeout - (time.time() - now) * 1000
366+
if rem > 0:
367+
try:
368+
# NOTE: `force_flush` before function quit in case of Lambda freeze.
369+
_logger_provider.force_flush(rem)
370+
except Exception: # pylint: disable=broad-except
371+
logger.exception("LoggerProvider failed to flush logs")
372+
else:
373+
logger.debug(
374+
"LoggerProvider (%s) was missing `force_flush` method. This is necessary in"
375+
" case of a Lambda freeze and would exist in the OTel SDK implementation.",
376+
_logger_provider.__class__.__name__,
377+
)
378+
380379
if exception is not None:
381380
raise exception.with_traceback(exception.__traceback__)
382381

@@ -403,6 +402,7 @@ def _instrument(self, **kwargs):
403402
**kwargs: Optional arguments
404403
``tracer_provider``: a TracerProvider, defaults to global
405404
``meter_provider``: a MeterProvider, defaults to global
405+
``logger_provider``: a LoggerProvider, defaults to global
406406
``event_context_extractor``: a method which takes the Lambda
407407
Event as input and extracts an OTel Context from it. By default,
408408
the context is extracted from the HTTP headers of an API Gateway
@@ -423,9 +423,7 @@ def _instrument(self, **kwargs):
423423
self._wrapped_function_name,
424424
) = lambda_handler.rsplit(".", 1)
425425

426-
flush_timeout_env = os.environ.get(
427-
OTEL_INSTRUMENTATION_AWS_LAMBDA_FLUSH_TIMEOUT, None
428-
)
426+
flush_timeout_env = os.environ.get(OTEL_INSTRUMENTATION_AWS_LAMBDA_FLUSH_TIMEOUT, None)
429427
flush_timeout = 30000
430428
try:
431429
if flush_timeout_env is not None:
@@ -440,11 +438,10 @@ def _instrument(self, **kwargs):
440438
self._wrapped_module_name,
441439
self._wrapped_function_name,
442440
flush_timeout,
443-
event_context_extractor=kwargs.get(
444-
"event_context_extractor", _default_event_context_extractor
445-
),
441+
event_context_extractor=kwargs.get("event_context_extractor", _default_event_context_extractor),
446442
tracer_provider=kwargs.get("tracer_provider"),
447443
meter_provider=kwargs.get("meter_provider"),
444+
logger_provider=kwargs.get("logger_provider"),
448445
)
449446

450447
def _uninstrument(self, **kwargs):

0 commit comments

Comments
 (0)