Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,12 @@ updates:
directory: "/" # Location of package manifests
schedule:
interval: daily
reviewers:
- "rustatian"
assignees:
- "rustatian"

- package-ecosystem: "github-actions"
directory: "/"
schedule:
interval: daily
reviewers:
- "rustatian"
assignees:
- "rustatian"
51 changes: 24 additions & 27 deletions .github/workflows/linux.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
name: boltdb

on:
push:
branches:
Expand All @@ -9,96 +8,94 @@ on:
branches:
- master
- stable

jobs:
boltdb_test:
name: boltdb (Go ${{ matrix.go }}, PHP ${{ matrix.php }}, OS ${{matrix.os}})
runs-on: ${{ matrix.os }}
timeout-minutes: 60
strategy:
matrix:
php: [ "8.4" ]
go: [ stable ]
os: [ "ubuntu-latest" ]
php: ["8.5"]
go: [stable]
os: ["ubuntu-latest"]
steps:
- name: Set up Go ${{ matrix.go }}
uses: actions/setup-go@v6 # action page: <https://github.com/actions/setup-go>
with:
go-version: ${{ matrix.go }}

- name: Set up PHP ${{ matrix.php }}
uses: shivammathur/setup-php@v2 # action page: <https://github.com/shivammathur/setup-php>
with:
php-version: ${{ matrix.php }}
extensions: sockets

- name: Check out code
uses: actions/checkout@v6

- name: Get Composer Cache Directory
id: composer-cache
run: |
cd tests/php_test_files
echo "dir=$(composer config cache-files-dir)" >> $GITHUB_OUTPUT

- name: Init Composer Cache # Docs: <https://git.io/JfAKn#php---composer>
uses: actions/cache@v5
with:
path: ${{ steps.composer-cache.outputs.dir }}
key: ${{ runner.os }}-composer-${{ matrix.php }}-${{ hashFiles('**/composer.json') }}
restore-keys: ${{ runner.os }}-composer-

- name: Install Composer dependencies
run: cd tests/php_test_files && composer update --prefer-dist --no-progress --ansi

- name: Init Go modules Cache # Docs: <https://git.io/JfAKn#go---modules>
uses: actions/cache@v5
with:
path: ~/go/pkg/mod
key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
restore-keys: ${{ runner.os }}-go-

- name: Install Go dependencies
run: go mod download

- name: Create dirs
- name: Run unit tests with coverage
run: |
mkdir ./tests/coverage-ci

- name: Run unit test with coverage
run: |
go test -timeout 20m -v -race -cover -tags=debug -failfast -coverpkg=$(cat ./tests/pkgs.txt) -coverprofile=./tests/coverage-ci/boltdb_u.out -covermode=atomic plugin.go plugin_test.go

go test -timeout 20m -v -race -cover -tags=debug -failfast -coverprofile=./tests/coverage-ci/boltdb_u.out -covermode=atomic plugin.go plugin_test.go
- name: Run golang tests with coverage
run: |
cd tests
docker compose -f env/docker-compose-boltdb.yaml up -d
sleep 30
go test -timeout 20m -v -race -cover -tags=debug -failfast -coverpkg=$(cat pkgs.txt) -coverprofile=./coverage-ci/boltdb.out -covermode=atomic boltdb_test.go

mkdir -p ./coverage-ci
go test -timeout 20m -v -race -cover -tags=debug -failfast -coverprofile=./coverage-ci/boltdb.out -covermode=atomic boltdb_test.go
- name: Archive code coverage results
uses: actions/upload-artifact@v7
with:
name: coverage
name: coverage-boltdb
path: ./tests/coverage-ci

codecov:
name: Upload codecov
runs-on: ubuntu-latest
needs:
- boltdb_test

timeout-minutes: 60
steps:
- name: Check out code
uses: actions/checkout@v6
- name: Download code coverage results
uses: actions/download-artifact@v8
with:
pattern: coverage-*
path: coverage
merge-multiple: true
- run: |
echo 'mode: atomic' > summary.txt
tail -q -n +2 *.out >> summary.txt
sed -i '2,${/roadrunner/!d}' summary.txt

tail -q -n +2 coverage/*.out >> summary.txt
awk '
NR == 1 { print; next }
/^github\.com\/roadrunner-server\/boltdb\/v6\// {
sub(/^github\.com\/roadrunner-server\/boltdb\/v6\//, "", $0)
print
}
' summary.txt > summary.filtered.txt
mv summary.filtered.txt summary.txt
Comment on lines 86 to +96
Copy link

Copilot AI Mar 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The awk filter keeps only coverage lines starting with github.com/roadrunner-server/boltdb/v6/, but both test commands run go test with explicit file lists (e.g. go test ... plugin.go plugin_test.go / ... boltdb_test.go), which typically produces coverprofile entries prefixed with command-line-arguments/.... This filtering will likely drop all coverage data and upload an effectively empty report. Adjust the test invocation to use package paths (e.g. go test ./... or go test . / go test ./tests) or broaden the filter to include the actual coverprofile path prefix.

Copilot uses AI. Check for mistakes.
- name: upload to codecov
uses: codecov/codecov-action@v5 # Docs: <https://github.com/codecov/codecov-action>
uses: codecov/codecov-action@v5
with:
files: summary.txt
fail_ci_if_error: false
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@
vendor/
**/vendor/
.idea
.DS_Store
.DS_Store
**/composer.lock
8 changes: 0 additions & 8 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ linters:
- bodyclose
- copyloopvar
- dogsled
- dupl
- errcheck
- errorlint
- exhaustive
Expand Down Expand Up @@ -60,13 +59,6 @@ linters:
- common-false-positives
- legacy
- std-error-handling
rules:
- linters:
- dupl
- funlen
- gocognit
- scopelint
path: _test\.go
paths:
- .github
- .git
Expand Down
63 changes: 16 additions & 47 deletions boltjobs/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"sync/atomic"
"time"

"github.com/roadrunner-server/api/v4/plugins/v4/jobs"
"github.com/roadrunner-server/api-plugins/v6/jobs"
"github.com/roadrunner-server/errors"
bolt "go.etcd.io/bbolt"
jprop "go.opentelemetry.io/contrib/propagators/jaeger"
Expand Down Expand Up @@ -64,7 +64,7 @@ type Driver struct {
stopCh chan struct{}
}

func FromConfig(tracer *sdktrace.TracerProvider, configKey string, log *zap.Logger, cfg Configurer, pipe jobs.Pipeline, pq jobs.Queue) (*Driver, error) {
func FromConfig(_ context.Context, tracer *sdktrace.TracerProvider, configKey string, log *zap.Logger, cfg Configurer, pipe jobs.Pipeline, pq jobs.Queue) (*Driver, error) {
const op = errors.Op("init_boltdb_jobs")

var localCfg config
Expand All @@ -84,13 +84,10 @@ func FromConfig(tracer *sdktrace.TracerProvider, configKey string, log *zap.Logg
db, err := bolt.Open(localCfg.File, os.FileMode(localCfg.Permissions), &bolt.Options{ //nolint:gosec
Timeout: time.Second * 20,
})

if err != nil {
return nil, errors.E(op, err)
}

// create a bucket if it does not exist
// tx.Commit invokes via the db.Update
err = create(db)
if err != nil {
return nil, errors.E(op, err)
Expand All @@ -111,8 +108,8 @@ func FromConfig(tracer *sdktrace.TracerProvider, configKey string, log *zap.Logg
},
cond: sync.NewCond(&sync.Mutex{}),

delayed: toPtr(uint64(0)),
active: toPtr(uint64(0)),
delayed: new(uint64),
active: new(uint64),

db: db,
log: log,
Expand All @@ -125,11 +122,11 @@ func FromConfig(tracer *sdktrace.TracerProvider, configKey string, log *zap.Logg
return dr, nil
}

func FromPipeline(tracer *sdktrace.TracerProvider, pipeline jobs.Pipeline, log *zap.Logger, cfg Configurer, pq jobs.Queue) (*Driver, error) {
func FromPipeline(_ context.Context, tracer *sdktrace.TracerProvider, pipeline jobs.Pipeline, log *zap.Logger, cfg Configurer, pq jobs.Queue) (*Driver, error) {
const op = errors.Op("init_boltdb_jobs")

var conf config
err := cfg.UnmarshalKey(pluginName, conf)
err := cfg.UnmarshalKey(pluginName, &conf)
if err != nil {
return nil, errors.E(op, err)
}
Expand All @@ -148,19 +145,15 @@ func FromPipeline(tracer *sdktrace.TracerProvider, pipeline jobs.Pipeline, log *
perm = 493 // 0755
}

// add default values
conf.InitDefaults()

db, err := bolt.Open(pipeline.String(file, rrDB), os.FileMode(perm), &bolt.Options{ //nolint:gosec
Timeout: time.Second * 20,
})

if err != nil {
return nil, errors.E(op, err)
}

// create a bucket if it does not exist
// tx.Commit invokes via the db.Update
err = create(db)
if err != nil {
return nil, errors.E(op, err)
Expand All @@ -179,8 +172,8 @@ func FromPipeline(tracer *sdktrace.TracerProvider, pipeline jobs.Pipeline, log *
}},
cond: sync.NewCond(&sync.Mutex{}),

delayed: toPtr(uint64(0)),
active: toPtr(uint64(0)),
delayed: new(uint64),
active: new(uint64),

db: db,
log: log,
Expand All @@ -202,9 +195,7 @@ func (d *Driver) Push(ctx context.Context, job jobs.Message) error {
err := d.db.Update(func(tx *bolt.Tx) error {
item := fromJob(job)
d.prop.Inject(ctx, propagation.HeaderCarrier(item.headers))
// pool with buffers
buf := d.get()
// encode the job
enc := gob.NewEncoder(buf)
err := enc.Encode(item)
if err != nil {
Expand All @@ -216,7 +207,6 @@ func (d *Driver) Push(ctx context.Context, job jobs.Message) error {
copy(value, buf.Bytes())
d.put(buf)

// handle delay
if item.Options.Delay > 0 {
b := tx.Bucket(strToBytes(DelayBucket))
tKey := time.Now().UTC().Add(time.Second * time.Duration(item.Options.Delay)).Format(time.RFC3339)
Expand All @@ -237,7 +227,6 @@ func (d *Driver) Push(ctx context.Context, job jobs.Message) error {
return errors.E(op, err)
}

// increment active counter
atomic.AddUint64(d.active, 1)

return nil
Expand All @@ -262,13 +251,11 @@ func (d *Driver) Run(ctx context.Context, p jobs.Pipeline) error {
return errors.E(op, errors.Errorf("no such pipeline registered: %s", pipe.Name()))
}

// run listener
go d.listener()
go d.listener() //nolint:gosec
go d.delayedJobsListener()

// increase number of listeners
atomic.AddUint32(&d.listeners, 1)
d.log.Debug("pipeline was started", zap.String("driver", pipe.Driver()), zap.String("pipeline", pipe.Name()), zap.Time("start", start), zap.Int64("elapsed", time.Since(start).Milliseconds()))
d.log.Debug("pipeline was started", zap.String("driver", pipe.Driver()), zap.String("pipeline", pipe.Name()), zap.Time("start", start), zap.Duration("elapsed", time.Since(start)))
return nil
}

Expand All @@ -285,10 +272,9 @@ func (d *Driver) Stop(ctx context.Context) error {

pipe := *d.pipeline.Load()

// remove all pending JOBS associated with the pipeline
_ = d.pq.Remove(pipe.Name())

d.log.Debug("pipeline was stopped", zap.String("driver", pipe.Driver()), zap.String("pipeline", pipe.Name()), zap.Time("start", start), zap.Int64("elapsed", time.Since(start).Milliseconds()))
d.log.Debug("pipeline was stopped", zap.String("driver", pipe.Driver()), zap.String("pipeline", pipe.Name()), zap.Time("start", start), zap.Duration("elapsed", time.Since(start)))
return d.db.Close()
}

Expand All @@ -304,7 +290,6 @@ func (d *Driver) Pause(ctx context.Context, p string) error {
}

l := atomic.LoadUint32(&d.listeners)
// no active listeners
if l == 0 {
return errors.Str("no active listeners, nothing to pause")
}
Expand All @@ -314,7 +299,7 @@ func (d *Driver) Pause(ctx context.Context, p string) error {

atomic.AddUint32(&d.listeners, ^uint32(0))

d.log.Debug("pipeline was paused", zap.String("driver", pipe.Driver()), zap.String("pipeline", pipe.Name()), zap.Time("start", start), zap.Int64("elapsed", time.Since(start).Milliseconds()))
d.log.Debug("pipeline was paused", zap.String("driver", pipe.Driver()), zap.String("pipeline", pipe.Name()), zap.Time("start", start), zap.Duration("elapsed", time.Since(start)))

return nil
}
Expand All @@ -331,19 +316,16 @@ func (d *Driver) Resume(ctx context.Context, p string) error {
}

l := atomic.LoadUint32(&d.listeners)
// no active listeners
if l == 1 {
return errors.Str("boltdb listener is already in the active state")
}

// run listener
go d.listener()
go d.listener() //nolint:gosec
go d.delayedJobsListener()

// increase number of listeners
atomic.AddUint32(&d.listeners, 1)

d.log.Debug("pipeline was resumed", zap.String("driver", pipe.Driver()), zap.String("pipeline", pipe.Name()), zap.Time("start", start), zap.Int64("elapsed", time.Since(start).Milliseconds()))
d.log.Debug("pipeline was resumed", zap.String("driver", pipe.Driver()), zap.String("pipeline", pipe.Name()), zap.Time("start", start), zap.Duration("elapsed", time.Since(start)))

return nil
}
Expand All @@ -361,7 +343,7 @@ func (d *Driver) State(ctx context.Context) (*jobs.State, error) {
Priority: uint64(pipe.Priority()), //nolint:gosec
Active: int64(atomic.LoadUint64(d.active)), //nolint:gosec
Delayed: int64(atomic.LoadUint64(d.delayed)), //nolint:gosec
Ready: toBool(atomic.LoadUint32(&d.listeners)),
Ready: atomic.LoadUint32(&d.listeners) > 0,
}, nil
}

Expand Down Expand Up @@ -390,7 +372,6 @@ func create(db *bolt.DB) error {

pushB := tx.Bucket(strToBytes(PushBucket))

// get all items, which are in the InQueueBucket and put them into the PushBucket
for k, v := cursor.First(); k != nil; k, v = cursor.Next() {
err = pushB.Put(k, v)
if err != nil {
Expand All @@ -401,11 +382,7 @@ func create(db *bolt.DB) error {
return nil
})

if err != nil {
return err
}

return nil
return err
}

func (d *Driver) get() *bytes.Buffer {
Expand All @@ -416,11 +393,3 @@ func (d *Driver) put(b *bytes.Buffer) {
b.Reset()
d.bPool.Put(b)
}

func toBool(r uint32) bool {
return r > 0
}

func toPtr[T any](v T) *T {
return &v
}
Loading
Loading