Skip to content

Commit 9579077

Browse files
moved output schema to transformation
1 parent 07ba161 commit 9579077

File tree

1 file changed

+102
-73
lines changed

1 file changed

+102
-73
lines changed

articles/iot-operations/connect-to-cloud/howto-create-dataflow.md

Lines changed: 102 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -199,73 +199,6 @@ Datasources allow you to specify multiple MQTT or Kafka topics without needing t
199199
<!-- TODO: Put the right article link here -->
200200
For more information about creating an MQTT endpoint as a dataflow source, see [MQTT Endpoint](concept-schema-registry.md).
201201

202-
#### Specify schema to deserialize data
203-
204-
Schemas are documents that describe the format of a message and its contents to enable processing and contextualization. You can upload schemas using the ARM/Bicep templates. The following configuration demonstrates how to define a schema in your Bicep file. In this example, the schema defines fields such as `asset_id`, `asset_name`, `location`, `temperature`, `manufacturer`, `production_date`, and `serial_number`. Each field is assigned a specific data type (e.g., `string`) and marked as non-nullable. This ensures all incoming messages contain these fields with valid data.
205-
206-
```bicep
207-
var assetDeltaSchema = '''
208-
{
209-
"$schema": "Delta/1.0",
210-
"type": "object",
211-
"properties": {
212-
"type": "struct",
213-
"fields": [
214-
{ "name": "asset_id", "type": "string", "nullable": false, "metadata": {} },
215-
{ "name": "asset_name", "type": "string", "nullable": false, "metadata": {} },
216-
{ "name": "location", "type": "string", "nullable": false, "metadata": {} },
217-
{ "name": "manufacturer", "type": "string", "nullable": false, "metadata": {} },
218-
{ "name": "production_date", "type": "string", "nullable": false, "metadata": {} },
219-
{ "name": "serial_number", "type": "string", "nullable": false, "metadata": {} },
220-
{ "name": "temperature", "type": "double", "nullable": false, "metadata": {} }
221-
]
222-
}
223-
}
224-
'''
225-
```
226-
227-
To register the schema with the Azure Schema Registry, use the following Bicep configuration. This configuration creates a schema definition and assigns it a version within the schema registry, allowing it to be referenced later in your data transformations.
228-
229-
```bicep
230-
param opcuaSchemaName string = 'opcua-output-delta'
231-
param opcuaSchemaVer string = '1'
232-
233-
resource opcSchema 'Microsoft.DeviceRegistry/schemaRegistries/schemas@2024-09-01-preview' = {
234-
parent: schemaRegistry
235-
name: opcuaSchemaName
236-
properties: {
237-
displayName: 'OPC UA Delta Schema'
238-
description: 'This is a OPC UA delta Schema'
239-
format: 'Delta/1.0'
240-
schemaType: 'MessageSchema'
241-
}
242-
}
243-
244-
resource opcuaSchemaInstance 'Microsoft.DeviceRegistry/schemaRegistries/schemas/schemaVersions@2024-09-01-preview' = {
245-
parent: opcSchema
246-
name: opcuaSchemaVer
247-
properties: {
248-
description: 'Schema version'
249-
schemaContent: opcuaSchemaContent
250-
}
251-
}
252-
```
253-
254-
Once the schema is registered, it can be referenced in transformations to ensure that the source data is correctly deserialized. In the configuration below, the schemaRef points to the specific schema version to be used, and the serializationFormat defines how the data will be serialized during the transformation process.
255-
256-
```bicep
257-
{
258-
operationType: 'BuiltInTransformation'
259-
builtInTransformationSettings: {
260-
// ..
261-
schemaRef: 'aio-sr://${opcuaSchemaName}:${opcuaSchemaVer}'
262-
serializationFormat: 'Parquet' // can also be 'Delta'
263-
}
264-
}
265-
```
266-
267-
For more information about schema registry, see [Understand message schemas](concept-schema-registry.md).
268-
269202
# [Kubernetes](#tab/kubernetes)
270203

271204
For example, to configure a source using an MQTT endpoint and two MQTT topic filters, use the following configuration:
@@ -297,14 +230,39 @@ spec:
297230

298231
To specify the schema, create the file and store it in the schema registry.
299232

300-
```yaml
233+
```json
301234
{
302-
"type": "record",
235+
"$schema": "http://json-schema.org/draft-07/schema#",
303236
"name": "Temperature",
304-
"fields": [
305-
{"name": "deviceId", "type": "string"},
306-
{"name": "temperature", "type": "float"}
307-
]
237+
"description": "Schema for representing an asset's key attributes",
238+
"type": "object",
239+
"required": [ "deviceId", "asset_name"],
240+
"properties": {
241+
"deviceId": {
242+
"type": "string"
243+
},
244+
"temperature": {
245+
"type": "double"
246+
},
247+
"serial_number": {
248+
"type": "string"
249+
},
250+
"production_date": {
251+
"type": "string",
252+
"description": "Event duration"
253+
},
254+
"asset_name": {
255+
"type": "string",
256+
"description": "Name of asset"
257+
},
258+
"location": {
259+
"type": "string",
260+
},
261+
"manufacturer": {
262+
"type": "string",
263+
"description": "Name of manufacturer"
264+
}
265+
}
308266
}
309267
```
310268

@@ -362,6 +320,77 @@ In the operations experience portal, select **Dataflow** > **Add transform (opti
362320
}
363321
```
364322

323+
#### Specify output schema to transform data
324+
325+
The following configuration demonstrates how to define an output schema in your Bicep file. In this example, the schema defines fields such as `asset_id`, `asset_name`, `location`, `temperature`, `manufacturer`, `production_date`, and `serial_number`. Each field is assigned a specific data type (e.g., `string`) and marked as non-nullable. This ensures all incoming messages contain these fields with valid data.
326+
327+
```bicep
328+
var assetDeltaSchema = '''
329+
{
330+
"$schema": "Delta/1.0",
331+
"type": "object",
332+
"properties": {
333+
"type": "struct",
334+
"fields": [
335+
{ "name": "asset_id", "type": "string", "nullable": false, "metadata": {} },
336+
{ "name": "asset_name", "type": "string", "nullable": false, "metadata": {} },
337+
{ "name": "location", "type": "string", "nullable": false, "metadata": {} },
338+
{ "name": "manufacturer", "type": "string", "nullable": false, "metadata": {} },
339+
{ "name": "production_date", "type": "string", "nullable": false, "metadata": {} },
340+
{ "name": "serial_number", "type": "string", "nullable": false, "metadata": {} },
341+
{ "name": "temperature", "type": "double", "nullable": false, "metadata": {} }
342+
]
343+
}
344+
}
345+
'''
346+
```
347+
348+
To register the schema with the Azure Schema Registry, use the following Bicep configuration. This configuration creates a schema definition and assigns it a version within the schema registry, allowing it to be referenced later in your data transformations.
349+
350+
```bicep
351+
param opcuaSchemaName string = 'opcua-output-delta'
352+
param opcuaSchemaVer string = '1'
353+
354+
resource opcSchema 'Microsoft.DeviceRegistry/schemaRegistries/schemas@2024-09-01-preview' = {
355+
parent: schemaRegistry
356+
name: opcuaSchemaName
357+
properties: {
358+
displayName: 'OPC UA Delta Schema'
359+
description: 'This is a OPC UA delta Schema'
360+
format: 'Delta/1.0'
361+
schemaType: 'MessageSchema'
362+
}
363+
}
364+
365+
resource opcuaSchemaInstance 'Microsoft.DeviceRegistry/schemaRegistries/schemas/schemaVersions@2024-09-01-preview' = {
366+
parent: opcSchema
367+
name: opcuaSchemaVer
368+
properties: {
369+
description: 'Schema version'
370+
schemaContent: opcuaSchemaContent
371+
}
372+
}
373+
```
374+
375+
When the dataflow resource is created, it includes a schemaRef value that points to the generated schema stored in the schema registry. It can be referenced in transformations which creates a new schema in Delta format.
376+
377+
Currently, Azure IoT Operations experience only supports Parquet output for output schemas.
378+
379+
Note: The Delta schema format is used for both Parquet and Delta output.
380+
381+
```bicep
382+
{
383+
operationType: 'BuiltInTransformation'
384+
builtInTransformationSettings: {
385+
// ..
386+
schemaRef: 'aio-sr://${opcuaSchemaName}:${opcuaSchemaVer}'
387+
serializationFormat: 'Parquet' // can also be 'Delta'
388+
}
389+
}
390+
```
391+
392+
For more information about schema registry, see [Understand message schemas](concept-schema-registry.md).
393+
365394
# [Kubernetes](#tab/kubernetes)
366395

367396
```yaml

0 commit comments

Comments
 (0)