@@ -16,8 +16,8 @@ import (
1616 "time"
1717
1818 "golang.org/x/exp/slices"
19+ "golang.org/x/sync/singleflight"
1920
20- "github.com/hashicorp/go-multierror"
2121 "github.com/hashicorp/go-retryablehttp"
2222 "github.com/kelseyhightower/envconfig"
2323 "github.com/rs/zerolog"
@@ -105,23 +105,28 @@ func NewClient(opts ...Option) (Client, error) {
105105// Default values: 26 seconds
106106// Changed values: 67 seconds
107107type aivenClient struct {
108- Host string `envconfig:"AIVEN_WEB_URL" default:"https://api.aiven.io"`
109- UserAgent string `envconfig:"AIVEN_USER_AGENT" default:"aiven-go-client/v3"`
110- Token string `envconfig:"AIVEN_TOKEN"`
111- Debug bool `envconfig:"AIVEN_DEBUG"`
112- RetryMax int `envconfig:"AIVEN_CLIENT_RETRY_MAX" default:"6"`
113- RetryWaitMin time.Duration `envconfig:"AIVEN_CLIENT_RETRY_WAIT_MIN" default:"2s"`
114- RetryWaitMax time.Duration `envconfig:"AIVEN_CLIENT_RETRY_WAIT_MAX" default:"15s"`
115- logger zerolog.Logger
116- doer Doer
108+ Host string `envconfig:"AIVEN_WEB_URL" default:"https://api.aiven.io"`
109+ UserAgent string `envconfig:"AIVEN_USER_AGENT" default:"aiven-go-client/v3"`
110+ Token string `envconfig:"AIVEN_TOKEN"`
111+ Debug bool `envconfig:"AIVEN_DEBUG"`
112+ RetryMax int `envconfig:"AIVEN_CLIENT_RETRY_MAX" default:"6"`
113+ RetryWaitMin time.Duration `envconfig:"AIVEN_CLIENT_RETRY_WAIT_MIN" default:"2s"`
114+ RetryWaitMax time.Duration `envconfig:"AIVEN_CLIENT_RETRY_WAIT_MAX" default:"15s"`
115+ EnableSingleFlight bool `envconfig:"AIVEN_CLIENT_ENABLE_SINGLE_FLIGHT" default:"true"`
116+ logger zerolog.Logger
117+ doer Doer
118+ singleflight singleflight.Group
117119}
118120
119121// OperationIDKey is the key used to store the operation ID in the context.
120122type OperationIDKey struct {}
121123
122124func (d * aivenClient ) Do (ctx context.Context , operationID , method , path string , in any , query ... [2 ]string ) ([]byte , error ) {
123125 ctx = context .WithValue (ctx , OperationIDKey {}, operationID )
124- var rsp * http.Response
126+ queryString := fmtQuery (operationID , query ... )
127+
128+ var statusCode int
129+ var shared bool
125130 var err error
126131
127132 if d .Debug {
@@ -133,54 +138,72 @@ func (d *aivenClient) Do(ctx context.Context, operationID, method, path string,
133138 if err != nil {
134139 event = d .logger .Error ().Err (err )
135140 } else {
136- event = d .logger .Info (). Str ( "status" , rsp . Status )
141+ event = d .logger .Info ()
137142 }
138143
139144 event .Ctx (ctx ).
140145 Stringer ("duration" , end ).
141146 Str ("operationID" , operationID ).
142147 Str ("method" , method ).
143148 Str ("path" , path ).
144- Str ("query" , fmtQuery (operationID , query ... )).
149+ Str ("query" , queryString ).
150+ Int ("status_code" , statusCode ).
151+ Bool ("shared" , shared ).
145152 Send ()
146153 }()
147154 }
148155
149- rsp , err = d .do (ctx , operationID , method , path , in , query ... )
156+ var body []byte
157+ if d .EnableSingleFlight && (method == http .MethodGet || method == http .MethodHead || method == http .MethodOptions || method == http .MethodTrace ) {
158+ type result struct {
159+ statusCode int
160+ body []byte
161+ }
162+ key := strings .Join ([]string {method , d .Host , path , queryString }, "|" )
163+ v , serr , sh := d .singleflight .Do (key , func () (any , error ) {
164+ statusCode , body , err := d .do (ctx , method , path , in , queryString )
165+ return result {statusCode : statusCode , body : body }, err
166+ })
167+ res := v .(result )
168+ statusCode , body , err , shared = res .statusCode , res .body , serr , sh
169+ } else {
170+ statusCode , body , err = d .do (ctx , method , path , in , queryString )
171+ }
150172 if err != nil {
151173 return nil , err
152174 }
153175
154- defer func () {
155- err = multierror .Append (rsp .Body .Close ()).ErrorOrNil ()
156- }()
157-
158- return fromResponse (operationID , rsp )
176+ return fromBytes (operationID , statusCode , body )
159177}
160178
161- func (d * aivenClient ) do (ctx context.Context , operationID , method , path string , in any , query ... [ 2 ] string ) (* http. Response , error ) {
179+ func (d * aivenClient ) do (ctx context.Context , method , path string , in any , queryString string ) (int , [] byte , error ) {
162180 var body io.Reader
163181
164182 if ! (in == nil || isEmpty (in )) {
165183 b , err := json .Marshal (in )
166184 if err != nil {
167- return nil , err
185+ return 0 , nil , err
168186 }
169187
170188 body = bytes .NewBuffer (b )
171189 }
172190
173191 req , err := http .NewRequestWithContext (ctx , method , d .Host + path , body )
174192 if err != nil {
175- return nil , err
193+ return 0 , nil , err
176194 }
177195
178196 req .Header .Set ("Content-Type" , "application/json" )
179197 req .Header .Set ("User-Agent" , d .UserAgent )
180198 req .Header .Set ("Authorization" , "aivenv1 " + d .Token )
181- req .URL .RawQuery = fmtQuery ( operationID , query ... )
199+ req .URL .RawQuery = queryString
182200
183- return d .doer .Do (req )
201+ rsp , err := d .doer .Do (req )
202+ if err != nil {
203+ return 0 , nil , err
204+ }
205+ respBody , err := io .ReadAll (rsp .Body )
206+ return rsp .StatusCode , respBody , errors .Join (err , rsp .Body .Close ())
184207}
185208
186209func isEmpty (a any ) bool {
0 commit comments