diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 31c588a..f7b6c63 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -40,7 +40,7 @@ repos: hooks: - id: flake8 additional_dependencies: [flake8-comprehensions, flake8-docstrings, flake8-import-order, pep8-naming, pydocstyle] - args: ["--ignore", "E501,D103,D100,W503,E203", "--import-order-style", "google"] + args: ["--ignore", "E501,D103,D100,W503,E203,E402", "--import-order-style", "google"] - repo: https://github.com/jumanjihouse/pre-commit-hooks rev: 3.0.0 diff --git a/samples/python/command-response/README.md b/samples/python/command-response/README.md index 1cc0929..f524a5d 100644 --- a/samples/python/command-response/README.md +++ b/samples/python/command-response/README.md @@ -7,10 +7,6 @@ We are using the [Eclipse Paho](http://eclipse.org/paho/) MQTT Python client lib Note that the OCI IoT Platform only supports MQTT Secure (MQTTS) on port 8883. -We only illustrate a connection using password-based authentication with MQTT Secure (MQTTS). -This example can be easily modified to use certificate-based authentication and/or WebSocket -Secure (WSS) -- see the respective "publish" examples in this repository. - ## Scenario The `command-response.py` script will establish an MQTT connection with the OCI IoT @@ -28,10 +24,38 @@ Install the Python dependencies pip install -r requirements.txt ``` -The script assumes a Digital Twin with unstructured telemetry has already been created. +The script assumes a Digital Twin has already been created. ## Configure and run the script +### Telemetry payload + +- For unstructured telemetry, the content can be arbitrary. +- For structured telemetry, it must match the Model/Adapter. +- For structured telemetry in the default format, if a "time" property is specified, + it must be an epoch time in microseconds and will override the "time_observed" field + in the database. +- The same applies to structured telemetry in a custom format, but the mapping must be + defined in the adapter. + +The sample telemetry used by the scripts is compatible with all three Digital Twins +created in the "Manage Digital Twins" section of this repository: + +```json +telemetry_data = { + "time": 1757512025226854, + "sht_temperature": 23.8, + "qmp_temperature": 24.4, + "humidity": 56.1, + "pressure": 1012.2, + "count": 1, +} +``` + +The `time` field is optional, this can be specified in the configuration file (see below) + +### Configuration file + Copy `config.distr.py` to `config.py` and set the following variables: - `iot_device_host`: The Device Host for your IoT Domain. @@ -53,9 +77,16 @@ Copy `config.distr.py` to `config.py` and set the following variables: here. See the [proxy_set](https://eclipse.dev/paho/files/paho.mqtt.python/html/client.html#paho.mqtt.client.Client.proxy_set) documentation for more details. -- `username`: The "externalKey" property of your Digital Twin. -- `password`: The Digital Twin password; that is, the content of the vault secret - corresponding to the authId property of your Digital Twin. +- `auth_type`: Authentication type: `basic` or `cert`. +- For basic authentication: + - `username`: The "externalKey" property of your Digital Twin. + - `password`: The Digital Twin password; that is, the content of the vault secret + corresponding to the authId property of your Digital Twin. +- For certificate authentication: + - Set the path to your client certificate and key in the `client_cert` and `client_key` + variables. + - Keep in mind that the `authId` property of your Digital Twin must match the + Common Name (CN) of the certificate. Run the script: @@ -101,3 +132,6 @@ Waiting 2 seconds to process possible /cmd messages... Terminated $ ``` + +You can check the status of the message delivery and response in the `RAW_COMMAND_DATA` +database view. diff --git a/samples/python/command-response/command-response.py b/samples/python/command-response/command-response.py index fdb6243..86bd4d0 100755 --- a/samples/python/command-response/command-response.py +++ b/samples/python/command-response/command-response.py @@ -11,11 +11,17 @@ """ import json +import os import sys import threading import time +sys.path.append( + os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir, "shared")) +) + import config +import environmental_sensor_simulator import paho.mqtt.client as mqtt MQTT_PORT = 8883 @@ -26,19 +32,6 @@ def current_epoch_microseconds(): return int(time.time() * 1000 * 1000) -# Telemetry data example. -telemetry_data = { - "time": 0, - "sht_temperature": 23.8, - "qmp_temperature": 24.4, - "humidity": 56.1, - "pressure": 1012.2, - "count": 0, -} - -shutdown_event = threading.Event() - - # Callback for MQTT connection event. def on_connect(client, userdata, flags, reason_code, properties=None): print(f"Connected with result code {reason_code}") @@ -50,6 +43,7 @@ def on_connect(client, userdata, flags, reason_code, properties=None): def on_message(client, userdata, message, properties=None): topic = message.topic payload = message.payload.decode() + state = userdata if topic.endswith("/cmd"): print(f"Received command on {topic}: {payload}") # Command handling logic goes here. @@ -62,16 +56,20 @@ def on_message(client, userdata, message, properties=None): if cmd and cmd.get("shutdown", False): print("Shutdown command received. Preparing to exit...") - shutdown_event.set() # Signal the telemetry loop to stop. + state["shutdown_event"].set() # Signal the telemetry loop to stop. # Build corresponding /rsp topic rsp_topic = topic[:-4] + "/rsp" ack_msg = json.dumps( {"status": "acknowledged", "time": current_epoch_microseconds()} ) - print(f"Sending ack to {rsp_topic}: {ack_msg}") - client.publish(topic=rsp_topic, payload=ack_msg, qos=config.qos) + state["ack_msg_info"].append( + client.publish(topic=rsp_topic, payload=ack_msg, qos=config.qos) + ) + +if config.auth_type not in ("basic", "cert"): + raise ValueError("auth_type must be 'basic' or 'cert'") client = mqtt.Client( client_id=config.client_id, # Ensure client_id is set for persistent sessions. @@ -79,14 +77,21 @@ def on_message(client, userdata, message, properties=None): protocol=mqtt.MQTTv311, # Use v311 unless v5 features are needed. callback_api_version=mqtt.CallbackAPIVersion.VERSION2, # type: ignore ) +state = { + "ack_msg_info": [], + "shutdown_event": threading.Event(), +} +client.user_data_set(state) client.on_connect = on_connect client.on_message = on_message -# TLS/SSL configuration. -client.tls_set(ca_certs=config.ca_certs) - -# Authentication. -client.username_pw_set(username=config.username, password=config.password) +if config.auth_type == "basic": + client.tls_set(ca_certs=config.ca_certs) + client.username_pw_set(username=config.username, password=config.password) +else: + client.tls_set( + ca_certs=config.ca_certs, certfile=config.client_cert, keyfile=config.client_key + ) # Configure proxy if needed. if config.proxy_args: @@ -101,26 +106,34 @@ def on_message(client, userdata, message, properties=None): # Send telemetry messages. try: + telemetry = environmental_sensor_simulator.EnvironmentalSensorSimulator( + time_format=config.time_format + ) count = 1 print("Telemetry loop -- Press Ctrl-C to stop.") - while not shutdown_event.is_set(): + while not state["shutdown_event"].is_set(): print(f"Sending message #{count}") - telemetry_data["time"] = current_epoch_microseconds() - telemetry_data["count"] = count rc_pub = client.publish( topic=config.iot_endpoint, - payload=json.dumps(telemetry_data), + payload=json.dumps(telemetry.get_telemetry()), qos=config.qos, ) rc_pub.wait_for_publish() count += 1 time.sleep(config.message_delay) + # Drain ack_msg_info + while state["ack_msg_info"]: + state["ack_msg_info"].pop(0).wait_for_publish() + except KeyboardInterrupt: print("\nInterrupted by user. Exiting...") # Wait to process any potential /cmd messages before exit. print("Waiting 2 seconds to process possible /cmd messages...") time.sleep(2) +# Drain ack_msg_info +while state["ack_msg_info"]: + state["ack_msg_info"].pop(0).wait_for_publish() # Tear down the client and exit. client.loop_stop() diff --git a/samples/python/command-response/config.distr.py b/samples/python/command-response/config.distr.py index ceaee3e..5bafde1 100644 --- a/samples/python/command-response/config.distr.py +++ b/samples/python/command-response/config.distr.py @@ -16,6 +16,9 @@ # MQTT client id client_id = "your_device_name" +# Format of the "time" field in the payload ("none", "epoch", "iso") +time_format = "epoch" + # Quality of Service qos = 1 @@ -42,9 +45,32 @@ # Authentication ### +# Authentication type: basic or cert +auth_type = "basic" + +### +# For basic authentication +### + # The username is the "externalKey" property of your Digital Twin. username = "your_device_username" # The Digital Twin password. This should be the content of the vault secret # corresponding to the authId property of your Digital Twin. password = "your_device_password" + +### +# For certificate authentication (mTLS) +### + +# Path to your client certificate and key. +# If both the certificate and private key are in the same file, set client_key to None. +# You can retrieve a certificate bundle from the OCI certificate store with: +# oci certificates certificate-bundle get \ +# --certificate-id \ +# --bundle-type CERTIFICATE_CONTENT_WITH_PRIVATE_KEY | +# jq -r '.data."certificate-pem"','.data."private-key-pem"' > client_certificate_bundle.pem +# Keep in mind that the authId property of your Digital Twin must match the +# Common Name (CN) of the certificate. +client_cert = "/path/to/client_certificate.pem" +client_key = "" diff --git a/samples/python/command-response/requirements.txt b/samples/python/command-response/requirements.txt index ad20a40..9f48da8 100644 --- a/samples/python/command-response/requirements.txt +++ b/samples/python/command-response/requirements.txt @@ -1,2 +1,3 @@ +numpy~=2.3.0 paho-mqtt>=2.0.0 PySocks diff --git a/samples/python/publish-https/README.md b/samples/python/publish-https/README.md index 541ca69..992b476 100644 --- a/samples/python/publish-https/README.md +++ b/samples/python/publish-https/README.md @@ -11,13 +11,56 @@ Install the Python dependencies (using a pip install -r requirements.txt ``` -## Using Password-Based Authentication +The script assumes a Digital Twin has already been created. +The sample payload sent will be accepted by any of the Digital Twins created by the +[Manage Digital Twins](../../script/manage-dt/) section of this repository. + +## Configure and run the scripts + +### Telemetry payload + +- For unstructured telemetry, the content can be arbitrary. +- For structured telemetry, it must match the Model/Adapter. +- For structured telemetry in the default format, if a "time" property is specified, + it must be an epoch time in microseconds and will override the "time_observed" field + in the database. +- The same applies to structured telemetry in a custom format, but the mapping must be + defined in the adapter. + +The sample telemetry used by the scripts is compatible with all three Digital Twins +created in the "Manage Digital Twins" section of this repository: + +```json +telemetry_data = { + "time": 1757512025226854, + "sht_temperature": 23.8, + "qmp_temperature": 24.4, + "humidity": 56.1, + "pressure": 1012.2, + "count": 1, +} +``` + +The `time` field is optional, this can be specified in the configuration file (see below) + +### Common configuration Copy `config.distr.py` to `config.py` and set the following variables: -- `iot_device_host`: The Device Host for your IoT Domain -- `iot_endpoint`: The _path_ for your telemetry (equivalent to an MQTT topic) -- `username` and `password`: Credentials for your device +- `iot_device_host`: The Device Host for your IoT Domain. +- `iot_endpoint`: The MQTT topic for your telemetry. +- `time_format`: format of the `time` field in the payload: + - `none`: No time information is included in the payload. + - `epoch`: Current time as integer microseconds since Unix epoch. + - `iso`: Current time as an ISO8601 string in UTC (with 'Z' suffix). + +### Using Password-Based Authentication + +Set your device credentials in `config.py`: + +- `username`: The "externalKey" property of your Digital Twin. +- `password`: The Digital Twin password, i.e., the content of the vault secret + corresponding to the authId property of your Digital Twin. Run the script: @@ -25,13 +68,13 @@ Run the script: ./pub-https-basic.py ``` -## Using Certificate-Based Authentication +### Using Certificate-Based Authentication -Copy `config.distr.py` to `config.py` and set the following variables: +Set the path to your client certificate and key in the `client_cert` and `client_key` +variables of the `config.py` file. -- `iot_device_host`: The Device Host for your IoT Domain -- `iot_endpoint`: The _path_ for your telemetry (equivalent to an MQTT topic) -- `client_cert` and `client_key`: Paths to your client certificate and key +Keep in mind that the `authId` property of your Digital Twin must match the +Common Name (CN) of the certificate. Run the script: diff --git a/samples/python/publish-https/config.distr.py b/samples/python/publish-https/config.distr.py index 8b67a70..7ba7d07 100644 --- a/samples/python/publish-https/config.distr.py +++ b/samples/python/publish-https/config.distr.py @@ -10,6 +10,8 @@ # The IoT endpoint can be any value, similar to the MQTT topic. iot_endpoint = "iot/v1/http" +# Format of the "time" field in the payload ("none", "epoch", "iso") +time_format = "epoch" ### # For basic authentication diff --git a/samples/python/publish-https/pub-https-basic.py b/samples/python/publish-https/pub-https-basic.py index 86e0101..900e343 100755 --- a/samples/python/publish-https/pub-https-basic.py +++ b/samples/python/publish-https/pub-https-basic.py @@ -11,41 +11,26 @@ """ import json -from time import time +import os +import sys + +sys.path.append( + os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir, "shared")) +) import config +import environmental_sensor_simulator import requests - -# Get the current UTC time as epoch in microseconds -def current_epoch_microseconds(): - return int(time() * 1000 * 1000) - - -# Telemetry data. -# - For unstructured telemetry, the content can be arbitrary. -# - For structured telemetry, it must match the Model/Adapter. -# - For structured telemetry in the default format, if a "time" property is -# specified, it must be an epoch time in microseconds and will override the -# "time_observed" property. -# - The same applies to structured telemetry in a custom format, but the -# mapping must be defined in the adapter. -# -# The sample telemetry below is compatible with all three Digital Twins created -# in the "Manage Digital Twins" section of this repository. -telemetry_data = { - "time": current_epoch_microseconds(), - "sht_temperature": 23.8, - "qmp_temperature": 24.4, - "humidity": 56.1, - "pressure": 1012.2, - "count": 5479, -} +telemetry = environmental_sensor_simulator.EnvironmentalSensorSimulator( + time_format=config.time_format +) +payload = json.dumps(telemetry.get_telemetry()) try: response = requests.post( f"https://{config.iot_device_host}/{config.iot_endpoint}", - data=json.dumps(telemetry_data), + data=payload, headers={"Content-Type": "application/json"}, auth=(config.username, config.password), # Basic auth ) diff --git a/samples/python/publish-https/pub-https-cert.py b/samples/python/publish-https/pub-https-cert.py index 91f6510..238568f 100755 --- a/samples/python/publish-https/pub-https-cert.py +++ b/samples/python/publish-https/pub-https-cert.py @@ -11,37 +11,21 @@ """ import json -from time import time +import os +import sys + +sys.path.append( + os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir, "shared")) +) import config +import environmental_sensor_simulator import requests - -# Get the current UTC time as epoch in microseconds -def current_epoch_microseconds(): - return int(time() * 1000 * 1000) - - -# Telemetry data. -# - For unstructured telemetry, the content can be arbitrary. -# - For structured telemetry, it must match the Model/Adapter. -# - For structured telemetry in the default format, if a "time" property is -# specified, it must be an epoch time in microseconds and will override the -# "time_observed" property. -# - The same applies to structured telemetry in a custom format, but the -# mapping must be defined in the adapter. -# -# The sample telemetry below is compatible with all three Digital Twins created -# in the "Manage Digital Twins" section of this repository. -telemetry_data = { - "time": current_epoch_microseconds(), - "sht_temperature": 23.8, - "qmp_temperature": 24.4, - "humidity": 56.1, - "pressure": 1012.2, - "count": 5479, -} - +telemetry = environmental_sensor_simulator.EnvironmentalSensorSimulator( + time_format=config.time_format +) +payload = json.dumps(telemetry.get_telemetry()) if config.client_key: cert = (config.client_cert, config.client_key) @@ -51,7 +35,7 @@ def current_epoch_microseconds(): try: response = requests.post( f"https://{config.iot_device_host}/{config.iot_endpoint}", - data=json.dumps(telemetry_data), + data=payload, headers={"Content-Type": "application/json"}, cert=cert, # Client certificate auth ) diff --git a/samples/python/publish-https/requirements.txt b/samples/python/publish-https/requirements.txt index f229360..43fca92 100644 --- a/samples/python/publish-https/requirements.txt +++ b/samples/python/publish-https/requirements.txt @@ -1 +1,2 @@ +numpy~=2.3.0 requests diff --git a/samples/python/publish-mqtt/README.md b/samples/python/publish-mqtt/README.md index efb8ff9..b18b243 100644 --- a/samples/python/publish-mqtt/README.md +++ b/samples/python/publish-mqtt/README.md @@ -34,6 +34,32 @@ The sample payload sent will be accepted by any of the Digital Twins created by ## Configure and run the scripts +### Telemetry payload + +- For unstructured telemetry, the content can be arbitrary. +- For structured telemetry, it must match the Model/Adapter. +- For structured telemetry in the default format, if a "time" property is specified, + it must be an epoch time in microseconds and will override the "time_observed" field + in the database. +- The same applies to structured telemetry in a custom format, but the mapping must be + defined in the adapter. + +The sample telemetry used by the scripts is compatible with all three Digital Twins +created in the "Manage Digital Twins" section of this repository: + +```json +telemetry_data = { + "time": 1757512025226854, + "sht_temperature": 23.8, + "qmp_temperature": 24.4, + "humidity": 56.1, + "pressure": 1012.2, + "count": 1, +} +``` + +The `time` field is optional, this can be specified in the configuration file (see below) + ### Common configuration Copy `config.distr.py` to `config.py` and set the following variables: @@ -42,6 +68,10 @@ Copy `config.distr.py` to `config.py` and set the following variables: - `iot_endpoint`: The MQTT topic for your telemetry. - `message_count` and `message_delay`: The number of messages to send and the delay in seconds between messages. +- `time_format`: format of the `time` field in the payload: + - `none`: No time information is included in the payload. + - `epoch`: Current time as integer microseconds since Unix epoch. + - `iso`: Current time as an ISO8601 string in UTC (with 'Z' suffix). - `ca_certs`: The path to the CA certificate for the OCI IoT Platform. In most cases, you won't need to specify this: OCI uses certificate authorities from well-established providers, and recent Python versions will find the CA certificate diff --git a/samples/python/publish-mqtt/config.distr.py b/samples/python/publish-mqtt/config.distr.py index 86a2cd8..6a0bd79 100644 --- a/samples/python/publish-mqtt/config.distr.py +++ b/samples/python/publish-mqtt/config.distr.py @@ -14,6 +14,9 @@ message_count = 10 message_delay = 0.5 +# Format of the "time" field in the payload ("none", "epoch", "iso") +time_format = "epoch" + # Quality of Service qos = 1 diff --git a/samples/python/publish-mqtt/pub-mqtt-client-basic.py b/samples/python/publish-mqtt/pub-mqtt-client-basic.py index ce336fb..b8197b0 100755 --- a/samples/python/publish-mqtt/pub-mqtt-client-basic.py +++ b/samples/python/publish-mqtt/pub-mqtt-client-basic.py @@ -13,41 +13,21 @@ """ import json +import os import sys import time +sys.path.append( + os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir, "shared")) +) + import config +import environmental_sensor_simulator import paho.mqtt.client as mqtt MQTT_PORT = 8883 -# Get the current UTC time as epoch in microseconds -def current_epoch_microseconds(): - return int(time.time() * 1000 * 1000) - - -# Telemetry data. -# - For unstructured telemetry, the content can be arbitrary. -# - For structured telemetry, it must match the Model/Adapter. -# - For structured telemetry in the default format, if a "time" property is -# specified, it must be an epoch time in microseconds and will override the -# "time_observed" property. -# - The same applies to structured telemetry in a custom format, but the -# mapping must be defined in the adapter. -# -# The sample telemetry below is compatible with all three Digital Twins created -# in the "Manage Digital Twins" section of this repository. -telemetry_data = { - "time": 0, - "sht_temperature": 23.8, - "qmp_temperature": 24.4, - "humidity": 56.1, - "pressure": 1012.2, - "count": 0, -} - - # Callbacks - we only implement the on_connect def on_connect(client, userdata, flags, reason_code, properties): print(f"Connected with result code {reason_code}") @@ -74,13 +54,14 @@ def on_connect(client, userdata, flags, reason_code, properties): client.loop_start() # Send telemetry +telemetry = environmental_sensor_simulator.EnvironmentalSensorSimulator( + time_format=config.time_format +) for count in range(1, config.message_count + 1): print(f"Sending message #{count}") - telemetry_data["time"] = current_epoch_microseconds() - telemetry_data["count"] = count rc = client.publish( topic=config.iot_endpoint, - payload=json.dumps(telemetry_data), + payload=json.dumps(telemetry.get_telemetry()), qos=config.qos, ) rc.wait_for_publish() diff --git a/samples/python/publish-mqtt/pub-mqtt-client-cert.py b/samples/python/publish-mqtt/pub-mqtt-client-cert.py index c7d84c5..0e1ea54 100755 --- a/samples/python/publish-mqtt/pub-mqtt-client-cert.py +++ b/samples/python/publish-mqtt/pub-mqtt-client-cert.py @@ -13,41 +13,21 @@ """ import json +import os import sys import time +sys.path.append( + os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir, "shared")) +) + import config +import environmental_sensor_simulator import paho.mqtt.client as mqtt MQTT_PORT = 8883 -# Get the current UTC time as epoch in microseconds -def current_epoch_microseconds(): - return int(time.time() * 1000 * 1000) - - -# Telemetry data. -# - For unstructured telemetry, the content can be arbitrary. -# - For structured telemetry, it must match the Model/Adapter. -# - For structured telemetry in the default format, if a "time" property is -# specified, it must be an epoch time in microseconds and will override the -# "time_observed" property. -# - The same applies to structured telemetry in a custom format, but the -# mapping must be defined in the adapter. -# -# The sample telemetry below is compatible with all three Digital Twins created -# in the "Manage Digital Twins" section of this repository. -telemetry_data = { - "time": 0, - "sht_temperature": 23.8, - "qmp_temperature": 24.4, - "humidity": 56.1, - "pressure": 1012.2, - "count": 0, -} - - # Callbacks - we only implement the on_connect def on_connect(client, userdata, flags, reason_code, properties): print(f"Connected with result code {reason_code}") @@ -73,13 +53,14 @@ def on_connect(client, userdata, flags, reason_code, properties): client.loop_start() # Send telemetry +telemetry = environmental_sensor_simulator.EnvironmentalSensorSimulator( + time_format=config.time_format +) for count in range(1, config.message_count + 1): print(f"Sending message #{count}") - telemetry_data["time"] = current_epoch_microseconds() - telemetry_data["count"] = count rc = client.publish( topic=config.iot_endpoint, - payload=json.dumps(telemetry_data), + payload=json.dumps(telemetry.get_telemetry()), qos=config.qos, ) rc.wait_for_publish() diff --git a/samples/python/publish-mqtt/pub-mqtt-one-basic.py b/samples/python/publish-mqtt/pub-mqtt-one-basic.py index 8a0817e..762f069 100755 --- a/samples/python/publish-mqtt/pub-mqtt-one-basic.py +++ b/samples/python/publish-mqtt/pub-mqtt-one-basic.py @@ -13,39 +13,20 @@ """ import json +import os +import sys import time +sys.path.append( + os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir, "shared")) +) + import config +import environmental_sensor_simulator import paho.mqtt.publish as publish MQTT_PORT = 8883 - -# Get the current UTC time as epoch in microseconds -def current_epoch_microseconds(): - return int(time.time() * 1000 * 1000) - - -# Telemetry data. -# - For unstructured telemetry, the content can be arbitrary. -# - For structured telemetry, it must match the Model/Adapter. -# - For structured telemetry in the default format, if a "time" property is -# specified, it must be an epoch time in microseconds and will override the -# "time_observed" property. -# - The same applies to structured telemetry in a custom format, but the -# mapping must be defined in the adapter. -# -# The sample telemetry below is compatible with all three Digital Twins created -# in the "Manage Digital Twins" section of this repository. -telemetry_data = { - "time": 0, - "sht_temperature": 23.8, - "qmp_temperature": 24.4, - "humidity": 56.1, - "pressure": 1012.2, - "count": 0, -} - # TLS/SSL configuration tls = { "ca_certs": config.ca_certs, @@ -57,13 +38,14 @@ def current_epoch_microseconds(): "password": config.password, } +telemetry = environmental_sensor_simulator.EnvironmentalSensorSimulator( + time_format=config.time_format +) for count in range(1, config.message_count + 1): print(f"Sending message #{count}") - telemetry_data["time"] = current_epoch_microseconds() - telemetry_data["count"] = count publish.single( topic=config.iot_endpoint, - payload=json.dumps(telemetry_data), + payload=json.dumps(telemetry.get_telemetry()), qos=config.qos, hostname=config.iot_device_host, port=MQTT_PORT, diff --git a/samples/python/publish-mqtt/pub-mqtt-one-cert.py b/samples/python/publish-mqtt/pub-mqtt-one-cert.py index edc0259..c3dffff 100755 --- a/samples/python/publish-mqtt/pub-mqtt-one-cert.py +++ b/samples/python/publish-mqtt/pub-mqtt-one-cert.py @@ -13,39 +13,20 @@ """ import json +import os +import sys import time +sys.path.append( + os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir, "shared")) +) + import config +import environmental_sensor_simulator import paho.mqtt.publish as publish MQTT_PORT = 8883 - -# Get the current UTC time as epoch in microseconds -def current_epoch_microseconds(): - return int(time.time() * 1000 * 1000) - - -# Telemetry data. -# - For unstructured telemetry, the content can be arbitrary. -# - For structured telemetry, it must match the Model/Adapter. -# - For structured telemetry in the default format, if a "time" property is -# specified, it must be an epoch time in microseconds and will override the -# "time_observed" property. -# - The same applies to structured telemetry in a custom format, but the -# mapping must be defined in the adapter. -# -# The sample telemetry below is compatible with all three Digital Twins created -# in the "Manage Digital Twins" section of this repository. -telemetry_data = { - "time": 0, - "sht_temperature": 23.8, - "qmp_temperature": 24.4, - "humidity": 56.1, - "pressure": 1012.2, - "count": 0, -} - # TLS/SSL configuration tls = { "ca_certs": config.ca_certs, @@ -53,13 +34,14 @@ def current_epoch_microseconds(): "keyfile": config.client_key, } +telemetry = environmental_sensor_simulator.EnvironmentalSensorSimulator( + time_format=config.time_format +) for count in range(1, config.message_count + 1): print(f"Sending message #{count}") - telemetry_data["time"] = current_epoch_microseconds() - telemetry_data["count"] = count publish.single( topic=config.iot_endpoint, - payload=json.dumps(telemetry_data), + payload=json.dumps(telemetry.get_telemetry()), qos=config.qos, hostname=config.iot_device_host, port=MQTT_PORT, diff --git a/samples/python/publish-mqtt/requirements.txt b/samples/python/publish-mqtt/requirements.txt index ad20a40..9f48da8 100644 --- a/samples/python/publish-mqtt/requirements.txt +++ b/samples/python/publish-mqtt/requirements.txt @@ -1,2 +1,3 @@ +numpy~=2.3.0 paho-mqtt>=2.0.0 PySocks diff --git a/samples/python/publish-websockets/README.md b/samples/python/publish-websockets/README.md index 3bae096..d374571 100644 --- a/samples/python/publish-websockets/README.md +++ b/samples/python/publish-websockets/README.md @@ -44,6 +44,32 @@ The sample payload sent will be accepted by any of the Digital Twins created by ## Configure and run the scripts +### Telemetry payload + +- For unstructured telemetry, the content can be arbitrary. +- For structured telemetry, it must match the Model/Adapter. +- For structured telemetry in the default format, if a "time" property is specified, + it must be an epoch time in microseconds and will override the "time_observed" field + in the database. +- The same applies to structured telemetry in a custom format, but the mapping must be + defined in the adapter. + +The sample telemetry used by the scripts is compatible with all three Digital Twins +created in the "Manage Digital Twins" section of this repository: + +```json +telemetry_data = { + "time": 1757512025226854, + "sht_temperature": 23.8, + "qmp_temperature": 24.4, + "humidity": 56.1, + "pressure": 1012.2, + "count": 1, +} +``` + +The `time` field is optional, this can be specified in the configuration file (see below) + ### Common configuration Copy `config.distr.py` to `config.py` and set the following variables: @@ -52,6 +78,10 @@ Copy `config.distr.py` to `config.py` and set the following variables: - `iot_endpoint`: The MQTT topic for your telemetry. - `message_count` and `message_delay`: The number of messages to send and the delay in seconds between messages. +- `time_format`: format of the `time` field in the payload: + - `none`: No time information is included in the payload. + - `epoch`: Current time as integer microseconds since Unix epoch. + - `iso`: Current time as an ISO8601 string in UTC (with 'Z' suffix). - `ca_certs`: The path to the CA certificate for the OCI IoT Platform. In most cases, you won't need to specify this: OCI uses certificate authorities from well-established providers, and recent Python versions will find the CA certificate diff --git a/samples/python/publish-websockets/config.distr.py b/samples/python/publish-websockets/config.distr.py index 86a2cd8..6a0bd79 100644 --- a/samples/python/publish-websockets/config.distr.py +++ b/samples/python/publish-websockets/config.distr.py @@ -14,6 +14,9 @@ message_count = 10 message_delay = 0.5 +# Format of the "time" field in the payload ("none", "epoch", "iso") +time_format = "epoch" + # Quality of Service qos = 1 diff --git a/samples/python/publish-websockets/pub-wss-client-basic.py b/samples/python/publish-websockets/pub-wss-client-basic.py index 261cce2..9e2ac56 100755 --- a/samples/python/publish-websockets/pub-wss-client-basic.py +++ b/samples/python/publish-websockets/pub-wss-client-basic.py @@ -13,42 +13,22 @@ """ import json +import os import sys import time +sys.path.append( + os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir, "shared")) +) + import config +import environmental_sensor_simulator import paho.mqtt.client as mqtt WEBSOCKET_PORT = 443 WEBSOCKET_PATH = "/mqtt" -# Get the current UTC time as epoch in microseconds -def current_epoch_microseconds(): - return int(time.time() * 1000 * 1000) - - -# Telemetry data. -# - For unstructured telemetry, the content can be arbitrary. -# - For structured telemetry, it must match the Model/Adapter. -# - For structured telemetry in the default format, if a "time" property is -# specified, it must be an epoch time in microseconds and will override the -# "time_observed" property. -# - The same applies to structured telemetry in a custom format, but the -# mapping must be defined in the adapter. -# -# The sample telemetry below is compatible with all three Digital Twins created -# in the "Manage Digital Twins" section of this repository. -telemetry_data = { - "time": 0, - "sht_temperature": 23.8, - "qmp_temperature": 24.4, - "humidity": 56.1, - "pressure": 1012.2, - "count": 0, -} - - # Callbacks - we only implement the on_connect def on_connect(client, userdata, flags, reason_code, properties): print(f"Connected with result code {reason_code}") @@ -78,13 +58,14 @@ def on_connect(client, userdata, flags, reason_code, properties): client.loop_start() # Send telemetry +telemetry = environmental_sensor_simulator.EnvironmentalSensorSimulator( + time_format=config.time_format +) for count in range(1, config.message_count + 1): print(f"Sending message #{count}") - telemetry_data["time"] = current_epoch_microseconds() - telemetry_data["count"] = count rc = client.publish( topic=config.iot_endpoint, - payload=json.dumps(telemetry_data), + payload=json.dumps(telemetry.get_telemetry()), qos=config.qos, ) rc.wait_for_publish() diff --git a/samples/python/publish-websockets/pub-wss-client-cert.py b/samples/python/publish-websockets/pub-wss-client-cert.py index 806be78..ec44ccd 100755 --- a/samples/python/publish-websockets/pub-wss-client-cert.py +++ b/samples/python/publish-websockets/pub-wss-client-cert.py @@ -13,42 +13,22 @@ """ import json +import os import sys import time +sys.path.append( + os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir, "shared")) +) + import config +import environmental_sensor_simulator import paho.mqtt.client as mqtt WEBSOCKET_PORT = 443 WEBSOCKET_PATH = "/mqtt" -# Get the current UTC time as epoch in microseconds -def current_epoch_microseconds(): - return int(time.time() * 1000 * 1000) - - -# Telemetry data. -# - For unstructured telemetry, the content can be arbitrary. -# - For structured telemetry, it must match the Model/Adapter. -# - For structured telemetry in the default format, if a "time" property is -# specified, it must be an epoch time in microseconds and will override the -# "time_observed" property. -# - The same applies to structured telemetry in a custom format, but the -# mapping must be defined in the adapter. -# -# The sample telemetry below is compatible with all three Digital Twins created -# in the "Manage Digital Twins" section of this repository. -telemetry_data = { - "time": 0, - "sht_temperature": 23.8, - "qmp_temperature": 24.4, - "humidity": 56.1, - "pressure": 1012.2, - "count": 0, -} - - # Callbacks - we only implement the on_connect def on_connect(client, userdata, flags, reason_code, properties): print(f"Connected with result code {reason_code}") @@ -77,13 +57,14 @@ def on_connect(client, userdata, flags, reason_code, properties): client.loop_start() # Send telemetry +telemetry = environmental_sensor_simulator.EnvironmentalSensorSimulator( + time_format=config.time_format +) for count in range(1, config.message_count + 1): print(f"Sending message #{count}") - telemetry_data["time"] = current_epoch_microseconds() - telemetry_data["count"] = count rc = client.publish( topic=config.iot_endpoint, - payload=json.dumps(telemetry_data), + payload=json.dumps(telemetry.get_telemetry()), qos=config.qos, ) rc.wait_for_publish() diff --git a/samples/python/publish-websockets/pub-wss-one-basic.py b/samples/python/publish-websockets/pub-wss-one-basic.py index 9c4cf21..0331d79 100755 --- a/samples/python/publish-websockets/pub-wss-one-basic.py +++ b/samples/python/publish-websockets/pub-wss-one-basic.py @@ -13,39 +13,20 @@ """ import json +import os +import sys import time +sys.path.append( + os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir, "shared")) +) + import config +import environmental_sensor_simulator import paho.mqtt.publish as publish WEBSOCKET_PORT = 443 - -# Get the current UTC time as epoch in microseconds -def current_epoch_microseconds(): - return int(time.time() * 1000 * 1000) - - -# Telemetry data. -# - For unstructured telemetry, the content can be arbitrary. -# - For structured telemetry, it must match the Model/Adapter. -# - For structured telemetry in the default format, if a "time" property is -# specified, it must be an epoch time in microseconds and will override the -# "time_observed" property. -# - The same applies to structured telemetry in a custom format, but the -# mapping must be defined in the adapter. -# -# The sample telemetry below is compatible with all three Digital Twins created -# in the "Manage Digital Twins" section of this repository. -telemetry_data = { - "time": 0, - "sht_temperature": 23.8, - "qmp_temperature": 24.4, - "humidity": 56.1, - "pressure": 1012.2, - "count": 0, -} - # TLS/SSL configuration tls = { "ca_certs": config.ca_certs, @@ -57,13 +38,15 @@ def current_epoch_microseconds(): "password": config.password, } +telemetry = environmental_sensor_simulator.EnvironmentalSensorSimulator( + time_format=config.time_format +) + for count in range(1, config.message_count + 1): print(f"Sending message #{count}") - telemetry_data["time"] = current_epoch_microseconds() - telemetry_data["count"] = count publish.single( topic=config.iot_endpoint, - payload=json.dumps(telemetry_data), + payload=json.dumps(telemetry.get_telemetry()), qos=config.qos, hostname=config.iot_device_host, port=WEBSOCKET_PORT, diff --git a/samples/python/publish-websockets/pub-wss-one-cert.py b/samples/python/publish-websockets/pub-wss-one-cert.py index 713f2ca..afdd81c 100755 --- a/samples/python/publish-websockets/pub-wss-one-cert.py +++ b/samples/python/publish-websockets/pub-wss-one-cert.py @@ -13,39 +13,20 @@ """ import json +import os +import sys import time +sys.path.append( + os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir, "shared")) +) + import config +import environmental_sensor_simulator import paho.mqtt.publish as publish WEBSOCKET_PORT = 443 - -# Get the current UTC time as epoch in microseconds -def current_epoch_microseconds(): - return int(time.time() * 1000 * 1000) - - -# Telemetry data. -# - For unstructured telemetry, the content can be arbitrary. -# - For structured telemetry, it must match the Model/Adapter. -# - For structured telemetry in the default format, if a "time" property is -# specified, it must be an epoch time in microseconds and will override the -# "time_observed" property. -# - The same applies to structured telemetry in a custom format, but the -# mapping must be defined in the adapter. -# -# The sample telemetry below is compatible with all three Digital Twins created -# in the "Manage Digital Twins" section of this repository. -telemetry_data = { - "time": 0, - "sht_temperature": 23.8, - "qmp_temperature": 24.4, - "humidity": 56.1, - "pressure": 1012.2, - "count": 0, -} - # TLS/SSL configuration tls = { "ca_certs": config.ca_certs, @@ -53,13 +34,15 @@ def current_epoch_microseconds(): "keyfile": config.client_key, } +telemetry = environmental_sensor_simulator.EnvironmentalSensorSimulator( + time_format=config.time_format +) + for count in range(1, config.message_count + 1): print(f"Sending message #{count}") - telemetry_data["time"] = current_epoch_microseconds() - telemetry_data["count"] = count publish.single( topic=config.iot_endpoint, - payload=json.dumps(telemetry_data), + payload=json.dumps(telemetry.get_telemetry()), qos=config.qos, hostname=config.iot_device_host, port=WEBSOCKET_PORT, diff --git a/samples/python/publish-websockets/requirements.txt b/samples/python/publish-websockets/requirements.txt index ad20a40..9f48da8 100644 --- a/samples/python/publish-websockets/requirements.txt +++ b/samples/python/publish-websockets/requirements.txt @@ -1,2 +1,3 @@ +numpy~=2.3.0 paho-mqtt>=2.0.0 PySocks diff --git a/samples/python/shared/environmental_sensor_simulator.py b/samples/python/shared/environmental_sensor_simulator.py new file mode 100644 index 0000000..9ab9334 --- /dev/null +++ b/samples/python/shared/environmental_sensor_simulator.py @@ -0,0 +1,163 @@ +#!/usr/bin/env python3 + +""" +Class to simulate an environmental sensor. + +Copyright (c) 2025 Oracle and/or its affiliates. +Licensed under the Universal Permissive License v 1.0 as shown at +https://oss.oracle.com/licenses/upl + +DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER. +""" + +from datetime import datetime, timezone +import time + +import numpy as np + + +class EnvironmentalSensorSimulator: + """ + Simulates a virtual environmental sensor producing sequential telemetry. + + Parameters: + time_format (str): Specify time style for telemetry payload. + - 'epoch': Current time as integer microseconds since Unix epoch. + - 'iso': Current time as an ISO8601 string in UTC (with 'Z' suffix). + - 'none': No time information is included in the payload. + + Behavior: + - Each telemetry message includes: + - sht_temperature (°C), qmp_temperature (°C): Range [13.0, 21.0] + - humidity (%): Range [60.0, 90.0] + - pressure (hPa): Range [1000.0, 1030.0] + - count: Increments with every call + - (optional) time: Only included if time_format != 'none' + - Sensor values evolve stochastically, deviating stepwise from the previous + value using Gaussian noise and always clamped within their respective ranges. + - All floats are rounded to 2 decimal places in the output. + - Initial sensor values are chosen near the middle of their allowed range for + realistic startup. + """ + + def __init__(self, time_format="epoch"): + """ + Initialize the environmental sensor simulator with startup values. + + Args: + time_format (str): Format for time in telemetry output. Must be one of + 'epoch', 'iso', or 'none'. Default is 'epoch'. + + Raises: + ValueError: If time_format is not a supported value. + """ + if time_format not in ("epoch", "iso", "none"): + raise ValueError("time_format must be 'epoch', 'iso', or 'none'") + self.time_format = time_format + self.count = 0 + + # Bounds + self.TEMP_MIN = 13.0 + self.TEMP_MAX = 21.0 + self.PRESSURE_MIN = 1000.0 + self.PRESSURE_MAX = 1030.0 + self.HUMIDITY_MIN = 60.0 + self.HUMIDITY_MAX = 90.0 + + # Step std devs + self.sht_temp_sigma = 0.1 # Step size per call + self.qmp_temp_sigma = 0.1 + self.humidity_sigma = 0.5 + self.pressure_sigma = 0.2 + + # Initialize values: Gaussian around midpoint, clamp to bounds + rng = np.random.default_rng() + temp_mid = (self.TEMP_MIN + self.TEMP_MAX) / 2 + temp_std = (self.TEMP_MAX - self.TEMP_MIN) / 8 + pressure_mid = (self.PRESSURE_MIN + self.PRESSURE_MAX) / 2 + pressure_std = (self.PRESSURE_MAX - self.PRESSURE_MIN) / 8 + humidity_mid = (self.HUMIDITY_MIN + self.HUMIDITY_MAX) / 2 + humidity_std = (self.HUMIDITY_MAX - self.HUMIDITY_MIN) / 8 + + self.sht_temperature = self._clamp( + float(rng.normal(temp_mid, temp_std)), self.TEMP_MIN, self.TEMP_MAX + ) + self.qmp_temperature = self._clamp( + float(rng.normal(temp_mid, temp_std)), self.TEMP_MIN, self.TEMP_MAX + ) + self.humidity = self._clamp( + float(rng.normal(humidity_mid, humidity_std)), + self.HUMIDITY_MIN, + self.HUMIDITY_MAX, + ) + self.pressure = self._clamp( + float(rng.normal(pressure_mid, pressure_std)), + self.PRESSURE_MIN, + self.PRESSURE_MAX, + ) + + def _clamp(self, value, minv, maxv): + return max(min(value, maxv), minv) + + def _get_time(self): + now = time.time() + if self.time_format == "epoch": + # Return integer microseconds since Unix epoch + return int(now * 1_000_000) + else: + # ISO 8601 with microseconds and UTC; use Z for UTC indicator + return ( + datetime.fromtimestamp(now, tz=timezone.utc) + .isoformat(timespec="microseconds") + .replace("+00:00", "Z") + ) + + def get_telemetry(self): + """ + Generate the next telemetry reading. + + Simulates realistic sensor value fluctuations. Returns a dictionary containing + current sensor readings: + - sht_temperature (float): SHT sensor temperature in °C + - qmp_temperature (float): QMP sensor temperature in °C + - humidity (float): Relative humidity in % + - pressure (float): Pressure in hPa + - count (int): Call counter + - time: Timestamp in specified format, if enabled + + Returns: + dict: Telemetry payload with sensor readings and optional time. + """ + # Simulate the next values + self.count += 1 + self.sht_temperature = self._clamp( + float(np.random.normal(self.sht_temperature, self.sht_temp_sigma)), + self.TEMP_MIN, + self.TEMP_MAX, + ) + self.qmp_temperature = self._clamp( + float(np.random.normal(self.qmp_temperature, self.qmp_temp_sigma)), + self.TEMP_MIN, + self.TEMP_MAX, + ) + self.humidity = self._clamp( + float(np.random.normal(self.humidity, self.humidity_sigma)), + self.HUMIDITY_MIN, + self.HUMIDITY_MAX, + ) + self.pressure = self._clamp( + float(np.random.normal(self.pressure, self.pressure_sigma)), + self.PRESSURE_MIN, + self.PRESSURE_MAX, + ) + + telemetry = { + "sht_temperature": round(self.sht_temperature, 2), + "qmp_temperature": round(self.qmp_temperature, 2), + "humidity": round(self.humidity, 2), + "pressure": round(self.pressure, 2), + "count": self.count, + } + if self.time_format != "none": + telemetry["time"] = self._get_time() + return telemetry