Skip to content

Commit c92af09

Browse files
feat(azure_functions): add tracing support for service bus triggers (#13324)
This PR adds tracing support for Service Bus triggers (queues and topics) in Azure Functions. ### Additional Notes: - Span Kind set to `consumer` for Service Bus triggers - Refactors Azure Functions integration with `wrap_function_with_tracing` method so additional trigger [decorators](https://learn.microsoft.com/en-us/python/api/azure-functions/azure.functions.decorators?view=azure-python) can be added with the same approach - Make [manual request](https://learn.microsoft.com/en-us/azure/azure-functions/functions-manually-run-non-http?tabs=azure-portal) to service bus functions in test ## Checklist - [x] PR author has checked that all the criteria below are met - The PR description includes an overview of the change - The PR description articulates the motivation for the change - The change includes tests OR the PR description describes a testing strategy - The PR description notes risks associated with the change, if any - Newly-added code is easy to change - The change follows the [library release note guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html) - The change includes or references documentation updates if necessary - Backport labels are set (if [applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)) ## Reviewer Checklist - [x] Reviewer has checked that all the criteria below are met - Title is accurate - All changes are related to the pull request's stated goal - Avoids breaking [API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces) changes - Testing strategy adequately addresses listed risks - Newly-added code is easy to change - Release note makes sense to a user of the library - If necessary, author has acknowledged and discussed the performance implications of this PR as reported in the benchmarks PR comment - Backport labels are set in a manner that is consistent with the [release branch maintenance policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)
1 parent 8195942 commit c92af09

File tree

9 files changed

+200
-63
lines changed

9 files changed

+200
-63
lines changed

ddtrace/_trace/trace_handlers.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -823,7 +823,7 @@ def _set_span_pointer(span: "Span", span_pointer_description: _SpanPointerDescri
823823
)
824824

825825

826-
def _set_azure_function_tags(span, azure_functions_config, function_name, trigger, span_kind=SpanKind.INTERNAL):
826+
def _set_azure_function_tags(span, azure_functions_config, function_name, trigger, span_kind):
827827
span.set_tag_str(COMPONENT, azure_functions_config.integration_name)
828828
span.set_tag_str(SPAN_KIND, span_kind)
829829
span.set_tag_str("aas.function.name", function_name) # codespell:ignore
@@ -857,9 +857,9 @@ def _on_azure_functions_start_response(ctx, azure_functions_config, res, functio
857857
)
858858

859859

860-
def _on_azure_functions_trigger_span_modifier(ctx, azure_functions_config, function_name, trigger):
860+
def _on_azure_functions_trigger_span_modifier(ctx, azure_functions_config, function_name, trigger, span_kind):
861861
span = ctx.get_item("trigger_span")
862-
_set_azure_function_tags(span, azure_functions_config, function_name, trigger)
862+
_set_azure_function_tags(span, azure_functions_config, function_name, trigger, span_kind)
863863

864864

865865
def listen():
@@ -968,6 +968,7 @@ def listen():
968968
"rq.job.perform",
969969
"rq.job.fetch_many",
970970
"azure.functions.patched_route_request",
971+
"azure.functions.patched_service_bus",
971972
"azure.functions.patched_timer",
972973
):
973974
core.on(f"context.started.start_span.{context_name}", _start_span)

ddtrace/contrib/internal/azure_functions/patch.py

Lines changed: 59 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,18 @@
1-
import functools
2-
import inspect
31
import os
42

53
import azure.functions as azure_functions
64
from wrapt import wrap_function_wrapper as _w
75

86
from ddtrace import config
97
from ddtrace.contrib.internal.trace_utils import unwrap as _u
10-
from ddtrace.internal import core
8+
from ddtrace.ext import SpanKind
119
from ddtrace.internal.schema import schematize_service_name
1210
from ddtrace.internal.utils.formats import asbool
1311
from ddtrace.trace import Pin
1412

1513
from .utils import create_context
1614
from .utils import get_function_name
15+
from .utils import wrap_function_with_tracing
1716

1817

1918
config._add(
@@ -42,6 +41,8 @@ def patch():
4241
Pin().onto(azure_functions.FunctionApp)
4342
_w("azure.functions", "FunctionApp.function_name", _patched_function_name)
4443
_w("azure.functions", "FunctionApp.route", _patched_route)
44+
_w("azure.functions", "FunctionApp.service_bus_queue_trigger", _patched_service_bus_trigger)
45+
_w("azure.functions", "FunctionApp.service_bus_topic_trigger", _patched_service_bus_trigger)
4546
_w("azure.functions", "FunctionApp.timer_trigger", _patched_timer_trigger)
4647

4748

@@ -61,39 +62,49 @@ def _patched_route(wrapped, instance, args, kwargs):
6162
def _wrapper(func):
6263
function_name = get_function_name(pin, instance, func)
6364

64-
if inspect.iscoroutinefunction(func):
65-
66-
@functools.wraps(func)
67-
async def async_wrap_function(*args, **kwargs):
68-
req = kwargs.get(trigger_arg_name)
69-
with create_context("azure.functions.patched_route_request", pin, headers=req.headers) as ctx, ctx.span:
70-
ctx.set_item("req_span", ctx.span)
71-
core.dispatch("azure.functions.request_call_modifier", (ctx, config.azure_functions, req))
72-
res = None
73-
try:
74-
res = await func(*args, **kwargs)
75-
return res
76-
finally:
77-
core.dispatch(
78-
"azure.functions.start_response", (ctx, config.azure_functions, res, function_name, trigger)
79-
)
80-
81-
return wrapped(*args, **kwargs)(async_wrap_function)
82-
83-
@functools.wraps(func)
84-
def wrap_function(*args, **kwargs):
65+
def context_factory(kwargs):
8566
req = kwargs.get(trigger_arg_name)
86-
with create_context("azure.functions.patched_route_request", pin, headers=req.headers) as ctx, ctx.span:
87-
ctx.set_item("req_span", ctx.span)
88-
core.dispatch("azure.functions.request_call_modifier", (ctx, config.azure_functions, req))
89-
res = None
90-
try:
91-
res = func(*args, **kwargs)
92-
return res
93-
finally:
94-
core.dispatch(
95-
"azure.functions.start_response", (ctx, config.azure_functions, res, function_name, trigger)
96-
)
67+
return create_context("azure.functions.patched_route_request", pin, headers=req.headers)
68+
69+
def pre_dispatch(ctx, kwargs):
70+
req = kwargs.get(trigger_arg_name)
71+
ctx.set_item("req_span", ctx.span)
72+
return ("azure.functions.request_call_modifier", (ctx, config.azure_functions, req))
73+
74+
def post_dispatch(ctx, res):
75+
return ("azure.functions.start_response", (ctx, config.azure_functions, res, function_name, trigger))
76+
77+
wrap_function = wrap_function_with_tracing(
78+
func, context_factory, pre_dispatch=pre_dispatch, post_dispatch=post_dispatch
79+
)
80+
81+
return wrapped(*args, **kwargs)(wrap_function)
82+
83+
return _wrapper
84+
85+
86+
def _patched_service_bus_trigger(wrapped, instance, args, kwargs):
87+
trigger = "ServiceBus"
88+
89+
pin = Pin.get_from(instance)
90+
if not pin or not pin.enabled():
91+
return wrapped(*args, **kwargs)
92+
93+
def _wrapper(func):
94+
function_name = get_function_name(pin, instance, func)
95+
96+
def context_factory(kwargs):
97+
resource_name = f"{trigger} {function_name}"
98+
return create_context("azure.functions.patched_service_bus", pin, resource_name)
99+
100+
def pre_dispatch(ctx, kwargs):
101+
ctx.set_item("trigger_span", ctx.span)
102+
return (
103+
"azure.functions.trigger_call_modifier",
104+
(ctx, config.azure_functions, function_name, trigger, SpanKind.CONSUMER),
105+
)
106+
107+
wrap_function = wrap_function_with_tracing(func, context_factory, pre_dispatch=pre_dispatch)
97108

98109
return wrapped(*args, **kwargs)(wrap_function)
99110

@@ -109,31 +120,19 @@ def _patched_timer_trigger(wrapped, instance, args, kwargs):
109120

110121
def _wrapper(func):
111122
function_name = get_function_name(pin, instance, func)
112-
resource_name = f"{trigger} {function_name}"
113-
114-
if inspect.iscoroutinefunction(func):
115-
116-
@functools.wraps(func)
117-
async def async_wrap_function(*args, **kwargs):
118-
with create_context("azure.functions.patched_timer", pin, resource_name) as ctx, ctx.span:
119-
ctx.set_item("trigger_span", ctx.span)
120-
core.dispatch(
121-
"azure.functions.trigger_call_modifier",
122-
(ctx, config.azure_functions, function_name, trigger),
123-
)
124-
await func(*args, **kwargs)
125-
126-
return wrapped(*args, **kwargs)(async_wrap_function)
127-
128-
@functools.wraps(func)
129-
def wrap_function(*args, **kwargs):
130-
with create_context("azure.functions.patched_timer", pin, resource_name) as ctx, ctx.span:
131-
ctx.set_item("trigger_span", ctx.span)
132-
core.dispatch(
133-
"azure.functions.trigger_call_modifier",
134-
(ctx, config.azure_functions, function_name, trigger),
135-
)
136-
func(*args, **kwargs)
123+
124+
def context_factory(kwargs):
125+
resource_name = f"{trigger} {function_name}"
126+
return create_context("azure.functions.patched_timer", pin, resource_name)
127+
128+
def pre_dispatch(ctx, kwargs):
129+
ctx.set_item("trigger_span", ctx.span)
130+
return (
131+
"azure.functions.trigger_call_modifier",
132+
(ctx, config.azure_functions, function_name, trigger, SpanKind.INTERNAL),
133+
)
134+
135+
wrap_function = wrap_function_with_tracing(func, context_factory, pre_dispatch=pre_dispatch)
137136

138137
return wrapped(*args, **kwargs)(wrap_function)
139138

ddtrace/contrib/internal/azure_functions/utils.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
import functools
2+
import inspect
3+
14
from ddtrace import config
25
from ddtrace.contrib.internal.trace_utils import int_service
36
from ddtrace.ext import SpanTypes
@@ -30,3 +33,39 @@ def get_function_name(pin, instance, func):
3033
else:
3134
function_name = func.__name__
3235
return function_name
36+
37+
38+
def wrap_function_with_tracing(func, context_factory, pre_dispatch=None, post_dispatch=None):
39+
if inspect.iscoroutinefunction(func):
40+
41+
@functools.wraps(func)
42+
async def async_wrapper(*args, **kwargs):
43+
with context_factory(kwargs) as ctx, ctx.span:
44+
if pre_dispatch:
45+
core.dispatch(*pre_dispatch(ctx, kwargs))
46+
47+
res = None
48+
try:
49+
res = await func(*args, **kwargs)
50+
return res
51+
finally:
52+
if post_dispatch:
53+
core.dispatch(*post_dispatch(ctx, res))
54+
55+
return async_wrapper
56+
57+
@functools.wraps(func)
58+
def wrapper(*args, **kwargs):
59+
with context_factory(kwargs) as ctx, ctx.span:
60+
if pre_dispatch:
61+
core.dispatch(*pre_dispatch(ctx, kwargs))
62+
63+
res = None
64+
try:
65+
res = func(*args, **kwargs)
66+
return res
67+
finally:
68+
if post_dispatch:
69+
core.dispatch(*post_dispatch(ctx, res))
70+
71+
return wrapper
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
features:
2+
- |
3+
azure_functions: This introduces tracing support for service bus triggers.

tests/contrib/azure_functions/azure_function_app/function_app.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,20 @@ def http_get_child(req: func.HttpRequest) -> func.HttpResponse:
6464
return func.HttpResponse("Hello Datadog!")
6565

6666

67+
@app.function_name(name="servicebusqueue")
68+
@app.service_bus_queue_trigger(arg_name="msg", queue_name="queue.1", connection="CONNECTION_SETTING")
69+
def service_bus_queue(msg: func.ServiceBusMessage):
70+
pass
71+
72+
73+
@app.function_name(name="servicebustopic")
74+
@app.service_bus_topic_trigger(
75+
arg_name="msg", topic_name="topic.1", connection="CONNECTION_SETTING", subscription_name="subscription.1"
76+
)
77+
def service_bus_topic(msg: func.ServiceBusMessage):
78+
pass
79+
80+
6781
@app.timer_trigger(schedule="0 0 0 1 1 *", arg_name="timer")
6882
def timer(timer: func.TimerRequest) -> None:
6983
pass

tests/contrib/azure_functions/azure_function_app/local.settings.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
"FUNCTIONS_EXTENSION_VERSION": "~4",
66
"AzureWebJobsFeatureFlags": "EnableWorkerIndexing",
77
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
8+
"CONNECTION_SETTING": "Endpoint=sb://localhost;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=SAS_KEY_VALUE;UseDevelopmentEmulator=true;",
89
"WEBSITE_SITE_NAME": "test-func"
910
}
1011
}

tests/contrib/azure_functions/test_azure_functions_snapshot.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,30 @@ def test_http_get_distributed_tracing(azure_functions_client: Client) -> None:
118118
assert azure_functions_client.get("/api/httpgetroot", headers=DEFAULT_HEADERS).status_code == 200
119119

120120

121+
@pytest.mark.snapshot
122+
def test_service_bus_queue(azure_functions_client: Client) -> None:
123+
assert (
124+
azure_functions_client.post(
125+
"/admin/functions/servicebusqueue",
126+
headers={"User-Agent": "python-httpx/x.xx.x", "Content-Type": "application/json"},
127+
data=json.dumps({"input": '{"msg": "test message"}'}),
128+
).status_code
129+
== 202
130+
)
131+
132+
133+
@pytest.mark.snapshot
134+
def test_service_bus_topic(azure_functions_client: Client) -> None:
135+
assert (
136+
azure_functions_client.post(
137+
"/admin/functions/servicebustopic",
138+
headers={"User-Agent": "python-httpx/x.xx.x", "Content-Type": "application/json"},
139+
data=json.dumps({"input": '{"msg": "test message"}'}),
140+
).status_code
141+
== 202
142+
)
143+
144+
121145
@pytest.mark.snapshot
122146
def test_timer(azure_functions_client: Client) -> None:
123147
assert (
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
[[
2+
{
3+
"name": "azure.functions.invoke",
4+
"service": "test-func",
5+
"resource": "ServiceBus servicebusqueue",
6+
"trace_id": 0,
7+
"span_id": 1,
8+
"parent_id": 0,
9+
"type": "serverless",
10+
"meta": {
11+
"_dd.p.dm": "-0",
12+
"_dd.p.tid": "6815246300000000",
13+
"aas.function.name": "servicebusqueue",
14+
"aas.function.trigger": "ServiceBus",
15+
"component": "azure_functions",
16+
"language": "python",
17+
"runtime-id": "0a56574a66cd46eb8288f2856b00d420",
18+
"span.kind": "consumer"
19+
},
20+
"metrics": {
21+
"_dd.top_level": 1,
22+
"_dd.tracer_kr": 1.0,
23+
"_sampling_priority_v1": 1,
24+
"process_id": 46647
25+
},
26+
"duration": 195625,
27+
"start": 1746216035448955969
28+
}]]
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
[[
2+
{
3+
"name": "azure.functions.invoke",
4+
"service": "test-func",
5+
"resource": "ServiceBus servicebustopic",
6+
"trace_id": 0,
7+
"span_id": 1,
8+
"parent_id": 0,
9+
"type": "serverless",
10+
"meta": {
11+
"_dd.p.dm": "-0",
12+
"_dd.p.tid": "6815246c00000000",
13+
"aas.function.name": "servicebustopic",
14+
"aas.function.trigger": "ServiceBus",
15+
"component": "azure_functions",
16+
"language": "python",
17+
"runtime-id": "39a73bcd6b804b0c9fc28bd839d213f1",
18+
"span.kind": "consumer"
19+
},
20+
"metrics": {
21+
"_dd.top_level": 1,
22+
"_dd.tracer_kr": 1.0,
23+
"_sampling_priority_v1": 1,
24+
"process_id": 47763
25+
},
26+
"duration": 220333,
27+
"start": 1746216044422172751
28+
}]]

0 commit comments

Comments
 (0)