Skip to content

Commit 07a065a

Browse files
authored
Add wis2 compliant mqtt publishing (#223)
* Add wis2 pydantic model * Add wis2 pydantic model * Add option for WIS2 complient mqtt message * Add more ENV variables * Add content to wis2 mqtt message * Fix error when WIS2 mqtt was not configured * Add mosquitto mqtt to compose * Add new unit mapping * Add new unit mapping
1 parent baf6696 commit 07a065a

File tree

12 files changed

+283
-26
lines changed

12 files changed

+283
-26
lines changed

docker-compose.yml

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ services:
99
volumes:
1010
# - ts-data:/home/postgres/pgdata/data # for timescale image
1111
- ts-data:/var/lib/postgresql # for postgres image
12-
- ./datastore/database/extra.conf:/etc/conf_settings/extra.conf:ro # Extra Postgres configuration
12+
- ./datastore/database/extra.conf:/etc/conf_settings/extra.conf:ro # Extra Postgres configuration
1313
- ./datastore/database/healthcheck_postgis_uptime.sh:/healthcheck_postgis_uptime.sh:ro # for the healthcheck
1414
environment:
1515
- EXTRA_CONF_DIR=/etc/conf_settings
@@ -105,10 +105,17 @@ services:
105105
environment:
106106
- DSHOST=${DSHOST:-store}
107107
- DSPORT=${DSPORT:-50050}
108-
- MQTT_HOST=${MQTT_HOST}
108+
- MQTT_HOST=${MQTT_HOST:-mqtt}
109109
- MQTT_USERNAME=${MQTT_USERNAME}
110110
- MQTT_PASSWORD=${MQTT_PASSWORD}
111-
- MQTT_TLS=True
111+
- MQTT_PORT=${MQTT_PORT:-1883}
112+
- MQTT_TLS=${MQTT_TLS:-False}
113+
- WIS2_TOPIC=${WIS2_TOPIC:-'TEMP_TOPIC'}
114+
- WIS2_MQTT_HOST=${MQTT_HOST:-mqtt_wis2}
115+
- WIS2_MQTT_USERNAME=${MQTT_USERNAME:-}
116+
- WIS2_MQTT_PASSWORD=${MQTT_PASSWORD:-}
117+
- WIS2_MQTT_TLS=${MQTT_TLS:-False}
118+
- WIS2_MQTT_PORT=${MQTT_PORT:-1884}
112119
- INGEST_LOGLEVEL
113120
- GUNICORN_CMD_ARGS=--bind 0.0.0.0:8001 --workers=4 --access-logfile -
114121
depends_on:
@@ -123,6 +130,23 @@ services:
123130
volumes:
124131
- ./ingest/test/output:/app/output
125132

133+
mqtt:
134+
image: eclipse-mosquitto
135+
restart: unless-stopped
136+
ports:
137+
- "1883:1883"
138+
volumes:
139+
- ./mosquitto:/etc/mosquitto
140+
- ./mosquitto/mosquitto.conf:/mosquitto/config/mosquitto.conf
141+
142+
mqtt_wis2:
143+
image: eclipse-mosquitto
144+
restart: unless-stopped
145+
ports:
146+
- "1884:1884"
147+
volumes:
148+
- ./mosquitto:/etc/mosquitto
149+
- ./mosquitto/mosquitto_wis2.conf:/mosquitto/config/mosquitto.conf
126150
client:
127151
profiles: ["test"]
128152
build:
@@ -208,7 +232,12 @@ services:
208232
depends_on:
209233
db:
210234
condition: service_healthy
211-
command: ["--collector.stat_statements", "--collector.stat_user_tables", "--collector.stat_activity_autovacuum"]
235+
command:
236+
[
237+
"--collector.stat_statements",
238+
"--collector.stat_user_tables",
239+
"--collector.stat_activity_autovacuum",
240+
]
212241

213242
grafana:
214243
profiles: ["monitoring"]

ingest/README.md

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
# e-soh-event-queue
2+
23
## Enviornment variables
34

45
| Variable | Default Value | Description |
@@ -9,17 +10,24 @@
910
| `MQTT_USERNAME` | | Username for authentication with the MQTT broker. |
1011
| `MQTT_PASSWORD` | | Password for authentication with the MQTT broker. |
1112
| `MQTT_TLS` | `True` | Whether to use TLS (True/False) for the MQTT connection. Defaults to `True`.|
13+
| `WIS2_MQTT_HOST` | | Host address for the MQTT broker. |
14+
| `WIS2_MQTT_USERNAME` | | Username for authentication with the MQTT broker. |
15+
| `WIS2_MQTT_PASSWORD` | | Password for authentication with the MQTT broker. |
16+
| `WIS2_MQTT_TLS` | `True` | Whether to use TLS (True/False) for the MQTT connection. Defaults to `True`.|
17+
| `WIS2_METADATA_RECORD_ID` | | The ID of the WIS2 global metadata ID for this data service.|
18+
| `WIS2_TOPIC` | | The WIS2 MQTT topic the messages should be published under. |
19+
| `EDR_API_URL` | | If the EDR API is hosted on a different URL then the ingest API, set this to the correct URL for the EDR API.|
1220
| `PROMETHEUS_MULTIPROC_DIR` | `/tmp/metrics` | Directory for Prometheus multiprocess mode metrics. Defaults to `/tmp/metrics`. |
1321
| `INGEST_LOGLEVEL` | | Logging level for the ingestion process. |
1422
| `GUNICORN_CMD_ARGS` | | Command-line arguments for configuring Gunicorn, a Python WSGI HTTP Server. |
1523
| `FASTAPI_ROOT_PATH` | | If this api is behind proxy, this need to be set to the root path |
1624

17-
1825
## Dev install
19-
To install in dev mode run `pip install --editable .` from the top level of this repository.
2026

27+
To install in dev mode run `pip install --editable .` from the top level of this repository.
2128

2229
## C++ dependencies
30+
2331
| Module | Version |
2432
| --------------- | ------- |
2533
| libeccodes-data | 2.24.2 |
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
import os
2+
import json
3+
4+
5+
from api.model import Geometry
6+
from api.model import Link
7+
from api.wis2_model import Wis2MessageSchema
8+
from api.wis2_model import Properties
9+
from api.wis2_model import Content
10+
11+
12+
def get_api_timeseries_query(location_id: str, baseURL: str) -> str:
13+
query = "/collections/observations/locations/" + location_id
14+
baseURL = os.getenv("EDR_API_URL", baseURL)
15+
return baseURL + query
16+
17+
18+
def generate_wis2_topic() -> str:
19+
"""This function will generate the WIS2 complient toipc name"""
20+
wis2_topic = os.getenv("WIS2_TOPIC")
21+
if not wis2_topic:
22+
raise ValueError("WIS2_TOPIC env variable not set. Aborting publish to wis2")
23+
return wis2_topic
24+
25+
26+
def generate_wis2_payload(message: dict, request_url: str) -> Wis2MessageSchema:
27+
"""
28+
This function will generate the WIS2 complient payload based on the JSON schema for ESOH
29+
"""
30+
wis2_payload = Wis2MessageSchema(
31+
type="Feature",
32+
id=message["id"],
33+
version="v04",
34+
geometry=Geometry(**message["geometry"]),
35+
properties=Properties(
36+
producer=message["properties"]["naming_authority"],
37+
data_id=message["properties"]["data_id"],
38+
metadata_id=os.getenv(
39+
"WIS2_METADATA_RECORD_ID", None
40+
), # Need to figure out how we generate this? Is it staic or dynamic?
41+
datetime=message["properties"]["datetime"],
42+
pubtime=message["properties"]["pubtime"],
43+
content=Content(
44+
value=json.dumps(
45+
{
46+
"type": "Feature",
47+
"geometry": message["geometry"],
48+
"properties": {
49+
"observation": message["properties"]["content"]["value"],
50+
"CF_standard_name": message["properties"]["content"]["standard_name"],
51+
"unit": message["properties"]["content"]["unit"],
52+
},
53+
},
54+
separators=(",", ":"),
55+
),
56+
unit=message["properties"]["content"]["unit"],
57+
encoding="utf-8",
58+
),
59+
),
60+
links=(
61+
[
62+
Link(
63+
href=get_api_timeseries_query(message["properties"]["platform"], request_url),
64+
rel="canonical",
65+
type="application/prs.coverage+json",
66+
)
67+
]
68+
)
69+
+ (lambda x: x if x else [])(message["links"]),
70+
)
71+
72+
return wis2_payload

ingest/api/ingest.py

Lines changed: 41 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
import logging
22
from typing import Union
33

4+
from api.generate_wis2_payload import generate_wis2_payload
5+
from api.generate_wis2_payload import generate_wis2_topic
46
import grpc
57
import json
68

@@ -28,26 +30,38 @@ def __init__(
2830
self,
2931
mqtt_conf: dict,
3032
uuid_prefix: str,
33+
mqtt_WIS2_conf: dict | None = None,
3134
):
3235
self.uuid_prefix = uuid_prefix
3336
self.client = None
34-
if mqtt_conf["host"]:
35-
try:
37+
self.WIS2_client = None
38+
try:
39+
if mqtt_conf["host"]:
3640
self.client = connect_mqtt(mqtt_conf)
37-
except Exception as e:
38-
logger.error("Failed to establish connection to mqtt, " + "\n" + str(e))
39-
raise e
41+
if mqtt_WIS2_conf:
42+
self.WIS2_client = connect_mqtt(mqtt_WIS2_conf)
43+
except Exception as e:
44+
logger.error(
45+
"Failed to establish connection to mqtt, "
46+
+ "\n"
47+
+ str(e)
48+
+ "\n"
49+
+ json.dumps(mqtt_conf)
50+
+ "\n"
51+
+ json.dumps(mqtt_WIS2_conf)
52+
)
53+
raise e
4054

41-
async def ingest(self, message: Union[str, object]):
55+
async def ingest(self, message: Union[str, object], publishWIS2: bool, baseURL: str):
4256
"""
4357
This method will interpret call all methods for deciding input type, build the mqtt messages, and
4458
publish them.
4559
4660
"""
4761
messages = build_messages(message, self.uuid_prefix)
48-
await self.publish_messages(messages)
62+
await self.publish_messages(messages, publishWIS2, baseURL)
4963

50-
async def publish_messages(self, messages: list):
64+
async def publish_messages(self, messages: list, publishWIS2: bool, baseURL: str):
5165
"""
5266
This method accepts a list of json strings ready to be ingest to datastore
5367
and published to the mqtt topic.
@@ -60,7 +74,7 @@ async def publish_messages(self, messages: list):
6074
logger.error("Failed to reach datastore, " + "\n" + str(e))
6175
raise HTTPException(status_code=500, detail="API could not reach datastore")
6276

63-
if self.client is not None:
77+
if self.client or self.WIS2_client:
6478
for msg in messages:
6579
topic = (
6680
msg["properties"]["naming_authority"]
@@ -76,11 +90,27 @@ async def publish_messages(self, messages: list):
7690
msg["properties"]["level"] = level_string
7791
msg["properties"]["period"] = period_iso
7892
try:
79-
send_message(topic, json.dumps(msg), self.client)
80-
logger.debug("Succesfully published to mqtt")
93+
if self.client:
94+
send_message(topic, json.dumps(msg), self.client)
95+
logger.debug("Succesfully published to mqtt")
8196
except Exception as e:
8297
logger.error("Failed to publish to mqtt, " + str(e))
8398
raise HTTPException(
8499
status_code=500,
85100
detail="Data ingested to datastore. But unable to publish to mqtt",
86101
)
102+
try:
103+
if publishWIS2 and self.WIS2_client:
104+
send_message(
105+
generate_wis2_topic(),
106+
generate_wis2_payload(msg, baseURL).model_dump(exclude_unset=True, exclude_none=True),
107+
self.WIS2_client,
108+
)
109+
logger.debug("Succesfully published to mqtt")
110+
except Exception as e:
111+
logger.error("Failed to publish to WIS2 mqtt, " + str(e))
112+
print(e)
113+
raise HTTPException(
114+
status_code=500,
115+
detail="Data ingested to datastore. But unable to publish to WIS2 mqtt",
116+
)

ingest/api/main.py

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
import logging
22
import os
33

4+
from api.utilities import get_base_url_from_request
45
from fastapi import FastAPI
56
from fastapi import UploadFile
7+
from fastapi import Request
68
from pydantic import BaseModel
79

810
from typing import List
@@ -37,23 +39,46 @@ class Response(BaseModel):
3739
"port": int(os.getenv("MQTT_PORT", 8883)),
3840
}
3941

40-
ingester = IngestToPipeline(mqtt_conf=mqtt_configuration, uuid_prefix="uuid")
42+
mqtt_wis2_configuration = {
43+
"host": os.getenv("WIS2_MQTT_HOST", None),
44+
"username": os.getenv("WIS2_MQTT_USERNAME", None),
45+
"password": os.getenv("WIS2_MQTT_PASSWORD", None),
46+
"enable_tls": os.getenv("WIS2_MQTT_TLS", "False").lower() in ("true", "1", "t"),
47+
"port": int(os.getenv("WIS2_MQTT_PORT", 8883)),
48+
}
49+
50+
if not all(
51+
[
52+
mqtt_wis2_configuration["host"],
53+
]
54+
):
55+
mqtt_wis2_configuration = None
56+
57+
ingester = IngestToPipeline(
58+
mqtt_conf=mqtt_configuration,
59+
uuid_prefix="uuid",
60+
mqtt_WIS2_conf=mqtt_wis2_configuration,
61+
)
4162

4263
app = FastAPI(root_path=os.getenv("FASTAPI_ROOT_PATH", ""))
4364
add_metrics(app)
4465

4566

4667
@app.post("/bufr")
47-
async def upload_bufr_file(files: UploadFile):
68+
async def upload_bufr_file(http_request: Request, files: UploadFile, publishWIS2: bool = False):
4869
contents = await files.read()
4970
json_data = build_json_payload(contents)
50-
await post_json(json_data)
71+
await post_json(http_request, json_data, publishWIS2)
5172

5273
return Response(status_message="Successfully ingested", status_code=200)
5374

5475

5576
@app.post("/json")
56-
async def post_json(request: JsonMessageSchema | List[JsonMessageSchema]) -> Response:
77+
async def post_json(
78+
http_request: Request,
79+
request: JsonMessageSchema | List[JsonMessageSchema],
80+
publishWIS2: bool = False,
81+
) -> Response:
5782
status = "Successfully ingested"
5883
if isinstance(request, list):
5984
hash_list = [i.__hash__() for i in request]
@@ -65,6 +90,6 @@ async def post_json(request: JsonMessageSchema | List[JsonMessageSchema]) -> Res
6590
else:
6691
json_data = [request.model_dump(exclude_none=True)]
6792

68-
await ingester.ingest(json_data)
93+
await ingester.ingest(json_data, publishWIS2, get_base_url_from_request(http_request))
6994

7095
return Response(status_message=status, status_code=200)

ingest/api/model.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,8 @@ class Geometry(BaseModel):
4040

4141
class Integrity(BaseModel):
4242
method: Literal["sha256", "sha384", "sha512", "sha3-256", "sha3-384", "sha3-512"] = Field(
43-
..., description="A specific set of methods for calculating the checksum algorithms"
43+
...,
44+
description="A specific set of methods for calculating the checksum algorithms",
4445
)
4546
value: str = Field(..., description="Checksum value.")
4647

@@ -290,7 +291,8 @@ class Properties(BaseModel):
290291
)
291292
content: Content = Field(..., description="Actual data content")
292293
integrity: Optional[Integrity] = Field(
293-
None, description="Specifies a checksum to be applied to the data to ensure that the download is accurate."
294+
None,
295+
description="Specifies a checksum to be applied to the data to ensure that the download is accurate.",
294296
)
295297

296298
@field_validator("period", mode="before")
@@ -320,7 +322,6 @@ def check_datetime_iso(self) -> "Properties":
320322

321323
@model_validator(mode="after")
322324
def validate_wigos_id(self):
323-
324325
blocks = self.platform.split("-")
325326
assert len(blocks) == 4, f"Not enough blocks in input 'platform', '{self.platform}'"
326327
for i in blocks[:-1]:

ingest/api/send_mqtt.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77

88

99
def connect_mqtt(mqtt_conf: dict):
10-
1110
def on_connect(client, userdata, flags, rc, properties=None):
1211
if rc == 0:
1312
logger.info("Connected to MQTT Broker!")

ingest/api/std_name_units.json

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -478,6 +478,14 @@
478478
],
479479
"symbol": "m/s"
480480
},
481+
"wind_gust_from_direction": {
482+
"unit": "degrees",
483+
"symbol": "degrees",
484+
"alias": [
485+
"deg",
486+
"degree"
487+
]
488+
},
481489
"air_pressure": {
482490
"unit": "hPa",
483491
"symbol": "hPa",

0 commit comments

Comments
 (0)