Skip to content

Commit b57b668

Browse files
committed
fix: improve async execution ID context propagation using contextvars
- Use contextvars.copy_context() to properly propagate execution context in async functions - Implement AsyncExecutionIdHandler to handle JSON logging with execution_id - Redirect logging output from stderr to stdout for consistency - Add build dependency to dev dependencies - Update tests to reflect new logging output location
1 parent cffd0fa commit b57b668

File tree

4 files changed

+38
-26
lines changed

4 files changed

+38
-26
lines changed

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ functions_framework = ["py.typed"]
6565
[dependency-groups]
6666
dev = [
6767
"black>=23.3.0",
68+
"build>=1.1.1",
6869
"isort>=5.11.5",
6970
"pretend>=1.0.9",
7071
"pytest>=7.4.4",

src/functions_framework/aio/__init__.py

Lines changed: 29 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
# limitations under the License.
1414

1515
import asyncio
16+
import contextvars
1617
import functools
1718
import inspect
1819
import json
@@ -69,11 +70,7 @@ async def _crash_handler(request, exc):
6970
f"Exception on {request.url.path} [{request.method}]\n{tb_text}".rstrip()
7071
)
7172

72-
if _enable_execution_id_logging():
73-
log_entry = {"message": error_msg, "levelname": "ERROR"}
74-
logger.error(json.dumps(log_entry))
75-
else:
76-
logger.error(error_msg)
73+
logger.error(error_msg)
7774

7875
headers = {_FUNCTION_STATUS_HEADER_FIELD: _CRASH}
7976
return Response("Internal Server Error", status_code=500, headers=headers)
@@ -132,15 +129,14 @@ async def handler(request):
132129
result = await function(request)
133130
else:
134131
# TODO: Use asyncio.to_thread when we drop Python 3.8 support
135-
# Python 3.8 compatible version of asyncio.to_thread
136132
loop = asyncio.get_event_loop()
137-
result = await loop.run_in_executor(None, function, request)
133+
ctx = contextvars.copy_context()
134+
result = await loop.run_in_executor(None, ctx.run, function, request)
138135
if isinstance(result, str):
139136
return Response(result)
140137
elif isinstance(result, dict):
141138
return JSONResponse(result)
142139
elif isinstance(result, tuple) and len(result) == 2:
143-
# Support Flask-style tuple response
144140
content, status_code = result
145141
if isinstance(content, dict):
146142
return JSONResponse(content, status_code=status_code)
@@ -187,16 +183,37 @@ def _configure_app_execution_id_logging():
187183
import logging
188184
import logging.config
189185

186+
class AsyncExecutionIdHandler(logging.StreamHandler):
187+
def emit(self, record):
188+
context = execution_id.execution_context_var.get(None)
189+
190+
log_entry = {
191+
"message": self.format(record),
192+
"severity": record.levelname,
193+
}
194+
195+
if context and context.execution_id:
196+
log_entry["logging.googleapis.com/labels"] = {
197+
"execution_id": context.execution_id
198+
}
199+
200+
if context and context.span_id:
201+
log_entry["logging.googleapis.com/spanId"] = context.span_id
202+
203+
try:
204+
self.stream.write(json.dumps(log_entry) + "\n")
205+
self.stream.flush()
206+
except Exception:
207+
super().emit(record)
208+
190209
root_logger = logging.getLogger()
191210
root_logger.setLevel(logging.INFO)
192211

193-
# Remove existing handlers
194212
for handler in root_logger.handlers[:]:
195213
root_logger.removeHandler(handler)
196214

197-
handler = logging.StreamHandler(
198-
execution_id.LoggingHandlerAddExecutionId(sys.stderr)
199-
)
215+
handler = AsyncExecutionIdHandler(sys.stdout)
216+
handler.setFormatter(logging.Formatter("%(message)s"))
200217
handler.setLevel(logging.NOTSET)
201218
root_logger.addHandler(handler)
202219

src/functions_framework/execution_id.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,6 @@ def _extract_context_from_headers(headers):
8888
return ExecutionContext(execution_id, span_id)
8989

9090

91-
# Middleware to add execution id to request header if one does not already exist
9291
class WsgiMiddleware:
9392
def __init__(self, wsgi_app):
9493
self.wsgi_app = wsgi_app
@@ -101,7 +100,6 @@ def __call__(self, environ, start_response):
101100
return self.wsgi_app(environ, start_response)
102101

103102

104-
# ASGI Middleware to add execution id to request header if one does not already exist
105103
class AsgiMiddleware:
106104
def __init__(self, app):
107105
self.app = app
@@ -187,8 +185,6 @@ async def async_wrapper(request, *args, **kwargs):
187185
with stderr_redirect, stdout_redirect:
188186
result = await func(request, *args, **kwargs)
189187

190-
# Only reset context on successful completion
191-
# On exception, leave context available for exception handlers
192188
execution_context_var.reset(token)
193189
return result
194190

@@ -200,8 +196,6 @@ def sync_wrapper(request, *args, **kwargs):
200196
with stderr_redirect, stdout_redirect:
201197
result = func(request, *args, **kwargs)
202198

203-
# Only reset context on successful completion
204-
# On exception, leave context available for exception handlers
205199
execution_context_var.reset(token)
206200
return result
207201

tests/test_execution_id_async.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,9 @@ def test_async_uncaught_exception_in_user_function_sets_execution_id(
6262
)
6363
assert resp.status_code == 500
6464
record = capsys.readouterr()
65-
assert f'"execution_id": "{TEST_EXECUTION_ID}"' in record.err
65+
assert f'"execution_id": "{TEST_EXECUTION_ID}"' in record.out
66+
assert '"logging.googleapis.com/labels"' in record.out
67+
assert "ZeroDivisionError" in record.out
6668

6769

6870
def test_async_print_from_user_function_sets_execution_id(capsys, monkeypatch):
@@ -99,8 +101,9 @@ def test_async_log_from_user_function_sets_execution_id(capsys, monkeypatch):
99101
json={"message": json.dumps({"custom-field": "some-message"})},
100102
)
101103
record = capsys.readouterr()
102-
assert f'"execution_id": "{TEST_EXECUTION_ID}"' in record.err
103-
assert '"custom-field": "some-message"' in record.err
104+
assert f'"execution_id": "{TEST_EXECUTION_ID}"' in record.out
105+
assert '\\"custom-field\\": \\"some-message\\"' in record.out
106+
assert '"logging.googleapis.com/labels"' in record.out
104107

105108

106109
def test_async_user_function_can_retrieve_generated_execution_id(monkeypatch):
@@ -175,11 +178,9 @@ def make_request(client, message, exec_id):
175178
thread2.join()
176179

177180
record = capsys.readouterr()
178-
logs = record.err.strip().split("\n")
179-
logs_as_json = [json.loads(log) for log in logs if log]
181+
logs = record.out.strip().split("\n")
182+
logs_as_json = [json.loads(log) for log in logs if log and log.startswith("{")]
180183

181-
# Check that each message appears twice (once at start, once at end of async_sleep)
182-
# and that each has the correct execution ID
183184
message1_logs = [log for log in logs_as_json if log.get("message") == "message1"]
184185
message2_logs = [log for log in logs_as_json if log.get("message") == "message2"]
185186

@@ -190,7 +191,6 @@ def make_request(client, message, exec_id):
190191
len(message2_logs) == 2
191192
), f"Expected 2 logs for message2, got {len(message2_logs)}"
192193

193-
# Check that all message1 logs have exec-id-1
194194
for log in message1_logs:
195195
assert log["logging.googleapis.com/labels"]["execution_id"] == "exec-id-1"
196196

0 commit comments

Comments
 (0)