Skip to content

Commit c32b990

Browse files
committed
update client and producer
1 parent 924382a commit c32b990

File tree

2 files changed

+105
-19
lines changed

2 files changed

+105
-19
lines changed

client.go

Lines changed: 87 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,21 +3,46 @@ package pubsub
33
import (
44
"cloud.google.com/go/pubsub"
55
"context"
6-
"github.com/sirupsen/logrus"
6+
"fmt"
77
"google.golang.org/api/option"
8+
"log"
89
"os"
10+
"reflect"
11+
"strconv"
12+
"time"
913
)
1014

11-
func NewPubSubClient(ctx context.Context, projectId string, keyFilename string) (*pubsub.Client, error) {
15+
func NewPubSubClientWithRetries(ctx context.Context, projectId string, keyFilename string, retries []time.Duration) (*pubsub.Client, error) {
1216
if len(keyFilename) > 0 && existFile(keyFilename) {
13-
if logrus.IsLevelEnabled(logrus.InfoLevel) {
14-
logrus.Info("key file exists")
17+
log.Println("key file exists")
18+
c, er1 := pubsub.NewClient(ctx, projectId, option.WithCredentialsFile(keyFilename))
19+
if er1 == nil {
20+
return c, er1
21+
}
22+
i := 0
23+
err := Retry(retries, func() (err error) {
24+
i = i + 1
25+
c2, er2 := pubsub.NewClient(ctx, projectId, option.WithCredentialsFile(keyFilename))
26+
if er2 == nil {
27+
c = c2
28+
}
29+
return er2
30+
})
31+
if err != nil {
32+
log.Printf("Failed to new pubsub client: %s.", err.Error())
1533
}
34+
return c, err
35+
} else {
36+
log.Println("key file doesn't exists")
37+
return pubsub.NewClient(ctx, projectId)
38+
}
39+
}
40+
func NewPubSubClient(ctx context.Context, projectId string, keyFilename string) (*pubsub.Client, error) {
41+
if len(keyFilename) > 0 && existFile(keyFilename) {
42+
log.Println("key file exists")
1643
return pubsub.NewClient(ctx, projectId, option.WithCredentialsFile(keyFilename))
1744
} else {
18-
if logrus.IsLevelEnabled(logrus.WarnLevel) && len(keyFilename) > 0{
19-
logrus.Warn("key file doesn't exists")
20-
}
45+
log.Println("key file doesn't exists")
2146
return pubsub.NewClient(ctx, projectId)
2247
}
2348
}
@@ -28,7 +53,61 @@ func existFile(filename string) bool {
2853
} else if os.IsNotExist(err) {
2954
return false
3055
} else {
31-
logrus.Error(err)
56+
log.Println(err.Error())
3257
}
3358
return false
3459
}
60+
61+
func MakeDurations(vs []int64) []time.Duration {
62+
durations := make([]time.Duration, 0)
63+
for _, v := range vs {
64+
d := time.Duration(v) * time.Second
65+
durations = append(durations, d)
66+
}
67+
return durations
68+
}
69+
func MakeArray(v interface{}, prefix string, max int) []int64 {
70+
var ar []int64
71+
v2 := reflect.Indirect(reflect.ValueOf(v))
72+
for i := 1; i <= max; i++ {
73+
fn := prefix + strconv.Itoa(i)
74+
v3 := v2.FieldByName(fn).Interface().(int64)
75+
if v3 > 0 {
76+
ar = append(ar, v3)
77+
} else {
78+
return ar
79+
}
80+
}
81+
return ar
82+
}
83+
func DurationsFromValue(v interface{}, prefix string, max int) []time.Duration {
84+
arr := MakeArray(v, prefix, max)
85+
return MakeDurations(arr)
86+
}
87+
type RetryConfig struct {
88+
Retry1 int64 `mapstructure:"1" json:"retry1,omitempty" gorm:"column:retry1" bson:"retry1,omitempty" dynamodbav:"retry1,omitempty" firestore:"retry1,omitempty"`
89+
Retry2 int64 `mapstructure:"2" json:"retry2,omitempty" gorm:"column:retry2" bson:"retry2,omitempty" dynamodbav:"retry2,omitempty" firestore:"retry2,omitempty"`
90+
Retry3 int64 `mapstructure:"3" json:"retry3,omitempty" gorm:"column:retry3" bson:"retry3,omitempty" dynamodbav:"retry3,omitempty" firestore:"retry3,omitempty"`
91+
Retry4 int64 `mapstructure:"4" json:"retry4,omitempty" gorm:"column:retry4" bson:"retry4,omitempty" dynamodbav:"retry4,omitempty" firestore:"retry4,omitempty"`
92+
Retry5 int64 `mapstructure:"5" json:"retry5,omitempty" gorm:"column:retry5" bson:"retry5,omitempty" dynamodbav:"retry5,omitempty" firestore:"retry5,omitempty"`
93+
Retry6 int64 `mapstructure:"6" json:"retry6,omitempty" gorm:"column:retry6" bson:"retry6,omitempty" dynamodbav:"retry6,omitempty" firestore:"retry6,omitempty"`
94+
Retry7 int64 `mapstructure:"7" json:"retry7,omitempty" gorm:"column:retry7" bson:"retry7,omitempty" dynamodbav:"retry7,omitempty" firestore:"retry7,omitempty"`
95+
Retry8 int64 `mapstructure:"8" json:"retry8,omitempty" gorm:"column:retry8" bson:"retry8,omitempty" dynamodbav:"retry8,omitempty" firestore:"retry8,omitempty"`
96+
Retry9 int64 `mapstructure:"9" json:"retry9,omitempty" gorm:"column:retry9" bson:"retry9,omitempty" dynamodbav:"retry9,omitempty" firestore:"retry9,omitempty"`
97+
}
98+
func Retry(sleeps []time.Duration, f func() error) (err error) {
99+
attempts := len(sleeps)
100+
for i := 0; ; i++ {
101+
log.Printf("Retrying %d of %d ", i+1, attempts)
102+
err = f()
103+
if err == nil {
104+
return
105+
}
106+
if i >= (attempts - 1) {
107+
break
108+
}
109+
time.Sleep(sleeps[i])
110+
log.Printf("Retrying %d of %d after error: %s", i+1, attempts, err.Error())
111+
}
112+
return fmt.Errorf("after %d attempts, last error: %s", attempts, err)
113+
}

producer.go

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,11 @@ package pubsub
22

33
import (
44
"context"
5+
"log"
56
"time"
67

78
"cloud.google.com/go/iam"
89
"cloud.google.com/go/pubsub"
9-
"github.com/sirupsen/logrus"
1010
)
1111

1212
var CheckTopicPermission = CheckPermission
@@ -23,11 +23,20 @@ func NewProducer(ctx context.Context, client *pubsub.Client, topicId string, c T
2323
}
2424

2525
func NewProducerByConfig(ctx context.Context, c ProducerConfig) (*Producer, error) {
26-
client, err := NewPubSubClient(ctx, c.Client.ProjectId, c.Client.KeyFilename)
27-
if err != nil {
28-
return nil, err
26+
if c.Retry.Retry1 <= 0 {
27+
client, err := NewPubSubClient(ctx, c.Client.ProjectId, c.Client.KeyFilename)
28+
if err != nil {
29+
return nil, err
30+
}
31+
return NewProducer(ctx, client, c.TopicId, c.Topic), nil
32+
} else {
33+
durations := DurationsFromValue(c.Retry, "Retry", 9)
34+
client, err := NewPubSubClientWithRetries(ctx, c.Client.ProjectId, c.Client.KeyFilename, durations)
35+
if err != nil {
36+
return nil, err
37+
}
38+
return NewProducer(ctx, client, c.TopicId, c.Topic), nil
2939
}
30-
return NewProducer(ctx, client, c.TopicId, c.Topic), nil
3140
}
3241

3342
func ConfigureTopic(topic *pubsub.Topic, c TopicConfig) *pubsub.Topic {
@@ -62,14 +71,12 @@ func (c *Producer) Produce(ctx context.Context, data []byte, messageAttributes *
6271
func CheckPermission(ctx0 context.Context, iam *iam.Handle, permission string) {
6372
ctx, _ := context.WithTimeout(ctx0, 30*time.Second)
6473

65-
if logrus.IsLevelEnabled(logrus.InfoLevel) {
66-
logrus.Infof("Checking permission: %s", permission)
67-
}
74+
log.Printf("Checking permission: %s", permission)
6875
if permissions, err := iam.TestPermissions(ctx, []string{permission}); err != nil {
69-
logrus.Fatalf("Can't check permission %v: %s", permission, err.Error())
76+
log.Printf("Can't check permission %v: %s", permission, err.Error())
7077
} else if len(permissions) > 0 && permissions[0] == permission {
71-
logrus.Warnf("Permission %v valid", permission)
78+
log.Printf("Permission %v valid", permission)
7279
} else {
73-
logrus.Fatalf("Permission %v invalid", permission)
80+
log.Printf("Permission %v invalid", permission)
7481
}
7582
}

0 commit comments

Comments
 (0)