Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
d35646d
chore: add automated release
francescopepe Feb 15, 2024
cf0f503
Merge pull request #12 from francescopepe/FP/add-release-automation
francescopepe Feb 16, 2024
24d1260
chore: fix releaserc.yml
francescopepe Feb 16, 2024
ab739c8
feat: `reportFunc` returns a boolean value to decide whether the erro…
francescopepe Jun 17, 2024
8181967
Merge pull request #13 from francescopepe/FP/update-report-func
francescopepe Jun 17, 2024
10cdba4
chore: update dependencies
francescopepe Aug 17, 2024
1967ed6
feat: pass formigo message to handlers to get more info
francescopepe Aug 17, 2024
0936e34
chore: rename single message handler and consumer to message handler …
francescopepe Aug 17, 2024
a9f90db
feat: add batch response to support partial failures
francescopepe Aug 17, 2024
ed8a2bc
ci: update go version
francescopepe Aug 17, 2024
32ae9a6
chore: update go version in go.mod
francescopepe Aug 17, 2024
9d72b46
feat: make formigo.Message JSON encodable
francescopepe Aug 18, 2024
cc77bc3
Merge pull request #14 from francescopepe/FP/improvements
francescopepe Aug 22, 2024
0ffeeae
feat: make the retriever stop immediately if the context is canceled
francescopepe Aug 27, 2024
0102111
ci: add some simple tests and workflow
francescopepe Aug 27, 2024
3ead0c6
Merge pull request #15 from francescopepe/FP/stop-retriever-immediately
francescopepe Aug 27, 2024
3e4b72c
feat: export Worker, Consumer and Handlers
francescopepe Sep 15, 2024
c9fd186
Merge pull request #16 from francescopepe/FP/export-worker-consumers
francescopepe Oct 22, 2024
a1a0e54
chore: retract v1.0.0
francescopepe Oct 22, 2024
e361d82
chore: retract v1.0.0 correctly
lukemorrigan-pp Feb 20, 2025
f7ec269
chore: rename forked module for internal use
RichToms Feb 15, 2024
84934ce
fix: update formigo reference and delete retract block in go.mod
leegm-pp Jul 1, 2025
e1dea93
fix: try running commitlint from base to head insead
leegm-pp Jul 1, 2025
dd877c2
Merge branch 'main' into PAR-update-fork
leegm-pp Jul 1, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ jobs:
- uses: actions/checkout@v3
- uses: actions/setup-go@v4
with:
go-version: '1.22'
go-version-file: 'go.mod'
cache: false
- name: golangci-lint
uses: golangci/golangci-lint-action@v3
Expand All @@ -32,7 +32,7 @@ jobs:
- name: govulncheck
uses: golang/govulncheck-action@v1
with:
go-version-input: '~1.22.0'
go-version-file: 'go.mod'
check-latest: true

commitlint:
Expand All @@ -57,4 +57,4 @@ jobs:

- name: Validate PR commits with commitlint
if: github.event_name == 'pull_request'
run: npx commitlint --from ${{ github.event.pull_request.head.sha }}~${{ github.event.pull_request.commits }} --to ${{ github.event.pull_request.head.sha }} --verbose
run: npx commitlint --from ${{ github.event.pull_request.base.sha }} --to ${{ github.event.pull_request.head.sha }} --verbose
34 changes: 34 additions & 0 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
name: Release
on:
push:
branches:
- main

permissions:
contents: read # for checkout

jobs:
release:
name: Release
runs-on: ubuntu-latest
permissions:
contents: write # to be able to publish a GitHub release
issues: write # to be able to comment on released issues
pull-requests: write # to be able to comment on released pull requests
steps:
- name: Checkout
uses: actions/checkout@v3
with:
fetch-depth: 0
- name: Setup Node.js
uses: actions/setup-node@v3
with:
node-version: "lts/*"
- name: Install dependencies
run: npm install --save-dev @semantic-release/commit-analyzer @semantic-release/release-notes-generator @semantic-release/changelog @semantic-release/git @semantic-release/github
- name: Verify the integrity of provenance attestations and registry signatures for installed dependencies
run: npm audit signatures
- name: Release
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: npx semantic-release
19 changes: 19 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
name: Tests

on:
push:
branches:
- main
pull_request:

jobs:
tests:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/setup-go@v4
with:
go-version-file: 'go.mod'
cache: false
- name: Run tests
run: go test ./...
66 changes: 66 additions & 0 deletions .releaserc.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
---
branches:
- main
preset: "angular"
tagFormat: "v${version}"
plugins:
- "@semantic-release/commit-analyzer"
- "@semantic-release/release-notes-generator"
- "@semantic-release/github"

verifyConditions:
- "@semantic-release/github"
analyzeCommits:
- path: "@semantic-release/commit-analyzer"
releaseRules:
- type: "feat"
release: "patch"
- type: "hotfix"
release: "patch"
- type: "patch"
release: "patch"
- type: "minor"
release: "minor"
- type: "breaking"
release: "major"
generateNotes:
- path: "@semantic-release/release-notes-generator"
writerOpts:
groupBy: "type"
commitGroupsSort:
- "feat"
- "perf"
- "fix"
commitsSort: "header"
types:
- type: "feat"
- section: "Features"
# Tracked bug fix with a hotfix branch
- type: "hotfix"
- section: "Bug Fixes"
# Uninmportent fix (CI testing, etc)
- type: "fix"
- hidden: true
- type: "chore"
- hidden: true
- type: "docs"
- hidden: true
- type: "doc"
- hidden: true
- type: "style"
- hidden: true
- type: "refactor"
- hidden: true
- type: "perf"
- hidden: true
- type: "test"
- hidden: true
presetConfig: true
publish:
- path: "@semantic-release/github"

success:
- "@semantic-release/github"

fail:
- "@semantic-release/github"
36 changes: 18 additions & 18 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ Formigo is a powerful and flexible Golang library designed to simplify the proce

- **Efficient Throughput Management**: it offers optimal throughput management, allowing you to fine-tune the number of Go routines responsible for both polling messages from the queue and processing them. This dynamic control ensures maximum efficiency in various scenarios, making the library highly adaptable to your application's needs.

- **Configurable Batch Processing**: it uses powerful batch processing capabilities, enabling you to handle messages efficiently in customizable batches. With the Multiple Message Handler, messages can be processed in batches of a size you define, granting you full control over the processing logic. Moreover, you can adjust the batch buffer size and timeout settings, providing a flexible and optimal solution to process messages under various workloads.
- **Configurable Batch Processing**: it uses powerful batch processing capabilities, enabling you to handle messages efficiently in customizable batches. With the Batch Handler, messages can be processed in batches of a size you define, granting you full control over the processing logic. Moreover, you can adjust the batch buffer size and timeout settings, providing a flexible and optimal solution to process messages under various workloads.

- **Context Cancellation**: Effortlessly stop the QueueWorker by canceling its context. This feature guarantees smooth and controlled termination of the worker whenever required.

Expand Down Expand Up @@ -38,7 +38,7 @@ import (
"fmt"
"log"

"github.com/Pod-Point/go-queue-worker"
formigo "github.com/Pod-Point/go-queue-worker"
workerSqs "github.com/Pod-Point/go-queue-worker/clients/sqs"

"github.com/aws/aws-sdk-go-v2/aws"
Expand Down Expand Up @@ -72,12 +72,12 @@ func main() {
wkr := formigo.NewWorker(formigo.Configuration{
Client: sqsClient,
Concurrency: 100,
Consumer: formigo.NewSingleMessageConsumer(formigo.SingleMessageConsumerConfiguration{
Handler: func(ctx context.Context, msg interface{}) error {
log.Println("Got Message", msgs)
Consumer: formigo.NewMessageConsumer(formigo.MessageConsumerConfiguration{
Handler: func(ctx context.Context, msg formigo.Message) error {
log.Println("Got Message", msg.Content())

// Assert the type of message to get the body or any other attributes
log.Println("Message body", *msg.(types.Message).Body)
log.Println("Message body", *msg.Content().(types.Message).Body)

return nil
},
Expand Down Expand Up @@ -108,7 +108,7 @@ import (
"fmt"
"log"

"github.com/Pod-Point/go-queue-worker"
formigo "github.com/Pod-Point/go-queue-worker"
workerSqs "github.com/Pod-Point/go-queue-worker/clients/sqs"

"github.com/aws/aws-sdk-go-v2/aws"
Expand Down Expand Up @@ -142,18 +142,18 @@ func main() {
wkr := formigo.NewWorker(formigo.Configuration{
Client: sqsClient,
Concurrency: 100,
Consumer: formigo.NewMultiMessageConsumer(formigo.MultiMessageConsumerConfiguration{
BufferConfig: formigo.MultiMessageBufferConfiguration{
Consumer: formigo.BatchConsumer(formigo.BatchConsumerConfiguration{
BufferConfig: formigo.BatchBufferConfiguration{
Size: 100,
Timeout: time.Second * 5,
},
Handler: func(ctx context.Context, msgs []interface{}) error {
Handler: func(ctx context.Context, msgs []formigo.Message) error {
log.Printf("Got %d messages to process\n", len(msgs)

// Assert the type of message to get the body or any other attributes

for i, msg := range msgs {
log.Printf("Message %d body: %s", i, *msg.(types.Message).Body)
log.Printf("Message %d body: %s", i, *msg.Content().(types.Message).Body)
}

return nil
Expand All @@ -176,13 +176,13 @@ By processing messages in batches, the worker can significantly enhance throughp

## Configuration

| Configuration | Explanation | Default Value |
|-------------- | ----------- | ------------- |
| Client | The client is used for receiving messages from the queue and deleting them once they are processed correctly. This is a required configuration. | None |
| Concurrency | Number of Go routines that process the messages from the Queue. Higher values are useful for slow I/O operations in the consumer's handler. | 100 |
| Retrievers | Number of Go routines that retrieve messages from the Queue. Higher values are helpful for slow networks or when consumers are quicker. | 1 |
| ErrorConfig | Defines the error threshold and interval for worker termination and error reporting function. | None |
| Consumer | The message consumer, either SingleMessageConsumer or MultipleMessageConsumer. | None |
| Configuration | Explanation | Default Value |
|---------------|-------------------------------------------------------------------------------------------------------------------------------------------------|---------------|
| Client | The client is used for receiving messages from the queue and deleting them once they are processed correctly. This is a required configuration. | None |
| Concurrency | Number of Go routines that process the messages from the Queue. Higher values are useful for slow I/O operations in the consumer's handler. | 100 |
| Retrievers | Number of Go routines that retrieve messages from the Queue. Higher values are helpful for slow networks or when consumers are quicker. | 1 |
| ErrorConfig | Defines the error threshold and interval for worker termination and error reporting function. | None |
| Consumer | The message consumer, either MessageConsumer or BatchConsumer. | None |

## License

Expand Down
24 changes: 13 additions & 11 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@ type ErrorConfiguration struct {
// Default: 120s.
Period time.Duration

// The error report function
ReportFunc func(err error)
// The error report function, returns a boolean value to decide whether the error counts towards to threshold
ReportFunc func(err error) bool
}

// The MultiMessageBufferConfiguration defines a buffer which is consumed by the worker when either
// The BatchConsumerBufferConfiguration defines a buffer which is consumed by the worker when either
// the buffer is full or the timeout has passed since the first message got added.
type MultiMessageBufferConfiguration struct {
type BatchConsumerBufferConfiguration struct {
// Max number of messages that the buffer can contain.
// Default: 10.
Size int
Expand All @@ -52,13 +52,13 @@ type MultiMessageBufferConfiguration struct {
Timeout time.Duration
}

type SingleMessageConsumerConfiguration struct {
Handler singleMessageHandler
type MessageConsumerConfiguration struct {
Handler MessageHandler
}

type MultiMessageConsumerConfiguration struct {
Handler multiMessageHandler
BufferConfig MultiMessageBufferConfiguration
type BatchConsumerConfiguration struct {
Handler BatchHandler
BufferConfig BatchConsumerBufferConfiguration
}

type Configuration struct {
Expand All @@ -83,7 +83,7 @@ type Configuration struct {
ErrorConfig ErrorConfiguration

// The messages Consumer.
Consumer consumer
Consumer Consumer

// Configuration for the deleter
DeleterConfig DeleterConfiguration
Expand All @@ -107,8 +107,10 @@ func setWorkerConfigValues(config Configuration) Configuration {
}

if config.ErrorConfig.ReportFunc == nil {
config.ErrorConfig.ReportFunc = func(err error) {
config.ErrorConfig.ReportFunc = func(err error) bool {
log.Println("ERROR", err)

return true
}
}

Expand Down
Loading
Loading