Skip to content

Commit a65a51c

Browse files
committed
fix(failover): capture and restore request state
(cherry picked from commit f6146ed)
1 parent e645442 commit a65a51c

File tree

2 files changed

+76
-21
lines changed

2 files changed

+76
-21
lines changed

gravitee-apim-gateway/gravitee-apim-gateway-core/src/main/java/io/gravitee/gateway/reactive/core/failover/FailoverInvoker.java

Lines changed: 74 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,15 @@
2525
import io.github.resilience4j.circuitbreaker.CircuitBreakerRegistry;
2626
import io.github.resilience4j.rxjava3.circuitbreaker.operator.CircuitBreakerOperator;
2727
import io.gravitee.definition.model.v4.failover.Failover;
28+
import io.gravitee.gateway.api.buffer.Buffer;
29+
import io.gravitee.gateway.api.http.HttpHeaders;
2830
import io.gravitee.gateway.reactive.api.ExecutionFailure;
2931
import io.gravitee.gateway.reactive.api.context.ContextAttributes;
3032
import io.gravitee.gateway.reactive.api.context.ExecutionContext;
3133
import io.gravitee.gateway.reactive.api.context.http.HttpExecutionContext;
3234
import io.gravitee.gateway.reactive.api.invoker.HttpInvoker;
3335
import io.gravitee.gateway.reactive.api.invoker.Invoker;
36+
import io.gravitee.gateway.reactive.core.context.HttpRequestInternal;
3437
import io.gravitee.gateway.reactive.core.v4.endpoint.EndpointManager;
3538
import io.gravitee.gateway.reactive.core.v4.endpoint.ManagedEndpoint;
3639
import io.reactivex.rxjava3.core.Completable;
@@ -111,30 +114,33 @@ public Completable invoke(HttpExecutionContext ctx) {
111114
final AtomicInteger totalAttempts = new AtomicInteger(0);
112115
final AtomicReference<List<String>> endpointRotation = new AtomicReference<>();
113116
final AtomicReference<String> firstFailedEndpoint = new AtomicReference<>();
117+
final AtomicReference<RequestSnapshot> snapshotRef = new AtomicReference<>();
114118

115-
return Completable.defer(() -> {
116-
int attempt = totalAttempts.getAndIncrement();
119+
return captureRequestState(ctx, snapshotRef).andThen(
120+
Completable.defer(() -> {
121+
int attempt = totalAttempts.getAndIncrement();
117122

118-
// On retry, capture the endpoint that failed in the previous attempt
119-
if (attempt > 0) {
120-
firstFailedEndpoint.compareAndSet(null, resolveCurrentEndpointName(ctx));
121-
}
123+
// On retry, capture the endpoint that failed in the previous attempt and restore the request to its initial state
124+
if (attempt > 0) {
125+
firstFailedEndpoint.compareAndSet(null, resolveCurrentEndpointName(ctx));
126+
restoreRequestState(ctx, snapshotRef.get());
127+
}
128+
129+
// EndpointInvoker overrides the request endpoint. We need to set it back to original state to retry properly
130+
ctx.setAttribute(ATTR_REQUEST_ENDPOINT, originalEndpoint);
131+
// Entrypoint connectors skip response handling if there is an error. In the case of a retry, we need to reset the failure.
132+
ctx.removeInternalAttribute(ATTR_INTERNAL_EXECUTION_FAILURE);
133+
134+
forceNextEndpoint(ctx, attempt, endpointRotation);
122135

123-
// EndpointInvoker overrides the request endpoint. We need to set it back to original state to retry properly
124-
ctx.setAttribute(ATTR_REQUEST_ENDPOINT, originalEndpoint);
125-
// Entrypoint connectors skip response handling if there is an error. In the case of a retry, we need to reset the failure.
126-
ctx.removeInternalAttribute(ATTR_INTERNAL_EXECUTION_FAILURE);
127-
128-
forceNextEndpoint(ctx, attempt, endpointRotation);
129-
130-
// Consume body and ignore it. Consuming it with .body() method internally enables caching of chunks, which is mandatory to retry the request in case of failure.
131-
return ctx.request().body().ignoreElement().andThen(delegate.invoke(ctx)).andThen(evaluateFailureCondition(ctx));
132-
})
133-
.timeout(failoverConfiguration.getSlowCallDuration(), TimeUnit.MILLISECONDS)
134-
.retry(failoverConfiguration.getMaxRetries())
135-
.compose(CircuitBreakerOperator.of(circuitBreaker(ctx)))
136-
.onErrorResumeNext(t -> ctx.interruptWith(new ExecutionFailure(502).cause(t)))
137-
.doFinally(() -> recordFailoverMetrics(ctx, totalAttempts.get(), firstFailedEndpoint.get()));
136+
return delegate.invoke(ctx).andThen(evaluateFailureCondition(ctx));
137+
})
138+
.timeout(failoverConfiguration.getSlowCallDuration(), TimeUnit.MILLISECONDS)
139+
.retry(failoverConfiguration.getMaxRetries())
140+
.compose(CircuitBreakerOperator.of(circuitBreaker(ctx)))
141+
.onErrorResumeNext(t -> ctx.interruptWith(new ExecutionFailure(502).cause(t)))
142+
.doFinally(() -> recordFailoverMetrics(ctx, totalAttempts.get(), firstFailedEndpoint.get()))
143+
);
138144
}
139145

140146
/**
@@ -292,4 +298,51 @@ private CircuitBreaker circuitBreaker(HttpExecutionContext ctx) {
292298
return circuitBreaker;
293299
}
294300
}
301+
302+
/**
303+
* Captures the current request state (path, headers, body) so it can be restored on retry.
304+
* Calling {@code body()} also activates internal chunk caching, which is mandatory to replay the request.
305+
*/
306+
private Completable captureRequestState(HttpExecutionContext ctx, AtomicReference<RequestSnapshot> snapshotRef) {
307+
return ctx
308+
.request()
309+
.body()
310+
// Body present: capture path, headers, and body for full replay
311+
.doOnSuccess(body -> snapshotRef.compareAndSet(null, RequestSnapshot.withBody(ctx, body)))
312+
// No body (e.g. GET): capture path and headers only
313+
.doOnComplete(() -> snapshotRef.compareAndSet(null, RequestSnapshot.headersOnly(ctx)))
314+
.ignoreElement();
315+
}
316+
317+
/**
318+
* Restores the request to its initial state captured by {@link #captureRequestState}.
319+
*/
320+
private void restoreRequestState(HttpExecutionContext ctx, RequestSnapshot snapshot) {
321+
if (ctx.request() instanceof HttpRequestInternal httpRequestInternal) {
322+
httpRequestInternal.pathInfo(snapshot.pathInfo());
323+
}
324+
325+
HttpHeaders currentHeaders = ctx.request().headers();
326+
currentHeaders.clear();
327+
for (var entry : snapshot.headers()) {
328+
currentHeaders.add(entry.getKey(), entry.getValue());
329+
}
330+
331+
if (snapshot.body() != null) {
332+
ctx.request().body(snapshot.body());
333+
}
334+
}
335+
336+
@VisibleForTesting
337+
record RequestSnapshot(String pathInfo, HttpHeaders headers, Buffer body) {
338+
/** Captures request state including the body content for replay on retry. */
339+
static RequestSnapshot withBody(HttpExecutionContext ctx, Buffer body) {
340+
return new RequestSnapshot(ctx.request().pathInfo(), HttpHeaders.create(ctx.request().headers()), body);
341+
}
342+
343+
/** Captures request state when no body is present (e.g. GET requests). */
344+
static RequestSnapshot headersOnly(HttpExecutionContext ctx) {
345+
return new RequestSnapshot(ctx.request().pathInfo(), HttpHeaders.create(ctx.request().headers()), null);
346+
}
347+
}
295348
}

gravitee-apim-gateway/gravitee-apim-gateway-core/src/test/java/io/gravitee/gateway/reactive/core/failover/FailoverInvokerTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import io.gravitee.definition.model.v4.failover.Failover;
2626
import io.gravitee.el.TemplateEngine;
2727
import io.gravitee.gateway.api.buffer.Buffer;
28+
import io.gravitee.gateway.api.http.HttpHeaders;
2829
import io.gravitee.gateway.reactive.api.ExecutionFailure;
2930
import io.gravitee.gateway.reactive.api.context.ContextAttributes;
3031
import io.gravitee.gateway.reactive.api.context.InternalContextAttributes;
@@ -84,6 +85,7 @@ void setUp() {
8485
executionContext = new DefaultExecutionContext(request, response);
8586
((DefaultExecutionContext) executionContext).metrics(metrics);
8687
lenient().when(request.body()).thenReturn(Maybe.just(Buffer.buffer("body")));
88+
lenient().when(request.headers()).thenReturn(HttpHeaders.create());
8789
}
8890

8991
@Test

0 commit comments

Comments
 (0)