@@ -8,10 +8,12 @@ package v3
88import (
99 "bytes"
1010 "crypto/sha256"
11+ "encoding/json"
1112 "errors"
1213 "fmt"
1314 "io"
1415 "net/http"
16+ "strings"
1517 "sync"
1618
1719 "github.com/go-logr/logr"
@@ -30,13 +32,34 @@ import (
3032// the original, which a GET then shows is complete.
3133type replayRoundTripper struct {
3234 inner http.RoundTripper
33- gets map [string ]* http. Response
34- puts map [string ]* http. Response
35+ gets map [string ]* replayResponse
36+ puts map [string ]* replayResponse
3537 log logr.Logger
3638 padlock sync.Mutex
3739 redactor * vcr.Redactor
3840}
3941
42+ type replayResponse struct {
43+ response * http.Response
44+ remainingReplays int
45+ }
46+
47+ const (
48+ // Timing variations during test replay may result in a resource being reconciled an additional time - we don't
49+ // want to fail the test in this situation because we've already successfully achieved our goal state.
50+ // Thus we allow the last PUT for each resource to be replayed one extra time.
51+ maxPutReplays = 1 // Maximum number of times to replay a PUT request
52+
53+ // GET requests may be replayed multiple times to allow multiple reconciles to observe the same stable final state.
54+ // We set this to accommodate timing variations during test replay, while avoiding unbounded replays as they might
55+ // result in a test getting stuck and continuing to run until the entire test suite times out.
56+ //
57+ // Currently we need to set this to an apparently comical limit because some of our tests requiring many many
58+ // repetitions to work due to changes in timing used during test replay. Once we've addressed other issues causing
59+ // test instability, we should be able to reduce this limit significantly.
60+ maxGetReplays = 1000 // Maximum number of times to replay a GET request (effectively unlimited for now)
61+ )
62+
4063var _ http.RoundTripper = & replayRoundTripper {}
4164
4265// newReplayRoundTripper creates a new replayRoundTripper that will replay selected requests to improve test resilience.
@@ -47,8 +70,8 @@ func NewReplayRoundTripper(
4770) http.RoundTripper {
4871 return & replayRoundTripper {
4972 inner : inner ,
50- gets : make (map [string ]* http. Response ),
51- puts : make (map [string ]* http. Response ),
73+ gets : make (map [string ]* replayResponse ),
74+ puts : make (map [string ]* replayResponse ),
5275 log : log ,
5376 redactor : redactor ,
5477 }
@@ -69,6 +92,8 @@ func (replayer *replayRoundTripper) RoundTrip(request *http.Request) (*http.Resp
6992}
7093
7194func (replayer * replayRoundTripper ) roundTripGet (request * http.Request ) (* http.Response , error ) {
95+ requestURL := request .URL .RequestURI ()
96+
7297 // First use our inner round tripper to get the response.
7398 response , err := replayer .inner .RoundTrip (request )
7499 if err != nil {
@@ -81,20 +106,37 @@ func (replayer *replayRoundTripper) roundTripGet(request *http.Request) (*http.R
81106 defer replayer .padlock .Unlock ()
82107
83108 // We didn't find an interaction, see if we have a cached response to return
84- if cachedResponse , ok := replayer .gets [request .URL .String ()]; ok {
85- replayer .log .Info ("Replaying GET request" , "url" , request .URL .String ())
86- return cachedResponse , nil
109+ if cachedResponse , ok := replayer .gets [requestURL ]; ok {
110+ if cachedResponse .remainingReplays > 0 {
111+ cachedResponse .remainingReplays --
112+ replayer .log .Info ("Replaying GET request" , "url" , requestURL )
113+ return cachedResponse .response , nil
114+ }
115+
116+ // It's expired, remove it from the cache to ensure we don't replay it again
117+ delete (replayer .gets , requestURL )
87118 }
88119
89120 // No cached response, return the original response and error
90121 return response , err
91122 }
92123
93- replayer .padlock .Lock ()
94- defer replayer .padlock .Unlock ()
124+ // We have a response; if it has a status, cache only if that represents a terminal state
125+ cacheable := true
126+ if state , ok := replayer .resourceStateFromBody (response ); ok {
127+ cacheable = replayer .isTerminalProvisioningState (state )
128+ }
129+ if status , ok := replayer .operationStatusFromBody (response ); ok {
130+ cacheable = replayer .isTerminalOperationStatus (status )
131+ }
132+
133+ if cacheable {
134+ replayer .padlock .Lock ()
135+ defer replayer .padlock .Unlock ()
136+
137+ replayer .gets [requestURL ] = newReplayResponse (response , maxGetReplays )
138+ }
95139
96- // We have a response, cache it and return it
97- replayer .gets [request .URL .String ()] = response
98140 return response , nil
99141}
100142
@@ -115,10 +157,15 @@ func (replayer *replayRoundTripper) roundTripPut(request *http.Request) (*http.R
115157
116158 // We didn't find an interaction, see if we have a cached response to return
117159 if cachedResponse , ok := replayer .puts [hash ]; ok {
118- // Remove it from the cache to ensure we only replay it once
160+ if cachedResponse .remainingReplays > 0 {
161+ replayer .log .Info ("Replaying PUT request" , "url" , request .URL .String (), "hash" , hash )
162+ cachedResponse .remainingReplays --
163+ return cachedResponse .response , nil
164+ }
165+
166+ // It's expired, remove it from the cache to ensure we don't replay it again
119167 delete (replayer .puts , hash )
120- replayer .log .Info ("Replaying PUT request" , "url" , request .URL .String (), "hash" , hash )
121- return cachedResponse , nil
168+
122169 }
123170
124171 // No cached response, return the original response and error
@@ -129,7 +176,7 @@ func (replayer *replayRoundTripper) roundTripPut(request *http.Request) (*http.R
129176 defer replayer .padlock .Unlock ()
130177
131178 // We have a response, cache it and return it
132- replayer .puts [hash ] = response
179+ replayer .puts [hash ] = newReplayResponse ( response , maxPutReplays )
133180 return response , nil
134181}
135182
@@ -155,3 +202,77 @@ func (replayer *replayRoundTripper) hashOfBody(request *http.Request) string {
155202
156203 return fmt .Sprintf ("%x" , hash )
157204}
205+
206+ type operation struct {
207+ Status string `json:"status"`
208+ }
209+
210+ // operationStatusFromBody extracts the operation status from the response body, if present.
211+ func (replayer * replayRoundTripper ) operationStatusFromBody (response * http.Response ) (string , bool ) {
212+ body := replayer .bodyOfResponse (response )
213+
214+ var op operation
215+ if err := json .Unmarshal ([]byte (body ), & op ); err == nil {
216+ if op .Status != "" {
217+ return op .Status , true
218+ }
219+ }
220+
221+ return "" , false
222+ }
223+
224+ // isTerminalOperationStatus returns true if the specified status represents a terminal state for a long-running operation
225+ func (replayer * replayRoundTripper ) isTerminalOperationStatus (status string ) bool {
226+ return ! strings .EqualFold (status , "InProgress" )
227+ }
228+
229+ type resource struct {
230+ Properties struct {
231+ ProvisioningState string `json:"provisioningState"`
232+ } `json:"properties"`
233+ }
234+
235+ // resourceStateFromBody extracts the provisioning state from the response body, if present.
236+ func (replayer * replayRoundTripper ) resourceStateFromBody (response * http.Response ) (string , bool ) {
237+ body := replayer .bodyOfResponse (response )
238+
239+ // Treat the body as an operation and deserialize it
240+
241+ // Treat the body as a resource and deserialize it
242+ var res resource
243+ if err := json .Unmarshal ([]byte (body ), & res ); err == nil {
244+ if res .Properties .ProvisioningState != "" {
245+ return res .Properties .ProvisioningState , true
246+ }
247+ }
248+
249+ return "" , false
250+ }
251+
252+ // isTerminalStatus returns true if the specified status represents a terminal state a resource
253+ func (* replayRoundTripper ) isTerminalProvisioningState (status string ) bool {
254+ return strings .EqualFold (status , "Succeeded" ) ||
255+ strings .EqualFold (status , "Failed" ) ||
256+ strings .EqualFold (status , "Canceled" )
257+ }
258+
259+ func (replayer * replayRoundTripper ) bodyOfResponse (response * http.Response ) string {
260+ // Read all the content of the response body
261+ var body bytes.Buffer
262+ _ , err := body .ReadFrom (response .Body )
263+ if err != nil {
264+ // Should never fail
265+ panic (fmt .Sprintf ("reading response.Body failed: %s" , err ))
266+ }
267+
268+ // Reset the body so it can be read again
269+ response .Body = io .NopCloser (& body )
270+ return body .String ()
271+ }
272+
273+ func newReplayResponse (resp * http.Response , maxReplays int ) * replayResponse {
274+ return & replayResponse {
275+ response : resp ,
276+ remainingReplays : maxReplays ,
277+ }
278+ }
0 commit comments