@@ -12,6 +12,7 @@ import (
1212
1313 "github.com/go-kit/kit/log/level"
1414 ot "github.com/opentracing/opentracing-go"
15+ otlog "github.com/opentracing/opentracing-go/log"
1516 "golang.org/x/time/rate"
1617
1718 "github.com/aws/aws-sdk-go/aws"
@@ -21,6 +22,7 @@ import (
2122 "github.com/aws/aws-sdk-go/aws/session"
2223 "github.com/aws/aws-sdk-go/service/dynamodb"
2324 "github.com/aws/aws-sdk-go/service/dynamodb/dynamodbiface"
25+ "github.com/pkg/errors"
2426 "github.com/prometheus/client_golang/prometheus"
2527 awscommon "github.com/weaveworks/common/aws"
2628 "github.com/weaveworks/common/instrument"
@@ -144,7 +146,6 @@ type dynamoDBStorageClient struct {
144146
145147 // These functions exists for mocking, so we don't have to write a whole load
146148 // of boilerplate.
147- queryRequestFn func (ctx context.Context , input * dynamodb.QueryInput ) dynamoDBRequest
148149 batchGetItemRequestFn func (ctx context.Context , input * dynamodb.BatchGetItemInput ) dynamoDBRequest
149150 batchWriteItemRequestFn func (ctx context.Context , input * dynamodb.BatchWriteItemInput ) dynamoDBRequest
150151}
@@ -172,7 +173,6 @@ func newDynamoDBStorageClient(cfg DynamoDBConfig, schemaCfg chunk.SchemaConfig)
172173 DynamoDB : dynamoDB ,
173174 writeThrottle : rate .NewLimiter (rate .Limit (cfg .ThrottleLimit ), dynamoDBMaxWriteBatchSize ),
174175 }
175- client .queryRequestFn = client .queryRequest
176176 client .batchGetItemRequestFn = client .batchGetItemRequest
177177 client .batchWriteItemRequestFn = client .batchWriteItemRequest
178178 return client , nil
@@ -327,88 +327,44 @@ func (a dynamoDBStorageClient) query(ctx context.Context, query chunk.IndexQuery
327327 }
328328 }
329329
330- request := a .queryRequestFn (ctx , input )
331330 pageCount := 0
332331 defer func () {
333332 dynamoQueryPagesCount .Observe (float64 (pageCount ))
334333 }()
335334
336- for page := request ; page != nil ; page = page .NextPage () {
337- pageCount ++
338-
339- response , err := a .queryPage (ctx , input , page , query .HashValue , pageCount )
340- if err != nil {
341- return err
342- }
343-
344- if ! callback (query , response ) {
345- if err != nil {
346- return fmt .Errorf ("QueryPages error: table=%v, err=%v" , * input .TableName , page .Error ())
347- }
348- return nil
349- }
350- if ! page .HasNextPage () {
351- return nil
335+ retryer := newRetryer (ctx , a .cfg .backoffConfig )
336+ err := instrument .CollectedRequest (ctx , "DynamoDB.QueryPages" , dynamoRequestDuration , instrument .ErrorCode , func (innerCtx context.Context ) error {
337+ if sp := ot .SpanFromContext (innerCtx ); sp != nil {
338+ sp .SetTag ("tableName" , query .TableName )
339+ sp .SetTag ("hashValue" , query .HashValue )
352340 }
353- }
354- return nil
355- }
356-
357- func (a dynamoDBStorageClient ) queryPage (ctx context.Context , input * dynamodb.QueryInput , page dynamoDBRequest , hashValue string , pageCount int ) (* dynamoDBReadResponse , error ) {
358- backoff := util .NewBackoff (ctx , a .cfg .backoffConfig )
359-
360- var err error
361- for backoff .Ongoing () {
362- err = instrument .CollectedRequest (ctx , "DynamoDB.QueryPages" , dynamoRequestDuration , instrument .ErrorCode , func (innerCtx context.Context ) error {
341+ return a .DynamoDB .QueryPagesWithContext (innerCtx , input , func (output * dynamodb.QueryOutput , _ bool ) bool {
342+ pageCount ++
363343 if sp := ot .SpanFromContext (innerCtx ); sp != nil {
364- sp .SetTag ("tableName" , aws .StringValue (input .TableName ))
365- sp .SetTag ("hashValue" , hashValue )
366- sp .SetTag ("page" , pageCount )
367- sp .SetTag ("retry" , backoff .NumRetries ())
344+ sp .LogFields (otlog .Int ("page" , pageCount ))
368345 }
369- return page .Send ()
370- })
371-
372- if cc := page .Data ().(* dynamodb.QueryOutput ).ConsumedCapacity ; cc != nil {
373- dynamoConsumedCapacity .WithLabelValues ("DynamoDB.QueryPages" , * cc .TableName ).
374- Add (float64 (* cc .CapacityUnits ))
375- }
376346
377- if err != nil {
378- recordDynamoError (* input .TableName , err , "DynamoDB.QueryPages" )
379- if awsErr , ok := err .(awserr.Error ); ok && ((awsErr .Code () == dynamodb .ErrCodeProvisionedThroughputExceededException ) || page .Retryable ()) {
380- if awsErr .Code () != dynamodb .ErrCodeProvisionedThroughputExceededException {
381- level .Warn (util .Logger ).Log ("msg" , "DynamoDB error" , "retry" , backoff .NumRetries (), "table" , * input .TableName , "err" , err )
382- }
383- backoff .Wait ()
384- continue
347+ if cc := output .ConsumedCapacity ; cc != nil {
348+ dynamoConsumedCapacity .WithLabelValues ("DynamoDB.QueryPages" , * cc .TableName ).
349+ Add (float64 (* cc .CapacityUnits ))
385350 }
386- return nil , fmt .Errorf ("QueryPage error: table=%v, err=%v" , * input .TableName , err )
387- }
388351
389- queryOutput := page .Data ().(* dynamodb.QueryOutput )
390- return & dynamoDBReadResponse {
391- items : queryOutput .Items ,
392- }, nil
352+ return callback (query , & dynamoDBReadResponse {items : output .Items })
353+ }, retryer .withRetries , withErrorHandler (query .TableName , "DynamoDB.QueryPages" ))
354+ })
355+ if err != nil {
356+ return errors .Wrapf (err , "QueryPages error: table=%v" , query .TableName )
393357 }
394- return nil , fmt . Errorf ( "QueryPage error: %s for table %v, last error %v" , backoff . Err (), * input . TableName , err )
358+ return err
395359}
396360
397361type dynamoDBRequest interface {
398- NextPage () dynamoDBRequest
399362 Send () error
400363 Data () interface {}
401364 Error () error
402- HasNextPage () bool
403365 Retryable () bool
404366}
405367
406- func (a dynamoDBStorageClient ) queryRequest (ctx context.Context , input * dynamodb.QueryInput ) dynamoDBRequest {
407- req , _ := a .DynamoDB .QueryRequest (input )
408- req .SetContext (ctx )
409- return dynamoDBRequestAdapter {req }
410- }
411-
412368func (a dynamoDBStorageClient ) batchGetItemRequest (ctx context.Context , input * dynamodb.BatchGetItemInput ) dynamoDBRequest {
413369 req , _ := a .DynamoDB .BatchGetItemRequest (input )
414370 req .SetContext (ctx )
@@ -425,33 +381,18 @@ type dynamoDBRequestAdapter struct {
425381 request * request.Request
426382}
427383
428- func (a dynamoDBRequestAdapter ) NextPage () dynamoDBRequest {
429- next := a .request .NextPage ()
430- if next == nil {
431- return nil
432- }
433- return dynamoDBRequestAdapter {next }
434- }
435-
436384func (a dynamoDBRequestAdapter ) Data () interface {} {
437385 return a .request .Data
438386}
439387
440388func (a dynamoDBRequestAdapter ) Send () error {
441- // Clear error in case we are retrying the same operation - if we
442- // don't do this then the same error will come back again immediately
443- a .request .Error = nil
444389 return a .request .Send ()
445390}
446391
447392func (a dynamoDBRequestAdapter ) Error () error {
448393 return a .request .Error
449394}
450395
451- func (a dynamoDBRequestAdapter ) HasNextPage () bool {
452- return a .request .HasNextPage ()
453- }
454-
455396func (a dynamoDBRequestAdapter ) Retryable () bool {
456397 return aws .BoolValue (a .request .Retryable )
457398}
@@ -840,6 +781,16 @@ func (b dynamoDBReadRequest) TakeReqs(from dynamoDBReadRequest, max int) {
840781 }
841782}
842783
784+ func withErrorHandler (tableName , operation string ) func (req * request.Request ) {
785+ return func (req * request.Request ) {
786+ req .Handlers .CompleteAttempt .PushBack (func (req * request.Request ) {
787+ if req .Error != nil {
788+ recordDynamoError (tableName , req .Error , operation )
789+ }
790+ })
791+ }
792+ }
793+
843794func recordDynamoError (tableName string , err error , operation string ) {
844795 if awsErr , ok := err .(awserr.Error ); ok {
845796 dynamoFailures .WithLabelValues (tableName , awsErr .Code (), operation ).Add (float64 (1 ))
0 commit comments