Skip to content

Commit 049efa2

Browse files
committed
Refactor code
1 parent c559a37 commit 049efa2

File tree

9 files changed

+311
-95
lines changed

9 files changed

+311
-95
lines changed

README.md

Lines changed: 74 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,86 @@
11
# pubsub
22

3+
A fully managed messaging service that allows for event-driven systems and real-time analytics on Google Cloud Platform. Key features include:
4+
- <b>Scalability</b>: Automatically scales to handle high-throughput workloads.
5+
- <b>Durability</b>: Ensures message delivery with at-least-once delivery guarantees.
6+
- <b>Flexibility</b>: Supports both push and pull delivery models.
7+
- <b>Integration</b>: Easily integrates with other Google Cloud services.
8+
9+
### Use Cases of Google Pub/Sub
10+
Common use cases include event-driven architectures, log collection, and streaming analytics.
11+
12+
![Microservice Architecture](https://cdn-images-1.medium.com/max/800/1*vKeePO_UC73i7tfymSmYNA.png)
13+
14+
#### Event-Driven Architectures
15+
- <b>Scenario</b>: Building applications where different components communicate via events (e.g., microservices)
16+
- <b>Benefit</b>: Decouples components, allowing independent scaling and development
17+
![A typical micro service](https://cdn-images-1.medium.com/max/800/1*d9kyekAbQYBxH-C6w38XZQ.png)
18+
#### Log Collection and Monitoring
19+
- <b>Scenario</b>: Aggregating logs from multiple applications and systems.
20+
- <b>Benefit</b>: Centralized logging and monitoring, improving visibility and debugging capabilities.
21+
22+
23+
#### Streaming analytics
24+
- <b>Scenario</b>: Collecting and analyzing data streams from various sources like IoT devices, social media, or user activity.
25+
- <b>Benefit</b>: Enables real-time data processing and analytics, providing timely insights and actions.
26+
27+
### Libraries for Google Pub/Sub
28+
- GO: [pubsub](https://github.com/core-go/pubsub). Example is at [go-subscription](https://github.com/project-samples/go-subscription)
29+
- nodejs: [pubsub](https://github.com/core-ts/pubsub). Example is at [pubsub-sample](https://github.com/typescript-tutorial/pubsub-sample)
30+
31+
#### A common flow to consume a message from a message queue
32+
![A common flow to consume a message from a message queue](https://cdn-images-1.medium.com/max/800/1*Y4QUN6QnfmJgaKigcNHbQA.png)
33+
- The libraries to implement this flow are:
34+
- [mq](https://github.com/core-go/mq) for GOLANG. Example is at [go-subscription](https://github.com/project-samples/go-subscription)
35+
- [mq-one](https://www.npmjs.com/package/mq-one) for nodejs. Example is at [pubsub-sample](https://github.com/typescript-tutorial/pubsub-sample)
36+
37+
### Comparison of Google Pub/Sub, Amazon SQS, and Apache Kafka
38+
#### Google Pub/Sub:
39+
- <b>Type</b>: Managed real-time messaging service.
40+
- <b>Use Case</b>: Event-driven architectures, real-time analytics.
41+
- <b>Scalability</b>: Automatically scales.
42+
- <b>Delivery Guarantees</b>: At-least-once delivery.
43+
- <b>Integration</b>: Tight with Google Cloud services.
44+
- <b>Delivery Models</b>: Push and pull.
45+
46+
#### Amazon SQS
47+
- <b>Type</b>: Managed message queuing service.
48+
- <b>Use Case</b>: Decoupling and scaling microservices, asynchronous tasks.
49+
- <b>Scalability</b>: Automatically scales.
50+
- <b>Delivery Guarantees</b>: At-least-once, FIFO (exactly-once).
51+
- <b>Integration</b>: Deep integration with AWS services.
52+
- <b>Delivery Models</b>: Primarily pull, with long polling.
53+
54+
#### Apache Kafka
55+
- <b>Type</b>: Open-source event streaming platform.
56+
- <b>Use Case</b>: High-throughput messaging, event sourcing, log aggregation.
57+
- <b>Scalability</b>: High with partitioned topics.
58+
- <b>Delivery Guarantees</b>: Configurable (at-least-once, exactly-once).
59+
- <b>Integration</b>: Broad ecosystem with various connectors.
60+
- <b>Delivery Models</b>: Pull-based consumer groups.
61+
62+
### Key Differences
63+
- <b>Management</b>: Pub/Sub and SQS are managed services, while Kafka is typically self-managed or via managed services like Confluent.
64+
- <b>Use Case Focus</b>: Pub/Sub and Kafka are ideal for real-time processing, whereas SQS is great for decoupling microservices and handling asynchronous tasks.
65+
- <b>Delivery Models</b>: Pub/Sub supports push and pull, SQS supports pull with long polling, and Kafka primarily uses pull with consumer groups.
66+
- <b>Scalability</b>: All three are highly scalable, but Kafka offers the most control over performance tuning.
67+
- <b>Integration</b>: Pub/Sub integrates well with Google Cloud, SQS with AWS, and Kafka has a broad integration ecosystem.
68+
69+
### When to Use
70+
- <b>Google Pub/Sub</b>: If you're using Google Cloud and need a managed, real-time messaging solution.
71+
- <b>Amazon SQS</b>: For reliable, scalable message queuing in AWS environments.
72+
- <b>Apache Kafka</b>: For complex event streaming and log aggregation, with a need for fine-tuned control and a broad integration ecosystem.
73+
374
## Installation
475

5-
Please make sure to initialize a Go module before installing common-go/pubsub:
76+
Please make sure to initialize a Go module before installing core-go/pubsub:
677

778
```shell
8-
go get -u github.com/common-go/pubsub
79+
go get -u github.com/core-go/pubsub
980
```
1081

1182
Import:
1283

1384
```go
14-
import "github.com/common-go/pubsub"
85+
import "github.com/core-go/pubsub"
1586
```

client.go

Lines changed: 52 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -5,24 +5,36 @@ import (
55
"context"
66
"fmt"
77
"google.golang.org/api/option"
8+
"google.golang.org/api/transport"
89
"log"
910
"os"
1011
"reflect"
1112
"strconv"
1213
"time"
1314
)
1415

15-
func NewPubSubClientWithRetries(ctx context.Context, projectId string, keyFilename string, retries []time.Duration) (*pubsub.Client, error) {
16-
if len(keyFilename) > 0 && existFile(keyFilename) {
17-
log.Println("key file exists")
18-
c, er1 := pubsub.NewClient(ctx, projectId, option.WithCredentialsFile(keyFilename))
16+
func NewPubSubClientWithRetries(ctx context.Context, credentials []byte, retries []time.Duration, options ...string) (*pubsub.Client, error) {
17+
var projectId string
18+
if len(options) > 0 && len(options[0]) > 0 {
19+
projectId = options[0]
20+
}
21+
if credentials != nil && len(credentials) > 0 {
22+
opts := option.WithCredentialsJSON(credentials)
23+
creds, er0 := transport.Creds(ctx, opts)
24+
if er0 != nil {
25+
return nil, er0
26+
}
27+
if len(projectId) == 0 {
28+
projectId = creds.ProjectID
29+
}
30+
c, er1 := pubsub.NewClient(ctx, projectId, opts)
1931
if er1 == nil {
2032
return c, er1
2133
}
2234
i := 0
2335
err := Retry(retries, func() (err error) {
2436
i = i + 1
25-
c2, er2 := pubsub.NewClient(ctx, projectId, option.WithCredentialsFile(keyFilename))
37+
c2, er2 := pubsub.NewClient(ctx, projectId, opts)
2638
if er2 == nil {
2739
c = c2
2840
}
@@ -33,11 +45,11 @@ func NewPubSubClientWithRetries(ctx context.Context, projectId string, keyFilena
3345
}
3446
return c, err
3547
} else {
36-
log.Println("key file doesn't exists")
48+
log.Println("empty credentials")
3749
return pubsub.NewClient(ctx, projectId)
3850
}
3951
}
40-
func NewPubSubClient(ctx context.Context, projectId string, keyFilename string) (*pubsub.Client, error) {
52+
func NewPubSubClientWithFile(ctx context.Context, projectId string, keyFilename string) (*pubsub.Client, error) {
4153
if len(keyFilename) > 0 && existFile(keyFilename) {
4254
log.Println("key file exists")
4355
return pubsub.NewClient(ctx, projectId, option.WithCredentialsFile(keyFilename))
@@ -46,6 +58,28 @@ func NewPubSubClient(ctx context.Context, projectId string, keyFilename string)
4658
return pubsub.NewClient(ctx, projectId)
4759
}
4860
}
61+
func NewPubSubClient(ctx context.Context, credentials []byte, options ...string) (*pubsub.Client, error) {
62+
opts := option.WithCredentialsJSON(credentials)
63+
var projectId string
64+
if len(options) > 0 && len(options[0]) > 0 {
65+
projectId = options[0]
66+
} else {
67+
creds, err := transport.Creds(ctx, opts)
68+
projectId = creds.ProjectID
69+
if err != nil {
70+
panic("Credentials Error: " + err.Error())
71+
}
72+
if creds == nil {
73+
panic("Error: creds is nil")
74+
}
75+
}
76+
if credentials != nil && len(credentials) > 0 {
77+
return pubsub.NewClient(ctx, projectId, opts)
78+
} else {
79+
log.Println("empty credentials")
80+
return pubsub.NewClient(ctx, projectId)
81+
}
82+
}
4983

5084
func existFile(filename string) bool {
5185
if _, err := os.Stat(filename); err == nil {
@@ -84,17 +118,19 @@ func DurationsFromValue(v interface{}, prefix string, max int) []time.Duration {
84118
arr := MakeArray(v, prefix, max)
85119
return MakeDurations(arr)
86120
}
121+
87122
type RetryConfig struct {
88-
Retry1 int64 `mapstructure:"1" json:"retry1,omitempty" gorm:"column:retry1" bson:"retry1,omitempty" dynamodbav:"retry1,omitempty" firestore:"retry1,omitempty"`
89-
Retry2 int64 `mapstructure:"2" json:"retry2,omitempty" gorm:"column:retry2" bson:"retry2,omitempty" dynamodbav:"retry2,omitempty" firestore:"retry2,omitempty"`
90-
Retry3 int64 `mapstructure:"3" json:"retry3,omitempty" gorm:"column:retry3" bson:"retry3,omitempty" dynamodbav:"retry3,omitempty" firestore:"retry3,omitempty"`
91-
Retry4 int64 `mapstructure:"4" json:"retry4,omitempty" gorm:"column:retry4" bson:"retry4,omitempty" dynamodbav:"retry4,omitempty" firestore:"retry4,omitempty"`
92-
Retry5 int64 `mapstructure:"5" json:"retry5,omitempty" gorm:"column:retry5" bson:"retry5,omitempty" dynamodbav:"retry5,omitempty" firestore:"retry5,omitempty"`
93-
Retry6 int64 `mapstructure:"6" json:"retry6,omitempty" gorm:"column:retry6" bson:"retry6,omitempty" dynamodbav:"retry6,omitempty" firestore:"retry6,omitempty"`
94-
Retry7 int64 `mapstructure:"7" json:"retry7,omitempty" gorm:"column:retry7" bson:"retry7,omitempty" dynamodbav:"retry7,omitempty" firestore:"retry7,omitempty"`
95-
Retry8 int64 `mapstructure:"8" json:"retry8,omitempty" gorm:"column:retry8" bson:"retry8,omitempty" dynamodbav:"retry8,omitempty" firestore:"retry8,omitempty"`
96-
Retry9 int64 `mapstructure:"9" json:"retry9,omitempty" gorm:"column:retry9" bson:"retry9,omitempty" dynamodbav:"retry9,omitempty" firestore:"retry9,omitempty"`
123+
Retry1 int64 `mapstructure:"1" json:"retry1,omitempty" gorm:"column:retry1" bson:"retry1,omitempty" dynamodbav:"retry1,omitempty" firestore:"retry1,omitempty"`
124+
Retry2 int64 `mapstructure:"2" json:"retry2,omitempty" gorm:"column:retry2" bson:"retry2,omitempty" dynamodbav:"retry2,omitempty" firestore:"retry2,omitempty"`
125+
Retry3 int64 `mapstructure:"3" json:"retry3,omitempty" gorm:"column:retry3" bson:"retry3,omitempty" dynamodbav:"retry3,omitempty" firestore:"retry3,omitempty"`
126+
Retry4 int64 `mapstructure:"4" json:"retry4,omitempty" gorm:"column:retry4" bson:"retry4,omitempty" dynamodbav:"retry4,omitempty" firestore:"retry4,omitempty"`
127+
Retry5 int64 `mapstructure:"5" json:"retry5,omitempty" gorm:"column:retry5" bson:"retry5,omitempty" dynamodbav:"retry5,omitempty" firestore:"retry5,omitempty"`
128+
Retry6 int64 `mapstructure:"6" json:"retry6,omitempty" gorm:"column:retry6" bson:"retry6,omitempty" dynamodbav:"retry6,omitempty" firestore:"retry6,omitempty"`
129+
Retry7 int64 `mapstructure:"7" json:"retry7,omitempty" gorm:"column:retry7" bson:"retry7,omitempty" dynamodbav:"retry7,omitempty" firestore:"retry7,omitempty"`
130+
Retry8 int64 `mapstructure:"8" json:"retry8,omitempty" gorm:"column:retry8" bson:"retry8,omitempty" dynamodbav:"retry8,omitempty" firestore:"retry8,omitempty"`
131+
Retry9 int64 `mapstructure:"9" json:"retry9,omitempty" gorm:"column:retry9" bson:"retry9,omitempty" dynamodbav:"retry9,omitempty" firestore:"retry9,omitempty"`
97132
}
133+
98134
func Retry(sleeps []time.Duration, f func() error) (err error) {
99135
attempts := len(sleeps)
100136
for i := 0; ; i++ {

client_config.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package pubsub
22

33
type ClientConfig struct {
4-
ProjectId string `mapstructure:"project_id" json:"projectId,omitempty" gorm:"column:projectid" bson:"projectId,omitempty" dynamodbav:"projectId,omitempty" firestore:"projectId,omitempty"`
5-
KeyFilename string `mapstructure:"key_filename" json:"keyFilename,omitempty" gorm:"column:keyfilename" bson:"keyFilename,omitempty" dynamodbav:"keyFilename,omitempty" firestore:"keyFilename,omitempty"`
4+
ProjectId string `yaml:"project_id" mapstructure:"project_id" json:"projectId,omitempty" gorm:"column:projectid" bson:"projectId,omitempty" dynamodbav:"projectId,omitempty" firestore:"projectId,omitempty"`
5+
Credentials string `yaml:"credentials" mapstructure:"credentials" json:"credentials,omitempty" gorm:"column:credentials" bson:"credentials,omitempty" dynamodbav:"credentials,omitempty" firestore:"credentials,omitempty"`
6+
KeyFilename string `yaml:"key_filename" mapstructure:"key_filename" json:"keyFilename,omitempty" gorm:"column:keyfilename" bson:"keyFilename,omitempty" dynamodbav:"keyFilename,omitempty" firestore:"keyFilename,omitempty"`
67
}

health_checker.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
11
package pubsub
22

33
import (
4+
"cloud.google.com/go/pubsub"
45
"context"
56
"fmt"
67
"time"
7-
8-
"cloud.google.com/go/pubsub"
98
)
109

1110
type PermissionType int
@@ -70,6 +69,9 @@ func (h *HealthChecker) Build(ctx context.Context, data map[string]interface{},
7069
if err == nil {
7170
return data
7271
}
72+
if data == nil {
73+
data = make(map[string]interface{}, 0)
74+
}
7375
data["error"] = err.Error()
7476
return data
7577
}

publisher.go

Lines changed: 47 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,70 +1,86 @@
11
package pubsub
22

33
import (
4+
"cloud.google.com/go/iam"
5+
"cloud.google.com/go/pubsub"
46
"context"
57
"log"
68
"time"
7-
8-
"cloud.google.com/go/iam"
9-
"cloud.google.com/go/pubsub"
109
)
1110

1211
var CheckTopicPermission = CheckPermission
1312

1413
type Publisher struct {
15-
Client *pubsub.Client
16-
Topic *pubsub.Topic
14+
Client *pubsub.Client
15+
Topic *pubsub.Topic
16+
Convert func(context.Context, []byte) ([]byte, error)
1717
}
1818

19-
func NewPublisher(ctx context.Context, client *pubsub.Client, topicId string, c TopicConfig) *Publisher {
19+
func NewPublisher(ctx context.Context, client *pubsub.Client, topicId string, c *TopicConfig, options ...func(context.Context, []byte) ([]byte, error)) *Publisher {
2020
topic := client.Topic(topicId)
2121
CheckTopicPermission(ctx, topic.IAM(), "pubsub.topics.publish")
22-
return &Publisher{Client: client, Topic: ConfigureTopic(topic, c)}
22+
var convert func(context.Context, []byte) ([]byte, error)
23+
if len(options) > 0 {
24+
convert = options[0]
25+
}
26+
return &Publisher{Client: client, Topic: ConfigureTopic(topic, c), Convert: convert}
2327
}
2428

25-
func NewPublisherByConfig(ctx context.Context, c PublisherConfig) (*Publisher, error) {
29+
func NewPublisherByConfig(ctx context.Context, c PublisherConfig, options ...func(context.Context, []byte) ([]byte, error)) (*Publisher, error) {
2630
if c.Retry.Retry1 <= 0 {
27-
client, err := NewPubSubClient(ctx, c.Client.ProjectId, c.Client.KeyFilename)
31+
client, err := NewPubSubClient(ctx, []byte(c.Client.Credentials), c.Client.ProjectId)
2832
if err != nil {
2933
return nil, err
3034
}
31-
return NewPublisher(ctx, client, c.TopicId, c.Topic), nil
35+
return NewPublisher(ctx, client, c.TopicId, c.Topic, options...), nil
3236
} else {
3337
durations := DurationsFromValue(c.Retry, "Retry", 9)
34-
client, err := NewPubSubClientWithRetries(ctx, c.Client.ProjectId, c.Client.KeyFilename, durations)
38+
client, err := NewPubSubClientWithRetries(ctx, []byte(c.Client.Credentials), durations, c.Client.ProjectId)
3539
if err != nil {
3640
return nil, err
3741
}
38-
return NewPublisher(ctx, client, c.TopicId, c.Topic), nil
42+
return NewPublisher(ctx, client, c.TopicId, c.Topic, options...), nil
3943
}
4044
}
4145

42-
func ConfigureTopic(topic *pubsub.Topic, c TopicConfig) *pubsub.Topic {
43-
if c.CountThreshold > 0 {
44-
topic.PublishSettings.DelayThreshold = time.Duration(c.CountThreshold) * time.Millisecond
45-
}
46-
if c.DelayThreshold > 0 {
47-
topic.PublishSettings.CountThreshold = c.DelayThreshold
48-
}
49-
if c.ByteThreshold > 0 {
50-
topic.PublishSettings.ByteThreshold = c.ByteThreshold
51-
}
52-
if c.NumGoroutines > 0 {
53-
topic.PublishSettings.NumGoroutines = c.NumGoroutines
46+
func ConfigureTopic(topic *pubsub.Topic, c *TopicConfig) *pubsub.Topic {
47+
if c != nil {
48+
if c.CountThreshold > 0 {
49+
topic.PublishSettings.DelayThreshold = time.Duration(c.CountThreshold) * time.Millisecond
50+
}
51+
if c.DelayThreshold > 0 {
52+
topic.PublishSettings.CountThreshold = c.DelayThreshold
53+
}
54+
if c.ByteThreshold > 0 {
55+
topic.PublishSettings.ByteThreshold = c.ByteThreshold
56+
}
57+
if c.NumGoroutines > 0 {
58+
topic.PublishSettings.NumGoroutines = c.NumGoroutines
59+
}
5460
}
5561
return topic
5662
}
57-
58-
func (c *Publisher) Publish(ctx context.Context, data []byte, attributes map[string]string) (string, error) {
59-
msg := &pubsub.Message{
60-
Data: data,
63+
func (p *Publisher) Publish(ctx context.Context, data []byte, attributes map[string]string) error {
64+
msg := &pubsub.Message{Data: data}
65+
if attributes != nil {
66+
msg.Attributes = attributes
6167
}
62-
68+
publishResult := p.Topic.Publish(ctx, msg)
69+
_, err := publishResult.Get(ctx)
70+
return err
71+
}
72+
func (p *Publisher) PublishData(ctx context.Context, data []byte) error {
73+
msg := &pubsub.Message{Data: data}
74+
publishResult := p.Topic.Publish(ctx, msg)
75+
_, err := publishResult.Get(ctx)
76+
return err
77+
}
78+
func (p *Publisher) PublishMessage(ctx context.Context, data []byte, attributes map[string]string) (string, error) {
79+
msg := &pubsub.Message{Data: data}
6380
if attributes != nil {
6481
msg.Attributes = attributes
6582
}
66-
67-
publishResult := c.Topic.Publish(ctx, msg)
83+
publishResult := p.Topic.Publish(ctx, msg)
6884
return publishResult.Get(ctx)
6985
}
7086

publisher_config.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
11
package pubsub
22

33
type PublisherConfig struct {
4-
TopicId string `mapstructure:"topic_id" json:"topicId,omitempty" gorm:"column:topicid" bson:"topicId,omitempty" dynamodbav:"topicId,omitempty" firestore:"topicId,omitempty"`
5-
Client ClientConfig `mapstructure:"client" json:"client,omitempty" gorm:"column:client" bson:"client,omitempty" dynamodbav:"client,omitempty" firestore:"client,omitempty"`
6-
Topic TopicConfig `mapstructure:"topic" json:"topic,omitempty" gorm:"column:topic" bson:"topic,omitempty" dynamodbav:"topic,omitempty" firestore:"topic,omitempty"`
7-
Retry RetryConfig `mapstructure:"retry" json:"retry,omitempty" gorm:"column:retry" bson:"retry,omitempty" dynamodbav:"retry,omitempty" firestore:"retry,omitempty"`
4+
TopicId string `yaml:"topic_id" mapstructure:"topic_id" json:"topicId,omitempty" gorm:"column:topicid" bson:"topicId,omitempty" dynamodbav:"topicId,omitempty" firestore:"topicId,omitempty"`
5+
Client ClientConfig `yaml:"client" mapstructure:"client" json:"client,omitempty" gorm:"column:client" bson:"client,omitempty" dynamodbav:"client,omitempty" firestore:"client,omitempty"`
6+
Topic *TopicConfig `yaml:"topic" mapstructure:"topic" json:"topic,omitempty" gorm:"column:topic" bson:"topic,omitempty" dynamodbav:"topic,omitempty" firestore:"topic,omitempty"`
7+
Retry RetryConfig `yaml:"retry" mapstructure:"retry" json:"retry,omitempty" gorm:"column:retry" bson:"retry,omitempty" dynamodbav:"retry,omitempty" firestore:"retry,omitempty"`
88
}
99

1010
type TopicConfig struct {
11-
DelayThreshold int `mapstructure:"delay_threshold" json:"delayThreshold,omitempty" gorm:"column:delaythreshold" bson:"delayThreshold,omitempty" dynamodbav:"delayThreshold,omitempty" firestore:"delayThreshold,omitempty"` // MaxMessages
12-
CountThreshold int `mapstructure:"count_threshold" json:"countThreshold,omitempty" gorm:"column:countthreshold" bson:"countThreshold,omitempty" dynamodbav:"countThreshold,omitempty" firestore:"countThreshold,omitempty"` // MaxMilliseconds
13-
ByteThreshold int `mapstructure:"byte_threshold" json:"byteThreshold,omitempty" gorm:"column:bytethreshold" bson:"byteThreshold,omitempty" dynamodbav:"byteThreshold,omitempty" firestore:"byteThreshold,omitempty"` // MaxBytes
14-
NumGoroutines int `mapstructure:"num_goroutines" json:"numGoroutines,omitempty" gorm:"column:numgoroutines" bson:"numGoroutines,omitempty" dynamodbav:"numGoroutines,omitempty" firestore:"numGoroutines,omitempty"`
11+
DelayThreshold int `yaml:"delay_threshold" mapstructure:"delay_threshold" json:"delayThreshold,omitempty" gorm:"column:delaythreshold" bson:"delayThreshold,omitempty" dynamodbav:"delayThreshold,omitempty" firestore:"delayThreshold,omitempty"` // MaxMessages
12+
CountThreshold int `yaml:"count_threshold" mapstructure:"" json:"countThreshold,omitempty" gorm:"column:countthreshold" bson:"countThreshold,omitempty" dynamodbav:"countThreshold,omitempty" firestore:"countThreshold,omitempty"` // MaxMilliseconds
13+
ByteThreshold int `yaml:"byte_threshold" mapstructure:"byte_threshold" json:"byteThreshold,omitempty" gorm:"column:bytethreshold" bson:"byteThreshold,omitempty" dynamodbav:"byteThreshold,omitempty" firestore:"byteThreshold,omitempty"` // MaxBytes
14+
NumGoroutines int `yaml:"num_goroutines" mapstructure:"num_goroutines" json:"numGoroutines,omitempty" gorm:"column:numgoroutines" bson:"numGoroutines,omitempty" dynamodbav:"numGoroutines,omitempty" firestore:"numGoroutines,omitempty"`
1515
}

0 commit comments

Comments
 (0)