|
1 | 1 | # import Utility modules
|
2 | 2 | import os
|
3 |
| -import logging |
4 | 3 |
|
5 | 4 | # import vendor-specific modules
|
6 | 5 | from quixstreams import Application
|
|
10 | 9 | from dotenv import load_dotenv
|
11 | 10 | load_dotenv()
|
12 | 11 |
|
13 |
| -logging.basicConfig(level=logging.INFO) |
14 |
| -logger = logging.getLogger(__name__) |
15 | 12 |
|
16 |
| -# read the consumer group from config |
17 |
| -consumer_group_name = os.environ.get("CONSUMER_GROUP_NAME", "influxdb-data-writer") |
18 |
| - |
19 |
| -# read the timestamp column from config |
20 |
| -timestamp_column = os.environ.get("TIMESTAMP_COLUMN") if os.environ.get("TIMESTAMP_COLUMN") else None |
| 13 | +tag_keys = keys.split(",") if (keys := os.environ.get("INFLUXDB_TAG_KEYS")) else [] |
| 14 | +field_keys = keys.split(",") if (keys := os.environ.get("INFLUXDB_FIELD_KEYS")) else [] |
| 15 | +measurement_name = os.environ.get("INFLUXDB_MEASUREMENT_NAME", "measurement1") |
| 16 | +time_setter = col if (col := os.environ.get("TIMESTAMP_COLUMN")) else None |
21 | 17 |
|
22 |
| -buffer_size = int(os.environ.get("BUFFER_SIZE", "1000")) |
| 18 | +influxdb_v3_sink = InfluxDB3Sink( |
| 19 | + token=os.environ["INFLUXDB_TOKEN"], |
| 20 | + host=os.environ["INFLUXDB_HOST"], |
| 21 | + organization_id=os.environ["INFLUXDB_ORG"], |
| 22 | + tags_keys=tag_keys, |
| 23 | + fields_keys=field_keys, |
| 24 | + time_setter=time_setter, |
| 25 | + database=os.environ["INFLUXDB_DATABASE"], |
| 26 | + measurement=measurement_name, |
| 27 | +) |
23 | 28 |
|
24 |
| -buffer_delay = float(os.environ.get("BUFFER_DELAY", "1")) |
25 | 29 |
|
26 |
| -# Create a Quix platform-specific application instead |
27 | 30 | app = Application(
|
28 |
| - consumer_group=consumer_group_name, |
| 31 | + consumer_group=os.environ.get("CONSUMER_GROUP_NAME", "influxdb-data-writer"), |
29 | 32 | auto_offset_reset="earliest",
|
30 |
| - commit_every=buffer_size, |
31 |
| - commit_interval=buffer_delay) |
32 |
| - |
| 33 | + commit_every=int(os.environ.get("BUFFER_SIZE", "1000")), |
| 34 | + commit_interval=float(os.environ.get("BUFFER_DELAY", "1")), |
| 35 | +) |
33 | 36 | input_topic = app.topic(os.environ["input"])
|
34 | 37 |
|
35 |
| -# Read the environment variable and convert it to a dictionary |
36 |
| -tag_keys = os.environ.get("INFLUXDB_TAG_KEYS", "").split(",") if os.environ.get("INFLUXDB_TAG_KEYS") else [] |
37 |
| -field_keys = os.environ.get("INFLUXDB_FIELD_KEYS", "").split(",")if os.environ.get("INFLUXDB_FIELD_KEYS") else [] |
38 |
| -measurement_name = os.environ.get("INFLUXDB_MEASUREMENT_NAME", "measurement1") |
39 |
| - |
40 |
| -influxdb_v3_sink = InfluxDB3Sink( |
41 |
| - token=os.environ["INFLUXDB_TOKEN"], |
42 |
| - host=os.environ["INFLUXDB_HOST"], |
43 |
| - organization_id=os.environ["INFLUXDB_ORG"], |
44 |
| - tags_keys=tag_keys, |
45 |
| - fields_keys=field_keys, |
46 |
| - time_key=timestamp_column, |
47 |
| - database=os.environ["INFLUXDB_DATABASE"], |
48 |
| - measurement=measurement_name) |
49 |
| - |
50 | 38 | sdf = app.dataframe(input_topic)
|
51 |
| - |
52 |
| -#sdf.print() |
53 | 39 | sdf.sink(influxdb_v3_sink)
|
54 | 40 |
|
| 41 | + |
55 | 42 | if __name__ == "__main__":
|
56 |
| - logger.info("Starting application") |
57 | 43 | app.run()
|
58 |
| - |
59 |
| - |
0 commit comments