Skip to content

Commit ca5d8ed

Browse files
committed
feat(mqtt): implement mqtt in wasip2
Signed-off-by: Andrew Steurer <[email protected]>
1 parent a65f13a commit ca5d8ed

File tree

8 files changed

+190
-0
lines changed

8 files changed

+190
-0
lines changed
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
main.wasm
2+
.spin/
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
# Requirements
2+
- Latest version of [TinyGo](https://tinygo.org/getting-started/)
3+
- Latest version of [Docker](https://docs.docker.com/get-started/get-docker/)
4+
5+
# Usage
6+
7+
In one terminal window, run:
8+
```sh
9+
# Note that the `-d` flag is intentionally omitted
10+
docker compose up
11+
```
12+
13+
In another terminal, you'll run your Spin app:
14+
```sh
15+
spin up --build
16+
```
17+
18+
In yet another terminal, you'll interact with the Spin app:
19+
```sh
20+
curl localhost:3000/publish
21+
```
22+
23+
You will see logs appear in the `docker compose` window that look something like this:
24+
```sh
25+
$ docker compose up
26+
...
27+
broker | 1754324646: New connection from 172.18.0.1:36970 on port 1883.
28+
broker | 1754324646: New client connected from 172.18.0.1:36970 as client001 (p2, c1, k30, u'user').
29+
subscriber | telemetry Eureka!
30+
broker | 1754324646: Client client001 closed its connection.
31+
```
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
services:
2+
mosquitto:
3+
image: eclipse-mosquitto:2.0.22
4+
container_name: broker
5+
ports:
6+
- "1883:1883"
7+
command: mosquitto -c /mosquitto-no-auth.conf
8+
9+
subscriber:
10+
image: eclipse-mosquitto:2.0.22
11+
container_name: subscriber
12+
depends_on:
13+
- mosquitto
14+
command: mosquitto_sub -h mosquitto -t '#' -v
15+
restart: "no" # Clean up container when stopped

v3/examples/mqtt-outbound/go.mod

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
module github.com/http_go
2+
3+
go 1.24
4+
5+
require github.com/spinframework/spin-go-sdk/v3 v3.0.0
6+
7+
require (
8+
github.com/julienschmidt/httprouter v1.3.0 // indirect
9+
go.bytecodealliance.org/cm v0.2.2 // indirect
10+
)
11+
12+
replace github.com/spinframework/spin-go-sdk/v3 => ../../

v3/examples/mqtt-outbound/go.sum

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
github.com/julienschmidt/httprouter v1.3.0 h1:U0609e9tgbseu3rBINet9P48AI/D3oJs4dN7jwJOQ1U=
2+
github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM=
3+
go.bytecodealliance.org/cm v0.2.2 h1:M9iHS6qs884mbQbIjtLX1OifgyPG9DuMs2iwz8G4WQA=
4+
go.bytecodealliance.org/cm v0.2.2/go.mod h1:JD5vtVNZv7sBoQQkvBvAAVKJPhR/bqBH7yYXTItMfZI=

v3/examples/mqtt-outbound/main.go

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package main
2+
3+
import (
4+
"net/http"
5+
"os"
6+
"strconv"
7+
"strings"
8+
9+
spinhttp "github.com/spinframework/spin-go-sdk/v3/http"
10+
"github.com/spinframework/spin-go-sdk/v3/mqtt"
11+
)
12+
13+
func main() {}
14+
15+
func init() {
16+
spinhttp.Handle(func(w http.ResponseWriter, r *http.Request) {
17+
var missingEnvs []string
18+
addr := validateEnv("MQTT_ADDRESS", &missingEnvs)
19+
usr := validateEnv("MQTT_USERNAME", &missingEnvs)
20+
pass := validateEnv("MQTT_PASSWORD", &missingEnvs)
21+
keepAliveStr := validateEnv("MQTT_KEEP_ALIVE_INTERVAL", &missingEnvs)
22+
topic := validateEnv("MQTT_TOPIC", &missingEnvs)
23+
24+
if len(missingEnvs) > 0 {
25+
panic("Missing the following environment variables:\n" + strings.Join(missingEnvs, "\n"))
26+
}
27+
28+
keepAlive, err := strconv.Atoi(keepAliveStr)
29+
if err != nil {
30+
panic("MQTT_KEEP_ALIVE_INTERVAL is not valid: must be an integer")
31+
}
32+
33+
conn, err := mqtt.OpenConnection(addr, usr, pass, uint64(keepAlive))
34+
if err != nil {
35+
panic(err)
36+
}
37+
38+
message := []byte("Eureka!")
39+
40+
if err := mqtt.Publish(conn, topic, message, 0); err != nil {
41+
panic(err)
42+
}
43+
44+
w.WriteHeader(200)
45+
w.Write([]byte("Message successfully published!\n"))
46+
})
47+
}
48+
49+
func validateEnv(key string, missingEnvs *[]string) (value string) {
50+
value = os.Getenv(key)
51+
if value == "" {
52+
if missingEnvs != nil {
53+
*missingEnvs = append(*missingEnvs, key)
54+
}
55+
}
56+
return value
57+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
spin_manifest_version = 2
2+
3+
[application]
4+
name = "go-mqtt-outbound-example"
5+
version = "0.1.0"
6+
authors = ["Andrew Steurer <[email protected]>"]
7+
description = "Using Spin with MQTT"
8+
9+
[[trigger.http]]
10+
route = "/publish"
11+
component = "mqtt-outbound"
12+
13+
[component.mqtt-outbound]
14+
source = "main.wasm"
15+
# To test anonymous MQTT authentication, remove the values from MQTT_USERNAME and MQTT_PASSWORD env variables.
16+
environment = { MQTT_ADDRESS = "mqtt://127.0.0.1:1883?client_id=client001", MQTT_USERNAME = "user", MQTT_PASSWORD = "password", MQTT_KEEP_ALIVE_INTERVAL = "30", MQTT_TOPIC = "telemetry" }
17+
allowed_outbound_hosts = ["mqtt://127.0.0.1:1883"]
18+
[component.mqtt-outbound.build]
19+
command = "tinygo build -target=wasip2 --wit-package $(go list -mod=readonly -m -f '{{.Dir}}' github.com/spinframework/spin-go-sdk/v3)/wit --wit-world http-trigger -gc=leaking -o main.wasm main.go"
20+
watch = ["**/*.go", "go.mod"]

v3/mqtt/mqtt.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package mqtt
2+
3+
import (
4+
"errors"
5+
6+
"github.com/spinframework/spin-go-sdk/v3/internal/fermyon/spin/v2.0.0/mqtt"
7+
"go.bytecodealliance.org/cm"
8+
)
9+
10+
// OpenConnection initializes an MQTT connection
11+
func OpenConnection(address, username, password string, keepAliveIntervalInSecs uint64) (mqtt.Connection, error) {
12+
conn, err, isErr := mqtt.ConnectionOpen(address, username, password, keepAliveIntervalInSecs).Result()
13+
if isErr {
14+
return cm.ResourceNone, toError(&err)
15+
}
16+
17+
return conn, nil
18+
}
19+
20+
// Publish publishes an MQTT message
21+
func Publish(conn mqtt.Connection, topic string, payload []byte, qos uint8) error {
22+
_, err, isErr := conn.Publish(topic, mqtt.Payload(cm.ToList(payload)), toQos(qos)).Result()
23+
if isErr {
24+
return toError(&err)
25+
}
26+
27+
return nil
28+
}
29+
30+
func toQos(q uint8) mqtt.Qos {
31+
switch q {
32+
case 0:
33+
return mqtt.QosAtMostOnce
34+
case 1:
35+
return mqtt.QosAtLeastOnce
36+
case 2:
37+
return mqtt.QosExactlyOnce
38+
default:
39+
panic("invalid QoS")
40+
}
41+
}
42+
43+
func toError(err *mqtt.Error) error {
44+
if err == nil {
45+
return nil
46+
}
47+
48+
return errors.New(err.String())
49+
}

0 commit comments

Comments
 (0)