Skip to content

Commit bf9aff4

Browse files
committed
init pubsub
1 parent 5b632e9 commit bf9aff4

File tree

9 files changed

+279
-15
lines changed

9 files changed

+279
-15
lines changed

.gitignore

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,2 @@
1-
# Binaries for programs and plugins
2-
*.exe
3-
*.exe~
4-
*.dll
5-
*.so
6-
*.dylib
7-
8-
# Test binary, built with `go test -c`
9-
*.test
10-
11-
# Output of the go coverage tool, specifically when used with LiteIDE
12-
*.out
13-
14-
# Dependency directories (remove the comment below to include it)
15-
# vendor/
1+
.vscode
2+
.idea

README.md

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
# pubsub
2+
3+
## Installation
4+
5+
Please make sure to initialize a Go module before installing common-go/pubsub:
6+
7+
```shell
8+
go get -u github.com/common-go/pubsub
9+
```
10+
11+
Import:
12+
13+
```go
14+
import "github.com/common-go/pubsub"
15+
```

client.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package pubsub
2+
3+
import (
4+
"cloud.google.com/go/pubsub"
5+
"context"
6+
"github.com/sirupsen/logrus"
7+
"google.golang.org/api/option"
8+
"os"
9+
)
10+
11+
func NewPubSubClient(ctx context.Context, projectId string, keyFilename string) (*pubsub.Client, error) {
12+
if len(keyFilename) > 0 && existFile(keyFilename) {
13+
if logrus.IsLevelEnabled(logrus.InfoLevel) {
14+
logrus.Info("key file exists")
15+
}
16+
return pubsub.NewClient(ctx, projectId, option.WithCredentialsFile(keyFilename))
17+
} else {
18+
if logrus.IsLevelEnabled(logrus.WarnLevel) && len(keyFilename) > 0{
19+
logrus.Warn("key file doesn't exists")
20+
}
21+
return pubsub.NewClient(ctx, projectId)
22+
}
23+
}
24+
25+
func existFile(filename string) bool {
26+
if _, err := os.Stat(filename); err == nil {
27+
return true
28+
} else if os.IsNotExist(err) {
29+
return false
30+
} else {
31+
logrus.Error(err)
32+
}
33+
return false
34+
}

client_config.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
package pubsub
2+
3+
type ClientConfig struct {
4+
ProjectId string `mapstructure:"project_id"`
5+
KeyFilename string `mapstructure:"key_filename"`
6+
}

consumer.go

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package pubsub
2+
3+
import (
4+
"cloud.google.com/go/pubsub"
5+
"context"
6+
"github.com/common-go/mq"
7+
"github.com/sirupsen/logrus"
8+
)
9+
10+
type Consumer struct {
11+
Client *pubsub.Client
12+
Subscription *pubsub.Subscription
13+
AckOnConsume bool
14+
}
15+
16+
func NewConsumer(client *pubsub.Client, subscriptionId string, c SubscriptionConfig, ackOnConsume bool) *Consumer {
17+
subscription := client.Subscription(subscriptionId)
18+
return &Consumer{Client: client, Subscription: ConfigureSubscription(subscription, c), AckOnConsume: ackOnConsume}
19+
}
20+
21+
func NewConsumerByConfig(ctx context.Context, c ConsumerConfig, ackOnConsume bool) (*Consumer, error) {
22+
client, err := NewPubSubClient(ctx, c.Client.ProjectId, c.Client.KeyFilename)
23+
if err != nil {
24+
return nil, err
25+
}
26+
return NewConsumer(client, c.SubscriptionId, c.SubscriptionConfig, ackOnConsume), nil
27+
}
28+
29+
func ConfigureSubscription(subscription *pubsub.Subscription, c SubscriptionConfig) *pubsub.Subscription {
30+
if c.MaxOutstandingMessages > 0 {
31+
subscription.ReceiveSettings.MaxOutstandingMessages = c.MaxOutstandingMessages
32+
}
33+
if c.NumGoroutines > 0 {
34+
subscription.ReceiveSettings.NumGoroutines = c.NumGoroutines
35+
}
36+
return subscription
37+
}
38+
39+
func (c *Consumer) Consume(ctx context.Context, caller mq.ConsumerCaller) {
40+
er1 := c.Subscription.Receive(ctx, func(ctx2 context.Context, m *pubsub.Message) {
41+
if logrus.IsLevelEnabled(logrus.DebugLevel) {
42+
logrus.Debugf("Received message: %s", m.Data)
43+
}
44+
message := mq.Message{
45+
Id: m.ID,
46+
Data: m.Data,
47+
Attributes: m.Attributes,
48+
Raw: m,
49+
}
50+
if c.AckOnConsume {
51+
m.Ack()
52+
}
53+
caller.Call(ctx2, &message, nil)
54+
})
55+
if er1 != nil {
56+
caller.Call(ctx, nil, er1)
57+
}
58+
}

consumer_config.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package pubsub
2+
3+
type ConsumerConfig struct {
4+
SubscriptionId string `mapstructure:"subscription_id"`
5+
Client ClientConfig `mapstructure:"client"`
6+
SubscriptionConfig SubscriptionConfig `mapstructure:"subscription"`
7+
}
8+
9+
type SubscriptionConfig struct {
10+
MaxOutstandingMessages int `mapstructure:"max_outstanding_messages"`
11+
NumGoroutines int `mapstructure:"num_goroutines"`
12+
}

producer.go

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
package pubsub
2+
3+
import (
4+
"context"
5+
"time"
6+
7+
"cloud.google.com/go/iam"
8+
"cloud.google.com/go/pubsub"
9+
"github.com/sirupsen/logrus"
10+
)
11+
12+
var CheckTopicPermission = CheckPermission
13+
14+
type Producer struct {
15+
Client *pubsub.Client
16+
Topic *pubsub.Topic
17+
}
18+
19+
func NewProducer(ctx context.Context, client *pubsub.Client, topicId string, c TopicConfig) *Producer {
20+
topic := client.Topic(topicId)
21+
CheckTopicPermission(ctx, topic.IAM(), "pubsub.topics.publish")
22+
return &Producer{Client: client, Topic: ConfigureTopic(topic, c)}
23+
}
24+
25+
func NewProducerByConfig(ctx context.Context, c ProducerConfig) (*Producer, error) {
26+
client, err := NewPubSubClient(ctx, c.Client.ProjectId, c.Client.KeyFilename)
27+
if err != nil {
28+
return nil, err
29+
}
30+
return NewProducer(ctx, client, c.TopicId, c.Topic), nil
31+
}
32+
33+
func ConfigureTopic(topic *pubsub.Topic, c TopicConfig) *pubsub.Topic {
34+
if c.CountThreshold > 0 {
35+
topic.PublishSettings.DelayThreshold = time.Duration(c.CountThreshold) * time.Millisecond
36+
}
37+
if c.DelayThreshold > 0 {
38+
topic.PublishSettings.CountThreshold = c.DelayThreshold
39+
}
40+
if c.ByteThreshold > 0 {
41+
topic.PublishSettings.ByteThreshold = c.ByteThreshold
42+
}
43+
if c.NumGoroutines > 0 {
44+
topic.PublishSettings.NumGoroutines = c.NumGoroutines
45+
}
46+
return topic
47+
}
48+
49+
func (c *Producer) Produce(ctx context.Context, data []byte, messageAttributes *map[string]string) (string, error) {
50+
msg := &pubsub.Message{
51+
Data: data,
52+
}
53+
54+
if messageAttributes != nil {
55+
msg.Attributes = *messageAttributes
56+
}
57+
58+
publishResult := c.Topic.Publish(ctx, msg)
59+
return publishResult.Get(ctx)
60+
}
61+
62+
func CheckPermission(ctx0 context.Context, iam *iam.Handle, permission string) {
63+
ctx, _ := context.WithTimeout(ctx0, 30*time.Second)
64+
65+
if logrus.IsLevelEnabled(logrus.InfoLevel) {
66+
logrus.Infof("Checking permission: %s", permission)
67+
}
68+
if permissions, err := iam.TestPermissions(ctx, []string{permission}); err != nil {
69+
logrus.Fatalf("Can't check permission %s: %v", permission, err)
70+
} else if len(permissions) > 0 && permissions[0] == permission {
71+
logrus.Warn("Permission %s valid", permission)
72+
} else {
73+
logrus.Fatalf("Permission %s invalid", permission)
74+
}
75+
}

producer_config.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package pubsub
2+
3+
type ProducerConfig struct {
4+
TopicId string `mapstructure:"topic_id"`
5+
Client ClientConfig `mapstructure:"client"`
6+
Topic TopicConfig `mapstructure:"topic"`
7+
}
8+
9+
type TopicConfig struct {
10+
DelayThreshold int `mapstructure:"delay_threshold"` // MaxMessages
11+
CountThreshold int `mapstructure:"count_threshold"` // MaxMilliseconds
12+
ByteThreshold int `mapstructure:"byte_threshold"` // MaxBytes
13+
NumGoroutines int `mapstructure:"num_goroutines"`
14+
}

pubsub_health_service.go

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
package pubsub
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"time"
7+
8+
"cloud.google.com/go/pubsub"
9+
"github.com/sirupsen/logrus"
10+
)
11+
12+
type PermissionType int
13+
14+
const (
15+
PermissionPublish PermissionType = 0
16+
PermissionSubscribe PermissionType = 1
17+
)
18+
19+
type PubSubHealthService struct {
20+
name string
21+
client *pubsub.Client
22+
timeout time.Duration
23+
permissionType PermissionType
24+
resourceId string
25+
}
26+
27+
func NewPubSubHealthService(name string, client *pubsub.Client, timeout time.Duration, permissionType PermissionType, resourceId string) *PubSubHealthService {
28+
return &PubSubHealthService{name, client, timeout, permissionType, resourceId}
29+
}
30+
31+
func (h *PubSubHealthService) Name() string {
32+
return h.name
33+
}
34+
35+
func (h *PubSubHealthService) Check(ctx context.Context) (map[string]interface{}, error) {
36+
res := make(map[string]interface{})
37+
var permissions []string
38+
var err error
39+
40+
timeoutCtx, _ := context.WithTimeout(ctx, h.timeout)
41+
if h.permissionType == PermissionPublish {
42+
permissions, err = h.client.Topic(h.resourceId).IAM().TestPermissions(timeoutCtx, []string{"pubsub.topics.publish"})
43+
} else if h.permissionType == PermissionSubscribe {
44+
permissions, err = h.client.Subscription(h.resourceId).IAM().TestPermissions(timeoutCtx, []string{"pubsub.subscriptions.consume"})
45+
}
46+
47+
if err != nil {
48+
logrus.Errorf("Can't TestPermissions %h: %h", h.resourceId, err.Error())
49+
return res, err
50+
} else if len(permissions) != 1 {
51+
return res, fmt.Errorf("invalid permissions: %v", permissions)
52+
} else {
53+
return res, nil
54+
}
55+
}
56+
57+
func (h *PubSubHealthService) Build(ctx context.Context, data map[string]interface{}, err error) map[string]interface{} {
58+
if err == nil {
59+
return data
60+
}
61+
data["error"] = err.Error()
62+
return data
63+
}

0 commit comments

Comments
 (0)