Skip to content

Commit 1fdf2ef

Browse files
authored
Merge pull request #292661 from PatAltimore/patricka-message-broker
Update to dataflows to use message broker
2 parents c373dfa + ffd4658 commit 1fdf2ef

File tree

7 files changed

+50
-69
lines changed

7 files changed

+50
-69
lines changed

articles/iot-operations/connect-to-cloud/concept-schema-registry.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ Each dataflow source can optionally specify a message schema. Currently, dataflo
9797

9898
Asset sources have a predefined message schema that was created by the connector for OPC UA.
9999

100-
Schemas can be uploaded for MQTT sources. Currently, Azure IoT Operations supports JSON for source schemas, also known as input schemas. In the operations experience, you can select an existing schema or upload one while defining an MQTT source:
100+
Schemas can be uploaded for message broker sources. Currently, Azure IoT Operations supports JSON for source schemas, also known as input schemas. In the operations experience, you can select an existing schema or upload one while defining a message broker source:
101101

102102
:::image type="content" source="./media/concept-schema-registry/upload-schema.png" alt-text="Screenshot that shows uploading a message schema in the operations experience portal.":::
103103

articles/iot-operations/connect-to-cloud/howto-create-dataflow.md

Lines changed: 49 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ ms.author: patricka
66
ms.service: azure-iot-operations
77
ms.subservice: azure-data-flows
88
ms.topic: how-to
9-
ms.date: 12/12/2024
9+
ms.date: 01/07/2025
1010
ai-usage: ai-assisted
1111

1212
#CustomerIntent: As an operator, I want to understand how to create a dataflow to connect data sources.
@@ -173,26 +173,27 @@ To configure a source for the dataflow, specify the endpoint reference and a lis
173173

174174
If the default endpoint isn't used as the source, it must be used as the [destination](#destination). To learn more about, see [Dataflows must use local MQTT broker endpoint](./howto-configure-dataflow-endpoint.md#dataflows-must-use-local-mqtt-broker-endpoint).
175175

176-
### Option 1: Use default MQTT endpoint as source
176+
### Option 1: Use default message broker endpoint as source
177177

178178
# [Portal](#tab/portal)
179179

180-
1. Under **Source details**, select **MQTT**.
180+
1. Under **Source details**, select **Message broker**.
181181

182-
:::image type="content" source="media/howto-create-dataflow/dataflow-source-mqtt.png" alt-text="Screenshot using operations experience to select MQTT as the source endpoint.":::
182+
:::image type="content" source="media/howto-create-dataflow/dataflow-source-mqtt.png" alt-text="Screenshot using operations experience to select message broker as the source endpoint.":::
183183

184-
1. Enter the following settings for the MQTT source:
184+
1. Enter the following settings for the message broker source:
185185

186186
| Setting | Description |
187187
| -------------------- | ------------------------------------------------------------------------------------------------- |
188-
| MQTT topic | The MQTT topic filter to subscribe to for incoming messages. See [Configure MQTT or Kafka topics](#configure-data-sources-mqtt-or-kafka-topics). |
188+
| Dataflow endpoint | Select *default* to use the default MQTT message broker endpoint. |
189+
| Topic | The topic filter to subscribe to for incoming messages. See [Configure MQTT or Kafka topics](#configure-data-sources-mqtt-or-kafka-topics). |
189190
| Message schema | The schema to use to deserialize the incoming messages. See [Specify schema to deserialize data](#specify-source-schema). |
190191

191192
1. Select **Apply**.
192193

193194
# [Bicep](#tab/bicep)
194195

195-
The MQTT endpoint is configured in the Bicep template file. For example, the following endpoint is a source for the dataflow.
196+
The message broker endpoint is configured in the Bicep template file. For example, the following endpoint is a source for the dataflow.
196197

197198
```bicep
198199
sourceSettings: {
@@ -208,7 +209,7 @@ Here, `dataSources` allow you to specify multiple MQTT or Kafka topics without n
208209

209210
# [Kubernetes (preview)](#tab/kubernetes)
210211

211-
For example, to configure a source using an MQTT endpoint and two MQTT topic filters, use the following configuration:
212+
For example, to configure a source using a message broker endpoint and two topic filters, use the following configuration:
212213

213214
```yaml
214215
sourceSettings:
@@ -250,20 +251,32 @@ Configuring an asset as a source is only available in the operations experience.
250251

251252
When using an asset as the source, the asset definition is used to infer the schema for the dataflow. The asset definition includes the schema for the asset's datapoints. To learn more, see [Manage asset configurations remotely](../discover-manage-assets/howto-manage-assets-remotely.md).
252253

253-
Once configured, the data from the asset reached the dataflow via the local MQTT broker. So, when using an asset as the source, the dataflow uses the local MQTT broker default endpoint as the source in actuality.
254+
Once configured, the data from the asset reaches the dataflow via the local MQTT broker. So, when using an asset as the source, the dataflow uses the local MQTT broker default endpoint as the source in actuality.
254255

255256
### Option 3: Use custom MQTT or Kafka dataflow endpoint as source
256257

257258
If you created a custom MQTT or Kafka dataflow endpoint (for example, to use with Event Grid or Event Hubs), you can use it as the source for the dataflow. Remember that storage type endpoints, like Data Lake or Fabric OneLake, can't be used as source.
258259

259-
To configure, use Kubernetes YAML or Bicep. Replace placeholder values with your custom endpoint name and topics.
260-
261260
# [Portal](#tab/portal)
262261

263-
Using a custom MQTT or Kafka endpoint as a source is currently not supported in the operations experience.
262+
1. Under **Source details**, select **Message broker**.
263+
264+
:::image type="content" source="media/howto-create-dataflow/dataflow-source-custom.png" alt-text="Screenshot using operations experience to select a custom message broker as the source endpoint.":::
265+
266+
1. Enter the following settings for the message broker source:
267+
268+
| Setting | Description |
269+
| -------------------- | ------------------------------------------------------------------------------------------------- |
270+
| Dataflow endpoint | Use the **Reselect** button to select a custom MQTT or Kafka dataflow endpoint. For more information, see [Configure MQTT dataflow endpoints](howto-configure-mqtt-endpoint.md) or [Configure Azure Event Hubs and Kafka dataflow endpoints](howto-configure-kafka-endpoint.md).|
271+
| Topic | The topic filter to subscribe to for incoming messages. See [Configure MQTT or Kafka topics](#configure-data-sources-mqtt-or-kafka-topics). |
272+
| Message schema | The schema to use to deserialize the incoming messages. See [Specify schema to deserialize data](#specify-source-schema). |
273+
274+
1. Select **Apply**.
264275

265276
# [Bicep](#tab/bicep)
266277

278+
Replace placeholder values with your custom endpoint name and topics.
279+
267280
```bicep
268281
sourceSettings: {
269282
endpointRef: '<CUSTOM_ENDPOINT_NAME>'
@@ -277,6 +290,8 @@ sourceSettings: {
277290

278291
# [Kubernetes (preview)](#tab/kubernetes)
279292

293+
Replace placeholder values with your custom endpoint name and topics.
294+
280295
```yaml
281296
sourceSettings:
282297
endpointRef: <CUSTOM_ENDPOINT_NAME>
@@ -298,20 +313,20 @@ When the source is an MQTT (Event Grid included) endpoint, you can use the MQTT
298313

299314
# [Portal](#tab/portal)
300315

301-
In the operations experience dataflow **Source details**, select **MQTT**, then use the **MQTT topic** field to specify the MQTT topic filter to subscribe to for incoming messages.
316+
In the operations experience dataflow **Source details**, select **Message broker**, then use the **Topic** field to specify the MQTT topic filter to subscribe to for incoming messages.
302317

303318
> [!NOTE]
304-
> Only one MQTT topic filter can be specified in the operations experience. To use multiple MQTT topic filters, use Bicep or Kubernetes.
319+
> Only one topic filter can be specified in the operations experience. To use multiple topic filters, use Bicep or Kubernetes.
305320

306321
# [Bicep](#tab/bicep)
307322

308323
```bicep
309324
sourceSettings: {
310-
endpointRef: '<MQTT_ENDPOINT_NAME>'
325+
endpointRef: '<MESSAGE_BROKER_ENDPOINT_NAME>'
311326
dataSources: [
312-
'<MQTT_TOPIC_FILTER_1>'
313-
'<MQTT_TOPIC_FILTER_2>'
314-
// Add more MQTT topic filters as needed
327+
'<TOPIC_FILTER_1>'
328+
'<TOPIC_FILTER_2>'
329+
// Add more topic filters as needed
315330
]
316331
}
317332
```
@@ -334,14 +349,14 @@ Here, the wildcard `+` is used to select all devices under the `thermostats` and
334349

335350
```yaml
336351
sourceSettings:
337-
endpointRef: <MQTT_ENDPOINT_NAME>
352+
endpointRef: <ENDPOINT_NAME>
338353
dataSources:
339-
- <MQTT_TOPIC_FILTER_1>
340-
- <MQTT_TOPIC_FILTER_2>
341-
# Add more MQTT topic filters as needed
354+
- <TOPIC_FILTER_1>
355+
- <TOPIC_FILTER_2>
356+
# Add more topic filters as needed
342357
```
343358

344-
Example with multiple MQTT topic filters with wildcards:
359+
Example with multiple topic filters with wildcards:
345360

346361
```yaml
347362
sourceSettings:
@@ -357,11 +372,11 @@ Here, the wildcard `+` is used to select all devices under the `thermostats` and
357372

358373
##### Shared subscriptions
359374

360-
To use shared subscriptions with MQTT sources, you can specify the shared subscription topic in the form of `$shared/<GROUP_NAME>/<TOPIC_FILTER>`.
375+
To use shared subscriptions with message broker sources, you can specify the shared subscription topic in the form of `$shared/<GROUP_NAME>/<TOPIC_FILTER>`.
361376

362377
# [Portal](#tab/portal)
363378

364-
In operations experience dataflow **Source details**, select **MQTT** and use the **MQTT topic** field to specify the shared subscription group and topic.
379+
In operations experience dataflow **Source details**, select **Message broker** and use the **Topic** field to specify the shared subscription group and topic.
365380

366381
# [Bicep](#tab/bicep)
367382

@@ -384,7 +399,7 @@ sourceSettings:
384399
---
385400

386401

387-
If the instance count in the [dataflow profile](howto-configure-dataflow-profile.md) is greater than one, shared subscription is automatically enabled for all dataflows that use MQTT source. In this case, the `$shared` prefix is added and the shared subscription group name automatically generated. For example, if you have a dataflow profile with an instance count of 3, and your dataflow uses an MQTT endpoint as source configured with topics `topic1` and `topic2`, they are automatically converted to shared subscriptions as `$shared/<GENERATED_GROUP_NAME>/topic1` and `$shared/<GENERATED_GROUP_NAME>/topic2`.
402+
If the instance count in the [dataflow profile](howto-configure-dataflow-profile.md) is greater than one, shared subscription is automatically enabled for all dataflows that use a message broker source. In this case, the `$shared` prefix is added and the shared subscription group name automatically generated. For example, if you have a dataflow profile with an instance count of 3, and your dataflow uses a message broker endpoint as source configured with topics `topic1` and `topic2`, they are automatically converted to shared subscriptions as `$shared/<GENERATED_GROUP_NAME>/topic1` and `$shared/<GENERATED_GROUP_NAME>/topic2`.
388403

389404
You can explicitly create a topic named `$shared/mygroup/topic` in your configuration. However, adding the `$shared` topic explicitly isn't recommended since the `$shared` prefix is automatically added when needed. Dataflows can make optimizations with the group name if it isn't set. For example, `$share` isn't set and dataflows only has to operate over the topic name.
390405

@@ -402,7 +417,10 @@ To configure the Kafka topics:
402417

403418
# [Portal](#tab/portal)
404419

405-
Using a Kafka endpoint as a source is currently not supported in the operations experience.
420+
In the operations experience dataflow **Source details**, select **Message broker**, then use the **Topic** field to specify the Kafka topic filter to subscribe to for incoming messages.
421+
422+
> [!NOTE]
423+
> Only one topic filter can be specified in the operations experience. To use multiple topic filters, use Bicep or Kubernetes.
406424

407425
# [Bicep](#tab/bicep)
408426

@@ -432,7 +450,7 @@ sourceSettings:
432450

433451
### Specify source schema
434452

435-
When using MQTT or Kafka as the source, you can specify a [schema](concept-schema-registry.md) to display the list of data points in the operations experience portal. Note that using a schema to deserialize and validate incoming messages [isn't currently supported](../troubleshoot/known-issues.md#dataflows).
453+
When using MQTT or Kafka as the source, you can specify a [schema](concept-schema-registry.md) to display the list of data points in the operations experience portal. Using a schema to deserialize and validate incoming messages [isn't currently supported](../troubleshoot/known-issues.md#dataflows).
436454

437455
If the source is an asset, the schema is automatically inferred from the asset definition.
438456

@@ -443,7 +461,7 @@ To configure the schema used to deserialize the incoming messages from a source:
443461

444462
# [Portal](#tab/portal)
445463

446-
In operations experience dataflow **Source details**, select **MQTT** and use the **Message schema** field to specify the schema. You can use the **Upload** button to upload a schema file first. To learn more, see [Understand message schemas](concept-schema-registry.md).
464+
In operations experience dataflow **Source details**, select **Message broker** and use the **Message schema** field to specify the schema. You can use the **Upload** button to upload a schema file first. To learn more, see [Understand message schemas](concept-schema-registry.md).
447465

448466
# [Bicep](#tab/bicep)
449467

@@ -759,43 +777,6 @@ builtInTransformationSettings:
759777

760778
To learn more, see [Map data by using dataflows](concept-dataflow-mapping.md) and [Convert data by using dataflows](concept-dataflow-conversions.md).
761779

762-
<!-- TODO: DOE content for this -->
763-
764-
<!-- #### Passthrough operation
765-
766-
Using map, you can apply a passthrough operation that takes all the input fields and maps them to the output field, essentially passing through all fields.
767-
768-
# [Portal](#tab/portal)
769-
770-
TBD
771-
772-
# [Bicep](#tab/bicep)
773-
774-
```bicep
775-
builtInTransformationSettings: {
776-
map: [
777-
{
778-
inputs: [ '*' ]
779-
output: '*'
780-
}
781-
]
782-
}
783-
```
784-
785-
# [Kubernetes (preview)](#tab/kubernetes)
786-
787-
```yaml
788-
builtInTransformationSettings:
789-
map:
790-
- inputs:
791-
- '*'
792-
output: '*'
793-
```
794-
795-
---
796-
797-
-->
798-
799780
### Serialize data according to a schema
800781

801782
If you want to serialize the data before sending it to the destination, you need to specify a schema and serialization format. Otherwise, the data is serialized in JSON with the types inferred. Storage endpoints like Microsoft Fabric or Azure Data Lake require a schema to ensure data consistency. Supported serialization formats are Parquet and Delta.
@@ -805,7 +786,7 @@ If you want to serialize the data before sending it to the destination, you need
805786

806787
# [Portal](#tab/portal)
807788

808-
For operations experience, you specify the schema and serialization format in the dataflow endpoint details. The endpoints that support serialization formats are Microsoft Fabric OneLake, Azure Data Lake Storage Gen 2, and Azure Data Explorer. For example, to serialize the data in Delta format, you need to upload a schema to the schema registry and reference it in the dataflow destination endpoint configuration.
789+
For operations experience, you specify the schema and serialization format in the dataflow endpoint details. The endpoints that support serialization formats are Microsoft Fabric OneLake, Azure Data Lake Storage Gen 2, Azure Data Explorer, and local storage. For example, to serialize the data in Delta format, you need to upload a schema to the schema registry and reference it in the dataflow destination endpoint configuration.
809790

810791
:::image type="content" source="media/howto-create-dataflow/destination-serialization.png" alt-text="Screenshot using the operations experience to set the dataflow destination endpoint serialization.":::
811792

@@ -976,7 +957,7 @@ The following example is a dataflow configuration that uses the MQTT endpoint fo
976957

977958
# [Portal](#tab/portal)
978959

979-
See Bicep or Kubernetes tabs for the configuration example.
960+
:::image type="content" source="media/howto-create-dataflow/dataflow-example.png" alt-text="Screenshot showing the operations experience dataflow example with a source endpoint, transforms, and a destination endpoint." lightbox="media/howto-create-dataflow/dataflow-example.png":::
980961

981962
# [Bicep](#tab/bicep)
982963

-178 Bytes
Loading
137 KB
Loading
-9.58 KB
Loading
141 KB
Loading
-7.85 KB
Loading

0 commit comments

Comments
 (0)