77 "errors"
88 "fmt"
99 "log"
10+ "net/http"
1011 "regexp"
1112 "strings"
1213 "time"
@@ -501,3 +502,55 @@ func (p *ClientProvider) UpdateDocumentByQuery(index, query, fields string) ([]b
501502
502503 return resBytes , nil
503504}
505+
506+ // ReadWithScroll scrolls through the pages of size given in the query and adds up the scrollID in the result
507+ // Which is expected in the subsequent function call to get the next page, empty result indicates the end of the page
508+ func (p * ClientProvider ) ReadWithScroll (index string , query map [string ]interface {}, result interface {}, scrollID string ) (err error ) {
509+ var res * esapi.Response
510+ defer func () {
511+ if err := res .Body .Close (); err != nil {
512+ log .Printf ("Err: %s" , err .Error ())
513+ }
514+ }()
515+
516+ if scrollID == "" {
517+ var buf bytes.Buffer
518+ err = json .NewEncoder (& buf ).Encode (query )
519+ if err != nil {
520+ return err
521+ }
522+
523+ res , err = p .client .Search (
524+ p .client .Search .WithIndex (index ),
525+ p .client .Search .WithBody (& buf ),
526+ p .client .Search .WithScroll (time .Minute ),
527+ )
528+ } else {
529+ res , err = p .client .Scroll (p .client .Scroll .WithScrollID (scrollID ), p .client .Scroll .WithScroll (time .Minute ))
530+ }
531+ if err != nil {
532+ return err
533+ }
534+ if res .StatusCode == http .StatusOK {
535+ if err = json .NewDecoder (res .Body ).Decode (result ); err != nil {
536+ return err
537+ }
538+
539+ return nil
540+ }
541+ if res .IsError () {
542+ if res .StatusCode == http .StatusNotFound {
543+ // index doesn't exist
544+ return errors .New ("index doesn't exist" )
545+ }
546+
547+ var e map [string ]interface {}
548+ if err = json .NewDecoder (res .Body ).Decode (& e ); err != nil {
549+ return err
550+ }
551+
552+ err = fmt .Errorf ("[%s] %s: %s" , res .Status (), e ["error" ].(map [string ]interface {})["type" ], e ["error" ].(map [string ]interface {})["reason" ])
553+ return err
554+ }
555+ return nil
556+ }
0 commit comments