Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@

@if (showConfiguration && providerSchema?.config) {
<div class="endpoint-card__form__field">
@if (formGroup.errors?.providerMismatch; as mismatch) {
@if (formGroup.dirty && formGroup.errors?.providerMismatch; as mismatch) {
<gio-banner-error> All endpoints in this group must use the same provider ({{ mismatch.expected }}). </gio-banner-error>
}
<gio-form-json-schema formControlName="configuration" [jsonSchema]="providerSchema.config"></gio-form-json-schema>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ describe('ApiV4FailoverComponent', () => {
it('should enable and set failover config', async () => {
const api = fakeApiV4({
id: API_ID,
failover: undefined,
});
expectApiGetRequest(api);
const saveBar = await loader.getHarness(GioSaveBarHarness);
Expand Down Expand Up @@ -127,7 +126,7 @@ describe('ApiV4FailoverComponent', () => {
enabled: true,
forceNextEndpointOnFailure: false,
maxRetries: 2,
failureCondition: undefined,
failureCondition: '',
slowCallDuration: 200,
openStateDuration: 2000,
maxFailures: 2,
Expand Down Expand Up @@ -221,7 +220,7 @@ describe('ApiV4FailoverComponent', () => {
enabled: true,
forceNextEndpointOnFailure: false,
maxRetries: 3,
failureCondition: undefined,
failureCondition: '',
slowCallDuration: 300,
openStateDuration: 3000,
maxFailures: 3,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ export class ApiFailoverV4Component implements OnInit, OnDestroy {
enabled,
forceNextEndpointOnFailure,
maxRetries,
failureCondition: failureCondition || undefined,
failureCondition,
slowCallDuration,
openStateDuration,
maxFailures,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,15 @@
import io.github.resilience4j.circuitbreaker.CircuitBreakerRegistry;
import io.github.resilience4j.rxjava3.circuitbreaker.operator.CircuitBreakerOperator;
import io.gravitee.definition.model.v4.failover.Failover;
import io.gravitee.gateway.api.buffer.Buffer;
import io.gravitee.gateway.api.http.HttpHeaders;
import io.gravitee.gateway.reactive.api.ExecutionFailure;
import io.gravitee.gateway.reactive.api.context.ContextAttributes;
import io.gravitee.gateway.reactive.api.context.ExecutionContext;
import io.gravitee.gateway.reactive.api.context.http.HttpExecutionContext;
import io.gravitee.gateway.reactive.api.invoker.HttpInvoker;
import io.gravitee.gateway.reactive.api.invoker.Invoker;
import io.gravitee.gateway.reactive.core.context.HttpRequestInternal;
import io.gravitee.gateway.reactive.core.v4.endpoint.EndpointManager;
import io.gravitee.gateway.reactive.core.v4.endpoint.ManagedEndpoint;
import io.reactivex.rxjava3.core.Completable;
Expand Down Expand Up @@ -111,30 +114,33 @@ public Completable invoke(HttpExecutionContext ctx) {
final AtomicInteger totalAttempts = new AtomicInteger(0);
final AtomicReference<List<String>> endpointRotation = new AtomicReference<>();
final AtomicReference<String> firstFailedEndpoint = new AtomicReference<>();
final AtomicReference<RequestSnapshot> snapshotRef = new AtomicReference<>();

return Completable.defer(() -> {
int attempt = totalAttempts.getAndIncrement();
return captureRequestState(ctx, snapshotRef).andThen(
Completable.defer(() -> {
int attempt = totalAttempts.getAndIncrement();

// On retry, capture the endpoint that failed in the previous attempt
if (attempt > 0) {
firstFailedEndpoint.compareAndSet(null, resolveCurrentEndpointName(ctx));
}
// On retry, capture the endpoint that failed in the previous attempt and restore the request to its initial state
if (attempt > 0) {
firstFailedEndpoint.compareAndSet(null, resolveCurrentEndpointName(ctx));
restoreRequestState(ctx, snapshotRef.get());
}

// EndpointInvoker overrides the request endpoint. We need to set it back to original state to retry properly
ctx.setAttribute(ATTR_REQUEST_ENDPOINT, originalEndpoint);
// Entrypoint connectors skip response handling if there is an error. In the case of a retry, we need to reset the failure.
ctx.removeInternalAttribute(ATTR_INTERNAL_EXECUTION_FAILURE);

forceNextEndpoint(ctx, attempt, endpointRotation);

// EndpointInvoker overrides the request endpoint. We need to set it back to original state to retry properly
ctx.setAttribute(ATTR_REQUEST_ENDPOINT, originalEndpoint);
// Entrypoint connectors skip response handling if there is an error. In the case of a retry, we need to reset the failure.
ctx.removeInternalAttribute(ATTR_INTERNAL_EXECUTION_FAILURE);

forceNextEndpoint(ctx, attempt, endpointRotation);

// Consume body and ignore it. Consuming it with .body() method internally enables caching of chunks, which is mandatory to retry the request in case of failure.
return ctx.request().body().ignoreElement().andThen(delegate.invoke(ctx)).andThen(evaluateFailureCondition(ctx));
})
.timeout(failoverConfiguration.getSlowCallDuration(), TimeUnit.MILLISECONDS)
.retry(failoverConfiguration.getMaxRetries())
.compose(CircuitBreakerOperator.of(circuitBreaker(ctx)))
.onErrorResumeNext(t -> ctx.interruptWith(new ExecutionFailure(502).cause(t)))
.doFinally(() -> recordFailoverMetrics(ctx, totalAttempts.get(), firstFailedEndpoint.get()));
return delegate.invoke(ctx).andThen(evaluateFailureCondition(ctx));
})
.timeout(failoverConfiguration.getSlowCallDuration(), TimeUnit.MILLISECONDS)
.retry(failoverConfiguration.getMaxRetries())
.compose(CircuitBreakerOperator.of(circuitBreaker(ctx)))
.onErrorResumeNext(t -> ctx.interruptWith(new ExecutionFailure(502).cause(t)))
.doFinally(() -> recordFailoverMetrics(ctx, totalAttempts.get(), firstFailedEndpoint.get()))
);
}

/**
Expand Down Expand Up @@ -292,4 +298,51 @@ private CircuitBreaker circuitBreaker(HttpExecutionContext ctx) {
return circuitBreaker;
}
}

/**
* Captures the current request state (path, headers, body) so it can be restored on retry.
* Calling {@code body()} also activates internal chunk caching, which is mandatory to replay the request.
*/
private Completable captureRequestState(HttpExecutionContext ctx, AtomicReference<RequestSnapshot> snapshotRef) {
return ctx
.request()
.body()
// Body present: capture path, headers, and body for full replay
.doOnSuccess(body -> snapshotRef.compareAndSet(null, RequestSnapshot.withBody(ctx, body)))
// No body (e.g. GET): capture path and headers only
.doOnComplete(() -> snapshotRef.compareAndSet(null, RequestSnapshot.headersOnly(ctx)))
.ignoreElement();
}

/**
* Restores the request to its initial state captured by {@link #captureRequestState}.
*/
private void restoreRequestState(HttpExecutionContext ctx, RequestSnapshot snapshot) {
if (ctx.request() instanceof HttpRequestInternal httpRequestInternal) {
httpRequestInternal.pathInfo(snapshot.pathInfo());
}

HttpHeaders currentHeaders = ctx.request().headers();
currentHeaders.clear();
for (var entry : snapshot.headers()) {
currentHeaders.add(entry.getKey(), entry.getValue());
}

if (snapshot.body() != null) {
ctx.request().body(snapshot.body());
}
}

@VisibleForTesting
record RequestSnapshot(String pathInfo, HttpHeaders headers, Buffer body) {
/** Captures request state including the body content for replay on retry. */
static RequestSnapshot withBody(HttpExecutionContext ctx, Buffer body) {
return new RequestSnapshot(ctx.request().pathInfo(), HttpHeaders.create(ctx.request().headers()), body);
}

/** Captures request state when no body is present (e.g. GET requests). */
static RequestSnapshot headersOnly(HttpExecutionContext ctx) {
return new RequestSnapshot(ctx.request().pathInfo(), HttpHeaders.create(ctx.request().headers()), null);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.gravitee.definition.model.v4.failover.Failover;
import io.gravitee.el.TemplateEngine;
import io.gravitee.gateway.api.buffer.Buffer;
import io.gravitee.gateway.api.http.HttpHeaders;
import io.gravitee.gateway.reactive.api.ExecutionFailure;
import io.gravitee.gateway.reactive.api.context.ContextAttributes;
import io.gravitee.gateway.reactive.api.context.InternalContextAttributes;
Expand Down Expand Up @@ -84,6 +85,7 @@ void setUp() {
executionContext = new DefaultExecutionContext(request, response);
((DefaultExecutionContext) executionContext).metrics(metrics);
lenient().when(request.body()).thenReturn(Maybe.just(Buffer.buffer("body")));
lenient().when(request.headers()).thenReturn(HttpHeaders.create());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2163,7 +2163,6 @@ components:
default: true
failureCondition:
type: string
nullable: true
description: An EL expression evaluated on the response to determine if it should be considered a failure (e.g. "{#response.status >= 500}"). If null, response content is not evaluated.
forceNextEndpointOnFailure:
type: boolean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7599,7 +7599,6 @@ components:
default: true
failureCondition:
type: string
nullable: true
description: An EL expression evaluated on the response to determine if it should be considered a failure (e.g. "{#response.status >= 500}"). If null, response content is not evaluated.
forceNextEndpointOnFailure:
type: boolean
Expand Down