Skip to content

Commit 1c79d4d

Browse files
Merge pull request #77144 from rezasherafat/docupdate_amqp_3
Added service telemetry receive using AMQP.
2 parents c5a9998 + 99933e1 commit 1c79d4d

File tree

1 file changed

+67
-2
lines changed

1 file changed

+67
-2
lines changed

articles/iot-hub/iot-hub-amqp-support.md

Lines changed: 67 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ The cloud-to-device message exchange between service and IoT Hub as well as betw
6060
| Created by | Link type | Link path | Description |
6161
|------------|-----------|-----------|-------------|
6262
| Service | Sender link | `/messages/devicebound` | C2D messages destined to devices are sent to this link by the service. Messages sent over this link have their `To` property set to the target device's receiver link path: i.e., `/devices/<deviceID>/messages/devicebound`. |
63-
| Service | Receiver link | `/messages/serviceBound/feedback` | Completion, rejection, and abandonment feedback messages coming from devices received on this link by service. See [here](./iot-hub-devguide-messages-c2d.md#message-feedback) for more information about feedback messages. |
63+
| Service | Receiver link | `/messages/serviceBound/feedback` | Completion, rejection, and abandonment feedback messages coming from devices received on this link by service. For more information about feedback messages, see [here](./iot-hub-devguide-messages-c2d.md#message-feedback). |
6464

6565
The code snippet below demonstrates how to create a C2D message and send it to a device using [uAMQP library in Python](https://github.com/Azure/azure-uamqp-python).
6666

@@ -122,11 +122,76 @@ As shown above, a C2D feedback message has content type of `application/vnd.micr
122122
* Key `originalMessageId` in feedback body has the ID of the original C2D message sent by the service. This can be used to correlate feedback to C2D messages.
123123

124124
### Receive telemetry messages (service client)
125+
By default, IoT Hub stores ingested device telemetry messages in a built-in Event hub. Your service client can use the AMQP protocol to receive the stored events.
126+
127+
For this purpose, the service client first needs to connect to the IoT Hub endpoint and receive a redirection address to the built-in Event Hubs. Service client then uses the provided address to connect to the built-in Event hub.
128+
129+
In each step, the client needs to present the following pieces of information:
130+
* Valid service credentials (service SAS token).
131+
* A well-formatted path to the consumer group partition it intends to retrieve messages from. For a given consumer group and partition ID, the path has the following format: `/messages/events/ConsumerGroups/<consumer_group>/Partitions/<partition_id>` (the default consumer group is `$Default`).
132+
* An optional filtering predicate to designate a starting point in the partition (this can be in the form of a sequence number, offset or enqueued timestamp).
133+
134+
The code snippet below uses [uAMQP library in Python](https://github.com/Azure/azure-uamqp-python) to demonstrate the above steps.
135+
136+
```python
137+
import json
138+
import uamqp
139+
import urllib
140+
import time
141+
142+
# Use generate_sas_token implementation available here: https://docs.microsoft.com/azure/iot-hub/iot-hub-devguide-security#security-token-structure
143+
from helper import generate_sas_token
144+
145+
iot_hub_name = '<iot-hub-name>'
146+
hostname = '{iot_hub_name}.azure-devices.net'.format(iot_hub_name=iot_hub_name)
147+
policy_name = 'service'
148+
access_key = '<primary-or-secondary-key>'
149+
operation = '/messages/events/ConsumerGroups/{consumer_group}/Partitions/{p_id}'.format(consumer_group='$Default', p_id=0)
150+
151+
username = '{policy_name}@sas.root.{iot_hub_name}'.format(policy_name=policy_name, iot_hub_name=iot_hub_name)
152+
sas_token = generate_sas_token(hostname, access_key, policy_name)
153+
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username), urllib.quote_plus(sas_token), hostname, operation)
154+
155+
# Optional filtering predicates can be specified using endpiont_filter
156+
# Valid predicates include:
157+
# - amqp.annotation.x-opt-sequence-number
158+
# - amqp.annotation.x-opt-offset
159+
# - amqp.annotation.x-opt-enqueued-time
160+
# Set endpoint_filter variable to None if no filter is needed
161+
endpoint_filter = b'amqp.annotation.x-opt-sequence-number > 2995'
162+
163+
# Helper function to set the filtering predicate on the source URI
164+
def set_endpoint_filter(uri, endpoint_filter=''):
165+
source_uri = uamqp.address.Source(uri)
166+
source_uri.set_filter(endpoint_filter)
167+
return source_uri
168+
169+
receive_client = uamqp.ReceiveClient(set_endpoint_filter(uri, endpoint_filter), debug=True)
170+
try:
171+
batch = receive_client.receive_message_batch(max_batch_size=5)
172+
except uamqp.errors.LinkRedirect as redirect:
173+
# Once a redirect error is received, close the original client and recreate a new one to the re-directed address
174+
receive_client.close()
175+
176+
sas_auth = uamqp.authentication.SASTokenAuth.from_shared_access_key(redirect.address, policy_name, access_key)
177+
receive_client = uamqp.ReceiveClient(set_endpoint_filter(redirect.address, endpoint_filter), auth=sas_auth, debug=True)
178+
179+
# Start receiving messages in batches
180+
batch = receive_client.receive_message_batch(max_batch_size=5)
181+
for msg in batch:
182+
print('*** received a message ***')
183+
print(''.join(msg.get_data()))
184+
print('\t: ' + str(msg.annotations['x-opt-sequence-number']))
185+
print('\t: ' + str(msg.annotations['x-opt-offset']))
186+
print('\t: ' + str(msg.annotations['x-opt-enqueued-time']))
187+
```
188+
189+
For a given device ID, IoT Hub uses a hash of the device ID to determine which partition to store its messages in. The code snippet above demonstrates receiving events from a single such partition. Note, however, that a typical application often needs to retrieve events stored in all event hub partitions.
125190

126191

127192
### Additional notes
128193
* The AMQP connections may be disrupted due to network glitch, or expiry of the authentication token (generated in the code). The service client must handle these circumstances and re-establish the connection and links if needed. For the case of authentication token expiry, the client can also proactively renew the token prior to its expiry to avoid a connection drop.
129-
* In some cases, your client must be able to correctly handle link redirections. Refer to your AMQP client documentation on how to do this.
194+
* In some cases, your client must be able to correctly handle link redirections. Refer to your AMQP client documentation on how to handle this operation.
130195

131196
### Receive cloud-to-device messages (device and module client)
132197
AMQP links used on the device side are as follows:

0 commit comments

Comments
 (0)