diff --git a/python/sources/influxdb_3/README.md b/python/sources/influxdb_3/README.md index 1c30af0f4..dfebf1b81 100644 --- a/python/sources/influxdb_3/README.md +++ b/python/sources/influxdb_3/README.md @@ -1,6 +1,25 @@ # InfluxDB v3 -[This connector](https://github.com/quixio/quix-samples/tree/main/python/sources/influxdb_3) demonstrates how to use the InfluxDB v3 query API to periodically query InfluxDB and publish the results to a Kafka topic. +[This connector](https://github.com/quixio/quix-samples/tree/main/python/sources/influxdb_3) demonstrates how to use the InfluxDB v3 query API to periodically +query InfluxDB3 and publish the results to a Kafka topic. + +To learn more about how it functions, [check out the underlying +Quix Streams `InfluxDB3Source`](https://quix.io/docs/quix-streams/connectors/sources/influxdb3-source.html). + + + +## Using with a Quix Cloud InfluxDB3 Service + +This deployment will work seamlessly with a [Quix Cloud InfluxDB3 service](https://github.com/quixio/quix-samples/tree/main/docker/influxdb_3). + +Simply provide the following arguments when setting up this connector: + +```shell +INFLUXDB_HOST="http://influxdb3:80" +INFLUXDB_ORG="" # required, but ignored +INFLUXDB_TOKEN="" # required, but ignored +``` + ## How to run @@ -13,21 +32,39 @@ Then either: * or click `Customise connector` to inspect or alter the code before deployment. + + ## Environment Variables The connector uses the following environment variables: -- **output**: This is the ouput topic that will receive the stream (Default: `influxdb`, Required: `True`) -- **task_interval**: Interval to run query. Must be within the InfluxDB notation; 1s, 1m, 1h, 1d, 1w, 1mo, 1y (Default: `5m`, Required: `True`) -- **INFLUXDB_HOST**: Host address for the InfluxDB instance. (Default: `eu-central-1-1.aws.cloud2.influxdata.com`, Required: `True`) -- **INFLUXDB_TOKEN**: Authentication token to access InfluxDB. (Default: ``, Required: `True`) -- **INFLUXDB_ORG**: Organization name in InfluxDB. (Default: ``, Required: `False`) -- **INFLUXDB_DATABASE**: Database name in InfluxDB where data is stored. (Default: ``, Required: `True`) -- **INFLUXDB_MEASUREMENT_NAME**: The InfluxDB measurement to read data from. If not specified, the name of the output topic will be used (Default: ``, Required: `False`) +### Required + +- `output`: The Kafka topic that will receive the query results. +- `INFLUXDB_HOST`: Host address for the InfluxDB instance. +- `INFLUXDB_TOKEN`: Authentication token to access InfluxDB. +- `INFLUXDB_ORG`: Organization name in InfluxDB. +- `INFLUXDB_DATABASE`: Database name in InfluxDB where data is stored. + +### Optional + +- `INFLUXDB_QUERY_MEASUREMENTS`: The measurements to query. If left None, all measurements will be processed. +- `INFLUXDB_RECORD_TIMESTAMP_COLUMN`: The InfluxDB record column used for the Kafka timestamp, else uses Kafka default (produce time). +- `INFLUXDB_RECORD_MEASUREMENT_COLUMN`: The column name used for inserting the measurement name, else uses `'_measurement'`. +- `INFLUXDB_QUERY_SQL`: A custom SQL query to use for retrieving data from InfluxDB, else uses default. +- `INFLUXDB_QUERY_START_DATE`: The RFC3339-formatted start time for querying InfluxDB, else uses current runtime. +- `INFLUXDB_QUERY_END_DATE`: The RFC3339-formatted end time for querying InfluxDB, else runs indefinitely for 1 measurement only. +- `INFLUXDB_QUERY_TIME_DELTA`: Time interval for batching queries, e.g., `'5m'` for 5 minutes. +- `INFLUXDB_QUERY_MAX_RETRIES`: Maximum number of retries for querying or producing (with multiplicative backoff). +- `INFLUXDB_QUERY_DELAY_SECONDS`: Add an optional delay (in seconds) between producing batches +- `CONSUMER_GROUP_NAME`: The name of the consumer group to use when consuming from Kafka. +- `BUFFER_SIZE`: The number of records that sink holds before flush data to InfluxDb. +- `BUFFER_TIMEOUT`: The number of seconds that sink holds before flush data to the InfluxDb. ## Requirements / Prerequisites -You will need to have an InfluxDB 3.0 instance available and an API authentication token. +You will need to have an InfluxDB 3.0 instance available and an API authentication token ( +unless otherwise noted). ## Contribute diff --git a/python/sources/influxdb_3/dockerfile b/python/sources/influxdb_3/dockerfile index 692316bbb..752b6e836 100644 --- a/python/sources/influxdb_3/dockerfile +++ b/python/sources/influxdb_3/dockerfile @@ -1,5 +1,5 @@ FROM python:3.12.5-slim-bookworm - + # Set environment variables for non-interactive setup and unbuffered output ENV DEBIAN_FRONTEND=noninteractive \ PYTHONUNBUFFERED=1 \ diff --git a/python/sources/influxdb_3/library.json b/python/sources/influxdb_3/library.json index a44a34304..8871b850b 100644 --- a/python/sources/influxdb_3/library.json +++ b/python/sources/influxdb_3/library.json @@ -17,16 +17,8 @@ "Name": "output", "Type": "EnvironmentVariable", "InputType": "OutputTopic", - "Description": "This is the Kafka topic that will receive the query results", - "DefaultValue": "influxdbv3-data", - "Required": true - }, - { - "Name": "task_interval", - "Type": "EnvironmentVariable", - "InputType": "FreeText", - "Description": "Interval to run query. Must be within the InfluxDB notation; 1s, 1m, 1h, 1d, 1w, 1y", - "DefaultValue": "5m", + "Description": "The Kafka topic that will receive the query results", + "DefaultValue": "influxdbv3-data-source", "Required": true }, { @@ -34,13 +26,13 @@ "Type": "EnvironmentVariable", "InputType": "FreeText", "Description": "Host address for the InfluxDB instance.", - "DefaultValue": "eu-central-1-1.aws.cloud2.influxdata.com", + "DefaultValue": "http://influxdb3:80", "Required": true }, { "Name": "INFLUXDB_TOKEN", "Type": "EnvironmentVariable", - "InputType": "FreeText", + "InputType": "Secret", "Description": "Authentication token to access InfluxDB.", "DefaultValue": "", "Required": true @@ -62,11 +54,97 @@ "Required": true }, { - "Name": "INFLUXDB_MEASUREMENT_NAME", + "Name": "INFLUXDB_QUERY_MEASUREMENTS", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "The measurements to query. If left None, all measurements will be processed.", + "Required": false + }, + { + "Name": "INFLUXDB_RECORD_KEY_COLUMN", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "The InfluxDB record column used for the Kafka message key, else uses the measurement's name.", + "Required": false + }, + { + "Name": "INFLUXDB_RECORD_TIMESTAMP_COLUMN", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "The InfluxDB record column used for the Kafka timestamp, else uses Kafka default (produce time).", + "Required": false + }, + { + "Name": "INFLUXDB_RECORD_MEASUREMENT_COLUMN", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "The column name used for inserting the measurement name, else uses '_measurement'.", + "Required": false + }, + { + "Name": "INFLUXDB_QUERY_SQL", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "A custom SQL query to use for retrieving data from InfluxDB, else uses default.", + "Required": false + }, + { + "Name": "INFLUXDB_QUERY_START_DATE", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "The RFC3339-formatted start time for querying InfluxDB, else uses current runtime.", + "Required": false + }, + { + "Name": "INFLUXDB_QUERY_END_DATE", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "The RFC3339-formatted end time for querying InfluxDB, else runs indefinitely for 1 measurement only.", + "Required": false + }, + { + "Name": "INFLUXDB_QUERY_TIME_DELTA", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "Time interval for batching queries, e.g., '5m' for 5 minutes.", + "Required": false + }, + { + "Name": "INFLUXDB_QUERY_MAX_RETRIES", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "Maximum number of retries for querying or producing (with multiplicative backoff).", + "Required": false + }, + { + "Name": "INFLUXDB_QUERY_DELAY_SECONDS", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "Add an optional delay (in seconds) between producing batches.", + "Required": false + }, + { + "Name": "CONSUMER_GROUP_NAME", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "The name of the consumer group to use when consuming from Kafka.", + "DefaultValue": "influxdb-sink", + "Required": true + }, + { + "Name": "BUFFER_SIZE", + "Type": "EnvironmentVariable", + "InputType": "FreeText", + "Description": "The number of records that sink holds before flush data to InfluxDb.", + "DefaultValue": "1000", + "Required": false + }, + { + "Name": "BUFFER_TIMEOUT", "Type": "EnvironmentVariable", "InputType": "FreeText", - "Description": "The InfluxDB measurement to read data from. If not specified, the name of the output topic will be used", - "DefaultValue": "", + "Description": "The number of seconds that sink holds before flush data to the InfluxDb.", + "DefaultValue": "1", "Required": false } ], diff --git a/python/sources/influxdb_3/main.py b/python/sources/influxdb_3/main.py index b806b41b5..9ffd1731a 100644 --- a/python/sources/influxdb_3/main.py +++ b/python/sources/influxdb_3/main.py @@ -1,138 +1,80 @@ -# Import utility modules +from dateutil import parser import os -import random -import json -import logging -from time import sleep +import inspect +from typing import Any -# import vendor-specific libraries from quixstreams import Application -from quixstreams.models.serializers.quix import JSONSerializer, SerializationContext -import influxdb_client_3 as InfluxDBClient3 +from quixstreams.sources.community.influxdb3 import InfluxDB3Source # for local dev, load env vars from a .env file from dotenv import load_dotenv load_dotenv() -# Initialize logging -logging.basicConfig(level=logging.INFO) -logger = logging.getLogger(__name__) - -# Create a Quix Application -app = Application(use_changelog_topics=False) - -# Define a serializer for messages, using JSON Serializer for ease -serializer = JSONSerializer() - -# Define the topic using the "output" environment variable -topic_name = os.environ["output"] -topic = app.topic(topic_name) - -influxdb3_client = InfluxDBClient3.InfluxDBClient3(token=os.environ["INFLUXDB_TOKEN"], - host=os.environ["INFLUXDB_HOST"], - org=os.environ["INFLUXDB_ORG"], - database=os.environ["INFLUXDB_DATABASE"]) - -measurement_name = os.environ.get("INFLUXDB_MEASUREMENT_NAME", os.environ["output"]) -interval = os.environ.get("task_interval", "5m") - -# Global variable to control the main loop's execution -run = True - -# InfluxDB interval-to-seconds conversion dictionary -UNIT_SECONDS = { - "s": 1, - "m": 60, - "h": 3600, - "d": 86400, - "w": 604800, - "y": 31536000, -} - -# Helper function to convert time intervals (like 1h, 2m) into seconds for easier processing. -# This function is useful for determining the frequency of certain operations. -def interval_to_seconds(interval: str) -> int: - try: - return int(interval[:-1]) * UNIT_SECONDS[interval[-1]] - except ValueError as e: - if "invalid literal" in str(e): - raise ValueError( - "interval format is {int}{unit} i.e. '10h'; " - f"valid units: {list(UNIT_SECONDS.keys())}") - except KeyError: - raise ValueError( - f"Unknown interval unit: {interval[-1]}; " - f"valid units: {list(UNIT_SECONDS.keys())}") - -interval_seconds = interval_to_seconds(interval) - -# Function to fetch data from InfluxDB and send it to Quix -# It runs in a continuous loop, periodically fetching data based on the interval. -def get_data(): - # Run in a loop until the main thread is terminated - while run: - try: - query_definition = f'SELECT * FROM "{measurement_name}" WHERE time >= now() - {interval}' - print(f"Sending query {query_definition}") - # Query InfluxDB 3.0 using influxql or sql - table = influxdb3_client.query( - query=query_definition, - mode="pandas", - language="influxql") - - table = table.drop(columns=["iox::measurement"]) - - # If there are rows to write to the stream at this time - if not table.empty: - # Convert to JSON for JSON-to-bytes serializer - json_result = table.to_json(orient='records', date_format='iso') - yield json_result - print(f"Query retrieved {table.size} records") - else: - print("No new data to publish.") - - # Wait for the next interval - sleep(interval_seconds) - - except Exception as e: - print("query failed", flush=True) - print(f"error: {e}", flush=True) - sleep(1) - -def main(): + +def get_kwargs_defaults() -> dict[str, Any]: """ - Read data from the Query and publish it to Kafka + Gets the default kwargs of MongoDBSink so they can be passed in instances + where the user did not provide an environment variable. """ + params = inspect.signature(InfluxDB3Source.__init__).parameters.values() + return { + param.name: param.default for param in params if + param.default is not inspect.Parameter.empty + } + + +def _measurements(): + if measurements := os.getenv("INFLUXDB_QUERY_MEASUREMENTS"): + return measurements.split(',') + + +def _key_setter(): + if column := os.getenv("INFLUXDB_RECORD_KEY_COLUMN"): + return lambda record: record[column] + + +def _timestamp_setter(): + if column := os.getenv("INFLUXDB_RECORD_TIMESTAMP_COLUMN"): + return lambda record: record[column] + + +def _start_date(): + if date := os.getenv("INFLUXDB_QUERY_START_DATE"): + return parser.parse(date) + + +def _end_date(): + if date := os.getenv("INFLUXDB_QUERY_END_DATE"): + return parser.parse(date) + + +kwargs_defaults = get_kwargs_defaults() +influxdb3_source = InfluxDB3Source( + host=os.environ["INFLUXDB_HOST"], + token=os.environ["INFLUXDB_TOKEN"], + organization_id=os.environ["INFLUXDB_ORG"], + database=os.environ["INFLUXDB_DATABASE"], + measurements=_measurements() or kwargs_defaults["measurements"], + key_setter=_key_setter() or kwargs_defaults["key_setter"], + timestamp_setter=_timestamp_setter() or kwargs_defaults["timestamp_setter"], + measurement_column_name=os.getenv("INFLUXDB_RECORD_MEASUREMENT_COLUMN") or kwargs_defaults["measurement_column_name"], + sql_query=os.getenv("INFLUXDB_QUERY_SQL") or kwargs_defaults["sql_query"], + start_date=_start_date() or kwargs_defaults["start_date"], + end_date=_end_date() or kwargs_defaults["end_date"], + time_delta=os.getenv("INFLUXDB_QUERY_TIME_DELTA") or kwargs_defaults["time_delta"], + max_retries=int(retries) if (retries := os.getenv("INFLUXDB_QUERY_MAX_RETRIES", '')) != '' else kwargs_defaults["max_retries"], + delay=float(delay) if (delay := os.getenv("INFLUXDB_QUERY_DELAY_SECONDS", '')) != '' else kwargs_defaults["delay"], +) + +# Create a Quix platform-specific application instead +app = Application( + consumer_group=os.environ.get("CONSUMER_GROUP_NAME", "influxdb-data-source"), + auto_offset_reset="earliest", + commit_every=int(os.environ.get("BUFFER_SIZE", "1000")), + commit_interval=float(os.environ.get("BUFFER_DELAY", "1")), +) +sdf = app.add_source(influxdb3_source, topic=app.topic(name=os.environ["output"])) + - # Create a pre-configured Producer object. - # Producer is already setup to use Quix brokers. - # It will also ensure that the topics exist before producing to them if - # Application.Quix is initialized with "auto_create_topics=True". - - with app.get_producer() as producer: - for res in get_data(): - # Parse the JSON string into a Python object - records = json.loads(res) - for index, obj in enumerate(records): - # Generate a unique message_key for each row - # Change to a tag name if you want to aggregate data by a specific tag such as "SensorID"—e.g. message_key = obj['SensorID'] - message_key = f"INFLUX_DATA_{str(random.randint(1, 100)).zfill(3)}_{index}" - logger.info(f"Produced message with key:{message_key}, value:{obj}") - - # Serialize row value to bytes - serialized_value = serializer( - value=obj, ctx=SerializationContext(topic=topic.name) - ) - - # publish the data to the topic - producer.produce( - topic=topic.name, - key=message_key, - value=serialized_value, - ) - -if __name__ == "__main__": - try: - main() - except KeyboardInterrupt: - print("Exiting.") \ No newline at end of file +if __name__ == '__main__': + app.run() diff --git a/python/sources/influxdb_3/requirements.txt b/python/sources/influxdb_3/requirements.txt index 9727f52c9..db213e73e 100644 --- a/python/sources/influxdb_3/requirements.txt +++ b/python/sources/influxdb_3/requirements.txt @@ -1,3 +1,3 @@ -quixstreams==2.9.0 -influxdb3-python==0.3.6 -python-dotenv \ No newline at end of file +quixstreams[influxdb3]==3.17.0 +python-dotenv +python-dateutil \ No newline at end of file