Skip to content

Commit c3951f6

Browse files
author
Vipul Rawat
authored
add mqtt support for publishing and subscribing messages (#370)
1 parent 2e4f712 commit c3951f6

File tree

14 files changed

+862
-0
lines changed

14 files changed

+862
-0
lines changed

docs/advanced-guide/using-publisher-subscriber/page.md

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,36 @@ docker run --name=gcloud-emulator -d -p 8086:8086 \
7373
> **Note**: In Google PubSub only one subscription name can access one topic, framework appends the topic name and subscription name to form the
7474
> unique subscription name on the Google client.
7575
76+
### Mqtt
77+
78+
#### Configs
79+
```dotenv
80+
PUBSUB_BACKEND=MQTT // using Mqtt as pubsub
81+
MQTT_HOST=localhost // broker host url
82+
MQTT_PORT=1883 // broker port
83+
MQTT_CLIENT_ID_SUFFIX=test // suffix to a random generated client-id(uuid v4)
84+
85+
#some additional configs(optional)
86+
MQTT_PROTOCOL=tcp // protocol for connecting to broker can be tcp, tls, ws or wss
87+
MQTT_MESSAGE_ORDER=true // config to maintain/retain message publish order, by defualt this is false
88+
MQTT_USER=username // authentication username
89+
MQTT_PASSWORD=password // authentication password
90+
```
91+
> **Note** : If `MQTT_HOST` config is not provided, the application will connect to a public broker
92+
> [HiveMQ](https://www.hivemq.com/mqtt/public-mqtt-broker/)
93+
94+
#### Docker setup
95+
```shell
96+
docker run -d \
97+
--name mqtt \
98+
-p 8883:8883 \
99+
-v <path-to>/mosquitto.conf:/mosquitto/config/mosquitto.conf \
100+
eclipse-mosquitto:latest
101+
```
102+
> **Note**: find the default mosquitto config file [here](https://github.com/eclipse/mosquitto/blob/master/mosquitto.conf)
103+
104+
105+
76106
## Subscribing to Pub/Sub
77107
Adding a subscriber is similar to adding an HTTP handler, which makes it easier to develop scalable applications,
78108
as it decoupled from the Sender/Publisher.

docs/navigation.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ export const navigation = [
2020
{ title: 'Monitoring Service Health', href: '/docs/advanced-guide/monitoring-service-health' },
2121
{ title: 'Handling Data Migrations', href: '/docs/advanced-guide/handling-data-migrations' },
2222
{ title: 'Writing gRPC Server', href: '/docs/advanced-guide/grpc' },
23+
{ title: 'Using Publisher and Subscriber', href: '/docs/advanced-guide/using-publisher-subscriber' }
2324
// { title: 'Dealing with Remote Files', href: '/docs/advanced-guide/remote-files' },
2425
// { title: 'Supporting OAuth', href: '/docs/advanced-guide/oauth' },
2526
// { title: 'Creating a Static File Server', href: '/docs/advanced-guide/static-file-server' },

examples/using-publisher/configs/.env

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,11 @@ PUBSUB_BACKEND=KAFKA
55
PUBSUB_BROKER=localhost:9092
66
CONSUMER_ID=test
77

8+
# For using MQTT uncomment these configs
9+
#PUBSUB_BACKEND=MQTT
10+
#MQTT_PROTOCOL=tcp
11+
#MQTT_HOST=localhost
12+
#MQTT_PORT=8883
13+
#MQTT_CLIENT_ID_SUFFIX=test-publisher
14+
815
LOG_LEVEL=DEBUG

examples/using-subscriber/configs/.env

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,11 @@ PUBSUB_BROKER=localhost:9092
66
CONSUMER_ID=test
77
PUBSUB_OFFSET=-2
88

9+
# For using MQTT uncomment these configs
10+
#PUBSUB_BACKEND=MQTT
11+
#MQTT_PROTOCOL=tcp
12+
#MQTT_HOST=localhost
13+
#MQTT_PORT=8883
14+
#MQTT_CLIENT_ID_SUFFIX=test-subscriber
15+
916
LOG_LEVEL=DEBUG

go.mod

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,11 @@ require (
66
cloud.google.com/go/pubsub v1.36.2
77
github.com/DATA-DOG/go-sqlmock v1.5.2
88
github.com/alicebob/miniredis/v2 v2.31.1
9+
github.com/eclipse/paho.mqtt.golang v1.4.3
910
github.com/go-redis/redismock/v9 v9.2.0
1011
github.com/go-sql-driver/mysql v1.7.1
1112
github.com/gogo/protobuf v1.3.2
13+
github.com/google/uuid v1.6.0
1214
github.com/gorilla/mux v1.8.1
1315
github.com/grpc-ecosystem/go-grpc-middleware v1.4.0
1416
github.com/joho/godotenv v1.5.1
@@ -53,6 +55,7 @@ require (
5355
github.com/google/s2a-go v0.1.7 // indirect
5456
github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect
5557
github.com/googleapis/gax-go/v2 v2.12.1 // indirect
58+
github.com/gorilla/websocket v1.5.0 // indirect
5659
github.com/klauspost/compress v1.16.6 // indirect
5760
github.com/openzipkin/zipkin-go v0.4.2 // indirect
5861
github.com/pierrec/lz4/v4 v4.1.17 // indirect

go.sum

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
4343
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
4444
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
4545
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
46+
github.com/eclipse/paho.mqtt.golang v1.4.3 h1:2kwcUGn8seMUfWndX0hGbvH8r7crgcJguQNCyp70xik=
47+
github.com/eclipse/paho.mqtt.golang v1.4.3/go.mod h1:CSYvoAlsMkhYOXh/oKyxa8EcBci6dVkLCbo5tTC1RIE=
4648
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
4749
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
4850
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
@@ -106,6 +108,8 @@ github.com/googleapis/gax-go/v2 v2.12.1 h1:9F8GV9r9ztXyAi00gsMQHNoF51xPZm8uj1dpY
106108
github.com/googleapis/gax-go/v2 v2.12.1/go.mod h1:61M8vcyyXR2kqKFxKrfA22jaA8JGF7Dc8App1U3H6jc=
107109
github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY=
108110
github.com/gorilla/mux v1.8.1/go.mod h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ=
111+
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
112+
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
109113
github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 h1:UH//fgunKIs4JdUbpDl1VZCDaL56wXCB/5+wF6uHfaI=
110114
github.com/grpc-ecosystem/go-grpc-middleware v1.4.0/go.mod h1:g5qyo/la0ALbONm6Vbp88Yd8NsDy6rZz+RcrMPxvld8=
111115
github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0=

pkg/gofr/container/container.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"gofr.dev/pkg/gofr/datasource/pubsub"
99
"gofr.dev/pkg/gofr/datasource/pubsub/google"
1010
"gofr.dev/pkg/gofr/datasource/pubsub/kafka"
11+
"gofr.dev/pkg/gofr/datasource/pubsub/mqtt"
1112
"gofr.dev/pkg/gofr/datasource/redis"
1213
"gofr.dev/pkg/gofr/datasource/sql"
1314
"gofr.dev/pkg/gofr/logging"
@@ -94,6 +95,33 @@ func (c *Container) Create(conf config.Config) {
9495
ProjectID: conf.Get("GOOGLE_PROJECT_ID"),
9596
SubscriptionName: conf.Get("GOOGLE_SUBSCRIPTION_NAME"),
9697
}, c.Logger, c.metricsManager)
98+
case "MQTT":
99+
var qos byte
100+
101+
port, _ := strconv.Atoi(conf.Get("MQTT_PORT"))
102+
order, _ := strconv.ParseBool(conf.GetOrDefault("MQTT_MESSAGE_ORDER", "false"))
103+
104+
switch conf.Get("MQTT_QOS") {
105+
case "1":
106+
qos = 1
107+
case "2":
108+
qos = 2
109+
default:
110+
qos = 0
111+
}
112+
113+
configs := &mqtt.Config{
114+
Protocol: conf.GetOrDefault("MQTT_PROTOCOL", "tcp"), // using tcp as default method to connect to broker
115+
Hostname: conf.Get("MQTT_HOST"),
116+
Port: port,
117+
Username: conf.Get("MQTT_USER"),
118+
Password: conf.Get("MQTT_PASSWORD"),
119+
ClientID: conf.Get("MQTT_CLIENT_ID_SUFFIX"),
120+
QoS: qos,
121+
Order: order,
122+
}
123+
124+
c.PubSub = mqtt.New(configs, c.Logger, c.metricsManager)
97125
}
98126
}
99127

pkg/gofr/container/container_test.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"gofr.dev/pkg/gofr/config"
1010
"gofr.dev/pkg/gofr/datasource"
1111
"gofr.dev/pkg/gofr/datasource/pubsub"
12+
"gofr.dev/pkg/gofr/datasource/pubsub/mqtt"
1213
"gofr.dev/pkg/gofr/service"
1314
"gofr.dev/pkg/gofr/testutil"
1415
)
@@ -63,6 +64,19 @@ func Test_newContainerPubSubIntializationFail(t *testing.T) {
6364
}
6465
}
6566

67+
func TestContianer_MQTTInitialization_Default(t *testing.T) {
68+
configs := map[string]string{
69+
"PUBSUB_BACKEND": "MQTT",
70+
}
71+
72+
c := NewContainer(testutil.NewMockConfig(configs))
73+
74+
assert.NotNil(t, c.PubSub)
75+
m, ok := c.PubSub.(*mqtt.MQTT)
76+
assert.True(t, ok)
77+
assert.NotNil(t, m.Client)
78+
}
79+
6680
func TestContainer_GetHTTPService(t *testing.T) {
6781
svc := service.NewHTTPService("", nil, nil)
6882

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package mqtt
2+
3+
import (
4+
"context"
5+
6+
"gofr.dev/pkg/gofr/datasource"
7+
)
8+
9+
type Logger interface {
10+
Debug(args ...interface{})
11+
Debugf(format string, args ...interface{})
12+
Warnf(format string, args ...interface{})
13+
Errorf(format string, args ...interface{})
14+
}
15+
16+
type Metrics interface {
17+
IncrementCounter(ctx context.Context, name string, labels ...string)
18+
}
19+
20+
type PubSub interface {
21+
SubscribeWithFunction(topic string, subscribeFunc SubscribeFunc) error
22+
Publish(ctx context.Context, topic string, message []byte) error
23+
Unsubscribe(topic string) error
24+
Disconnect(waitTime uint)
25+
Ping() error
26+
Health() datasource.Health
27+
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package mqtt
2+
3+
import mqtt "github.com/eclipse/paho.mqtt.golang"
4+
5+
type message struct {
6+
msg mqtt.Message
7+
}
8+
9+
func (m *message) Commit() {
10+
m.msg.Ack()
11+
}

0 commit comments

Comments
 (0)