Skip to content

Commit ddf423f

Browse files
Copilotswathipilannatisch
authored
Add comprehensive thread and coroutine safety documentation and examples for Event Hubs and Service Bus (#41463)
* Initial plan for issue * Add thread safety documentation and concurrent sending examples for Event Hubs and Service Bus Co-authored-by: swathipil <[email protected]> * Address PR review feedback: update documentation samples and remove redundant concurrent functions Co-authored-by: swathipil <[email protected]> * Fix import order: move standard library imports above Azure SDK imports Co-authored-by: swathipil <[email protected]> * Fix indentation of Note lines in EventHub and ServiceBus samples README Co-authored-by: swathipil <[email protected]> * Address PR review feedback: update thread safety wording, remove main functions from README snippets, change 'sent batch' to 'sent messages' Co-authored-by: annatisch <[email protected]> * Address final review comments: update ServiceBus thread safety wording and remove main functions from README snippets Co-authored-by: swathipil <[email protected]> * Add note about preferring native async APIs over ThreadPoolExecutor across sync samples and documentation Co-authored-by: swathipil <[email protected]> * change send samples to conn str auth to work around CI identity auth failure and test concurrency samples --------- Co-authored-by: copilot-swe-agent[bot] <[email protected]> Co-authored-by: swathipil <[email protected]> Co-authored-by: annatisch <[email protected]> Co-authored-by: swathipil <[email protected]>
1 parent c7634e4 commit ddf423f

File tree

10 files changed

+435
-6
lines changed

10 files changed

+435
-6
lines changed

sdk/eventhub/azure-eventhub/README.md

Lines changed: 68 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -96,9 +96,75 @@ Also, the concepts for AMQP are well documented in [OASIS Advanced Messaging Que
9696

9797
### Thread safety
9898

99-
We do not guarantee that the EventHubProducerClient or EventHubConsumerClient are thread-safe. We do not recommend reusing these instances across threads. It is up to the running application to use these classes in a thread-safe manner.
99+
We do not guarantee that the EventHubProducerClient or EventHubConsumerClient are thread-safe or coroutine-safe. We do not recommend reusing these instances across threads or sharing them between coroutines. It is up to the running application to use these classes in a concurrency-safe manner.
100100

101-
The data model type, `EventDataBatch` is not thread-safe. It should not be shared across threads nor used concurrently with client methods.
101+
The data model type, `EventDataBatch` is not thread-safe or coroutine-safe. It should not be shared across threads nor used concurrently with client methods.
102+
103+
For scenarios requiring concurrent sending from multiple threads, ensure proper thread-safety management using mechanisms like threading.Lock(). **Note:** Native async APIs should be used instead of running in a ThreadPoolExecutor, if possible.
104+
```python
105+
import threading
106+
from concurrent.futures import ThreadPoolExecutor
107+
from azure.eventhub import EventHubProducerClient, EventData
108+
from azure.identity import DefaultAzureCredential
109+
110+
EVENTHUB_NAMESPACE = "<your-namespace>.servicebus.windows.net"
111+
EVENTHUB_NAME = "<your-eventhub-name>"
112+
113+
# Create a global lock
114+
producer_lock = threading.Lock()
115+
116+
def send_batch(producer_id, producer):
117+
with producer_lock:
118+
event_data_batch = producer.create_batch()
119+
for i in range(10):
120+
event_data_batch.add(EventData(f"Message {i} from producer {producer_id}"))
121+
producer.send_batch(event_data_batch)
122+
print(f"Producer {producer_id} sent batch.")
123+
124+
credential = DefaultAzureCredential()
125+
producer = EventHubProducerClient(
126+
fully_qualified_namespace=EVENTHUB_NAMESPACE,
127+
eventhub_name=EVENTHUB_NAME,
128+
credential=credential
129+
)
130+
131+
with producer:
132+
with ThreadPoolExecutor(max_workers=5) as executor:
133+
for i in range(5): # Launch 5 threads
134+
executor.submit(send_batch, i, producer)
135+
```
136+
137+
For scenarios requiring concurrent sending in asyncio applications, ensure proper coroutine-safety management using mechanisms like asyncio.Lock()
138+
```python
139+
import asyncio
140+
from azure.eventhub.aio import EventHubProducerClient
141+
from azure.eventhub import EventData
142+
from azure.identity.aio import DefaultAzureCredential
143+
144+
EVENTHUB_NAMESPACE = "<your-namespace>.servicebus.windows.net"
145+
EVENTHUB_NAME = "<your-eventhub-name>"
146+
147+
# Shared lock for coroutine-safe access
148+
producer_lock = asyncio.Lock()
149+
150+
async def send_batch(producer_id, producer):
151+
async with producer_lock:
152+
event_data_batch = await producer.create_batch()
153+
for i in range(10):
154+
event_data_batch.add(EventData(f"Message {i} from producer {producer_id}"))
155+
await producer.send_batch(event_data_batch)
156+
print(f"Producer {producer_id} sent batch.")
157+
158+
credential = DefaultAzureCredential()
159+
producer = EventHubProducerClient(
160+
fully_qualified_namespace=EVENTHUB_NAMESPACE,
161+
eventhub_name=EVENTHUB_NAME,
162+
credential=credential
163+
)
164+
165+
async with producer:
166+
await asyncio.gather(*(send_batch(i, producer) for i in range(5)))
167+
```
102168

103169
## Examples
104170

sdk/eventhub/azure-eventhub/samples/README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ Both [sync version](https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/
2424
- Send event data batch to a specific partition determined by partition key
2525
- Send event data batch to a specific partition by partition id
2626
- Send event data batch with customized properties
27+
- Send events concurrently with proper thread/coroutine safety practices
28+
- **Note**: EventHub clients are not thread-safe or coroutine-safe
2729

2830
- [send_stream.py](https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/eventhub/azure-eventhub/samples/sync_samples/send_stream.py) ([async version](https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/eventhub/azure-eventhub/samples/async_samples/send_stream_async.py)) - Examples to do streaming sending:
2931
- Send in a stream

sdk/eventhub/azure-eventhub/samples/async_samples/send_async.py

Lines changed: 42 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@
77

88
"""
99
Examples to show sending events with different options to an Event Hub asynchronously.
10+
11+
WARNING: EventHubProducerClient and EventDataBatch are not coroutine-safe.
12+
Do not share these instances between coroutines without proper coroutine-safe management using mechanisms like asyncio.Lock.
1013
"""
1114

1215
import time
@@ -87,6 +90,34 @@ async def send_event_data_list(producer):
8790
print("Sending error: ", eh_err)
8891

8992

93+
94+
async def send_concurrent_with_shared_client_and_lock():
95+
"""
96+
Example showing concurrent sending with a shared client using asyncio.Lock.
97+
"""
98+
send_lock = asyncio.Lock()
99+
100+
producer = EventHubProducerClient.from_connection_string(
101+
conn_str=CONNECTION_STR,
102+
eventhub_name=EVENTHUB_NAME,
103+
)
104+
105+
async def send_with_lock(task_id):
106+
try:
107+
# Use lock to ensure coroutine-safe sending
108+
async with send_lock:
109+
batch = await producer.create_batch()
110+
batch.add(EventData(f"Synchronized message from coroutine {task_id}"))
111+
await producer.send_batch(batch)
112+
print(f"Coroutine {task_id} sent synchronized message successfully")
113+
except Exception as e:
114+
print(f"Coroutine {task_id} failed: {e}")
115+
116+
async with producer:
117+
# Use asyncio.gather to run coroutines concurrently with lock synchronization
118+
await asyncio.gather(*[send_with_lock(i) for i in range(3)])
119+
120+
90121
async def run():
91122

92123
producer = EventHubProducerClient.from_connection_string(
@@ -102,6 +133,14 @@ async def run():
102133
await send_event_data_list(producer)
103134

104135

105-
start_time = time.time()
106-
asyncio.run(run())
107-
print("Send messages in {} seconds.".format(time.time() - start_time))
136+
async def main():
137+
start_time = time.time()
138+
await run()
139+
print("Send messages in {} seconds.".format(time.time() - start_time))
140+
141+
142+
print("\nDemonstrating concurrent sending with shared client and locks...")
143+
await send_concurrent_with_shared_client_and_lock()
144+
145+
146+
asyncio.run(main())

sdk/eventhub/azure-eventhub/samples/sync_samples/send.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,17 @@
77

88
"""
99
Examples to show sending events with different options to an Event Hub partition.
10+
11+
WARNING: EventHubProducerClient and EventDataBatch are not thread-safe.
12+
Do not share these instances between threads without proper thread-safe management using mechanisms like threading.Lock.
13+
Note: Native async APIs should be used instead of running in a ThreadPoolExecutor, if possible.
1014
"""
1115

1216
import time
1317
import os
18+
import threading
19+
from concurrent.futures import ThreadPoolExecutor
20+
1421
from azure.eventhub import EventHubProducerClient, EventData
1522
from azure.eventhub.exceptions import EventHubError
1623

@@ -88,6 +95,37 @@ def send_event_data_list(producer):
8895
print("Sending error: ", eh_err)
8996

9097

98+
def send_concurrent_with_shared_client_and_lock():
99+
"""
100+
Example showing concurrent sending with a shared client using threading.Lock.
101+
Note: Native async APIs should be used instead of running in a ThreadPoolExecutor, if possible.
102+
"""
103+
send_lock = threading.Lock()
104+
105+
producer = EventHubProducerClient.from_connection_string(
106+
conn_str=CONNECTION_STR,
107+
eventhub_name=EVENTHUB_NAME,
108+
)
109+
110+
def send_with_lock(thread_id):
111+
try:
112+
# Use lock to ensure thread-safe sending
113+
with send_lock:
114+
batch = producer.create_batch()
115+
batch.add(EventData(f"Synchronized message from thread {thread_id}"))
116+
producer.send_batch(batch)
117+
print(f"Thread {thread_id} sent synchronized message successfully")
118+
except Exception as e:
119+
print(f"Thread {thread_id} failed: {e}")
120+
121+
with producer:
122+
with ThreadPoolExecutor(max_workers=3) as executor:
123+
futures = [executor.submit(send_with_lock, i) for i in range(3)]
124+
# Wait for all threads to complete
125+
for future in futures:
126+
future.result()
127+
128+
91129
producer = EventHubProducerClient.from_connection_string(
92130
conn_str=CONNECTION_STR,
93131
eventhub_name=EVENTHUB_NAME,
@@ -103,3 +141,7 @@ def send_event_data_list(producer):
103141
send_event_data_list(producer)
104142

105143
print("Send messages in {} seconds.".format(time.time() - start_time))
144+
145+
146+
print("\nDemonstrating concurrent sending with shared client and locks...")
147+
send_concurrent_with_shared_client_and_lock()

sdk/servicebus/azure-servicebus/README.md

Lines changed: 59 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,65 @@ To interact with these resources, one should be familiar with the following SDK
9595

9696
### Thread safety
9797

98-
We do not guarantee that the ServiceBusClient, ServiceBusSender, and ServiceBusReceiver are thread-safe. We do not recommend reusing these instances across threads. It is up to the running application to use these classes in a thread-safe manner.
98+
We do not guarantee that the ServiceBusClient, ServiceBusSender, and ServiceBusReceiver are thread-safe or coroutine-safe. We do not recommend reusing these instances across threads or sharing them between coroutines. It is up to the running application to use these classes in a concurrency-safe manner.
99+
100+
The data model type, `ServiceBusMessageBatch` is not thread-safe or coroutine-safe. It should not be shared across threads nor used concurrently with client methods.
101+
102+
For scenarios requiring concurrent sending from multiple threads, ensure proper thread-safety management using mechanisms like threading.Lock(). **Note:** Native async APIs should be used instead of running in a ThreadPoolExecutor, if possible.
103+
```python
104+
import threading
105+
from concurrent.futures import ThreadPoolExecutor
106+
from azure.servicebus import ServiceBusClient, ServiceBusMessage
107+
from azure.identity import DefaultAzureCredential
108+
109+
SERVICE_BUS_NAMESPACE = "<your-namespace>.servicebus.windows.net"
110+
QUEUE_NAME = "<your-queue-name>"
111+
112+
lock = threading.Lock()
113+
114+
def send_batch(sender_id, sender):
115+
with lock:
116+
messages = [ServiceBusMessage(f"Message {i} from sender {sender_id}") for i in range(10)]
117+
sender.send_messages(messages)
118+
print(f"Sender {sender_id} sent messages.")
119+
120+
credential = DefaultAzureCredential()
121+
client = ServiceBusClient(fully_qualified_namespace=SERVICE_BUS_NAMESPACE, credential=credential)
122+
123+
with client:
124+
sender = client.get_queue_sender(queue_name=QUEUE_NAME)
125+
with sender:
126+
with ThreadPoolExecutor(max_workers=5) as executor:
127+
for i in range(5):
128+
executor.submit(send_batch, i, sender)
129+
```
130+
131+
For scenarios requiring concurrent sending in asyncio applications, ensure proper coroutine-safety management using mechanisms like asyncio.Lock()
132+
```python
133+
import asyncio
134+
from azure.servicebus.aio import ServiceBusClient
135+
from azure.servicebus import ServiceBusMessage
136+
from azure.identity.aio import DefaultAzureCredential
137+
138+
SERVICE_BUS_NAMESPACE = "<your-namespace>.servicebus.windows.net"
139+
QUEUE_NAME = "<your-queue-name>"
140+
141+
lock = asyncio.Lock()
142+
143+
async def send_batch(sender_id, sender):
144+
async with lock:
145+
messages = [ServiceBusMessage(f"Message {i} from sender {sender_id}") for i in range(10)]
146+
await sender.send_messages(messages)
147+
print(f"Sender {sender_id} sent messages.")
148+
149+
credential = DefaultAzureCredential()
150+
client = ServiceBusClient(fully_qualified_namespace=SERVICE_BUS_NAMESPACE, credential=credential)
151+
152+
async with client:
153+
sender = client.get_queue_sender(queue_name=QUEUE_NAME)
154+
async with sender:
155+
await asyncio.gather(*(send_batch(i, sender) for i in range(5)))
156+
```
99157

100158
## Examples
101159

sdk/servicebus/azure-servicebus/samples/README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,13 @@ Both [sync version](https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/
1919
- [send_queue.py](https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/servicebus/azure-servicebus/samples/sync_samples/send_queue.py) ([async version](https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/servicebus/azure-servicebus/samples/async_samples/send_queue_async.py)) - Examples to send messages to a service bus queue:
2020
- From a connection string
2121
- Enabling Logging
22+
- Send messages concurrently with proper thread/coroutine safety practices
23+
- **Note**: ServiceBusClient, ServiceBusSender, and ServiceBusReceiver are not thread-safe or coroutine-safe
2224
- [send_topic.py](https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/servicebus/azure-servicebus/samples/sync_samples/send_topic.py) ([async version](https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/servicebus/azure-servicebus/samples/async_samples/send_topic_async.py)) - Examples to send messages to a service bus topic:
2325
- From a connection string
2426
- Enabling Logging
27+
- Send messages concurrently with proper thread/coroutine safety practices
28+
- **Note**: ServiceBusClient, ServiceBusSender, and ServiceBusReceiver are not thread-safe or coroutine-safe
2529
- [receive_queue.py](https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/servicebus/azure-servicebus/samples/sync_samples/receive_queue.py) ([async_version](https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/servicebus/azure-servicebus/samples/async_samples/receive_queue_async.py)) - Examples to receive messages from a service bus queue:
2630
- Receive messages
2731
- [receive_subscription.py](https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/servicebus/azure-servicebus/samples/sync_samples/receive_subscription.py) ([async_version](https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/servicebus/azure-servicebus/samples/async_samples/receive_subscription_async.py)) - Examples to receive messages from a service bus subscription:

sdk/servicebus/azure-servicebus/samples/async_samples/send_queue_async.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@
77

88
"""
99
Example to show sending message(s) to a Service Bus Queue asynchronously.
10+
11+
WARNING: ServiceBusClient, ServiceBusSender, and ServiceBusMessageBatch are not coroutine-safe.
12+
Do not share these instances between coroutines without proper coroutine-safe management using mechanisms like asyncio.Lock.
1013
"""
1114

1215
import os
@@ -41,6 +44,33 @@ async def send_batch_message(sender):
4144
await sender.send_messages(batch_message)
4245

4346

47+
48+
49+
async def send_concurrent_with_shared_client_and_lock():
50+
"""
51+
Example showing concurrent sending with a shared client using asyncio.Lock.
52+
"""
53+
send_lock = asyncio.Lock()
54+
55+
servicebus_client = ServiceBusClient(FULLY_QUALIFIED_NAMESPACE, DefaultAzureCredential())
56+
57+
async def send_with_lock(task_id):
58+
try:
59+
# Use lock to ensure coroutine-safe sending
60+
async with send_lock:
61+
sender = servicebus_client.get_queue_sender(queue_name=QUEUE_NAME)
62+
async with sender:
63+
message = ServiceBusMessage(f"Synchronized message from coroutine {task_id}")
64+
await sender.send_messages(message)
65+
print(f"Coroutine {task_id} sent synchronized message successfully")
66+
except Exception as e:
67+
print(f"Coroutine {task_id} failed: {e}")
68+
69+
async with servicebus_client:
70+
# Use asyncio.gather to run coroutines concurrently with lock synchronization
71+
await asyncio.gather(*[send_with_lock(i) for i in range(3)])
72+
73+
4474
async def main():
4575
credential = DefaultAzureCredential()
4676
servicebus_client = ServiceBusClient(FULLY_QUALIFIED_NAMESPACE, credential)
@@ -55,4 +85,9 @@ async def main():
5585
print("Send message is done.")
5686

5787

88+
89+
print("\nDemonstrating concurrent sending with shared client and locks...")
90+
await send_concurrent_with_shared_client_and_lock()
91+
92+
5893
asyncio.run(main())

0 commit comments

Comments
 (0)