Skip to content

Commit ef90d56

Browse files
committed
Merge branch 'master' into composite-get
2 parents 2e86894 + 6eb9ed4 commit ef90d56

File tree

3 files changed

+73
-10
lines changed

3 files changed

+73
-10
lines changed

query.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ type Query struct {
4545

4646
err error
4747
cc *ConsumedCapacity
48+
sm *ScanMetrics
4849
}
4950

5051
var (
@@ -245,6 +246,12 @@ func (q *Query) ConsumedCapacity(cc *ConsumedCapacity) *Query {
245246
return q
246247
}
247248

249+
// ScanMetrics will measure the number of items scanned and returned by this operation and add it to sm.
250+
func (q *Query) ScanMetrics(sm *ScanMetrics) *Query {
251+
q.sm = sm
252+
return q
253+
}
254+
248255
// One executes this query and retrieves a single result,
249256
// unmarshaling the result to out.
250257
// This uses the DynamoDB GetItem API when possible, otherwise Query.
@@ -433,6 +440,7 @@ func (itr *queryIter) Next(ctx context.Context, out interface{}) bool {
433440
return false
434441
}
435442
itr.query.cc.add(itr.output.ConsumedCapacity)
443+
itr.query.sm.add(itr.output.ScannedCount, itr.output.Count)
436444
if len(itr.output.LastEvaluatedKey) > len(itr.exLEK) {
437445
itr.exLEK = itr.output.LastEvaluatedKey
438446
}

scan.go

Lines changed: 38 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ type Scan struct {
3434

3535
err error
3636
cc *ConsumedCapacity
37+
sm *ScanMetrics
3738
}
3839

3940
// Scan creates a new request to scan this table.
@@ -73,13 +74,17 @@ func (s *Scan) Segment(segment int, totalSegments int) *Scan {
7374
func (s *Scan) newSegments(segments int, leks []PagingKey) []*scanIter {
7475
iters := make([]*scanIter, segments)
7576
lekLen := len(leks)
76-
for i := int(0); i < segments; i++ {
77+
for i := 0; i < segments; i++ {
7778
seg := *s
7879
var cc *ConsumedCapacity
7980
if s.cc != nil {
8081
cc = new(ConsumedCapacity)
8182
}
82-
seg.Segment(i, segments).ConsumedCapacity(cc)
83+
var sm *ScanMetrics
84+
if s.sm != nil {
85+
sm = new(ScanMetrics)
86+
}
87+
seg.Segment(i, segments).ConsumedCapacity(cc).ScanMetrics(sm)
8388
if i < lekLen {
8489
lek := leks[i]
8590
if lek == nil {
@@ -150,6 +155,12 @@ func (s *Scan) ConsumedCapacity(cc *ConsumedCapacity) *Scan {
150155
return s
151156
}
152157

158+
// ScanMetrics will measure the number of items scanned and returned by this operation and add it to sm.
159+
func (s *Scan) ScanMetrics(sm *ScanMetrics) *Scan {
160+
s.sm = sm
161+
return s
162+
}
163+
153164
// Iter returns a results iterator for this request.
154165
func (s *Scan) Iter() PagingIter {
155166
return &scanIter{
@@ -163,16 +174,16 @@ func (s *Scan) Iter() PagingIter {
163174
// Canceling the context given here will cancel the processing of all segments.
164175
func (s *Scan) IterParallel(ctx context.Context, segments int) ParallelIter {
165176
iters := s.newSegments(segments, nil)
166-
ps := newParallelScan(iters, s.cc, false, unmarshalItem)
177+
ps := newParallelScan(iters, s.cc, s.sm, false, unmarshalItem)
167178
go ps.run(ctx)
168179
return ps
169180
}
170181

171-
// IterParallelFrom returns a results iterator continued from a previous ParallelIter's LastEvaluatedKeys.
182+
// IterParallelStartFrom returns a results iterator continued from a previous ParallelIter's LastEvaluatedKeys.
172183
// Canceling the context given here will cancel the processing of all segments.
173184
func (s *Scan) IterParallelStartFrom(ctx context.Context, keys []PagingKey) ParallelIter {
174185
iters := s.newSegments(len(keys), keys)
175-
ps := newParallelScan(iters, s.cc, false, unmarshalItem)
186+
ps := newParallelScan(iters, s.cc, s.sm, false, unmarshalItem)
176187
go ps.run(ctx)
177188
return ps
178189
}
@@ -206,7 +217,7 @@ func (s *Scan) AllWithLastEvaluatedKey(ctx context.Context, out interface{}) (Pa
206217
// AllParallel executes this request by running the given number of segments in parallel, then unmarshaling all results to out, which must be a pointer to a slice.
207218
func (s *Scan) AllParallel(ctx context.Context, segments int, out interface{}) error {
208219
iters := s.newSegments(segments, nil)
209-
ps := newParallelScan(iters, s.cc, true, unmarshalAppendTo(out))
220+
ps := newParallelScan(iters, s.cc, s.sm, true, unmarshalAppendTo(out))
210221
go ps.run(ctx)
211222
for ps.Next(ctx, out) {
212223
}
@@ -217,7 +228,7 @@ func (s *Scan) AllParallel(ctx context.Context, segments int, out interface{}) e
217228
// Returns a slice of LastEvalutedKeys that can be used to continue the query later.
218229
func (s *Scan) AllParallelWithLastEvaluatedKeys(ctx context.Context, segments int, out interface{}) ([]PagingKey, error) {
219230
iters := s.newSegments(segments, nil)
220-
ps := newParallelScan(iters, s.cc, false, unmarshalAppendTo(out))
231+
ps := newParallelScan(iters, s.cc, s.sm, false, unmarshalAppendTo(out))
221232
go ps.run(ctx)
222233
for ps.Next(ctx, out) {
223234
}
@@ -229,7 +240,7 @@ func (s *Scan) AllParallelWithLastEvaluatedKeys(ctx context.Context, segments in
229240
// Returns a new slice of LastEvaluatedKeys after the scan finishes.
230241
func (s *Scan) AllParallelStartFrom(ctx context.Context, keys []PagingKey, out interface{}) ([]PagingKey, error) {
231242
iters := s.newSegments(len(keys), keys)
232-
ps := newParallelScan(iters, s.cc, false, unmarshalAppendTo(out))
243+
ps := newParallelScan(iters, s.cc, s.sm, false, unmarshalAppendTo(out))
233244
go ps.run(ctx)
234245
for ps.Next(ctx, out) {
235246
}
@@ -405,6 +416,7 @@ redo:
405416
return false
406417
}
407418
itr.scan.cc.add(itr.output.ConsumedCapacity)
419+
itr.scan.sm.add(itr.output.ScannedCount, itr.output.Count)
408420
if len(itr.output.LastEvaluatedKey) > len(itr.exLEK) {
409421
itr.exLEK = itr.output.LastEvaluatedKey
410422
}
@@ -472,17 +484,19 @@ type parallelScan struct {
472484
lekErr error
473485

474486
cc *ConsumedCapacity
487+
sm *ScanMetrics
475488
err error
476489
mu *sync.Mutex
477490

478491
unmarshal unmarshalFunc
479492
}
480493

481-
func newParallelScan(iters []*scanIter, cc *ConsumedCapacity, skipLEK bool, unmarshal unmarshalFunc) *parallelScan {
494+
func newParallelScan(iters []*scanIter, cc *ConsumedCapacity, sm *ScanMetrics, skipLEK bool, unmarshal unmarshalFunc) *parallelScan {
482495
ps := &parallelScan{
483496
iters: iters,
484497
items: make(chan Item),
485498
cc: cc,
499+
sm: sm,
486500
mu: new(sync.Mutex),
487501
unmarshal: unmarshal,
488502
}
@@ -521,9 +535,23 @@ func (ps *parallelScan) run(ctx context.Context) {
521535
}
522536
}
523537

524-
if ps.cc != nil && iter.scan.cc != nil {
538+
shouldUpdateCC := ps.cc != nil && iter.scan.cc != nil
539+
shouldUpdateSM := ps.sm != nil && iter.scan.sm != nil
540+
// If we need to do both, let's hold the mutex for both updates, then skip
541+
// Otherwise, grab the mutex and update whichever one is necessary
542+
if shouldUpdateCC && shouldUpdateSM {
525543
ps.mu.Lock()
526544
mergeConsumedCapacity(ps.cc, iter.scan.cc)
545+
mergeScanMetrics(ps.sm, iter.scan.sm)
546+
ps.mu.Unlock()
547+
} else if shouldUpdateSM || shouldUpdateCC {
548+
ps.mu.Lock()
549+
switch {
550+
case shouldUpdateCC:
551+
mergeConsumedCapacity(ps.cc, iter.scan.cc)
552+
case shouldUpdateSM:
553+
mergeScanMetrics(ps.sm, iter.scan.sm)
554+
}
527555
ps.mu.Unlock()
528556
}
529557

table.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -378,3 +378,30 @@ func mergeConsumedCapacity(dst, src *ConsumedCapacity) {
378378
}
379379
dst.Requests += src.Requests
380380
}
381+
382+
// ScanMetrics represents the metrics returned by a Scan or Query operation.
383+
// Use this to track the number of items scanned and returned across paginated requests.
384+
type ScanMetrics struct {
385+
// Scanned is the number of items examined before any filter expression was applied.
386+
// This corresponds to DynamoDB's ScannedCount.
387+
Scanned int64
388+
// Count is the number of items returned after filtering.
389+
// This corresponds to DynamoDB's Count.
390+
Count int64
391+
}
392+
393+
func (sm *ScanMetrics) add(scanned, count int32) {
394+
if sm == nil {
395+
return
396+
}
397+
sm.Scanned += int64(scanned)
398+
sm.Count += int64(count)
399+
}
400+
401+
func mergeScanMetrics(dst, src *ScanMetrics) {
402+
if dst == nil || src == nil {
403+
return
404+
}
405+
dst.Scanned += src.Scanned
406+
dst.Count += src.Count
407+
}

0 commit comments

Comments
 (0)