Skip to content

Commit a76fdf5

Browse files
Copilotswathipil
andcommitted
Add thread safety documentation and concurrent sending examples for Event Hubs and Service Bus
Co-authored-by: swathipil <[email protected]>
1 parent f98c1f7 commit a76fdf5

File tree

10 files changed

+541
-6
lines changed

10 files changed

+541
-6
lines changed

sdk/eventhub/azure-eventhub/README.md

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -96,9 +96,42 @@ Also, the concepts for AMQP are well documented in [OASIS Advanced Messaging Que
9696

9797
### Thread safety
9898

99-
We do not guarantee that the EventHubProducerClient or EventHubConsumerClient are thread-safe. We do not recommend reusing these instances across threads. It is up to the running application to use these classes in a thread-safe manner.
99+
We do not guarantee that the EventHubProducerClient or EventHubConsumerClient are thread-safe or coroutine-safe. We do not recommend reusing these instances across threads or sharing them between coroutines. It is up to the running application to use these classes in a thread-safe manner.
100100

101-
The data model type, `EventDataBatch` is not thread-safe. It should not be shared across threads nor used concurrently with client methods.
101+
The data model type, `EventDataBatch` is not thread-safe or coroutine-safe. It should not be shared across threads nor used concurrently with client methods.
102+
103+
For scenarios requiring concurrent sending from multiple threads:
104+
```python
105+
import threading
106+
from azure.eventhub import EventHubProducerClient, EventData
107+
108+
# Use a lock to ensure only one thread sends at a time
109+
send_lock = threading.Lock()
110+
111+
def send_events_thread_safe(producer, events):
112+
with send_lock:
113+
batch = producer.create_batch()
114+
for event in events:
115+
batch.add(event)
116+
producer.send_batch(batch)
117+
```
118+
119+
For scenarios requiring concurrent sending in asyncio applications:
120+
```python
121+
import asyncio
122+
from azure.eventhub.aio import EventHubProducerClient
123+
from azure.eventhub import EventData
124+
125+
# Use a lock to ensure only one coroutine sends at a time
126+
send_lock = asyncio.Lock()
127+
128+
async def send_events_coroutine_safe(producer, events):
129+
async with send_lock:
130+
batch = await producer.create_batch()
131+
for event in events:
132+
batch.add(event)
133+
await producer.send_batch(batch)
134+
```
102135

103136
## Examples
104137

sdk/eventhub/azure-eventhub/samples/README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ Both [sync version](https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/
2424
- Send event data batch to a specific partition determined by partition key
2525
- Send event data batch to a specific partition by partition id
2626
- Send event data batch with customized properties
27+
- Send events concurrently with proper thread/coroutine safety practices
28+
- **Note**: EventHub clients are not thread-safe or coroutine-safe
2729

2830
- [send_stream.py](https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/eventhub/azure-eventhub/samples/sync_samples/send_stream.py) ([async version](https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/eventhub/azure-eventhub/samples/async_samples/send_stream_async.py)) - Examples to do streaming sending:
2931
- Send in a stream

sdk/eventhub/azure-eventhub/samples/async_samples/send_async.py

Lines changed: 73 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,11 @@
77

88
"""
99
Examples to show sending events with different options to an Event Hub asynchronously.
10+
11+
WARNING: EventHubProducerClient and EventDataBatch are not coroutine-safe!
12+
Do not share these instances between coroutines without proper synchronization.
13+
If you need to send from multiple coroutines, create separate client instances
14+
or use proper synchronization mechanisms like asyncio.Lock.
1015
"""
1116

1217
import time
@@ -88,6 +93,60 @@ async def send_event_data_list(producer):
8893
print("Sending error: ", eh_err)
8994

9095

96+
async def send_concurrent_with_separate_producers():
97+
"""
98+
Example showing coroutine-safe concurrent sending using separate producers.
99+
WARNING: Do NOT share EventHubProducerClient instances between coroutines!
100+
"""
101+
async def send_from_coroutine(task_id):
102+
# Create a separate producer for each coroutine - clients are NOT coroutine-safe
103+
producer = EventHubProducerClient(
104+
fully_qualified_namespace=FULLY_QUALIFIED_NAMESPACE,
105+
eventhub_name=EVENTHUB_NAME,
106+
credential=DefaultAzureCredential(),
107+
)
108+
try:
109+
async with producer:
110+
batch = await producer.create_batch()
111+
batch.add(EventData(f"Message from coroutine {task_id}"))
112+
await producer.send_batch(batch)
113+
print(f"Coroutine {task_id} sent message successfully")
114+
except Exception as e:
115+
print(f"Coroutine {task_id} failed: {e}")
116+
117+
# Use asyncio.gather to run coroutines concurrently
118+
await asyncio.gather(*[send_from_coroutine(i) for i in range(3)])
119+
120+
121+
async def send_concurrent_with_shared_client_and_lock():
122+
"""
123+
Example showing concurrent sending with a shared client using asyncio.Lock.
124+
This is less efficient than separate clients but demonstrates coroutine synchronization.
125+
"""
126+
send_lock = asyncio.Lock()
127+
128+
producer = EventHubProducerClient(
129+
fully_qualified_namespace=FULLY_QUALIFIED_NAMESPACE,
130+
eventhub_name=EVENTHUB_NAME,
131+
credential=DefaultAzureCredential(),
132+
)
133+
134+
async def send_with_lock(task_id):
135+
try:
136+
# Use lock to ensure coroutine-safe sending
137+
async with send_lock:
138+
batch = await producer.create_batch()
139+
batch.add(EventData(f"Synchronized message from coroutine {task_id}"))
140+
await producer.send_batch(batch)
141+
print(f"Coroutine {task_id} sent synchronized message successfully")
142+
except Exception as e:
143+
print(f"Coroutine {task_id} failed: {e}")
144+
145+
async with producer:
146+
# Use asyncio.gather to run coroutines concurrently with lock synchronization
147+
await asyncio.gather(*[send_with_lock(i) for i in range(3)])
148+
149+
91150
async def run():
92151

93152
producer = EventHubProducerClient(
@@ -104,6 +163,17 @@ async def run():
104163
await send_event_data_list(producer)
105164

106165

107-
start_time = time.time()
108-
asyncio.run(run())
109-
print("Send messages in {} seconds.".format(time.time() - start_time))
166+
async def main():
167+
start_time = time.time()
168+
await run()
169+
print("Send messages in {} seconds.".format(time.time() - start_time))
170+
171+
# Demonstrate concurrent sending
172+
print("\nDemonstrating concurrent sending with separate producers...")
173+
await send_concurrent_with_separate_producers()
174+
175+
print("\nDemonstrating concurrent sending with shared client and locks...")
176+
await send_concurrent_with_shared_client_and_lock()
177+
178+
179+
asyncio.run(main())

sdk/eventhub/azure-eventhub/samples/sync_samples/send.py

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,16 @@
77

88
"""
99
Examples to show sending events with different options to an Event Hub partition.
10+
11+
WARNING: EventHubProducerClient and EventDataBatch are not thread-safe!
12+
Do not share these instances across threads. If you need to send from multiple threads,
13+
create separate client instances for each thread or use proper synchronization mechanisms.
1014
"""
1115

1216
import time
1317
import os
18+
import threading
19+
from concurrent.futures import ThreadPoolExecutor
1420
from azure.eventhub import EventHubProducerClient, EventData
1521
from azure.eventhub.exceptions import EventHubError
1622
from azure.identity import DefaultAzureCredential
@@ -89,6 +95,67 @@ def send_event_data_list(producer):
8995
print("Sending error: ", eh_err)
9096

9197

98+
def send_concurrent_with_threads_safe(credentials):
99+
"""
100+
Example showing thread-safe concurrent sending using separate producers per thread.
101+
WARNING: Do NOT share EventHubProducerClient instances across threads!
102+
"""
103+
def send_from_thread(thread_id):
104+
# Create a separate producer for each thread - clients are NOT thread-safe
105+
producer = EventHubProducerClient(
106+
fully_qualified_namespace=FULLY_QUALIFIED_NAMESPACE,
107+
eventhub_name=EVENTHUB_NAME,
108+
credential=credentials,
109+
)
110+
try:
111+
with producer:
112+
batch = producer.create_batch()
113+
batch.add(EventData(f"Message from thread {thread_id}"))
114+
producer.send_batch(batch)
115+
print(f"Thread {thread_id} sent message successfully")
116+
except Exception as e:
117+
print(f"Thread {thread_id} failed: {e}")
118+
119+
# Use ThreadPoolExecutor to manage threads
120+
with ThreadPoolExecutor(max_workers=3) as executor:
121+
futures = [executor.submit(send_from_thread, i) for i in range(3)]
122+
# Wait for all threads to complete
123+
for future in futures:
124+
future.result()
125+
126+
127+
def send_concurrent_with_shared_client_and_lock():
128+
"""
129+
Example showing concurrent sending with a shared client using locks.
130+
This is less efficient than separate clients but demonstrates thread synchronization.
131+
"""
132+
send_lock = threading.Lock()
133+
134+
producer = EventHubProducerClient(
135+
fully_qualified_namespace=FULLY_QUALIFIED_NAMESPACE,
136+
eventhub_name=EVENTHUB_NAME,
137+
credential=DefaultAzureCredential(),
138+
)
139+
140+
def send_with_lock(thread_id):
141+
try:
142+
# Use lock to ensure thread-safe sending
143+
with send_lock:
144+
batch = producer.create_batch()
145+
batch.add(EventData(f"Synchronized message from thread {thread_id}"))
146+
producer.send_batch(batch)
147+
print(f"Thread {thread_id} sent synchronized message successfully")
148+
except Exception as e:
149+
print(f"Thread {thread_id} failed: {e}")
150+
151+
with producer:
152+
with ThreadPoolExecutor(max_workers=3) as executor:
153+
futures = [executor.submit(send_with_lock, i) for i in range(3)]
154+
# Wait for all threads to complete
155+
for future in futures:
156+
future.result()
157+
158+
92159
producer = EventHubProducerClient(
93160
fully_qualified_namespace=FULLY_QUALIFIED_NAMESPACE,
94161
eventhub_name=EVENTHUB_NAME,
@@ -105,3 +172,10 @@ def send_event_data_list(producer):
105172
send_event_data_list(producer)
106173

107174
print("Send messages in {} seconds.".format(time.time() - start_time))
175+
176+
# Demonstrate concurrent sending
177+
print("\nDemonstrating concurrent sending with separate producers...")
178+
send_concurrent_with_threads_safe(DefaultAzureCredential())
179+
180+
print("\nDemonstrating concurrent sending with shared client and locks...")
181+
send_concurrent_with_shared_client_and_lock()

sdk/servicebus/azure-servicebus/README.md

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,42 @@ To interact with these resources, one should be familiar with the following SDK
9595

9696
### Thread safety
9797

98-
We do not guarantee that the ServiceBusClient, ServiceBusSender, and ServiceBusReceiver are thread-safe. We do not recommend reusing these instances across threads. It is up to the running application to use these classes in a thread-safe manner.
98+
We do not guarantee that the ServiceBusClient, ServiceBusSender, and ServiceBusReceiver are thread-safe or coroutine-safe. We do not recommend reusing these instances across threads or sharing them between coroutines. It is up to the running application to use these classes in a thread-safe manner.
99+
100+
The data model type, `ServiceBusMessageBatch` is not thread-safe or coroutine-safe. It should not be shared across threads nor used concurrently with client methods.
101+
102+
For scenarios requiring concurrent sending from multiple threads:
103+
```python
104+
import threading
105+
from azure.servicebus import ServiceBusClient, ServiceBusMessage
106+
107+
# Use a lock to ensure only one thread sends at a time
108+
send_lock = threading.Lock()
109+
110+
def send_messages_thread_safe(sender, messages):
111+
with send_lock:
112+
batch = sender.create_message_batch()
113+
for message in messages:
114+
batch.add_message(message)
115+
sender.send_messages(batch)
116+
```
117+
118+
For scenarios requiring concurrent sending in asyncio applications:
119+
```python
120+
import asyncio
121+
from azure.servicebus.aio import ServiceBusClient
122+
from azure.servicebus import ServiceBusMessage
123+
124+
# Use a lock to ensure only one coroutine sends at a time
125+
send_lock = asyncio.Lock()
126+
127+
async def send_messages_coroutine_safe(sender, messages):
128+
async with send_lock:
129+
batch = await sender.create_message_batch()
130+
for message in messages:
131+
batch.add_message(message)
132+
await sender.send_messages(batch)
133+
```
99134

100135
## Examples
101136

sdk/servicebus/azure-servicebus/samples/README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,13 @@ Both [sync version](https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/
1919
- [send_queue.py](https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/servicebus/azure-servicebus/samples/sync_samples/send_queue.py) ([async version](https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/servicebus/azure-servicebus/samples/async_samples/send_queue_async.py)) - Examples to send messages to a service bus queue:
2020
- From a connection string
2121
- Enabling Logging
22+
- Send messages concurrently with proper thread/coroutine safety practices
23+
- **Note**: ServiceBusClient, ServiceBusSender, and ServiceBusReceiver are not thread-safe or coroutine-safe
2224
- [send_topic.py](https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/servicebus/azure-servicebus/samples/sync_samples/send_topic.py) ([async version](https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/servicebus/azure-servicebus/samples/async_samples/send_topic_async.py)) - Examples to send messages to a service bus topic:
2325
- From a connection string
2426
- Enabling Logging
27+
- Send messages concurrently with proper thread/coroutine safety practices
28+
- **Note**: ServiceBusClient, ServiceBusSender, and ServiceBusReceiver are not thread-safe or coroutine-safe
2529
- [receive_queue.py](https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/servicebus/azure-servicebus/samples/sync_samples/receive_queue.py) ([async_version](https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/servicebus/azure-servicebus/samples/async_samples/receive_queue_async.py)) - Examples to receive messages from a service bus queue:
2630
- Receive messages
2731
- [receive_subscription.py](https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/servicebus/azure-servicebus/samples/sync_samples/receive_subscription.py) ([async_version](https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/servicebus/azure-servicebus/samples/async_samples/receive_subscription_async.py)) - Examples to receive messages from a service bus subscription:

sdk/servicebus/azure-servicebus/samples/async_samples/send_queue_async.py

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,11 @@
77

88
"""
99
Example to show sending message(s) to a Service Bus Queue asynchronously.
10+
11+
WARNING: ServiceBusClient, ServiceBusSender, and ServiceBusMessageBatch are not coroutine-safe!
12+
Do not share these instances between coroutines without proper synchronization.
13+
If you need to send from multiple coroutines, create separate client instances
14+
or use proper synchronization mechanisms like asyncio.Lock.
1015
"""
1116

1217
import os
@@ -41,6 +46,54 @@ async def send_batch_message(sender):
4146
await sender.send_messages(batch_message)
4247

4348

49+
async def send_concurrent_with_separate_clients():
50+
"""
51+
Example showing coroutine-safe concurrent sending using separate clients.
52+
WARNING: Do NOT share ServiceBusClient instances between coroutines!
53+
"""
54+
async def send_from_coroutine(task_id):
55+
# Create a separate client for each coroutine - clients are NOT coroutine-safe
56+
servicebus_client = ServiceBusClient(FULLY_QUALIFIED_NAMESPACE, DefaultAzureCredential())
57+
try:
58+
async with servicebus_client:
59+
sender = servicebus_client.get_queue_sender(queue_name=QUEUE_NAME)
60+
async with sender:
61+
message = ServiceBusMessage(f"Message from coroutine {task_id}")
62+
await sender.send_messages(message)
63+
print(f"Coroutine {task_id} sent message successfully")
64+
except Exception as e:
65+
print(f"Coroutine {task_id} failed: {e}")
66+
67+
# Use asyncio.gather to run coroutines concurrently
68+
await asyncio.gather(*[send_from_coroutine(i) for i in range(3)])
69+
70+
71+
async def send_concurrent_with_shared_client_and_lock():
72+
"""
73+
Example showing concurrent sending with a shared client using asyncio.Lock.
74+
This is less efficient than separate clients but demonstrates coroutine synchronization.
75+
"""
76+
send_lock = asyncio.Lock()
77+
78+
servicebus_client = ServiceBusClient(FULLY_QUALIFIED_NAMESPACE, DefaultAzureCredential())
79+
80+
async def send_with_lock(task_id):
81+
try:
82+
# Use lock to ensure coroutine-safe sending
83+
async with send_lock:
84+
sender = servicebus_client.get_queue_sender(queue_name=QUEUE_NAME)
85+
async with sender:
86+
message = ServiceBusMessage(f"Synchronized message from coroutine {task_id}")
87+
await sender.send_messages(message)
88+
print(f"Coroutine {task_id} sent synchronized message successfully")
89+
except Exception as e:
90+
print(f"Coroutine {task_id} failed: {e}")
91+
92+
async with servicebus_client:
93+
# Use asyncio.gather to run coroutines concurrently with lock synchronization
94+
await asyncio.gather(*[send_with_lock(i) for i in range(3)])
95+
96+
4497
async def main():
4598
credential = DefaultAzureCredential()
4699
servicebus_client = ServiceBusClient(FULLY_QUALIFIED_NAMESPACE, credential)
@@ -54,5 +107,12 @@ async def main():
54107

55108
print("Send message is done.")
56109

110+
# Demonstrate concurrent sending
111+
print("\nDemonstrating concurrent sending with separate clients...")
112+
await send_concurrent_with_separate_clients()
113+
114+
print("\nDemonstrating concurrent sending with shared client and locks...")
115+
await send_concurrent_with_shared_client_and_lock()
116+
57117

58118
asyncio.run(main())

0 commit comments

Comments
 (0)