Skip to content

Commit 7491870

Browse files
committed
feat: Add execution ID logging for async functions
Refactors the async logging implementation to align with the sync version, ensuring consistent execution ID logging across both stacks.
1 parent b57b668 commit 7491870

File tree

2 files changed

+122
-133
lines changed

2 files changed

+122
-133
lines changed

src/functions_framework/aio/__init__.py

Lines changed: 14 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import inspect
1919
import json
2020
import logging
21+
import logging.config
2122
import os
2223
import re
2324
import sys
@@ -180,42 +181,19 @@ async def _handle_not_found(request: Request):
180181

181182

182183
def _configure_app_execution_id_logging():
183-
import logging
184-
import logging.config
185-
186-
class AsyncExecutionIdHandler(logging.StreamHandler):
187-
def emit(self, record):
188-
context = execution_id.execution_context_var.get(None)
189-
190-
log_entry = {
191-
"message": self.format(record),
192-
"severity": record.levelname,
193-
}
194-
195-
if context and context.execution_id:
196-
log_entry["logging.googleapis.com/labels"] = {
197-
"execution_id": context.execution_id
198-
}
199-
200-
if context and context.span_id:
201-
log_entry["logging.googleapis.com/spanId"] = context.span_id
202-
203-
try:
204-
self.stream.write(json.dumps(log_entry) + "\n")
205-
self.stream.flush()
206-
except Exception:
207-
super().emit(record)
208-
209-
root_logger = logging.getLogger()
210-
root_logger.setLevel(logging.INFO)
211-
212-
for handler in root_logger.handlers[:]:
213-
root_logger.removeHandler(handler)
214-
215-
handler = AsyncExecutionIdHandler(sys.stdout)
216-
handler.setFormatter(logging.Formatter("%(message)s"))
217-
handler.setLevel(logging.NOTSET)
218-
root_logger.addHandler(handler)
184+
# Logging needs to be configured before app logger is accessed
185+
logging.config.dictConfig(
186+
{
187+
"version": 1,
188+
"handlers": {
189+
"asgi": {
190+
"class": "logging.StreamHandler",
191+
"stream": "ext://functions_framework.execution_id.logging_stream",
192+
},
193+
},
194+
"root": {"level": "INFO", "handlers": ["asgi"]},
195+
}
196+
)
219197

220198

221199
def create_asgi_app(target=None, source=None, signature_type=None):

tests/test_execution_id_async.py

Lines changed: 108 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,14 @@
1111
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
14+
import asyncio
1415
import json
1516
import pathlib
17+
import re
18+
import sys
1619

17-
from unittest.mock import Mock
20+
from functools import partial
21+
from unittest.mock import Mock, patch
1822

1923
import pytest
2024

@@ -28,7 +32,7 @@
2832
TEST_SPAN_ID = "123456"
2933

3034

31-
def test_async_user_function_can_retrieve_execution_id_from_header():
35+
def test_user_function_can_retrieve_execution_id_from_header():
3236
source = TEST_FUNCTIONS_DIR / "execution_id" / "async_main.py"
3337
target = "async_function"
3438
app = create_asgi_app(target, source)
@@ -44,9 +48,7 @@ def test_async_user_function_can_retrieve_execution_id_from_header():
4448
assert resp.json()["execution_id"] == TEST_EXECUTION_ID
4549

4650

47-
def test_async_uncaught_exception_in_user_function_sets_execution_id(
48-
capsys, monkeypatch
49-
):
51+
def test_uncaught_exception_in_user_function_sets_execution_id(capsys, monkeypatch):
5052
monkeypatch.setenv("LOG_EXECUTION_ID", "true")
5153
source = TEST_FUNCTIONS_DIR / "execution_id" / "async_main.py"
5254
target = "async_error"
@@ -62,12 +64,12 @@ def test_async_uncaught_exception_in_user_function_sets_execution_id(
6264
)
6365
assert resp.status_code == 500
6466
record = capsys.readouterr()
65-
assert f'"execution_id": "{TEST_EXECUTION_ID}"' in record.out
66-
assert '"logging.googleapis.com/labels"' in record.out
67-
assert "ZeroDivisionError" in record.out
67+
assert f'"execution_id": "{TEST_EXECUTION_ID}"' in record.err
68+
assert '"logging.googleapis.com/labels"' in record.err
69+
assert "ZeroDivisionError" in record.err
6870

6971

70-
def test_async_print_from_user_function_sets_execution_id(capsys, monkeypatch):
72+
def test_print_from_user_function_sets_execution_id(capsys, monkeypatch):
7173
monkeypatch.setenv("LOG_EXECUTION_ID", "true")
7274
source = TEST_FUNCTIONS_DIR / "execution_id" / "async_main.py"
7375
target = "async_print_message"
@@ -86,7 +88,7 @@ def test_async_print_from_user_function_sets_execution_id(capsys, monkeypatch):
8688
assert '"message": "some-message"' in record.out
8789

8890

89-
def test_async_log_from_user_function_sets_execution_id(capsys, monkeypatch):
91+
def test_log_from_user_function_sets_execution_id(capsys, monkeypatch):
9092
monkeypatch.setenv("LOG_EXECUTION_ID", "true")
9193
source = TEST_FUNCTIONS_DIR / "execution_id" / "async_main.py"
9294
target = "async_log_message"
@@ -101,12 +103,12 @@ def test_async_log_from_user_function_sets_execution_id(capsys, monkeypatch):
101103
json={"message": json.dumps({"custom-field": "some-message"})},
102104
)
103105
record = capsys.readouterr()
104-
assert f'"execution_id": "{TEST_EXECUTION_ID}"' in record.out
105-
assert '\\"custom-field\\": \\"some-message\\"' in record.out
106-
assert '"logging.googleapis.com/labels"' in record.out
106+
assert f'"execution_id": "{TEST_EXECUTION_ID}"' in record.err
107+
assert '"custom-field": "some-message"' in record.err
108+
assert '"logging.googleapis.com/labels"' in record.err
107109

108110

109-
def test_async_user_function_can_retrieve_generated_execution_id(monkeypatch):
111+
def test_user_function_can_retrieve_generated_execution_id(monkeypatch):
110112
monkeypatch.setattr(
111113
execution_id, "_generate_execution_id", lambda: TEST_EXECUTION_ID
112114
)
@@ -124,7 +126,7 @@ def test_async_user_function_can_retrieve_generated_execution_id(monkeypatch):
124126
assert resp.json()["execution_id"] == TEST_EXECUTION_ID
125127

126128

127-
def test_async_does_not_set_execution_id_when_not_enabled(capsys):
129+
def test_does_not_set_execution_id_when_not_enabled(capsys):
128130
source = TEST_FUNCTIONS_DIR / "execution_id" / "async_main.py"
129131
target = "async_print_message"
130132
app = create_asgi_app(target, source)
@@ -142,61 +144,50 @@ def test_async_does_not_set_execution_id_when_not_enabled(capsys):
142144
assert "some-message" in record.out
143145

144146

145-
def test_async_concurrent_requests_maintain_separate_execution_ids(capsys, monkeypatch):
146-
monkeypatch.setenv("LOG_EXECUTION_ID", "true")
147-
147+
def test_does_not_set_execution_id_when_env_var_is_false(capsys, monkeypatch):
148+
monkeypatch.setenv("LOG_EXECUTION_ID", "false")
148149
source = TEST_FUNCTIONS_DIR / "execution_id" / "async_main.py"
149-
target = "async_sleep"
150+
target = "async_print_message"
150151
app = create_asgi_app(target, source)
151-
# Use separate clients to avoid connection pooling issues
152-
client1 = TestClient(app, raise_server_exceptions=False)
153-
client2 = TestClient(app, raise_server_exceptions=False)
154-
155-
# Make concurrent requests with explicit execution IDs
156-
import threading
157-
158-
def make_request(client, message, exec_id):
159-
client.post(
160-
"/",
161-
headers={
162-
"Content-Type": "application/json",
163-
"Function-Execution-Id": exec_id,
164-
},
165-
json={"message": message},
166-
)
167-
168-
thread1 = threading.Thread(
169-
target=lambda: make_request(client1, "message1", "exec-id-1")
170-
)
171-
thread2 = threading.Thread(
172-
target=lambda: make_request(client2, "message2", "exec-id-2")
152+
client = TestClient(app)
153+
client.post(
154+
"/",
155+
headers={
156+
"Function-Execution-Id": TEST_EXECUTION_ID,
157+
"Content-Type": "application/json",
158+
},
159+
json={"message": "some-message"},
173160
)
161+
record = capsys.readouterr()
162+
assert f'"execution_id": "{TEST_EXECUTION_ID}"' not in record.out
163+
assert "some-message" in record.out
174164

175-
thread1.start()
176-
thread2.start()
177-
thread1.join()
178-
thread2.join()
179165

166+
def test_does_not_set_execution_id_when_env_var_is_not_bool_like(capsys, monkeypatch):
167+
monkeypatch.setenv("LOG_EXECUTION_ID", "maybe")
168+
source = TEST_FUNCTIONS_DIR / "execution_id" / "async_main.py"
169+
target = "async_print_message"
170+
app = create_asgi_app(target, source)
171+
client = TestClient(app)
172+
client.post(
173+
"/",
174+
headers={
175+
"Function-Execution-Id": TEST_EXECUTION_ID,
176+
"Content-Type": "application/json",
177+
},
178+
json={"message": "some-message"},
179+
)
180180
record = capsys.readouterr()
181-
logs = record.out.strip().split("\n")
182-
logs_as_json = [json.loads(log) for log in logs if log and log.startswith("{")]
183-
184-
message1_logs = [log for log in logs_as_json if log.get("message") == "message1"]
185-
message2_logs = [log for log in logs_as_json if log.get("message") == "message2"]
181+
assert f'"execution_id": "{TEST_EXECUTION_ID}"' not in record.out
182+
assert "some-message" in record.out
186183

187-
assert (
188-
len(message1_logs) == 2
189-
), f"Expected 2 logs for message1, got {len(message1_logs)}"
190-
assert (
191-
len(message2_logs) == 2
192-
), f"Expected 2 logs for message2, got {len(message2_logs)}"
193184

194-
for log in message1_logs:
195-
assert log["logging.googleapis.com/labels"]["execution_id"] == "exec-id-1"
185+
def test_generate_execution_id():
186+
expected_matching_regex = "^[0-9a-zA-Z]{12}$"
187+
actual_execution_id = execution_id._generate_execution_id()
196188

197-
# Check that all message2 logs have exec-id-2
198-
for log in message2_logs:
199-
assert log["logging.googleapis.com/labels"]["execution_id"] == "exec-id-2"
189+
match = re.match(expected_matching_regex, actual_execution_id).group(0)
190+
assert match == actual_execution_id
200191

201192

202193
@pytest.mark.parametrize(
@@ -223,7 +214,7 @@ def make_request(client, message, exec_id):
223214
),
224215
],
225216
)
226-
def test_async_set_execution_context_headers(
217+
def test_set_execution_context_headers(
227218
headers, expected_execution_id, expected_span_id, should_generate
228219
):
229220
source = TEST_FUNCTIONS_DIR / "execution_id" / "async_main.py"
@@ -244,25 +235,64 @@ def test_async_set_execution_context_headers(
244235

245236

246237
@pytest.mark.asyncio
247-
async def test_crash_handler_without_context_sets_execution_id():
248-
"""Test that crash handler returns proper error response with crash header."""
249-
from functions_framework.aio import _crash_handler
250-
251-
# Create a mock request
252-
request = Mock()
253-
request.url.path = "/test"
254-
request.method = "POST"
255-
request.headers = {"Function-Execution-Id": "test-exec-id"}
238+
async def test_maintains_execution_id_for_concurrent_requests(monkeypatch, capsys):
239+
monkeypatch.setenv("LOG_EXECUTION_ID", "true")
256240

257-
# Create an exception
258-
exc = ValueError("Test error")
241+
expected_logs = (
242+
{
243+
"message": "message1",
244+
"logging.googleapis.com/labels": {"execution_id": "test-execution-id-1"},
245+
},
246+
{
247+
"message": "message2",
248+
"logging.googleapis.com/labels": {"execution_id": "test-execution-id-2"},
249+
},
250+
{
251+
"message": "message1",
252+
"logging.googleapis.com/labels": {"execution_id": "test-execution-id-1"},
253+
},
254+
{
255+
"message": "message2",
256+
"logging.googleapis.com/labels": {"execution_id": "test-execution-id-2"},
257+
},
258+
)
259259

260-
# Call crash handler
261-
response = await _crash_handler(request, exc)
260+
source = TEST_FUNCTIONS_DIR / "execution_id" / "async_main.py"
261+
target = "async_sleep"
262+
app = create_asgi_app(target, source)
263+
client = TestClient(app)
264+
loop = asyncio.get_event_loop()
265+
response1 = loop.run_in_executor(
266+
None,
267+
partial(
268+
client.post,
269+
"/",
270+
headers={
271+
"Content-Type": "application/json",
272+
"Function-Execution-Id": "test-execution-id-1",
273+
},
274+
json={"message": "message1"},
275+
),
276+
)
277+
response2 = loop.run_in_executor(
278+
None,
279+
partial(
280+
client.post,
281+
"/",
282+
headers={
283+
"Content-Type": "application/json",
284+
"Function-Execution-Id": "test-execution-id-2",
285+
},
286+
json={"message": "message2"},
287+
),
288+
)
289+
await asyncio.wait((response1, response2))
290+
record = capsys.readouterr()
291+
logs = record.err.strip().split("\n")
292+
logs_as_json = tuple(json.loads(log) for log in logs)
262293

263-
# Check response
264-
assert response.status_code == 500
265-
assert response.headers["X-Google-Status"] == "crash"
294+
sort_key = lambda d: d["message"]
295+
assert sorted(logs_as_json, key=sort_key) == sorted(expected_logs, key=sort_key)
266296

267297

268298
def test_async_decorator_with_sync_function():
@@ -285,22 +315,3 @@ def sync_func(request):
285315
result = wrapped(request)
286316

287317
assert result == {"status": "ok"}
288-
289-
290-
def test_sync_function_called_from_async_context():
291-
"""Test that a sync function works correctly when called from async ASGI app."""
292-
source = TEST_FUNCTIONS_DIR / "execution_id" / "async_main.py"
293-
target = "sync_function_in_async_context"
294-
app = create_asgi_app(target, source)
295-
client = TestClient(app)
296-
resp = client.post(
297-
"/",
298-
headers={
299-
"Function-Execution-Id": TEST_EXECUTION_ID,
300-
"Content-Type": "application/json",
301-
},
302-
)
303-
304-
result = resp.json()
305-
assert result["execution_id"] == TEST_EXECUTION_ID
306-
assert result["type"] == "sync"

0 commit comments

Comments
 (0)