|
| 1 | +--- |
| 2 | +title: Configure dataflow endpoints |
| 3 | +description: Configure dataflow endpoints to create connection points for data sources. |
| 4 | +author: PatAltimore |
| 5 | +ms.author: patricka |
| 6 | +ms.subservice: azure-mqtt-broker |
| 7 | +ms.topic: conceptual |
| 8 | +ms.date: 07/23/2024 |
| 9 | + |
| 10 | +#CustomerIntent: As an operator, I want to understand how to configure source and destination endpoints so that I can create a dataflow. |
| 11 | +--- |
| 12 | + |
| 13 | +# Configure dataflow endpoints |
| 14 | + |
| 15 | +[!INCLUDE [public-preview-note](../includes/public-preview-note.md)] |
| 16 | + |
| 17 | +To get started with dataflows, you need to configure endpoints. An endpoint is the connection point for the dataflow. You can use an endpoint as a source or destination for the dataflow. Some endpoint types can be used as [both sources and destinations](#endpoint-types-for-use-as-sources-and-destinations), while others are for [destinations only](#endpoint-types-for-destinations-only). A dataflow needs at least one source endpoint and one destination endpoint. |
| 18 | + |
| 19 | +The following example shows a custom resource definition with all of the configuration options. The required fields are dependent on the endpoint type. Review the sections for each endpoint type for configuration guidance. |
| 20 | +```yaml |
| 21 | +apiVersion: connectivity.iotoperations.azure.com/v1beta1 |
| 22 | +kind: DataflowEndpoint |
| 23 | +metadata: |
| 24 | + name: <endpoint-name> |
| 25 | +spec: |
| 26 | + endpointType: <endpointType> # mqtt, kafka, dataExplorer, dataLakeStorage, fabricOneLake, or localStorage |
| 27 | + authentication: |
| 28 | + method: <method> # systemAssignedManagedIdentity, x509Credentials, userAssignedManagedIdentity, or serviceAccountToken |
| 29 | + systemAssignedManagedIdentitySettings: # Required if method is systemAssignedManagedIdentity |
| 30 | + audience: https://eventgrid.azure.net |
| 31 | + x509CredentialsSettings: # Required if method is x509Credentials |
| 32 | + certificateSecretName: x509-certificate |
| 33 | + userAssignedManagedIdentitySettings: # Required if method is userAssignedManagedIdentity |
| 34 | + clientId: <id> |
| 35 | + tenantId: <id> |
| 36 | + audience: https://eventgrid.azure.net |
| 37 | + serviceAccountTokenSettings: # Required if method is serviceAccountToken |
| 38 | + audience: my-audience |
| 39 | + mqttSettings: # Required if endpoint type is mqtt |
| 40 | + host: example.westeurope-1.ts.eventgrid.azure.net:8883 |
| 41 | + tls: # Omit for no TLS or MQTT. |
| 42 | + mode: <mode> # enabled or disabled |
| 43 | + trustedCaCertificateConfigMap: ca-certificates |
| 44 | + sharedSubscription: |
| 45 | + groupMinimumShareNumber: 3 # Required if shared subscription is enabled. |
| 46 | + groupName: group1 # Required if shared subscription is enabled. |
| 47 | + clientIdPrefix: <prefix> |
| 48 | + retain: keep |
| 49 | + sessionExpirySeconds: 3600 |
| 50 | + qos: 1 |
| 51 | + protocol: mqtt |
| 52 | + maxInflightMessages: 100 |
| 53 | +``` |
| 54 | +
|
| 55 | +| Name | Description | |
| 56 | +|-------------------------------------------------|-----------------------------------------------------------------------------| |
| 57 | +| endpointType | Type of the endpoint. Values: mqtt, kafka, dataExplorer, dataLakeStorage, fabricOneLake, or localStorage. | |
| 58 | +| authentication.method | Method of authentication. Values: *systemAssignedManagedIdentity*, *x509Credentials*, *userAssignedManagedIdentity*, or *serviceAccountToken*. | |
| 59 | +| authentication.systemAssignedManagedIdentitySettings.audience | Audience of the service to authenticate against. Defaults to `https://eventgrid.azure.net`. | |
| 60 | +| authentication.x509CredentialsSettings.certificateSecretName | Secret name of the X.509 certificate. | |
| 61 | +| authentication.userAssignedManagedIdentitySettings.clientId | Client ID for the user-assigned managed identity. | |
| 62 | +| authentication.userAssignedManagedIdentitySettings.tenantId | Tenant ID. | |
| 63 | +| authentication.userAssignedManagedIdentitySettings.audience | Audience of the service to authenticate against. Defaults to `https://eventgrid.azure.net`. | |
| 64 | +| authentication.serviceAccountTokenSettings.audience | Audience of the service account. Optional, defaults to the broker internal service account audience. | |
| 65 | +| mqttSettings.host | Host of the MQTT broker in the form of \<hostname\>:\<port\>. connects to MQTT broker if omitted.| |
| 66 | +| mqttSettings.tls | TLS configuration. Omit for no TLS or MQTT broker. | |
| 67 | +| mqttSettings.tls.mode | Enable or disable TLS. Values: *enabled* or *disabled*. Defaults to *disabled*. | |
| 68 | +| mqttSettings.tls.trustedCaCertificateConfigMap | Trusted CA certificate config map. No CA certificate if omitted. No CA certificate works for public endpoints like Azure Event Grid.| |
| 69 | +| mqttSettings.sharedSubscription | Shared subscription settings. No shared subscription if omitted. | |
| 70 | +| mqttSettings.sharedSubscription.groupMinimumShareNumber | Number of clients to use for shared subscription. | |
| 71 | +| mqttSettings.sharedSubscription.groupName | Shared subscription group name. | |
| 72 | +| mqttSettings.clientIdPrefix | Client ID prefix. Client ID generated by the dataflow is \<prefix\>-id. No prefix if omitted.| |
| 73 | +| mqttSettings.retain | Whether or not to keep the retain setting. Values: *keep* or *never*. Defaults to *keep*. | |
| 74 | +| mqttSettings.sessionExpirySeconds | Session expiry in seconds. Defaults to *3600*.| |
| 75 | +| mqttSettings.qos | Quality of service. Values: *0* or *1*. Defaults to 1.| |
| 76 | +| mqttSettings.protocol | Use MQTT or websockets. Values: *mqtt* or *websockets*. Defaults to mqtt.| |
| 77 | +| mqttSettings.maxInflightMessages | The max number of messages to keep in flight. For subscribe, it's the receive maximum. For publish, it's the maximum number of messages to send before waiting for an acknowledgment. Default is *100*. | |
| 78 | + |
| 79 | +## Endpoint types for use as sources and destinations |
| 80 | + |
| 81 | +### MQTT |
| 82 | + |
| 83 | +MQTT endpoints are used for MQTT sources and destinations. You can configure the endpoint, TLS, authentication, and other settings. |
| 84 | + |
| 85 | +#### MQTT broker |
| 86 | + |
| 87 | +To configure an MQTT broker endpoint with default settings, you can omit the host field, along with other optional fields. This configuration allows you to connect to the default MQTT broker without any extra configuration in a durable way, no matter how the broker changes. |
| 88 | + |
| 89 | +```yaml |
| 90 | +apiVersion: connectivity.iotoperations.azure.com/v1beta1 |
| 91 | +kind: DataflowEndpoint |
| 92 | +metadata: |
| 93 | + name: mq |
| 94 | +spec: |
| 95 | + endpointType: mqtt |
| 96 | + authentication: |
| 97 | + method: serviceAccountToken |
| 98 | + serviceAccountTokenSettings: |
| 99 | + audience: aio-mqtt |
| 100 | + mqttSettings: |
| 101 | + {} |
| 102 | +``` |
| 103 | + |
| 104 | +#### Event Grid |
| 105 | + |
| 106 | +To configure an Event Grid MQTT broker endpoint, use managed identity for authentication. |
| 107 | + |
| 108 | +```yaml |
| 109 | +apiVersion: connectivity.iotoperations.azure.com/v1beta1 |
| 110 | +kind: DataflowEndpoint |
| 111 | +metadata: |
| 112 | + name: eventgrid |
| 113 | +spec: |
| 114 | + endpointType: mqtt |
| 115 | + authentication: |
| 116 | + method: systemAssignedManagedIdentity |
| 117 | + systemAssignedManagedIdentitySettings: |
| 118 | + {} |
| 119 | + mqttSettings: |
| 120 | + host: example.westeurope-1.ts.eventgrid.azure.net:8883 |
| 121 | + tls: |
| 122 | + mode: enabled |
| 123 | +``` |
| 124 | + |
| 125 | +#### Other MQTT brokers |
| 126 | + |
| 127 | +For other MQTT brokers, you can configure the endpoint, TLS, authentication, and other settings as needed. |
| 128 | + |
| 129 | +```yaml |
| 130 | +spec: |
| 131 | + endpointType: mqtt |
| 132 | + authentication: |
| 133 | + ... |
| 134 | + mqttSettings: |
| 135 | + host: example.mqttbroker.com:8883 |
| 136 | + tls: |
| 137 | + mode: enabled |
| 138 | + trustedCaCertificateConfigMap: <your CA certificate config map> |
| 139 | +``` |
| 140 | + |
| 141 | +Under `authentication` , you can configure the authentication method for the MQTT broker. Supported methods include X.509: |
| 142 | + |
| 143 | +```yaml |
| 144 | +authentication: |
| 145 | + method: x509Credentials |
| 146 | + x509CredentialsSettings: |
| 147 | + certificateSecretName: <your x509 secret name> |
| 148 | +``` |
| 149 | + |
| 150 | +System-assigned managed identity: |
| 151 | + |
| 152 | +```yaml |
| 153 | +authentication: |
| 154 | + method: systemAssignedManagedIdentity |
| 155 | + systemAssignedManagedIdentitySettings: |
| 156 | + # Audience of the service to authenticate against |
| 157 | + # Optional; defaults to the audience for Event Grid MQTT Broker |
| 158 | + audience: https://eventgrid.azure.net |
| 159 | +``` |
| 160 | + |
| 161 | +User-assigned managed identity: |
| 162 | + |
| 163 | +```yaml |
| 164 | +authentication: |
| 165 | + method: userAssignedManagedIdentity |
| 166 | + userAssignedManagedIdentitySettings: |
| 167 | + clientId: <id> |
| 168 | + tenantId: <id> |
| 169 | +``` |
| 170 | + |
| 171 | +Kubernetes SAT: |
| 172 | + |
| 173 | +```yaml |
| 174 | +authentication: |
| 175 | + method: serviceAccountToken |
| 176 | + serviceAccountTokenSettings: |
| 177 | + audience: <your service account audience> |
| 178 | +``` |
| 179 | + |
| 180 | +You can also configure shared subscriptions, QoS, MQTT version, client ID prefix, keep alive, clean session, session expiry, retain, and other settings. |
| 181 | + |
| 182 | +```yaml |
| 183 | +spec: |
| 184 | + endpointType: mqtt |
| 185 | + mqttSettings: |
| 186 | + sharedSubscription: |
| 187 | + groupMinimumShareNumber: 3 |
| 188 | + groupName: group1 |
| 189 | + qos: 1 |
| 190 | + mqttVersion: v5 |
| 191 | + clientIdPrefix: dataflow |
| 192 | + keepRetain: enabled |
| 193 | +``` |
| 194 | + |
| 195 | +### Kafka |
| 196 | + |
| 197 | +Kafka endpoints are used for Kafka sources and destinations. You can configure the endpoint, TLS, authentication, and other settings. |
| 198 | + |
| 199 | +#### Azure Event Hubs |
| 200 | + |
| 201 | +To configure an Azure Event Hubs Kafka, the recommended way is to use managed identity for authentication. |
| 202 | + |
| 203 | +```yaml |
| 204 | +apiVersion: connectivity.iotoperations.azure.com/v1beta1 |
| 205 | +kind: DataflowEndpoint |
| 206 | +metadata: |
| 207 | + name: kafka |
| 208 | +spec: |
| 209 | + endpointType: kafka |
| 210 | + authentication: |
| 211 | + method: systemAssignedManagedIdentity |
| 212 | + systemAssignedManagedIdentitySettings: {} |
| 213 | + kafkaSettings: |
| 214 | + host: <NAMESPACE>.servicebus.windows.net:9093 |
| 215 | + tls: |
| 216 | + mode: enabled |
| 217 | + consumerGroupId: mqConnector |
| 218 | +``` |
| 219 | + |
| 220 | +#### Other Kafka brokers |
| 221 | + |
| 222 | +For example, to configure a Kafka endpoint set the host, TLS, authentication, and other settings as needed. |
| 223 | + |
| 224 | +```yaml |
| 225 | +apiVersion: connectivity.iotoperations.azure.com/v1beta1 |
| 226 | +kind: DataflowEndpoint |
| 227 | +metadata: |
| 228 | + name: kafka |
| 229 | +spec: |
| 230 | + endpointType: kafka |
| 231 | + authentication: |
| 232 | + ... |
| 233 | + kafkaSettings: |
| 234 | + host: example.kafka.com:9093 |
| 235 | + tls: |
| 236 | + mode: enabled |
| 237 | + consumerGroupId: mqConnector |
| 238 | +``` |
| 239 | + |
| 240 | +Under `authentication` , you can configure the authentication method for the Kafka broker. Supported methods include SASL, X.509, system-assigned managed identity, and user-assigned managed identity. |
| 241 | + |
| 242 | +```yaml |
| 243 | +authentication: |
| 244 | + method: sasl |
| 245 | + saslSettings: |
| 246 | + saslType: PLAIN |
| 247 | + tokenSecretName: <your token secret name> |
| 248 | + # OR |
| 249 | + method: x509Credentials |
| 250 | + x509CredentialsSettings: |
| 251 | + certificateSecretName: <your x509 secret name> |
| 252 | + # OR |
| 253 | + method: systemAssignedManagedIdentity |
| 254 | + systemAssignedManagedIdentitySettings: |
| 255 | + audience: https://<your Event Hubs namespace>.servicebus.windows.net |
| 256 | + # OR |
| 257 | + method: userAssignedManagedIdentity |
| 258 | + userAssignedManagedIdentitySettings: |
| 259 | + clientId: <id> |
| 260 | + tenantId: <id> |
| 261 | +``` |
| 262 | + |
| 263 | +### Configure settings specific to source endpoints |
| 264 | + |
| 265 | +For Kafka endpoints, you can configure settings specific for using the endpoint as a source. These settings have no effect if the endpoint is used as a destination. |
| 266 | + |
| 267 | +```yaml |
| 268 | +spec: |
| 269 | + endpointType: kafka |
| 270 | + kafkaSettings: |
| 271 | + consumerGroupId: fromMq |
| 272 | +``` |
| 273 | + |
| 274 | +### Configure settings specific to destination endpoints |
| 275 | + |
| 276 | +For Kafka endpoints, you can configure settings specific for using the endpoint as a destination. These settings have no effect if the endpoint is used as a source. |
| 277 | + |
| 278 | +```yaml |
| 279 | +spec: |
| 280 | + endpointType: kafka |
| 281 | + kafkaSettings: |
| 282 | + compression: gzip |
| 283 | + batching: |
| 284 | + latencyMs: 100 |
| 285 | + maxBytes: 1000000 |
| 286 | + maxMessages: 1000 |
| 287 | + partitionStrategy: static |
| 288 | + kafkaAcks: all |
| 289 | + copyMqttProperties: enabled |
| 290 | +``` |
| 291 | + |
| 292 | +## Endpoint types for destinations only |
| 293 | + |
| 294 | +### Azure Data Lake (ADLSv2) |
| 295 | + |
| 296 | +Azure Data Lake endpoints are used for Azure Data Lake destinations. You can configure the endpoint, authentication, table, and other settings. |
| 297 | + |
| 298 | +```yaml |
| 299 | +apiVersion: connectivity.iotoperations.azure.com/v1beta1 |
| 300 | +kind: DataflowEndpoint |
| 301 | +metadata: |
| 302 | + name: adls |
| 303 | +spec: |
| 304 | + endpointType: dataLakeStorage |
| 305 | + authentication: |
| 306 | + method: systemAssignedManagedIdentity |
| 307 | + systemAssignedManagedIdentitySettings: {} |
| 308 | + datalakeStorageSettings: |
| 309 | + host: example.blob.core.windows.net |
| 310 | +``` |
| 311 | + |
| 312 | +Other supported authentication method is SAS tokens or user-assigned managed identity. |
| 313 | + |
| 314 | +```yaml |
| 315 | +spec: |
| 316 | + authentication: |
| 317 | + method: accessToken |
| 318 | + accessTokenSecretRef: <your access token secret name> |
| 319 | + # OR |
| 320 | + userAssignedManagedIdentitySettings: |
| 321 | + clientId: <id> |
| 322 | + tenantId: <id> |
| 323 | +``` |
| 324 | + |
| 325 | +You can also configure batching latency, max bytes, and max messages. |
| 326 | + |
| 327 | +```yaml |
| 328 | +spec: |
| 329 | + endpointType: dataLakeStorage |
| 330 | + datalakeStorageSettings: |
| 331 | + batching: |
| 332 | + latencyMs: 100 |
| 333 | + maxBytes: 1000000 |
| 334 | + maxMessages: 1000 |
| 335 | +``` |
| 336 | + |
| 337 | +### Azure Data Explorer (ADX) |
| 338 | + |
| 339 | +Azure Data Explorer endpoints are used for Azure Data Explorer destinations. You can configure the endpoint, authentication, and other settings. |
| 340 | + |
| 341 | +```yaml |
| 342 | +apiVersion: connectivity.iotoperations.azure.com/v1beta1 |
| 343 | +kind: DataflowEndpoint |
| 344 | +metadata: |
| 345 | + name: adx |
| 346 | +spec: |
| 347 | + endpointType: dataExplorer |
| 348 | + authentication: |
| 349 | + method: systemAssignedManagedIdentity |
| 350 | + systemAssignedManagedIdentitySettings: {} |
| 351 | + # OR |
| 352 | + method: userAssignedManagedIdentity |
| 353 | + userAssignedManagedIdentitySettings: |
| 354 | + clientId: <id> |
| 355 | + tenantId: <id> |
| 356 | + dataExplorerSettings: |
| 357 | + host: example.westeurope.kusto.windows.net |
| 358 | + database: example-database |
| 359 | +``` |
| 360 | + |
| 361 | +Again, you can configure batching latency, max bytes, and max messages. |
| 362 | + |
| 363 | +```yaml |
| 364 | +spec: |
| 365 | + endpointType: dataExplorer |
| 366 | + dataExplorerSettings: |
| 367 | + batching: |
| 368 | + latencyMs: 100 |
| 369 | + maxBytes: 1000000 |
| 370 | + maxMessages: 1000 |
| 371 | +``` |
| 372 | + |
| 373 | +### Local storage and Edge Storage Accelerator |
| 374 | + |
| 375 | +Use the local storage option to send data to a locally available persistent volume, through which you can upload data via Edge Storage Accelerator (ESA) edge volumes. In this case, the format must be parquet. |
| 376 | + |
| 377 | +```yaml |
| 378 | +apiVersion: connectivity.iotoperations.azure.com/v1beta1 |
| 379 | +kind: DataflowEndpoint |
| 380 | +metadata: |
| 381 | + name: esa |
| 382 | +spec: |
| 383 | + endpointType: localStorage |
| 384 | + localStorageSettings: |
| 385 | + persistentVolumeClaimName: <your PVC name> |
| 386 | +``` |
0 commit comments