Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion dsqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,9 @@ func (q *DSQueue) worker(ctx context.Context, bufferSize, dedupCacheSize int, id
idle = false
rmMemCount += inBuf.Len()
inBuf.Clear()
dedupCache.Purge()
if dedupCache != nil {
dedupCache.Purge()
}
rmDSCount, err := q.clearDatastore(ctx)
if err != nil {
log.Errorw("cannot clear datastore", "err", err, "qname", q.name)
Expand Down
2 changes: 1 addition & 1 deletion dsqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ func TestPersistOneCid(t *testing.T) {

func TestDeduplicateCids(t *testing.T) {
ds := sync.MutexWrap(datastore.NewMapDatastore())
queue := dsqueue.New(ds, dsqName)
queue := dsqueue.New(ds, dsqName, dsqueue.WithDedupCacheSize(2*1024))
defer queue.Close()

cids := random.Cids(5)
Expand Down
18 changes: 9 additions & 9 deletions option.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,9 @@ import (
)

const (
DefaultBufferSize = 16 * 1024
DefaultDedupCacheSize = 2 * 1024
DefaultIdleWriteTime = time.Minute
DefaultCloseTimeout = 10 * time.Second
DefaultBufferSize = 16 * 1024
DefaultIdleWriteTime = time.Minute
DefaultCloseTimeout = 10 * time.Second
)

// config contains all options for DSQueue.
Expand All @@ -25,10 +24,9 @@ type Option func(*config)
// getOpts creates a config and applies Options to it.
func getOpts(opts []Option) config {
cfg := config{
bufferSize: DefaultBufferSize,
dedupCacheSize: DefaultDedupCacheSize,
idleWriteTime: DefaultIdleWriteTime,
closeTimeout: DefaultCloseTimeout,
bufferSize: DefaultBufferSize,
idleWriteTime: DefaultIdleWriteTime,
closeTimeout: DefaultCloseTimeout,
}

for _, opt := range opts {
Expand All @@ -52,7 +50,9 @@ func WithBufferSize(n int) Option {
}

// WithDedupCacheSize sets the size of the LRU cache used to deduplicate items
// in the queue. A value of 0 disables the dedup cache.
// in the queue.
//
// By default, the deduplication cache is disabled (size = 0).
func WithDedupCacheSize(n int) Option {
return func(c *config) {
if n < 0 {
Expand Down
Loading