Skip to content

Commit abd57a9

Browse files
committed
Add InfluxDB source.
1 parent 8bb4e40 commit abd57a9

File tree

3 files changed

+109
-0
lines changed

3 files changed

+109
-0
lines changed
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
---
2+
title: Configure a pipeline InfluxDB source stage
3+
description: Configure a pipeline source stage to read data from InfluxDB for processing. The source stage is the first stage in a Data Processor pipeline.
4+
author: dominicbetts
5+
ms.author: dobett
6+
ms.subservice: data-processor
7+
ms.topic: how-to
8+
ms.date: 02/21/2024
9+
10+
#CustomerIntent: As an operator, I want to configure an InfluxDB source stage so that I can read messages from an InfluxDB database for processing.
11+
---
12+
13+
# Configure an InfluxDB source stage
14+
15+
[!INCLUDE [public-preview-note](../includes/public-preview-note.md)]
16+
17+
The source stage is the first and required stage in an Azure IoT Data Processor (preview) pipeline. The source stage gets data into the data processing pipeline and prepares it for further processing. The InfluxDB source stage lets you read data from an [InfluxDB](https://www.influxdata.com/) database at a user-defined interval.
18+
19+
In the source stage, you define:
20+
21+
- Connection details for InfluxDB.
22+
- The interval at which to query the InfluxDB database. The stage waits for a result before it resets the interval timer.
23+
- A partitioning configuration based on your specific data processing requirements.
24+
25+
## Prerequisites
26+
27+
- A functioning instance of Data Processor is deployed.
28+
- An InfluxDB database with all necessary raw data available is operational and reachable.
29+
30+
## Prepare the Influx database
31+
32+
To connect to the InfluxDB database, you need to:
33+
34+
- Create an access token that grants the pipeline read access to the InfluxDB database. To learn more, see [Manage API tokens](https://docs.influxdata.com/influxdb/v2/admin/tokens/).
35+
- Create a secret in Azure Key Vault that contains the access token. To learn more, see [Manage secrets for your Azure IoT Operations deployment](../deploy-iot-ops/howto-manage-secrets.md).
36+
37+
## Configure the InfluxDB source
38+
39+
To configure the InfluxDB source:
40+
41+
- Provide details of the InfluxDB database. This configuration includes the server name and a query to retrieve the data.
42+
- Specify the authentication method. Currently limited to access token authentication.
43+
44+
The following table describes the InfluxDB source configuration parameters:
45+
46+
The base schema of the input configuration is made up of:
47+
48+
| Field | Type | Description | Required? | Default | Example |
49+
|--|--|--|--|--|--|
50+
| Data query | Object | The InfluxDB query configuration | Yes | | `{"expression": 'from(bucket:"test-bucket")\|> range(start: -1h) \|> filter(fn: (r) => r._measurement == "stat")'}` |
51+
| Server URL | string | URL of the InfluxDB server | Yes | | `https://contoso.com/some/url/path` |
52+
| Query interval | [Duration](concept-configuration-patterns.md#duration) | String representation of the time to wait before the next API call. | Yes | | `24h` |
53+
| Server port | integer | The InfluxDB server port | No | 443 | 443 |
54+
| Organization | string | The organization that holds the bucket to query from | Yes | `test-org` | `test-org` |
55+
| Data format | [Format](concept-supported-formats.md) | The stage applies the format to individual rows retrieved by the query. Only the `json` format is supported. The top-level `path` isn't supported. | Yes | | `{"type": "json"}` |
56+
| Partitioning | [Partitioning](#configure-partitioning) | Partitioning configuration for the source stage. | Required | NA | See [partitioning](#configure-partitioning) |
57+
| Authentication | Authentication type | The authentication method for connecting to the server. Supports `accessToken` type only. | Yes | `{"type": "accessToken"}` | `{"type": "accessToken"}` |
58+
59+
## Configure partitioning
60+
61+
[!INCLUDE [data-processor-configure-partition](../includes/data-processor-configure-partition.md)]
62+
63+
| Field | Description | Required | Default | Example |
64+
| ----- | ----------- | -------- | ------- | ------- |
65+
| Partition type | The type of partitioning to be used: Partition `ID` or Partition `Key` | Required | `ID` | `ID` |
66+
| Partition expression | The [jq expression](../process-data/concept-jq-expression.md) to use on the incoming message to compute the partition `ID` or partition `Key` | Required | `0` | `.payload.header` |
67+
| Number of partitions| The number of partitions in a Data Processor pipeline. | Required | `1` | `1` |
68+
69+
Data Processor adds metadata to the incoming message. See [Data Processor message structure overview](concept-message-structure.md) to understand how to correctly specify the partitioning expression that runs on the incoming message. By default, the partitioning expression is set to `0` with the **Partition type** as `ID` to send all the incoming data to a single partition.
70+
71+
For recommendations and to learn more, see [What is partitioning?](../process-data/concept-partitioning.md).
72+
73+
## Sample configuration
74+
75+
The following JSON example shows a complete InfluxDB source stage configuration:
76+
77+
```json
78+
{
79+
"displayName": "influxdb-reader-1",
80+
"type": "input/influxdbv2@v1",
81+
"next": ["next-stage-id"],
82+
"query": {
83+
"expression": "from(bucket:\"test-bucket\") |> range(start: -1h) |> filter(fn: (r) => r._measurement == \"stat\")"
84+
},
85+
"interval": "5s",
86+
"url": "https://contoso.com/some/url/path",
87+
"organization": "test-org",
88+
"partitionCount": 1,
89+
"partitionStrategy": {
90+
"type": "id",
91+
"expression": "0"
92+
},
93+
"format": {
94+
"type": "json"
95+
},
96+
"authentication": {
97+
"type": "accessToken",
98+
"accessToken": "AKV_ACCESS_TOKEN"
99+
}
100+
}
101+
```
102+
103+
## Related content
104+
105+
- [Serialization and deserialization formats](concept-supported-formats.md)
106+
- [What is partitioning?](concept-partitioning.md)

articles/iot-operations/process-data/overview-data-processor.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ Data Processor pipelines can use the following stages:
6666
| [Source - MQ](howto-configure-datasource-mq.md) | Retrieves data from an MQTT broker. |
6767
| [Source - HTTP endpoint](howto-configure-datasource-http.md) | Retrieves data from an HTTP endpoint. |
6868
| [Source - SQL](howto-configure-datasource-sql.md) | Retrieves data from a Microsoft SQL Server database. |
69+
| [Source - InfluxDB](howto-configure-datasource-influxdb.md) | Retrieves data from an InfluxDB database. |
6970
| [Filter](howto-configure-filter-stage.md) | Filters data coming through the stage. For example, filter out any message with temperature outside of the `50F-150F` range. |
7071
| [Transform](howto-configure-transform-stage.md) | Normalizes the structure of the data. For example, change the structure from `{"Name": "Temp", "value": 50}` to `{"temp": 50}`. |
7172
| [LKV](howto-configure-lkv-stage.md) | Stores selected metric values into an LKV store. For example, store only temperature and humidity measurements into LKV, ignore the rest. A subsequent stage can enrich a message with the stored LKV data. |

articles/iot-operations/toc.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,8 @@ items:
107107
href: process-data/howto-configure-datasource-http.md
108108
- name: Configure a pipeline SQL Server source stage
109109
href: process-data/howto-configure-datasource-sql.md
110+
- name: Configure a pipeline InfluxDB source stage
111+
href: process-data/howto-configure-datasource-influxdb.md
110112
- name: Process data
111113
items:
112114
- name: Aggregate data in a pipeline

0 commit comments

Comments
 (0)