diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index 5ff49ea..1c045b6 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -18,7 +18,7 @@ jobs: - uses: actions/checkout@v3 - uses: actions/setup-go@v4 with: - go-version: '1.22' + go-version-file: 'go.mod' cache: false - name: golangci-lint uses: golangci/golangci-lint-action@v3 @@ -32,7 +32,7 @@ jobs: - name: govulncheck uses: golang/govulncheck-action@v1 with: - go-version-input: '~1.22.0' + go-version-file: 'go.mod' check-latest: true commitlint: @@ -57,4 +57,4 @@ jobs: - name: Validate PR commits with commitlint if: github.event_name == 'pull_request' - run: npx commitlint --from ${{ github.event.pull_request.head.sha }}~${{ github.event.pull_request.commits }} --to ${{ github.event.pull_request.head.sha }} --verbose + run: npx commitlint --from ${{ github.event.pull_request.base.sha }} --to ${{ github.event.pull_request.head.sha }} --verbose diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml new file mode 100644 index 0000000..9aa5f48 --- /dev/null +++ b/.github/workflows/release.yml @@ -0,0 +1,34 @@ +name: Release +on: + push: + branches: + - main + +permissions: + contents: read # for checkout + +jobs: + release: + name: Release + runs-on: ubuntu-latest + permissions: + contents: write # to be able to publish a GitHub release + issues: write # to be able to comment on released issues + pull-requests: write # to be able to comment on released pull requests + steps: + - name: Checkout + uses: actions/checkout@v3 + with: + fetch-depth: 0 + - name: Setup Node.js + uses: actions/setup-node@v3 + with: + node-version: "lts/*" + - name: Install dependencies + run: npm install --save-dev @semantic-release/commit-analyzer @semantic-release/release-notes-generator @semantic-release/changelog @semantic-release/git @semantic-release/github + - name: Verify the integrity of provenance attestations and registry signatures for installed dependencies + run: npm audit signatures + - name: Release + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + run: npx semantic-release diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml new file mode 100644 index 0000000..21744eb --- /dev/null +++ b/.github/workflows/test.yml @@ -0,0 +1,19 @@ +name: Tests + +on: + push: + branches: + - main + pull_request: + +jobs: + tests: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - uses: actions/setup-go@v4 + with: + go-version-file: 'go.mod' + cache: false + - name: Run tests + run: go test ./... diff --git a/.releaserc.yml b/.releaserc.yml new file mode 100644 index 0000000..5a2bd93 --- /dev/null +++ b/.releaserc.yml @@ -0,0 +1,66 @@ +--- +branches: + - main +preset: "angular" +tagFormat: "v${version}" +plugins: + - "@semantic-release/commit-analyzer" + - "@semantic-release/release-notes-generator" + - "@semantic-release/github" + +verifyConditions: + - "@semantic-release/github" +analyzeCommits: + - path: "@semantic-release/commit-analyzer" + releaseRules: + - type: "feat" + release: "patch" + - type: "hotfix" + release: "patch" + - type: "patch" + release: "patch" + - type: "minor" + release: "minor" + - type: "breaking" + release: "major" +generateNotes: + - path: "@semantic-release/release-notes-generator" + writerOpts: + groupBy: "type" + commitGroupsSort: + - "feat" + - "perf" + - "fix" + commitsSort: "header" + types: + - type: "feat" + - section: "Features" + # Tracked bug fix with a hotfix branch + - type: "hotfix" + - section: "Bug Fixes" + # Uninmportent fix (CI testing, etc) + - type: "fix" + - hidden: true + - type: "chore" + - hidden: true + - type: "docs" + - hidden: true + - type: "doc" + - hidden: true + - type: "style" + - hidden: true + - type: "refactor" + - hidden: true + - type: "perf" + - hidden: true + - type: "test" + - hidden: true + presetConfig: true +publish: + - path: "@semantic-release/github" + +success: + - "@semantic-release/github" + +fail: + - "@semantic-release/github" diff --git a/README.md b/README.md index c595a32..eef1373 100644 --- a/README.md +++ b/README.md @@ -6,7 +6,7 @@ Formigo is a powerful and flexible Golang library designed to simplify the proce - **Efficient Throughput Management**: it offers optimal throughput management, allowing you to fine-tune the number of Go routines responsible for both polling messages from the queue and processing them. This dynamic control ensures maximum efficiency in various scenarios, making the library highly adaptable to your application's needs. -- **Configurable Batch Processing**: it uses powerful batch processing capabilities, enabling you to handle messages efficiently in customizable batches. With the Multiple Message Handler, messages can be processed in batches of a size you define, granting you full control over the processing logic. Moreover, you can adjust the batch buffer size and timeout settings, providing a flexible and optimal solution to process messages under various workloads. +- **Configurable Batch Processing**: it uses powerful batch processing capabilities, enabling you to handle messages efficiently in customizable batches. With the Batch Handler, messages can be processed in batches of a size you define, granting you full control over the processing logic. Moreover, you can adjust the batch buffer size and timeout settings, providing a flexible and optimal solution to process messages under various workloads. - **Context Cancellation**: Effortlessly stop the QueueWorker by canceling its context. This feature guarantees smooth and controlled termination of the worker whenever required. @@ -38,7 +38,7 @@ import ( "fmt" "log" - "github.com/Pod-Point/go-queue-worker" + formigo "github.com/Pod-Point/go-queue-worker" workerSqs "github.com/Pod-Point/go-queue-worker/clients/sqs" "github.com/aws/aws-sdk-go-v2/aws" @@ -72,12 +72,12 @@ func main() { wkr := formigo.NewWorker(formigo.Configuration{ Client: sqsClient, Concurrency: 100, - Consumer: formigo.NewSingleMessageConsumer(formigo.SingleMessageConsumerConfiguration{ - Handler: func(ctx context.Context, msg interface{}) error { - log.Println("Got Message", msgs) + Consumer: formigo.NewMessageConsumer(formigo.MessageConsumerConfiguration{ + Handler: func(ctx context.Context, msg formigo.Message) error { + log.Println("Got Message", msg.Content()) // Assert the type of message to get the body or any other attributes - log.Println("Message body", *msg.(types.Message).Body) + log.Println("Message body", *msg.Content().(types.Message).Body) return nil }, @@ -108,7 +108,7 @@ import ( "fmt" "log" - "github.com/Pod-Point/go-queue-worker" + formigo "github.com/Pod-Point/go-queue-worker" workerSqs "github.com/Pod-Point/go-queue-worker/clients/sqs" "github.com/aws/aws-sdk-go-v2/aws" @@ -142,18 +142,18 @@ func main() { wkr := formigo.NewWorker(formigo.Configuration{ Client: sqsClient, Concurrency: 100, - Consumer: formigo.NewMultiMessageConsumer(formigo.MultiMessageConsumerConfiguration{ - BufferConfig: formigo.MultiMessageBufferConfiguration{ + Consumer: formigo.BatchConsumer(formigo.BatchConsumerConfiguration{ + BufferConfig: formigo.BatchBufferConfiguration{ Size: 100, Timeout: time.Second * 5, }, - Handler: func(ctx context.Context, msgs []interface{}) error { + Handler: func(ctx context.Context, msgs []formigo.Message) error { log.Printf("Got %d messages to process\n", len(msgs) // Assert the type of message to get the body or any other attributes for i, msg := range msgs { - log.Printf("Message %d body: %s", i, *msg.(types.Message).Body) + log.Printf("Message %d body: %s", i, *msg.Content().(types.Message).Body) } return nil @@ -176,13 +176,13 @@ By processing messages in batches, the worker can significantly enhance throughp ## Configuration -| Configuration | Explanation | Default Value | -|-------------- | ----------- | ------------- | -| Client | The client is used for receiving messages from the queue and deleting them once they are processed correctly. This is a required configuration. | None | -| Concurrency | Number of Go routines that process the messages from the Queue. Higher values are useful for slow I/O operations in the consumer's handler. | 100 | -| Retrievers | Number of Go routines that retrieve messages from the Queue. Higher values are helpful for slow networks or when consumers are quicker. | 1 | -| ErrorConfig | Defines the error threshold and interval for worker termination and error reporting function. | None | -| Consumer | The message consumer, either SingleMessageConsumer or MultipleMessageConsumer. | None | +| Configuration | Explanation | Default Value | +|---------------|-------------------------------------------------------------------------------------------------------------------------------------------------|---------------| +| Client | The client is used for receiving messages from the queue and deleting them once they are processed correctly. This is a required configuration. | None | +| Concurrency | Number of Go routines that process the messages from the Queue. Higher values are useful for slow I/O operations in the consumer's handler. | 100 | +| Retrievers | Number of Go routines that retrieve messages from the Queue. Higher values are helpful for slow networks or when consumers are quicker. | 1 | +| ErrorConfig | Defines the error threshold and interval for worker termination and error reporting function. | None | +| Consumer | The message consumer, either MessageConsumer or BatchConsumer. | None | ## License diff --git a/config.go b/config.go index 3b14f7f..0fe18e7 100644 --- a/config.go +++ b/config.go @@ -33,13 +33,13 @@ type ErrorConfiguration struct { // Default: 120s. Period time.Duration - // The error report function - ReportFunc func(err error) + // The error report function, returns a boolean value to decide whether the error counts towards to threshold + ReportFunc func(err error) bool } -// The MultiMessageBufferConfiguration defines a buffer which is consumed by the worker when either +// The BatchConsumerBufferConfiguration defines a buffer which is consumed by the worker when either // the buffer is full or the timeout has passed since the first message got added. -type MultiMessageBufferConfiguration struct { +type BatchConsumerBufferConfiguration struct { // Max number of messages that the buffer can contain. // Default: 10. Size int @@ -52,13 +52,13 @@ type MultiMessageBufferConfiguration struct { Timeout time.Duration } -type SingleMessageConsumerConfiguration struct { - Handler singleMessageHandler +type MessageConsumerConfiguration struct { + Handler MessageHandler } -type MultiMessageConsumerConfiguration struct { - Handler multiMessageHandler - BufferConfig MultiMessageBufferConfiguration +type BatchConsumerConfiguration struct { + Handler BatchHandler + BufferConfig BatchConsumerBufferConfiguration } type Configuration struct { @@ -83,7 +83,7 @@ type Configuration struct { ErrorConfig ErrorConfiguration // The messages Consumer. - Consumer consumer + Consumer Consumer // Configuration for the deleter DeleterConfig DeleterConfiguration @@ -107,8 +107,10 @@ func setWorkerConfigValues(config Configuration) Configuration { } if config.ErrorConfig.ReportFunc == nil { - config.ErrorConfig.ReportFunc = func(err error) { + config.ErrorConfig.ReportFunc = func(err error) bool { log.Println("ERROR", err) + + return true } } diff --git a/consumers.go b/consumers.go index a3eafa5..fdd90e1 100644 --- a/consumers.go +++ b/consumers.go @@ -10,8 +10,12 @@ import ( "github.com/Pod-Point/go-queue-worker/internal/messages" ) -type singleMessageHandler = func(ctx context.Context, msg interface{}) error -type multiMessageHandler = func(ctx context.Context, msgs []interface{}) error +type BatchResponse struct { + FailedMessagesId []interface{} +} + +type MessageHandler = func(ctx context.Context, msg Message) error +type BatchHandler = func(ctx context.Context, msgs []Message) (BatchResponse, error) // This means that the buffered messages didn't get passed to the handler within // the first message's timeout. @@ -21,7 +25,7 @@ type multiMessageHandler = func(ctx context.Context, msgs []interface{}) error // - Consumer to slow var errBufferCtxExpired = errors.New("buffer context expired, buffer will Reset") -type consumer interface { +type Consumer interface { consume(concurrency int, ctrl *controller, messageCh <-chan messages.Message, deleteCh chan<- messages.Message) } @@ -34,7 +38,7 @@ func makeAvailableConsumers(concurrency int) chan struct{} { return consumers } -// wrapHandler catches any panic error, logs it and returns the error that generated it. +// wrapHandler catches any panic error and returns the error that generated it. // It prevents the worker from crashing in case of an unexpected error. func wrapHandler(handler func() error) (err error) { defer func() { @@ -49,31 +53,31 @@ func wrapHandler(handler func() error) (err error) { return err } -// singleMessageConsumer defines a message handler that consumes only one message at a +// messageConsumer defines a message handler that consumes only one message at a // time. // It can be useful when the workload is specific per message, for example for sending // an email. -type singleMessageConsumer struct { - handler singleMessageHandler +type messageConsumer struct { + handler MessageHandler } -func (c *singleMessageConsumer) processMessage(msg messages.Message) error { +func (c *messageConsumer) processMessage(msg messages.Message) error { defer msg.CancelCtx() // This must be called to release resources associated with the context. // Process Message return wrapHandler(func() error { - return c.handler(msg.Ctx, msg.Msg) + return c.handler(msg.Ctx, msg) }) } // Consumes and deletes a single message, it stops only when the `messageCh` gets closed // and doesn't have any messages in it. -func (c *singleMessageConsumer) consume(concurrency int, ctrl *controller, messageCh <-chan messages.Message, deleteCh chan<- messages.Message) { +func (c *messageConsumer) consume(concurrency int, ctrl *controller, messageCh <-chan messages.Message, deleteCh chan<- messages.Message) { consumers := makeAvailableConsumers(concurrency) var wg sync.WaitGroup for msg := range messageCh { - <-consumers // Use an available comsumer + <-consumers // Use an available consumer wg.Add(1) go func(message messages.Message) { @@ -96,35 +100,41 @@ func (c *singleMessageConsumer) consume(concurrency int, ctrl *controller, messa wg.Wait() } -func NewSingleMessageConsumer(config SingleMessageConsumerConfiguration) *singleMessageConsumer { - return &singleMessageConsumer{ +func NewMessageConsumer(config MessageConsumerConfiguration) *messageConsumer { + return &messageConsumer{ handler: config.Handler, } } -// multiMessageConsumer allows to process multiple messages at a time. This can be useful +// batchConsumer allows to process multiple messages at a time. This can be useful // for batch updates or use cases with high throughput. -type multiMessageConsumer struct { - handler multiMessageHandler - bufferConfig MultiMessageBufferConfiguration +type batchConsumer struct { + handler BatchHandler + bufferConfig BatchConsumerBufferConfiguration } // It processes the messages and push them downstream for deletion. -func (c *multiMessageConsumer) processMessages(ctrl *controller, deleteCh chan<- messages.Message, ctx context.Context, messages []messages.Message) { - msgs := make([]interface{}, 0, len(messages)) - for _, msg := range messages { - msgs = append(msgs, msg.Msg) +func (c *batchConsumer) processMessages(ctrl *controller, deleteCh chan<- messages.Message, ctx context.Context, msgs []messages.Message) { + defer func() { + if r := recover(); r != nil { + ctrl.reportError(fmt.Errorf("panic error: %s", r)) + } + }() + + // Convert slice to the abstraction + converted := make([]Message, 0, len(msgs)) + for _, msg := range msgs { + converted = append(converted, msg) } - err := wrapHandler(func() error { - return c.handler(ctx, msgs) - }) + + resp, err := c.handler(ctx, converted) if err != nil { - ctrl.reportError(fmt.Errorf("failed to process messages: %w", err)) - return + ctrl.reportError(fmt.Errorf("failed to process batch: %w", err)) } + toDelete := c.buildMessagesToDeleteFromBatchResponse(msgs, resp) // Push messages for deletion - for _, msg := range messages { + for _, msg := range toDelete { deleteCh <- msg } } @@ -132,7 +142,7 @@ func (c *multiMessageConsumer) processMessages(ctrl *controller, deleteCh chan<- // Consumes and deletes a number of messages in the interval [1, N] based on configuration // provided in the BufferConfiguration. // It stops only when the messageCh gets closed and doesn't have any messages in it. -func (c *multiMessageConsumer) consume(concurrency int, ctrl *controller, messageCh <-chan messages.Message, deleteCh chan<- messages.Message) { +func (c *batchConsumer) consume(concurrency int, ctrl *controller, messageCh <-chan messages.Message, deleteCh chan<- messages.Message) { consumers := makeAvailableConsumers(concurrency) // Create buffer @@ -208,7 +218,28 @@ func (c *multiMessageConsumer) consume(concurrency int, ctrl *controller, messag wg.Wait() } -func NewMultiMessageConsumer(config MultiMessageConsumerConfiguration) *multiMessageConsumer { +func (c *batchConsumer) buildMessagesToDeleteFromBatchResponse(msgs []messages.Message, resp BatchResponse) []messages.Message { + if len(resp.FailedMessagesId) == 0 { + return msgs + } + + toDelete := make([]messages.Message, 0, len(msgs)) + + failedMessagesIdIndexed := make(map[interface{}]struct{}, len(resp.FailedMessagesId)) + for _, id := range resp.FailedMessagesId { + failedMessagesIdIndexed[id] = struct{}{} + } + + for _, msg := range msgs { + if _, ok := failedMessagesIdIndexed[msg.Id()]; !ok { + toDelete = append(toDelete, msg) + } + } + + return toDelete +} + +func NewBatchConsumer(config BatchConsumerConfiguration) *batchConsumer { if config.BufferConfig.Size == 0 { config.BufferConfig.Size = 10 } @@ -217,7 +248,7 @@ func NewMultiMessageConsumer(config MultiMessageConsumerConfiguration) *multiMes config.BufferConfig.Timeout = time.Second } - return &multiMessageConsumer{ + return &batchConsumer{ handler: config.Handler, bufferConfig: config.BufferConfig, } @@ -225,6 +256,6 @@ func NewMultiMessageConsumer(config MultiMessageConsumerConfiguration) *multiMes // Interface guards var ( - _ consumer = (*singleMessageConsumer)(nil) - _ consumer = (*multiMessageConsumer)(nil) + _ Consumer = (*messageConsumer)(nil) + _ Consumer = (*batchConsumer)(nil) ) diff --git a/controller.go b/controller.go index 858b881..9d05dd2 100644 --- a/controller.go +++ b/controller.go @@ -19,8 +19,10 @@ type controller struct { // the controller. func (c *controller) decreaseCounterAfterTimeout() { time.Sleep(c.errorConfig.Period) + c.mutex.Lock() defer c.mutex.Unlock() + c.errorCounter-- } @@ -29,6 +31,7 @@ func (c *controller) increaseCounter() { // Increase counter c.mutex.Lock() defer c.mutex.Unlock() + c.errorCounter++ } @@ -40,7 +43,9 @@ func (c *controller) shouldStop() bool { } func (c *controller) reportError(err error) { - c.errorConfig.ReportFunc(err) + if shouldIncreaseCounter := c.errorConfig.ReportFunc(err); !shouldIncreaseCounter { + return + } c.increaseCounter() go c.decreaseCounterAfterTimeout() diff --git a/go.mod b/go.mod index 8d3fdf5..b9949ea 100644 --- a/go.mod +++ b/go.mod @@ -1,12 +1,18 @@ module github.com/Pod-Point/go-queue-worker -go 1.20 +go 1.21 -require github.com/aws/aws-sdk-go-v2/service/sqs v1.23.2 +require ( + github.com/aws/aws-sdk-go-v2/service/sqs v1.34.5 + github.com/stretchr/testify v1.9.0 +) require ( - github.com/aws/aws-sdk-go-v2 v1.18.1 // indirect - github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.34 // indirect - github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.28 // indirect - github.com/aws/smithy-go v1.13.5 // indirect + github.com/aws/aws-sdk-go-v2 v1.30.4 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.16 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.16 // indirect + github.com/aws/smithy-go v1.20.4 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 644a8f6..5c94707 100644 --- a/go.sum +++ b/go.sum @@ -1,19 +1,20 @@ -github.com/aws/aws-sdk-go-v2 v1.18.1 h1:+tefE750oAb7ZQGzla6bLkOwfcQCEtC5y2RqoqCeqKo= -github.com/aws/aws-sdk-go-v2 v1.18.1/go.mod h1:uzbQtefpm44goOPmdKyAlXSNcwlRgF3ePWVW6EtJvvw= -github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.34 h1:A5UqQEmPaCFpedKouS4v+dHCTUo2sKqhoKO9U5kxyWo= -github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.34/go.mod h1:wZpTEecJe0Btj3IYnDx/VlUzor9wm3fJHyvLpQF0VwY= -github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.28 h1:srIVS45eQuewqz6fKKu6ZGXaq6FuFg5NzgQBAM6g8Y4= -github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.28/go.mod h1:7VRpKQQedkfIEXb4k52I7swUnZP0wohVajJMRn3vsUw= -github.com/aws/aws-sdk-go-v2/service/sqs v1.23.2 h1:Y2vfLiY3HmaMisuwx6fS2kMRYbajRXXB+9vesGVPseY= -github.com/aws/aws-sdk-go-v2/service/sqs v1.23.2/go.mod h1:TaV67b6JMD1988x/uMDop/JnMFK6v5d4Ru+sDmFg+ww= -github.com/aws/smithy-go v1.13.5 h1:hgz0X/DX0dGqTYpGALqXJoRKRj5oQ7150i5FdTePzO8= -github.com/aws/smithy-go v1.13.5/go.mod h1:Tg+OJXh4MB2R/uN61Ko2f6hTZwB/ZYGOtib8J3gBHzA= -github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg= -github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= -github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= -github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U= +github.com/aws/aws-sdk-go-v2 v1.30.4 h1:frhcagrVNrzmT95RJImMHgabt99vkXGslubDaDagTk8= +github.com/aws/aws-sdk-go-v2 v1.30.4/go.mod h1:CT+ZPWXbYrci8chcARI3OmI/qgd+f6WtuLOoaIA8PR0= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.16 h1:TNyt/+X43KJ9IJJMjKfa3bNTiZbUP7DeCxfbTROESwY= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.16/go.mod h1:2DwJF39FlNAUiX5pAc0UNeiz16lK2t7IaFcm0LFHEgc= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.16 h1:jYfy8UPmd+6kJW5YhY0L1/KftReOGxI/4NtVSTh9O/I= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.16/go.mod h1:7ZfEPZxkW42Afq4uQB8H2E2e6ebh6mXTueEpYzjCzcs= +github.com/aws/aws-sdk-go-v2/service/sqs v1.34.5 h1:HYyVDOC2/PIg+3oBX1q0wtDU5kONki6lrgIG0afrBkY= +github.com/aws/aws-sdk-go-v2/service/sqs v1.34.5/go.mod h1:7idt3XszF6sE9WPS1GqZRiDJOxw4oPtlRBXodWnCGjU= +github.com/aws/smithy-go v1.20.4 h1:2HK1zBdPgRbjFOHlfeQZfpC4r72MOb9bZkiFwggKO+4= +github.com/aws/smithy-go v1.20.4/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/client/client.go b/internal/client/client.go index f3305ff..9db03d5 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -1,9 +1,13 @@ package client -import "github.com/Pod-Point/go-queue-worker/internal/messages" +import ( + "context" + + "github.com/Pod-Point/go-queue-worker/internal/messages" +) type MessageReceiver interface { - ReceiveMessages() ([]messages.Message, error) + ReceiveMessages(ctx context.Context) ([]messages.Message, error) } type MessageDeleter interface { diff --git a/internal/messages/messages.go b/internal/messages/messages.go index 13c9ba4..ba91502 100644 --- a/internal/messages/messages.go +++ b/internal/messages/messages.go @@ -6,9 +6,23 @@ import ( ) type Message struct { - Ctx context.Context - CancelCtx context.CancelFunc - Msg interface{} + Ctx context.Context `json:"-"` // Exclude from JSON + CancelCtx context.CancelFunc `json:"-"` // Exclude from JSON + MsgId interface{} `json:"id"` + Msg interface{} `json:"content"` + ReceivedTime time.Time `json:"receivedAt"` +} + +func (m Message) Id() interface{} { + return m.MsgId +} + +func (m Message) Content() interface{} { + return m.Msg +} + +func (m Message) ReceivedAt() time.Time { + return m.ReceivedTime } type BufferConfiguration struct { @@ -106,7 +120,7 @@ type BufferWithContextTimeoutConfiguration struct { Size int } -// bufferWithContextTimeout is used to construct a buffer that has a context timeout +// BufferWithContextTimeout is used to construct a buffer that has a context timeout // along with the standard buffer timeout. This is used because the messages have to // be processed within a certain period and if this doesn't happen, the buffer should // delete the messages in it and reset. @@ -136,7 +150,7 @@ func (b *BufferWithContextTimeout) Add(msg Message) { // Reset resets its internal buffer, cancel the current context created and // reset any timeout. -// It's important to call this function avoid memory leaks. In fact, the +// It's important to call this function to avoid memory leaks. In fact, the // GC won't collect any timer or resources allocated within the context. // NOTE: this function should be always called to clean up any buffer // created. Used in defer can guarantee that it always run. diff --git a/message.go b/message.go new file mode 100644 index 0000000..d1b118a --- /dev/null +++ b/message.go @@ -0,0 +1,9 @@ +package formigo + +import "time" + +type Message interface { + ReceivedAt() time.Time + Content() interface{} + Id() interface{} +} diff --git a/retriever.go b/retriever.go index 737475a..bde055c 100644 --- a/retriever.go +++ b/retriever.go @@ -17,8 +17,14 @@ func retriever(ctx context.Context, receiver client.MessageReceiver, ctrl *contr case <-ctx.Done(): return default: - messages, err := receiver.ReceiveMessages() + msgs, err := receiver.ReceiveMessages(ctx) if err != nil { + if errors.Is(err, context.Canceled) && errors.Is(ctx.Err(), context.Canceled) { + // The worker's context was canceled. We can exit. + return + } + + // Report the error to the controller and continue. ctrl.reportError(fmt.Errorf("unable to receive message: %w", err)) continue } @@ -27,9 +33,9 @@ func retriever(ctx context.Context, receiver client.MessageReceiver, ctrl *contr // This means that the retriever won't listen for context cancellation // at this stage. func() { - for _, message := range messages { + for _, msg := range msgs { select { - case <-message.Ctx.Done(): + case <-msg.Ctx.Done(): // If consumers don't pick up the messages within the messages' timeout we raise // an error. // This could be due to one or more of the following reasons: @@ -42,7 +48,7 @@ func retriever(ctx context.Context, receiver client.MessageReceiver, ctrl *contr ctrl.reportError(errors.New("message didn't get picked up by any consumer within its timeout")) return // Avoid publishing all the messages downstream - case messageCh <- message: + case messageCh <- msg: // Message pushed to the channel } } diff --git a/sqs.go b/sqs.go index 84582ef..96026c1 100644 --- a/sqs.go +++ b/sqs.go @@ -35,8 +35,8 @@ type sqsClient struct { messageCtxTimeout time.Duration } -func (c sqsClient) ReceiveMessages() ([]messages.Message, error) { - out, err := c.svc.ReceiveMessage(context.Background(), c.receiveMessageInput) +func (c sqsClient) ReceiveMessages(ctx context.Context) ([]messages.Message, error) { + out, err := c.svc.ReceiveMessage(ctx, c.receiveMessageInput) if err != nil { return nil, fmt.Errorf("unable to receive messages: %w", err) } @@ -82,7 +82,9 @@ func (c sqsClient) prepareMessagesForDeletion(messages []messages.Message) []typ func (c sqsClient) createMessage(sqsMessage types.Message) messages.Message { msg := messages.Message{ - Msg: sqsMessage, + MsgId: *sqsMessage.MessageId, + Msg: sqsMessage, + ReceivedTime: time.Now(), } // Set a context with timeout diff --git a/worker.go b/worker.go index ea18a48..e6d5666 100644 --- a/worker.go +++ b/worker.go @@ -9,16 +9,16 @@ import ( "github.com/Pod-Point/go-queue-worker/internal/messages" ) -type worker struct { +type Worker struct { client client.Client concurrency int retrievers int errorConfig ErrorConfiguration - consumer consumer + consumer Consumer deleterConfig DeleterConfiguration } -func (w worker) Run(ctx context.Context) error { +func (w Worker) Run(ctx context.Context) error { // Create a new context with a cancel function used to stop the worker from the // controller in case too many errors occur. ctx, cancel := context.WithCancelCause(ctx) @@ -60,7 +60,7 @@ func (w worker) Run(ctx context.Context) error { // It returns a channel where the messages will be published and, only when all the // retrievers have stopped, it will close it to broadcast the signal to stop to the // consumers. -func (w worker) runRetrievers(ctx context.Context, ctrl *controller) <-chan messages.Message { +func (w Worker) runRetrievers(ctx context.Context, ctrl *controller) <-chan messages.Message { messageCh := make(chan messages.Message) var wg sync.WaitGroup @@ -84,7 +84,7 @@ func (w worker) runRetrievers(ctx context.Context, ctrl *controller) <-chan mess // It returns a channel where the messages will be published for deletion and, // only when the consumer has stopped, it will close it to broadcast the // signal to stop to the deleter. -func (w worker) runConsumer(ctrl *controller, messageCh <-chan messages.Message) <-chan messages.Message { +func (w Worker) runConsumer(ctrl *controller, messageCh <-chan messages.Message) <-chan messages.Message { deleteCh := make(chan messages.Message) go func() { @@ -96,10 +96,10 @@ func (w worker) runConsumer(ctrl *controller, messageCh <-chan messages.Message) return deleteCh } -func NewWorker(config Configuration) worker { +func NewWorker(config Configuration) Worker { config = setWorkerConfigValues(config) - return worker{ + return Worker{ client: config.Client, concurrency: config.Concurrency, retrievers: config.Retrievers, diff --git a/worker_test.go b/worker_test.go new file mode 100644 index 0000000..3ccf1f5 --- /dev/null +++ b/worker_test.go @@ -0,0 +1,251 @@ +package formigo + +import ( + "context" + "fmt" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/Pod-Point/go-queue-worker/internal/messages" +) + +type SimpleInMemoryBrokerMessage struct { + messageId string + body string + deleteReqCh chan struct{} + deleteAckCh chan struct{} + timer *time.Timer +} + +type SimpleInMemoryBroker struct { + visibilityTimeout time.Duration + queue chan *SimpleInMemoryBrokerMessage + inFlights chan *SimpleInMemoryBrokerMessage + expired chan *SimpleInMemoryBrokerMessage + + statics struct { + rwMutex sync.RWMutex + enqueuedMessages int + inFlightMessages int + } +} + +func NewSimpleInMemoryBroker(visibilityTimeout time.Duration) *SimpleInMemoryBroker { + return &SimpleInMemoryBroker{ + visibilityTimeout: visibilityTimeout, + queue: make(chan *SimpleInMemoryBrokerMessage, 1000), + inFlights: make(chan *SimpleInMemoryBrokerMessage), + expired: make(chan *SimpleInMemoryBrokerMessage, 1000), + } +} + +func (b *SimpleInMemoryBroker) run(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case msg := <-b.inFlights: + go func(ctx context.Context) { + select { + case <-ctx.Done(): + return + case <-msg.deleteReqCh: + msg.deleteAckCh <- struct{}{} + case <-msg.timer.C: + b.expired <- msg + } + }(ctx) + } + } +} + +func (b *SimpleInMemoryBroker) AddMessages(msgs []*SimpleInMemoryBrokerMessage) { + for _, msg := range msgs { + b.queue <- msg + b.statics.rwMutex.Lock() + b.statics.enqueuedMessages++ + b.statics.rwMutex.Unlock() + } +} + +func (b *SimpleInMemoryBroker) DeleteMessages(msgs []messages.Message) error { + requestTimer := time.NewTimer(time.Second * 5) + defer requestTimer.Stop() + + for _, msg := range msgs { + brokerMsg := msg.Content().(*SimpleInMemoryBrokerMessage) + + select { + case <-requestTimer.C: + return fmt.Errorf("failed to delete message %s: request timeout", brokerMsg.messageId) + case brokerMsg.deleteReqCh <- struct{}{}: + } + + if !brokerMsg.timer.Stop() { + return fmt.Errorf("failed to delete message %s: visibility timeout exipired", brokerMsg.messageId) + } + + <-brokerMsg.deleteAckCh + + b.statics.rwMutex.Lock() + b.statics.inFlightMessages-- + b.statics.rwMutex.Unlock() + } + + return nil +} + +func (b *SimpleInMemoryBroker) ReceiveMessages(ctx context.Context) ([]messages.Message, error) { + var polledMessage *SimpleInMemoryBrokerMessage + select { + case polledMessage = <-b.expired: + default: + timer := time.NewTimer(time.Millisecond * 500) + defer timer.Stop() + + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-timer.C: + return nil, nil + case polledMessage = <-b.expired: + case polledMessage = <-b.queue: + } + } + + polledMessage.timer = time.NewTimer(b.visibilityTimeout) + polledMessage.deleteReqCh = make(chan struct{}) + polledMessage.deleteAckCh = make(chan struct{}) + + time.After(time.Millisecond * 5) + + msg := messages.Message{ + MsgId: polledMessage.messageId, + Msg: polledMessage, + ReceivedTime: time.Now(), + } + + // Set a context with timeout + msg.Ctx, msg.CancelCtx = context.WithTimeout(context.Background(), b.visibilityTimeout) + + // Move the message to inflight + b.inFlights <- polledMessage + b.statics.rwMutex.Lock() + b.statics.enqueuedMessages-- + b.statics.inFlightMessages++ + b.statics.rwMutex.Unlock() + + return []messages.Message{msg}, nil +} + +func (b *SimpleInMemoryBroker) EnqueuedMessages() int { + b.statics.rwMutex.RLock() + defer b.statics.rwMutex.RUnlock() + return b.statics.enqueuedMessages +} + +func (b *SimpleInMemoryBroker) InFlightMessages() int { + b.statics.rwMutex.RLock() + defer b.statics.rwMutex.RUnlock() + return b.statics.inFlightMessages +} + +func TestWorker(t *testing.T) { + inMemoryBroker := NewSimpleInMemoryBroker(time.Second * 10) + go inMemoryBroker.run(context.Background()) + + t.Run("can receive a message", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + msgs := []*SimpleInMemoryBrokerMessage{ + { + messageId: "1", + body: "Hello, world!", + }, + } + + inMemoryBroker.AddMessages(msgs) + + wkr := NewWorker(Configuration{ + Client: inMemoryBroker, + Concurrency: 1, + Retrievers: 1, + ErrorConfig: ErrorConfiguration{ + ReportFunc: func(err error) bool { + t.Fatalf("unexpected error: %v", err) + return true + }, + }, + Consumer: NewMessageConsumer(MessageConsumerConfiguration{ + Handler: func(ctx context.Context, msg Message) error { + defer cancel() + + assert.Equal(t, "Hello, world!", msg.Content().(*SimpleInMemoryBrokerMessage).body) + + return nil + }, + }), + }) + + assert.NoError(t, wkr.Run(ctx)) + assert.Equal(t, 0, inMemoryBroker.EnqueuedMessages()) + assert.Equal(t, 0, inMemoryBroker.InFlightMessages()) + }) + + t.Run("can receive a batch of messages", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + msgs := []*SimpleInMemoryBrokerMessage{ + { + messageId: "1", + body: "Hello, world 1!", + }, + { + messageId: "2", + body: "Hello, world 2!", + }, + { + messageId: "3", + body: "Hello, world 3!", + }, + } + + inMemoryBroker.AddMessages(msgs) + + wkr := NewWorker(Configuration{ + Client: inMemoryBroker, + Concurrency: 1, + Retrievers: 1, + ErrorConfig: ErrorConfiguration{ + ReportFunc: func(err error) bool { + t.Fatalf("unexpected error: %v", err) + return true + }, + }, + Consumer: NewBatchConsumer(BatchConsumerConfiguration{ + BufferConfig: BatchConsumerBufferConfiguration{ + Size: 3, + Timeout: time.Second, + }, + Handler: func(ctx context.Context, msgs []Message) (BatchResponse, error) { + defer cancel() + + if len(msgs) < 3 { + t.Fatalf("expected 3 messages, got %d", len(msgs)) + } + + assert.Equal(t, "Hello, world 1!", msgs[0].Content().(*SimpleInMemoryBrokerMessage).body) + assert.Equal(t, "Hello, world 2!", msgs[1].Content().(*SimpleInMemoryBrokerMessage).body) + assert.Equal(t, "Hello, world 3!", msgs[2].Content().(*SimpleInMemoryBrokerMessage).body) + + return BatchResponse{}, nil + }, + }), + }) + + assert.NoError(t, wkr.Run(ctx)) + assert.Equal(t, 0, inMemoryBroker.EnqueuedMessages()) + assert.Equal(t, 0, inMemoryBroker.InFlightMessages()) + }) +}