From ea81e64bb45639e5674fb912b00f9f4a76460896 Mon Sep 17 00:00:00 2001 From: Andrew Steurer <94206073+asteurer@users.noreply.github.com> Date: Wed, 6 Aug 2025 13:39:47 -0500 Subject: [PATCH] feat(mqtt): implement mqtt in wasip2 Signed-off-by: Andrew Steurer <94206073+asteurer@users.noreply.github.com> --- v3/examples/mqtt-outbound/.gitignore | 2 + v3/examples/mqtt-outbound/README.md | 31 ++++++++++++++ v3/examples/mqtt-outbound/compose.yaml | 15 +++++++ v3/examples/mqtt-outbound/go.mod | 12 ++++++ v3/examples/mqtt-outbound/go.sum | 4 ++ v3/examples/mqtt-outbound/main.go | 44 +++++++++++++++++++ v3/examples/mqtt-outbound/spin.toml | 20 +++++++++ v3/mqtt/mqtt.go | 58 ++++++++++++++++++++++++++ 8 files changed, 186 insertions(+) create mode 100644 v3/examples/mqtt-outbound/.gitignore create mode 100644 v3/examples/mqtt-outbound/README.md create mode 100644 v3/examples/mqtt-outbound/compose.yaml create mode 100644 v3/examples/mqtt-outbound/go.mod create mode 100644 v3/examples/mqtt-outbound/go.sum create mode 100644 v3/examples/mqtt-outbound/main.go create mode 100644 v3/examples/mqtt-outbound/spin.toml create mode 100644 v3/mqtt/mqtt.go diff --git a/v3/examples/mqtt-outbound/.gitignore b/v3/examples/mqtt-outbound/.gitignore new file mode 100644 index 0000000..b565010 --- /dev/null +++ b/v3/examples/mqtt-outbound/.gitignore @@ -0,0 +1,2 @@ +main.wasm +.spin/ diff --git a/v3/examples/mqtt-outbound/README.md b/v3/examples/mqtt-outbound/README.md new file mode 100644 index 0000000..c3c66c6 --- /dev/null +++ b/v3/examples/mqtt-outbound/README.md @@ -0,0 +1,31 @@ +# Requirements +- Latest version of [TinyGo](https://tinygo.org/getting-started/) +- Latest version of [Docker](https://docs.docker.com/get-started/get-docker/) + +# Usage + +In one terminal window, run: +```sh +# Note that the `-d` flag is intentionally omitted +docker compose up +``` + +In another terminal, you'll run your Spin app: +```sh +spin up --build +``` + +In yet another terminal, you'll interact with the Spin app: +```sh +curl localhost:3000/publish +``` + +You will see logs appear in the `docker compose` window that look something like this: +```sh +$ docker compose up +... +broker | 1754324646: New connection from 172.18.0.1:36970 on port 1883. +broker | 1754324646: New client connected from 172.18.0.1:36970 as client001 (p2, c1, k30, u'user'). +subscriber | telemetry Eureka! +broker | 1754324646: Client client001 closed its connection. +``` diff --git a/v3/examples/mqtt-outbound/compose.yaml b/v3/examples/mqtt-outbound/compose.yaml new file mode 100644 index 0000000..d7d8207 --- /dev/null +++ b/v3/examples/mqtt-outbound/compose.yaml @@ -0,0 +1,15 @@ +services: + mosquitto: + image: eclipse-mosquitto:2.0.22 + container_name: broker + ports: + - "1883:1883" + command: mosquitto -c /mosquitto-no-auth.conf + + subscriber: + image: eclipse-mosquitto:2.0.22 + container_name: subscriber + depends_on: + - mosquitto + command: mosquitto_sub -h mosquitto -t '#' -v + restart: "no" # Clean up container when stopped \ No newline at end of file diff --git a/v3/examples/mqtt-outbound/go.mod b/v3/examples/mqtt-outbound/go.mod new file mode 100644 index 0000000..1b69eb4 --- /dev/null +++ b/v3/examples/mqtt-outbound/go.mod @@ -0,0 +1,12 @@ +module github.com/http_go + +go 1.24 + +require github.com/spinframework/spin-go-sdk/v3 v3.0.0 + +require ( + github.com/julienschmidt/httprouter v1.3.0 // indirect + go.bytecodealliance.org/cm v0.2.2 // indirect +) + +replace github.com/spinframework/spin-go-sdk/v3 => ../../ diff --git a/v3/examples/mqtt-outbound/go.sum b/v3/examples/mqtt-outbound/go.sum new file mode 100644 index 0000000..c1ebfdf --- /dev/null +++ b/v3/examples/mqtt-outbound/go.sum @@ -0,0 +1,4 @@ +github.com/julienschmidt/httprouter v1.3.0 h1:U0609e9tgbseu3rBINet9P48AI/D3oJs4dN7jwJOQ1U= +github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= +go.bytecodealliance.org/cm v0.2.2 h1:M9iHS6qs884mbQbIjtLX1OifgyPG9DuMs2iwz8G4WQA= +go.bytecodealliance.org/cm v0.2.2/go.mod h1:JD5vtVNZv7sBoQQkvBvAAVKJPhR/bqBH7yYXTItMfZI= diff --git a/v3/examples/mqtt-outbound/main.go b/v3/examples/mqtt-outbound/main.go new file mode 100644 index 0000000..391628f --- /dev/null +++ b/v3/examples/mqtt-outbound/main.go @@ -0,0 +1,44 @@ +package main + +import ( + "net/http" + "os" + "strconv" + + spinhttp "github.com/spinframework/spin-go-sdk/v3/http" + "github.com/spinframework/spin-go-sdk/v3/mqtt" +) + +func main() {} + +func init() { + spinhttp.Handle(func(w http.ResponseWriter, r *http.Request) { + addr := os.Getenv("MQTT_ADDRESS") + usr := os.Getenv("MQTT_USERNAME") + pass := os.Getenv("MQTT_PASSWORD") + keepAliveStr := os.Getenv("MQTT_KEEP_ALIVE_INTERVAL") + topic := os.Getenv("MQTT_TOPIC") + + keepAlive, err := strconv.Atoi(keepAliveStr) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + w.Write([]byte("MQTT_KEEP_ALIVE_INTERVAL is not valid: must be an integer")) + } + + conn, err := mqtt.OpenConnection(addr, usr, pass, uint64(keepAlive)) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + w.Write([]byte(err.Error())) + } + + message := []byte("Eureka!") + + if err := conn.Publish(topic, message, mqtt.QosAtMostOnce); err != nil { + w.WriteHeader(http.StatusInternalServerError) + w.Write([]byte(err.Error())) + } + + w.WriteHeader(200) + w.Write([]byte("Message successfully published!\n")) + }) +} diff --git a/v3/examples/mqtt-outbound/spin.toml b/v3/examples/mqtt-outbound/spin.toml new file mode 100644 index 0000000..44d3932 --- /dev/null +++ b/v3/examples/mqtt-outbound/spin.toml @@ -0,0 +1,20 @@ +spin_manifest_version = 2 + +[application] +name = "go-mqtt-outbound-example" +version = "0.1.0" +authors = ["Andrew Steurer <94206073+asteurer@users.noreply.github.com>"] +description = "Using Spin with MQTT" + +[[trigger.http]] +route = "/publish" +component = "mqtt-outbound" + +[component.mqtt-outbound] +source = "main.wasm" +# To test anonymous MQTT authentication, remove the values from MQTT_USERNAME and MQTT_PASSWORD env variables. +environment = { MQTT_ADDRESS = "mqtt://127.0.0.1:1883?client_id=client001", MQTT_USERNAME = "user", MQTT_PASSWORD = "password", MQTT_KEEP_ALIVE_INTERVAL = "30", MQTT_TOPIC = "telemetry" } +allowed_outbound_hosts = ["mqtt://127.0.0.1:1883"] +[component.mqtt-outbound.build] +command = "tinygo build -target=wasip2 --wit-package $(go list -mod=readonly -m -f '{{.Dir}}' github.com/spinframework/spin-go-sdk/v3)/wit --wit-world http-trigger -gc=leaking -o main.wasm main.go" +watch = ["**/*.go", "go.mod"] diff --git a/v3/mqtt/mqtt.go b/v3/mqtt/mqtt.go new file mode 100644 index 0000000..8a56b5f --- /dev/null +++ b/v3/mqtt/mqtt.go @@ -0,0 +1,58 @@ +package mqtt + +import ( + "errors" + "fmt" + + "github.com/spinframework/spin-go-sdk/v3/internal/fermyon/spin/v2.0.0/mqtt" + "go.bytecodealliance.org/cm" +) + +type Connection struct { + conn mqtt.Connection +} + +// OpenConnection initializes an MQTT connection +func OpenConnection(address, username, password string, keepAliveIntervalInSecs uint64) (Connection, error) { + conn, err, isErr := mqtt.ConnectionOpen(address, username, password, keepAliveIntervalInSecs).Result() + if isErr { + return Connection{}, toError(&err) + } + + return Connection{conn: conn}, nil +} + +// Publish publishes an MQTT message +func (c *Connection) Publish(topic string, payload []byte, qos QoS) error { + _, err, isErr := c.conn.Publish(topic, mqtt.Payload(cm.ToList(payload)), mqtt.Qos(qos)).Result() + if isErr { + return toError(&err) + } + + return nil +} + +// QoS for publishing Mqtt messages +type QoS = mqtt.Qos + +const ( + QosAtMostOnce = mqtt.QosAtMostOnce + QosAtLeastOnce = mqtt.QosAtLeastOnce + QosExactlyOnce = mqtt.QosExactlyOnce +) + +func toError(err *mqtt.Error) error { + if err == nil { + return nil + } + + if err.String() == "connection-failed" { + return fmt.Errorf("connection-failed: %s", *err.ConnectionFailed()) + } + + if err.String() == "other" { + return fmt.Errorf(*err.Other()) + } + + return errors.New(err.String()) +}