Skip to content

Commit 924382a

Browse files
committed
update pubsub
1 parent 41c0feb commit 924382a

File tree

4 files changed

+15
-10
lines changed

4 files changed

+15
-10
lines changed

consumer.go

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"cloud.google.com/go/pubsub"
55
"context"
66
"github.com/common-go/mq"
7-
"github.com/sirupsen/logrus"
87
)
98

109
type Consumer struct {
@@ -19,11 +18,20 @@ func NewConsumer(client *pubsub.Client, subscriptionId string, c SubscriptionCon
1918
}
2019

2120
func NewConsumerByConfig(ctx context.Context, c ConsumerConfig, ackOnConsume bool) (*Consumer, error) {
22-
client, err := NewPubSubClient(ctx, c.Client.ProjectId, c.Client.KeyFilename)
23-
if err != nil {
24-
return nil, err
21+
if c.Retry.Retry1 <= 0 {
22+
client, err := NewPubSubClient(ctx, c.Client.ProjectId, c.Client.KeyFilename)
23+
if err != nil {
24+
return nil, err
25+
}
26+
return NewConsumer(client, c.SubscriptionId, c.SubscriptionConfig, ackOnConsume), nil
27+
} else {
28+
durations := DurationsFromValue(c.Retry, "Retry", 9)
29+
client, err := NewPubSubClientWithRetries(ctx, c.Client.ProjectId, c.Client.KeyFilename, durations)
30+
if err != nil {
31+
return nil, err
32+
}
33+
return NewConsumer(client, c.SubscriptionId, c.SubscriptionConfig, ackOnConsume), nil
2534
}
26-
return NewConsumer(client, c.SubscriptionId, c.SubscriptionConfig, ackOnConsume), nil
2735
}
2836

2937
func ConfigureSubscription(subscription *pubsub.Subscription, c SubscriptionConfig) *pubsub.Subscription {
@@ -38,9 +46,6 @@ func ConfigureSubscription(subscription *pubsub.Subscription, c SubscriptionConf
3846

3947
func (c *Consumer) Consume(ctx context.Context, caller mq.ConsumerCaller) {
4048
er1 := c.Subscription.Receive(ctx, func(ctx2 context.Context, m *pubsub.Message) {
41-
if logrus.IsLevelEnabled(logrus.DebugLevel) {
42-
logrus.Debugf("Received message: %s", m.Data)
43-
}
4449
message := mq.Message{
4550
Id: m.ID,
4651
Data: m.Data,

consumer_config.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ type ConsumerConfig struct {
44
SubscriptionId string `mapstructure:"subscription_id"`
55
Client ClientConfig `mapstructure:"client"`
66
SubscriptionConfig SubscriptionConfig `mapstructure:"subscription"`
7+
Retry RetryConfig `mapstructure:"retry"`
78
}
89

910
type SubscriptionConfig struct {

producer_config.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ type ProducerConfig struct {
44
TopicId string `mapstructure:"topic_id"`
55
Client ClientConfig `mapstructure:"client"`
66
Topic TopicConfig `mapstructure:"topic"`
7+
Retry RetryConfig `mapstructure:"retry"`
78
}
89

910
type TopicConfig struct {

pubsub_health_checker.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import (
66
"time"
77

88
"cloud.google.com/go/pubsub"
9-
"github.com/sirupsen/logrus"
109
)
1110

1211
type PermissionType int
@@ -45,7 +44,6 @@ func (h *PubSubHealthChecker) Check(ctx context.Context) (map[string]interface{}
4544
}
4645

4746
if err != nil {
48-
logrus.Errorf("Can't TestPermissions %s: %s", h.resourceId, err.Error())
4947
return res, err
5048
} else if len(permissions) != 1 {
5149
return res, fmt.Errorf("invalid permissions: %v", permissions)

0 commit comments

Comments
 (0)