Skip to content

Commit 54343c4

Browse files
authored
[Tracing] Add EventHubs live tracing tests (#34578)
Signed-off-by: Paul Van Eck <[email protected]>
1 parent bda301f commit 54343c4

File tree

9 files changed

+266
-91
lines changed

9 files changed

+266
-91
lines changed

sdk/core/azure-core-tracing-opentelemetry/dev_requirements.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,5 @@ opentelemetry-sdk<2.0.0,>=1.12.0
55
opentelemetry-instrumentation-requests>=0.32b0
66
requests
77
azure-storage-blob
8-
-e ../../servicebus/azure-servicebus
8+
../../servicebus/azure-servicebus
9+
../../eventhub/azure-eventhub

sdk/core/azure-core-tracing-opentelemetry/test-resources.bicep

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -73,14 +73,31 @@ resource serviceBusSubscription 'Microsoft.ServiceBus/namespaces/topics/subscrip
7373
properties: {}
7474
}
7575

76+
resource eventHubNamespace 'Microsoft.EventHub/namespaces@2021-11-01' = {
77+
name: '${baseName}eh'
78+
location: location
79+
sku: {
80+
capacity: 5
81+
name: 'Standard'
82+
tier: 'Standard'
83+
}
84+
85+
resource eventHub 'eventhubs' = {
86+
name: 'eh-${baseName}-hub'
87+
properties: {
88+
partitionCount: 3
89+
}
90+
}
91+
}
92+
93+
var authRuleResourceId = resourceId('Microsoft.ServiceBus/namespaces/authorizationRules', serviceBusNamespace.name, 'RootManageSharedAccessKey')
94+
var eventHubsAuthRuleResourceId = resourceId('Microsoft.EventHub/namespaces/authorizationRules', eventHubNamespace.name, 'RootManageSharedAccessKey')
7695

7796
var name = storageAccount.name
7897
var key = storageAccount.listKeys().keys[0].value
7998
var storageConnectionString = 'DefaultEndpointsProtocol=https;AccountName=${name};AccountKey=${key}'
8099
var serviceBusConnectionString = listkeys(authRuleResourceId, sbVersion).primaryConnectionString
81-
82-
83-
var authRuleResourceId = resourceId('Microsoft.ServiceBus/namespaces/authorizationRules', serviceBusNamespace.name, 'RootManageSharedAccessKey')
100+
var eventHubsConnectionString = listkeys(eventHubsAuthRuleResourceId, '2021-11-01').primaryConnectionString
84101

85102
output AZURE_STORAGE_ACCOUNT_NAME string = name
86103
output AZURE_STORAGE_ACCOUNT_KEY string = key
@@ -89,3 +106,5 @@ output AZURE_SERVICEBUS_CONNECTION_STRING string = serviceBusConnectionString
89106
output AZURE_SERVICEBUS_QUEUE_NAME string = serviceBusQueue.name
90107
output AZURE_SERVICEBUS_TOPIC_NAME string = serviceBusTopic.name
91108
output AZURE_SERVICEBUS_SUBSCRIPTION_NAME string = serviceBusSubscription.name
109+
output AZURE_EVENTHUB_CONNECTION_STRING string = eventHubsConnectionString
110+
output AZURE_EVENTHUB_NAME string = eventHubNamespace::eventHub.name

sdk/core/azure-core-tracing-opentelemetry/tests/conftest.py

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,23 +12,24 @@
1212
import pytest
1313

1414

15-
span_exporter = InMemorySpanExporter()
15+
class TracingTestHelper:
16+
def __init__(self, tracer, exporter):
17+
self.tracer = tracer
18+
self.exporter = exporter
1619

1720

18-
@pytest.fixture(scope="session")
19-
def tracer():
20-
processor = SimpleSpanProcessor(span_exporter)
21+
@pytest.fixture(scope="session", autouse=True)
22+
def enable_tracing():
2123
provider = TracerProvider()
22-
provider.add_span_processor(processor)
2324
trace.set_tracer_provider(provider)
2425

25-
return provider.get_tracer(__name__)
26-
2726

2827
@pytest.fixture(scope="function")
29-
def exporter():
30-
span_exporter.clear()
31-
return span_exporter
28+
def tracing_helper() -> TracingTestHelper:
29+
span_exporter = InMemorySpanExporter()
30+
processor = SimpleSpanProcessor(span_exporter)
31+
trace.get_tracer_provider().add_span_processor(processor)
32+
return TracingTestHelper(trace.get_tracer(__name__), span_exporter)
3233

3334

3435
@pytest.fixture(scope="session")
@@ -41,4 +42,6 @@ def config():
4142
"servicebus_queue_name": os.environ.get("AZURE_SERVICEBUS_QUEUE_NAME"),
4243
"servicebus_topic_name": os.environ.get("AZURE_SERVICEBUS_TOPIC_NAME"),
4344
"servicebus_subscription_name": os.environ.get("AZURE_SERVICEBUS_SUBSCRIPTION_NAME"),
45+
"eventhub_connection_string": os.environ.get("AZURE_EVENTHUB_CONNECTION_STRING"),
46+
"eventhub_name": os.environ.get("AZURE_EVENTHUB_NAME"),
4447
}
Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
# ------------------------------------
2+
# Copyright (c) Microsoft Corporation.
3+
# Licensed under the MIT License.
4+
# ------------------------------------
5+
from datetime import datetime
6+
import sys
7+
import threading
8+
9+
import pytest
10+
from azure.core.tracing.common import with_current_context
11+
from azure.eventhub import EventHubConsumerClient, EventHubProducerClient, EventData
12+
from opentelemetry.trace import SpanKind
13+
14+
15+
class TestEventHubsTracing:
16+
def _verify_span_attributes(self, *, span):
17+
# Ensure all attributes are set and have a value.
18+
for attr in span.attributes:
19+
assert span.attributes[attr] is not None and span.attributes[attr] != ""
20+
21+
def _verify_message(self, *, span, dest, server_address):
22+
assert span.name == "EventHubs.message"
23+
assert span.kind == SpanKind.PRODUCER
24+
self._verify_span_attributes(span=span)
25+
assert span.attributes["az.namespace"] == "Microsoft.EventHub"
26+
assert span.attributes["messaging.system"] == "eventhubs"
27+
assert span.attributes["messaging.destination.name"] == dest
28+
assert span.attributes["server.address"] == server_address
29+
30+
def _verify_send(self, *, span, dest, server_address, message_count):
31+
assert span.name == "EventHubs.send"
32+
assert span.kind == SpanKind.CLIENT
33+
self._verify_span_attributes(span=span)
34+
assert span.attributes["az.namespace"] == "Microsoft.EventHub"
35+
assert span.attributes["messaging.system"] == "eventhubs"
36+
assert span.attributes["messaging.destination.name"] == dest
37+
assert span.attributes["messaging.operation"] == "publish"
38+
assert span.attributes["server.address"] == server_address
39+
if message_count > 1:
40+
assert span.attributes["messaging.batch.message_count"] == message_count
41+
42+
def _verify_receive(self, *, span, dest, server_address, message_count):
43+
assert span.name == "EventHubs.receive"
44+
assert span.kind == SpanKind.CLIENT
45+
self._verify_span_attributes(span=span)
46+
assert span.attributes["az.namespace"] == "Microsoft.EventHub"
47+
assert span.attributes["messaging.system"] == "eventhubs"
48+
assert span.attributes["messaging.destination.name"] == dest
49+
assert span.attributes["messaging.operation"] == "receive"
50+
assert span.attributes["server.address"] == server_address
51+
for link in span.links:
52+
assert "enqueuedTime" in link.attributes
53+
if message_count > 1:
54+
assert span.attributes["messaging.batch.message_count"] == message_count
55+
56+
def _verify_process(self, *, span, dest, server_address, message_count):
57+
assert span.name == "EventHubs.process"
58+
assert span.kind == SpanKind.CONSUMER
59+
self._verify_span_attributes(span=span)
60+
assert span.attributes["az.namespace"] == "Microsoft.EventHub"
61+
assert span.attributes["messaging.system"] == "eventhubs"
62+
assert span.attributes["messaging.destination.name"] == dest
63+
assert span.attributes["messaging.operation"] == "process"
64+
assert span.attributes["server.address"] == server_address
65+
if message_count > 1:
66+
assert span.attributes["messaging.batch.message_count"] == message_count
67+
68+
@pytest.mark.live_test_only
69+
@pytest.mark.skipif(sys.platform.startswith("darwin"), reason="threading issues on mac CI")
70+
def test_eventhubs_client_tracing(self, config, tracing_helper):
71+
72+
connection_string = config["eventhub_connection_string"]
73+
eventhub_name = config["eventhub_name"]
74+
75+
producer_client = EventHubProducerClient.from_connection_string(
76+
conn_str=connection_string,
77+
eventhub_name=eventhub_name,
78+
)
79+
80+
consumer_client = EventHubConsumerClient.from_connection_string(
81+
conn_str=connection_string,
82+
consumer_group="$Default",
83+
eventhub_name=eventhub_name,
84+
)
85+
86+
with tracing_helper.tracer.start_as_current_span(name="root"):
87+
88+
current_date = datetime.now()
89+
90+
with producer_client:
91+
92+
# Send batch of events
93+
event_data_batch = producer_client.create_batch()
94+
event_data_batch.add(EventData("First message inside an EventDataBatch"))
95+
event_data_batch.add(EventData("Second message inside an EventDataBatch"))
96+
producer_client.send_batch(event_data_batch)
97+
98+
send_spans = tracing_helper.exporter.get_finished_spans()
99+
100+
# We expect 3 spans to have finished: 1 send spans, and 2 message spans.
101+
assert len(send_spans) == 3
102+
103+
server_address = producer_client._address.hostname
104+
dest_name = producer_client._address.path
105+
106+
# Verify the spans from the batch send.
107+
self._verify_message(span=send_spans[0], dest=dest_name, server_address=server_address)
108+
self._verify_message(span=send_spans[1], dest=dest_name, server_address=server_address)
109+
self._verify_send(span=send_spans[2], dest=dest_name, server_address=server_address, message_count=2)
110+
111+
# Verify span links from batch send.
112+
assert len(send_spans[2].links) == 2
113+
link = send_spans[2].links[0]
114+
assert link.context.span_id == send_spans[0].context.span_id
115+
assert link.context.trace_id == send_spans[0].context.trace_id
116+
117+
link = send_spans[2].links[1]
118+
assert link.context.span_id == send_spans[1].context.span_id
119+
assert link.context.trace_id == send_spans[1].context.trace_id
120+
121+
tracing_helper.exporter.clear()
122+
123+
def on_event_batch(partition_context, event_batch):
124+
pass
125+
126+
# Receive batch of events.
127+
worker = threading.Thread(
128+
target=with_current_context(consumer_client.receive_batch),
129+
args=(on_event_batch,),
130+
kwargs={"starting_position": current_date},
131+
)
132+
worker.daemon = True
133+
worker.start()
134+
worker.join(timeout=3)
135+
136+
receive_spans = tracing_helper.exporter.get_finished_spans()
137+
138+
# We expect 2 spans to have finished: 1 receive span and 1 process span.
139+
assert len(receive_spans) == 2
140+
141+
self._verify_receive(span=receive_spans[0], dest=dest_name, server_address=server_address, message_count=2)
142+
self._verify_process(span=receive_spans[1], dest=dest_name, server_address=server_address, message_count=2)
143+
144+
# Verify receive span links.
145+
assert len(receive_spans[0].links) == 2
146+
assert receive_spans[0].links[0].context.span_id == send_spans[0].context.span_id
147+
assert receive_spans[0].links[1].context.span_id == send_spans[1].context.span_id
148+
149+
# Verify process span links.
150+
assert len(receive_spans[1].links) == 2
151+
assert receive_spans[1].links[0].context.span_id == send_spans[0].context.span_id
152+
assert receive_spans[1].links[1].context.span_id == send_spans[1].context.span_id

sdk/core/azure-core-tracing-opentelemetry/tests/test_schema.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@
1111

1212

1313
class TestOpenTelemetrySchema:
14-
def test_latest_schema_attributes_renamed(self, tracer):
15-
with tracer.start_as_current_span("Root", kind=OpenTelemetrySpanKind.CLIENT) as parent:
14+
def test_latest_schema_attributes_renamed(self, tracing_helper):
15+
with tracing_helper.tracer.start_as_current_span("Root", kind=OpenTelemetrySpanKind.CLIENT) as parent:
1616
wrapped_class = OpenTelemetrySpan(span=parent)
1717
schema_version = OpenTelemetrySchema.get_latest_version()
1818
attribute_mappings = OpenTelemetrySchema.get_attribute_mappings(schema_version)
@@ -30,8 +30,8 @@ def test_latest_schema_attributes_renamed(self, tracer):
3030
# Check that original attribute is not present.
3131
assert wrapped_class.span_instance.attributes.get(key) is None
3232

33-
def test_latest_schema_attributes_not_renamed(self, tracer):
34-
with tracer.start_as_current_span("Root", kind=OpenTelemetrySpanKind.CLIENT) as parent:
33+
def test_latest_schema_attributes_not_renamed(self, tracing_helper):
34+
with tracing_helper.tracer.start_as_current_span("Root", kind=OpenTelemetrySpanKind.CLIENT) as parent:
3535
wrapped_class = OpenTelemetrySpan(span=parent)
3636

3737
wrapped_class.add_attribute("foo", "bar")

sdk/core/azure-core-tracing-opentelemetry/tests/test_servicebus_live.py

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -60,12 +60,12 @@ def _verify_complete(self, *, span, dest, server_address):
6060
assert span.attributes["messaging.destination.name"] == dest
6161

6262
@pytest.mark.live_test_only
63-
def test_servicebus_client_tracing_queue(self, config, exporter, tracer):
63+
def test_servicebus_client_tracing_queue(self, config, tracing_helper):
6464
connection_string = config["servicebus_connection_string"]
6565
queue_name = config["servicebus_queue_name"]
6666
client = ServiceBusClient.from_connection_string(connection_string)
6767

68-
with tracer.start_as_current_span(name="root"):
68+
with tracing_helper.tracer.start_as_current_span(name="root"):
6969
with client.get_queue_sender(queue_name) as sender:
7070

7171
# Sending a single message
@@ -77,7 +77,7 @@ def test_servicebus_client_tracing_queue(self, config, exporter, tracer):
7777
message_batch.add_message(ServiceBusMessage("Second batch foo message"))
7878
sender.send_messages(message_batch)
7979

80-
send_spans = exporter.get_finished_spans()
80+
send_spans = tracing_helper.exporter.get_finished_spans()
8181
server_address = sender.fully_qualified_namespace
8282

8383
# We expect 5 spans to have finished: 2 send spans, and 3 message spans.
@@ -107,7 +107,7 @@ def test_servicebus_client_tracing_queue(self, config, exporter, tracer):
107107
assert link.context.span_id == send_spans[3].context.span_id
108108
assert link.context.trace_id == send_spans[3].context.trace_id
109109

110-
exporter.clear()
110+
tracing_helper.exporter.clear()
111111

112112
# Receive all the sent spans.
113113
receiver = client.get_queue_receiver(queue_name=queue_name)
@@ -117,7 +117,7 @@ def test_servicebus_client_tracing_queue(self, config, exporter, tracer):
117117
assert "foo" in str(msg)
118118
receiver.complete_message(msg)
119119

120-
receive_spans = exporter.get_finished_spans()
120+
receive_spans = tracing_helper.exporter.get_finished_spans()
121121

122122
# We expect 4 spans to have finished: 1 receive span, and 3 settlement spans.
123123
assert len(receive_spans) == 4
@@ -135,13 +135,13 @@ def test_servicebus_client_tracing_queue(self, config, exporter, tracer):
135135
self._verify_complete(span=receive_spans[3], dest=queue_name, server_address=server_address)
136136

137137
@pytest.mark.live_test_only
138-
def test_servicebus_client_tracing_topic(self, config, exporter, tracer):
138+
def test_servicebus_client_tracing_topic(self, config, tracing_helper):
139139
connection_string = config["servicebus_connection_string"]
140140
topic_name = config["servicebus_topic_name"]
141141
subscription_name = config["servicebus_subscription_name"]
142142
client = ServiceBusClient.from_connection_string(connection_string)
143143

144-
with tracer.start_as_current_span(name="root"):
144+
with tracing_helper.tracer.start_as_current_span(name="root"):
145145
with client.get_topic_sender(topic_name) as sender:
146146

147147
# Sending a single message
@@ -153,7 +153,7 @@ def test_servicebus_client_tracing_topic(self, config, exporter, tracer):
153153
message_batch.add_message(ServiceBusMessage("Second batch foo message"))
154154
sender.send_messages(message_batch)
155155

156-
send_spans = exporter.get_finished_spans()
156+
send_spans = tracing_helper.exporter.get_finished_spans()
157157
server_address = sender.fully_qualified_namespace
158158

159159
# We expect 5 spans to have finished: 2 send spans, and 3 message spans.
@@ -183,7 +183,7 @@ def test_servicebus_client_tracing_topic(self, config, exporter, tracer):
183183
assert link.context.span_id == send_spans[3].context.span_id
184184
assert link.context.trace_id == send_spans[3].context.trace_id
185185

186-
exporter.clear()
186+
tracing_helper.exporter.clear()
187187

188188
# Receive all the sent spans.
189189
receiver = client.get_subscription_receiver(topic_name, subscription_name)
@@ -193,7 +193,7 @@ def test_servicebus_client_tracing_topic(self, config, exporter, tracer):
193193
assert "foo" in str(msg)
194194
receiver.complete_message(msg)
195195

196-
receive_spans = exporter.get_finished_spans()
196+
receive_spans = tracing_helper.exporter.get_finished_spans()
197197

198198
# We expect 4 spans to have finished: 1 receive span, and 3 settlement spans.
199199
assert len(receive_spans) == 4

sdk/core/azure-core-tracing-opentelemetry/tests/test_storage_live.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,14 @@
1111

1212
class TestStorageTracing:
1313
@pytest.mark.live_test_only
14-
def test_blob_service_client_tracing(self, config, exporter, tracer):
14+
def test_blob_service_client_tracing(self, config, tracing_helper):
1515
connection_string = config["storage_connection_string"]
1616
client = BlobServiceClient.from_connection_string(connection_string)
1717

18-
with tracer.start_as_current_span(name="root") as parent:
18+
with tracing_helper.tracer.start_as_current_span(name="root") as parent:
1919
client.get_service_properties()
2020

21-
spans = exporter.get_finished_spans()
21+
spans = tracing_helper.exporter.get_finished_spans()
2222

2323
# We expect 3 spans, one for the root span, one for the method call, and one for the HTTP request.
2424
assert len(spans) == 3

0 commit comments

Comments
 (0)