Skip to content

Commit c3b75e4

Browse files
authored
do not convert input item to string (#12)
1 parent 12b0307 commit c3b75e4

File tree

2 files changed

+51
-27
lines changed

2 files changed

+51
-27
lines changed

dsqueue.go

Lines changed: 27 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ type DSQueue struct {
4141
closeOnce sync.Once
4242
dequeue chan []byte
4343
ds datastore.Batching
44-
enqueue chan string
44+
enqueue chan []byte
4545
clear chan chan<- int
4646
closeTimeout time.Duration
4747
getn chan getRequest
@@ -59,7 +59,7 @@ func New(ds datastore.Batching, name string, options ...Option) *DSQueue {
5959
closed: make(chan error, 1),
6060
dequeue: make(chan []byte),
6161
ds: namespace.Wrap(ds, datastore.NewKey("/dsq-"+name)),
62-
enqueue: make(chan string),
62+
enqueue: make(chan []byte),
6363
clear: make(chan chan<- int),
6464
closeTimeout: cfg.closeTimeout,
6565
getn: make(chan getRequest),
@@ -105,7 +105,7 @@ func (q *DSQueue) Put(item []byte) (err error) {
105105
}
106106
}()
107107

108-
q.enqueue <- string(item)
108+
q.enqueue <- item
109109
return
110110
}
111111

@@ -157,8 +157,8 @@ func (q *DSQueue) Name() string {
157157
return q.name
158158
}
159159

160-
func makeKey(item string, counter uint64) datastore.Key {
161-
b64Item := base64.RawURLEncoding.EncodeToString([]byte(item))
160+
func makeKey(item []byte, counter uint64) datastore.Key {
161+
b64Item := base64.RawURLEncoding.EncodeToString(item)
162162
return datastore.NewKey(fmt.Sprintf("%016x/%s", counter, b64Item))
163163
}
164164

@@ -167,9 +167,9 @@ func (q *DSQueue) worker(ctx context.Context, bufferSize, dedupCacheSize int, id
167167
defer close(q.closed)
168168

169169
var (
170-
item string
170+
item []byte
171171
counter uint64
172-
inBuf deque.Deque[string]
172+
inBuf deque.Deque[[]byte]
173173
)
174174

175175
const baseCap = 1024
@@ -181,7 +181,7 @@ func (q *DSQueue) worker(ctx context.Context, bufferSize, dedupCacheSize int, id
181181
}
182182

183183
defer func() {
184-
if item != "" {
184+
if item != nil {
185185
// Write the item directly, instead of pushing it to the front of
186186
// inbuf, in order to retain it's original kay, and therefore the
187187
// order in the datastore, which may not be empty.
@@ -226,7 +226,7 @@ func (q *DSQueue) worker(ctx context.Context, bufferSize, dedupCacheSize int, id
226226
}
227227

228228
for {
229-
if item == "" {
229+
if item == nil {
230230
if !dsEmpty {
231231
head, err := q.getQueueHead(ctx)
232232
if err != nil {
@@ -244,12 +244,11 @@ func (q *DSQueue) worker(ctx context.Context, bufferSize, dedupCacheSize int, id
244244
log.Errorw("malformed queued item, removing it from queue", "err", err, "key", head.Key, "qname", q.name)
245245
continue
246246
}
247-
itemBin, err := base64.RawURLEncoding.DecodeString(parts[1])
247+
item, err = base64.RawURLEncoding.DecodeString(parts[1])
248248
if err != nil {
249249
log.Errorw("error decoding queued item, removing it from queue", "err", err, "key", head.Key, "qname", q.name)
250250
continue
251251
}
252-
item = string(itemBin)
253252
} else {
254253
dsEmpty = true
255254
}
@@ -265,7 +264,7 @@ func (q *DSQueue) worker(ctx context.Context, bufferSize, dedupCacheSize int, id
265264

266265
// If c != cid.Undef set dequeue and attempt write.
267266
var dequeue chan []byte
268-
if item != "" {
267+
if item != nil {
269268
dequeue = q.dequeue
270269
}
271270

@@ -275,15 +274,16 @@ func (q *DSQueue) worker(ctx context.Context, bufferSize, dedupCacheSize int, id
275274
return
276275
}
277276
if dedupCache != nil {
278-
if found, _ := dedupCache.ContainsOrAdd(toQueue, struct{}{}); found {
277+
cacheItem := string(toQueue)
278+
if found, _ := dedupCache.ContainsOrAdd(cacheItem, struct{}{}); found {
279279
// update recentness in LRU cache
280-
dedupCache.Add(toQueue, struct{}{})
280+
dedupCache.Add(cacheItem, struct{}{})
281281
continue
282282
}
283283
}
284284
idle = false
285285

286-
if item == "" {
286+
if item == nil {
287287
// Use this CID as the next output since there was nothing in
288288
// the datastore or buffer previously.
289289
item = toQueue
@@ -301,8 +301,8 @@ func (q *DSQueue) worker(ctx context.Context, bufferSize, dedupCacheSize int, id
301301
rspChan := getRequest.rsp
302302
var outItems [][]byte
303303

304-
if item != "" {
305-
outItems = append(outItems, []byte(item))
304+
if item != nil {
305+
outItems = append(outItems, item)
306306

307307
if !dsEmpty {
308308
outItems, err = q.readDatastore(ctx, n-len(outItems), outItems)
@@ -314,12 +314,12 @@ func (q *DSQueue) worker(ctx context.Context, bufferSize, dedupCacheSize int, id
314314
}
315315
}
316316

317-
item = ""
317+
item = nil
318318
idle = false
319319
}
320320
if len(outItems) < n {
321321
for itm := range inBuf.IterPopFront() {
322-
outItems = append(outItems, []byte(itm))
322+
outItems = append(outItems, itm)
323323
if len(outItems) == n {
324324
break
325325
}
@@ -329,8 +329,8 @@ func (q *DSQueue) worker(ctx context.Context, bufferSize, dedupCacheSize int, id
329329
items: outItems,
330330
}
331331

332-
case dequeue <- []byte(item):
333-
item = ""
332+
case dequeue <- item:
333+
item = nil
334334
idle = false
335335

336336
case <-batchTimer.C:
@@ -339,7 +339,7 @@ func (q *DSQueue) worker(ctx context.Context, bufferSize, dedupCacheSize int, id
339339
commit = true
340340
} else {
341341
if inBuf.Cap() > baseCap {
342-
inBuf = deque.Deque[string]{}
342+
inBuf = deque.Deque[[]byte]{}
343343
inBuf.SetBaseCap(baseCap)
344344
}
345345
}
@@ -352,10 +352,10 @@ func (q *DSQueue) worker(ctx context.Context, bufferSize, dedupCacheSize int, id
352352

353353
case rsp := <-q.clear:
354354
var rmMemCount int
355-
if item != "" {
355+
if item != nil {
356356
rmMemCount = 1
357357
}
358-
item = ""
358+
item = nil
359359
k = datastore.Key{}
360360
idle = false
361361
rmMemCount += inBuf.Len()
@@ -452,7 +452,7 @@ func (q *DSQueue) getQueueHead(ctx context.Context) (*query.Entry, error) {
452452
return &r.Entry, r.Error
453453
}
454454

455-
func (q *DSQueue) commitInput(ctx context.Context, counter uint64, items *deque.Deque[string]) error {
455+
func (q *DSQueue) commitInput(ctx context.Context, counter uint64, items *deque.Deque[[]byte]) error {
456456
if ctx.Err() != nil {
457457
return ctx.Err()
458458
}
@@ -527,12 +527,12 @@ func (q *DSQueue) readDatastore(ctx context.Context, n int, items [][]byte) ([][
527527
log.Errorw("malformed queued item, removing it from queue", "err", err, "key", result.Key, "qname", q.name)
528528
continue
529529
}
530-
itemBin, err := base64.RawURLEncoding.DecodeString(parts[1])
530+
item, err := base64.RawURLEncoding.DecodeString(parts[1])
531531
if err != nil {
532532
log.Errorw("error decoding queued item, removing it from queue", "err", err, "key", result.Key, "qname", q.name)
533533
continue
534534
}
535-
items = append(items, itemBin)
535+
items = append(items, item)
536536
}
537537

538538
if err = batch.Commit(ctx); err != nil {

dsqueue_test.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -403,3 +403,27 @@ func (sds *slowds) Put(ctx context.Context, key datastore.Key, value []byte) err
403403
}
404404
return sds.Batching.Put(ctx, key, value)
405405
}
406+
407+
func BenchmarkGetN(b *testing.B) {
408+
ds := sync.MutexWrap(datastore.NewMapDatastore())
409+
queue := dsqueue.New(ds, dsqName, dsqueue.WithDedupCacheSize(0))
410+
defer queue.Close()
411+
412+
cids := random.Cids(25)
413+
414+
b.ResetTimer()
415+
b.ReportAllocs()
416+
for i := 0; i < b.N; i++ {
417+
for _, c := range cids {
418+
queue.Put(c.Bytes())
419+
}
420+
421+
outItems, err := queue.GetN(50)
422+
if err != nil {
423+
b.Fatal(err)
424+
}
425+
if len(outItems) != len(cids) {
426+
b.Fatalf("dequeued wrond number of items, expected %d, got %d", len(cids), len(outItems))
427+
}
428+
}
429+
}

0 commit comments

Comments
 (0)