Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

* Set `blockOwnerDeletion` to `true` in the owner references in Strimzi managed resources.
Deleting the Strimzi custom resources will now by default wait for the deletion of all the owned Kubernetes resources.
* Introduce `maxBackoffMinutes` property to configure the maximum backoff cap for automatic restarts of failed Kafka Connect connectors. Default remains 60 minutes if not set.

### Major changes, deprecations, and removals

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,15 @@
builderPackage = Constants.FABRIC8_KUBERNETES_API
)
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonPropertyOrder({"enabled", "maxRestarts"})
@JsonPropertyOrder({"enabled", "maxRestarts", "maxBackoffMinutes"})
@EqualsAndHashCode
@ToString
public class AutoRestart implements UnknownPropertyPreserving {
public static final int DEFAULT_MAX_BACKOFF_MINUTES = 60;

private boolean enabled = true;
private Integer maxRestarts;
private Integer maxBackoffMinutes = DEFAULT_MAX_BACKOFF_MINUTES;

private Map<String, Object> additionalProperties;

Expand All @@ -54,6 +57,15 @@ public Integer getMaxRestarts() {
public void setMaxRestarts(Integer maxRestarts) {
this.maxRestarts = maxRestarts;
}

@Description("The maximum backoff time in minutes between restarts. Defaults 60 minutes.")
public Integer getMaxBackoffMinutes() {
return maxBackoffMinutes;
}

public void setMaxBackoffMinutes(Integer maxBackoffMinutes) {
this.maxBackoffMinutes = maxBackoffMinutes;
}

@Override
public Map<String, Object> getAdditionalProperties() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ spec:
tasksMax: 2
autoRestart:
enabled: true
maxBackoffMinutes: 60
config:
file: "/home/source/test.source.txt"
topic: "test.topic"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.strimzi.api.kafka.model.connect.KafkaConnectResources;
import io.strimzi.api.kafka.model.connect.KafkaConnectStatus;
import io.strimzi.api.kafka.model.connector.AlterOffsets;
import io.strimzi.api.kafka.model.connector.AutoRestart;
import io.strimzi.api.kafka.model.connector.AutoRestartStatus;
import io.strimzi.api.kafka.model.connector.AutoRestartStatusBuilder;
import io.strimzi.api.kafka.model.connector.KafkaConnector;
Expand Down Expand Up @@ -628,10 +629,11 @@ private Future<List<Condition>> updateState(Reconciliation reconciliation, Strin
return previousAutoRestartStatus(reconciliation, connectorName, resource)
.compose(previousAutoRestartStatus -> {
boolean needsRestart = connectorHasFailed(statusResultJson) || !failedTaskIds(statusResultJson).isEmpty();
AutoRestart autoRestart = connectorSpec.getAutoRestart();

if (needsRestart) {
// Connector or task failed, and we should check it for auto-restart
if (shouldAutoRestart(previousAutoRestartStatus, connectorSpec.getAutoRestart().getMaxRestarts())) {
if (shouldAutoRestart(previousAutoRestartStatus, autoRestart.getMaxRestarts(), autoRestart.getMaxBackoffMinutes())) {
// There are failures, and it is a time to restart the connector now
metrics().connectorsAutoRestartsCounter(reconciliation.namespace()).increment();
return autoRestartConnector(reconciliation, host, apiClient, connectorName, status, previousAutoRestartStatus);
Expand All @@ -643,7 +645,7 @@ private Future<List<Condition>> updateState(Reconciliation reconciliation, Strin
} else {
// Connector and tasks are not failed
if (previousAutoRestartStatus != null) {
if (shouldResetAutoRestartStatus(previousAutoRestartStatus)) {
if (shouldResetAutoRestartStatus(previousAutoRestartStatus, autoRestart.getMaxBackoffMinutes())) {
// The connector is not failing now for some time => time to reset the auto-restart status
LOGGER.infoCr(reconciliation, "Resetting the auto-restart status of connector {} ", connectorName);
status.autoRestart = null;
Expand Down Expand Up @@ -700,10 +702,11 @@ private Future<ConnectorStatusAndConditions> autoRestartConnector(Reconciliation
*
* @param autoRestartStatus Status field with auto-restart status
* @param maxRestarts Maximum number of restarts (or null for unlimited restarts)
* @param maxBackoffMinutes Maximum backoff time in minutes between restarts
*
* @return True if the connector should be auto-restarted right now. False otherwise.
*/
/* test */ static boolean shouldAutoRestart(AutoRestartStatus autoRestartStatus, Integer maxRestarts) {
/* test */ static boolean shouldAutoRestart(AutoRestartStatus autoRestartStatus, Integer maxRestarts, Integer maxBackoffMinutes) {
if (autoRestartStatus == null
|| autoRestartStatus.getLastRestartTimestamp() == null) {
// If there is no previous auto.restart status or timestamp, we always restart it
Expand All @@ -714,21 +717,23 @@ private Future<ConnectorStatusAndConditions> autoRestartConnector(Reconciliation
var minutesSinceLastRestart = StatusUtils.minutesDifferenceUntilNow(StatusUtils.isoUtcDatetime(autoRestartStatus.getLastRestartTimestamp()));

return (maxRestarts == null || count < maxRestarts)
&& minutesSinceLastRestart >= nextAutoRestartBackOffIntervalInMinutes(count);
&& minutesSinceLastRestart >= nextAutoRestartBackOffIntervalInMinutes(count, maxBackoffMinutes);
}
}

/**
* Calculates the back-off interval for auto-restarting the connectors. It is calculated as (n^2 + n) where n is the
* number of previous restarts, but is capped at max 60 minutes. As a result, the restarts should be done after 0,
* 2, 6, 12, 20, 30, 42, and 56 minutes and then every 60 minutes.
* number of previous restarts, but is capped at max maxBackoffMinutes minutes. As a result, the restarts should be done after 0,
* 2, 6, 12, 20, 30, 42, 56 minutes or after maxBackoffMinutes is reached. Then,
* every maxBackoffMinutes minutes.
*
* @param restartCount Number of restarts already applied to the connector
* @param restartCount Number of restarts already applied to the connector
* @param maxBackoffMinutes Maximum backoff time in minutes between restarts
*
* @return Number of minutes after which the next restart should happen
*/
private static int nextAutoRestartBackOffIntervalInMinutes(int restartCount) {
return Math.min(restartCount * restartCount + restartCount, 60);
private static int nextAutoRestartBackOffIntervalInMinutes(int restartCount, int maxBackoffMinutes) {
return Math.min(restartCount * restartCount + restartCount, maxBackoffMinutes);
}

/**
Expand All @@ -740,14 +745,14 @@ private static int nextAutoRestartBackOffIntervalInMinutes(int restartCount)
*
* @return True if the previous auto-restart status of the connector should be reset to 0. False otherwise.
*/
/* test */ static boolean shouldResetAutoRestartStatus(AutoRestartStatus autoRestartStatus) {
/* test */ static boolean shouldResetAutoRestartStatus(AutoRestartStatus autoRestartStatus, int maxBackoffMinutes) {
if (autoRestartStatus != null
&& autoRestartStatus.getLastRestartTimestamp() != null
&& autoRestartStatus.getCount() > 0) {
// There are previous auto-restarts => we check if it is time to reset the status
long minutesSinceLastRestart = StatusUtils.minutesDifferenceUntilNow(StatusUtils.isoUtcDatetime(autoRestartStatus.getLastRestartTimestamp()));

return minutesSinceLastRestart > nextAutoRestartBackOffIntervalInMinutes(autoRestartStatus.getCount());
return minutesSinceLastRestart > nextAutoRestartBackOffIntervalInMinutes(autoRestartStatus.getCount(), maxBackoffMinutes);
} else {
// There are no previous restarts => nothing to reset
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
*/
package io.strimzi.operator.cluster.operator.assembly;

import io.strimzi.api.kafka.model.connector.AutoRestart;
import io.strimzi.api.kafka.model.connector.AutoRestartStatusBuilder;
import org.junit.jupiter.api.Test;

Expand All @@ -26,56 +27,70 @@ public void testShouldAutoRestartConnector() {
.withCount(1)
.withLastRestartTimestamp(ZonedDateTime.now(ZoneOffset.UTC).minusMinutes(3).format(DateTimeFormatter.ISO_INSTANT))
.build();
assertThat(AbstractConnectOperator.shouldAutoRestart(autoRestartStatus, null), is(true));
assertThat(AbstractConnectOperator.shouldAutoRestart(autoRestartStatus, null,
AutoRestart.DEFAULT_MAX_BACKOFF_MINUTES), is(true));

// Should not restart before minute 2 when auto restart count is 1
autoRestartStatus = new AutoRestartStatusBuilder()
.withCount(1)
.withLastRestartTimestamp(ZonedDateTime.now(ZoneOffset.UTC).minusMinutes(1).format(DateTimeFormatter.ISO_INSTANT))
.build();
assertThat(AbstractConnectOperator.shouldAutoRestart(autoRestartStatus, null), is(false));
assertThat(AbstractConnectOperator.shouldAutoRestart(autoRestartStatus, null,
AutoRestart.DEFAULT_MAX_BACKOFF_MINUTES), is(false));

// Should restart after minute 12 when auto restart count is 3
autoRestartStatus = new AutoRestartStatusBuilder()
.withCount(3)
.withLastRestartTimestamp(ZonedDateTime.now(ZoneOffset.UTC).minusMinutes(13).format(DateTimeFormatter.ISO_INSTANT))
.build();
assertThat(AbstractConnectOperator.shouldAutoRestart(autoRestartStatus, null), is(true));
assertThat(AbstractConnectOperator.shouldAutoRestart(autoRestartStatus, null,
AutoRestart.DEFAULT_MAX_BACKOFF_MINUTES), is(true));

// Should not restart before minute 12 when auto restart count is 3
autoRestartStatus = new AutoRestartStatusBuilder()
.withCount(3)
.withLastRestartTimestamp(ZonedDateTime.now(ZoneOffset.UTC).minusMinutes(10).format(DateTimeFormatter.ISO_INSTANT))
.build();
assertThat(AbstractConnectOperator.shouldAutoRestart(autoRestartStatus, null), is(false));
assertThat(AbstractConnectOperator.shouldAutoRestart(autoRestartStatus, null,
AutoRestart.DEFAULT_MAX_BACKOFF_MINUTES), is(false));

// Should restart after minute 61 when auto restart count is 25
autoRestartStatus = new AutoRestartStatusBuilder()
.withCount(25)
.withLastRestartTimestamp(ZonedDateTime.now(ZoneOffset.UTC).minusMinutes(61).format(DateTimeFormatter.ISO_INSTANT))
.build();
assertThat(AbstractConnectOperator.shouldAutoRestart(autoRestartStatus, null), is(true));
assertThat(AbstractConnectOperator.shouldAutoRestart(autoRestartStatus, null,
AutoRestart.DEFAULT_MAX_BACKOFF_MINUTES), is(true));

// Should not restart after 59 minutes when auto restart count is 25
autoRestartStatus = new AutoRestartStatusBuilder()
.withCount(25)
.withLastRestartTimestamp(ZonedDateTime.now(ZoneOffset.UTC).minusMinutes(59).format(DateTimeFormatter.ISO_INSTANT))
.build();
assertThat(AbstractConnectOperator.shouldAutoRestart(autoRestartStatus, null), is(false));
assertThat(AbstractConnectOperator.shouldAutoRestart(autoRestartStatus, null,
AutoRestart.DEFAULT_MAX_BACKOFF_MINUTES), is(false));

// Should not restart after 6 attempts
autoRestartStatus = new AutoRestartStatusBuilder()
.withCount(7)
.withLastRestartTimestamp(ZonedDateTime.now(ZoneOffset.UTC).minusDays(1).format(DateTimeFormatter.ISO_INSTANT))
.build();
assertThat(AbstractConnectOperator.shouldAutoRestart(autoRestartStatus, 7), is(false));
assertThat(AbstractConnectOperator.shouldAutoRestart(autoRestartStatus, 7, null), is(false));

// Should restart after 6 attempts when maxRestarts set to higher number
autoRestartStatus = new AutoRestartStatusBuilder()
.withCount(7)
.withLastRestartTimestamp(ZonedDateTime.now(ZoneOffset.UTC).minusDays(1).format(DateTimeFormatter.ISO_INSTANT))
.build();
assertThat(AbstractConnectOperator.shouldAutoRestart(autoRestartStatus, 8), is(true));
assertThat(AbstractConnectOperator.shouldAutoRestart(autoRestartStatus, 8,
AutoRestart.DEFAULT_MAX_BACKOFF_MINUTES), is(true));

// Should restart after 4 attempts when maxBackoffMinutes is set to 15 minutes
autoRestartStatus = new AutoRestartStatusBuilder()
.withCount(4)
.withLastRestartTimestamp(ZonedDateTime.now(ZoneOffset.UTC).minusMinutes(15).format(DateTimeFormatter.ISO_INSTANT))
.build();
assertThat(AbstractConnectOperator.shouldAutoRestart(autoRestartStatus, 8, 15), is(true));
}

@Test
Expand All @@ -85,41 +100,62 @@ public void testShouldResetAutoRestartStatus() {
.withCount(1)
.withLastRestartTimestamp(ZonedDateTime.now(ZoneOffset.UTC).minusMinutes(3).format(DateTimeFormatter.ISO_INSTANT))
.build();
assertThat(AbstractConnectOperator.shouldResetAutoRestartStatus(autoRestartStatus), is(true));
assertThat(AbstractConnectOperator.shouldResetAutoRestartStatus(autoRestartStatus,
AutoRestart.DEFAULT_MAX_BACKOFF_MINUTES), is(true));

// Should not reset before minute 2 when auto restart count is 1
autoRestartStatus = new AutoRestartStatusBuilder()
.withCount(1)
.withLastRestartTimestamp(ZonedDateTime.now(ZoneOffset.UTC).minusMinutes(1).format(DateTimeFormatter.ISO_INSTANT))
.build();
assertThat(AbstractConnectOperator.shouldResetAutoRestartStatus(autoRestartStatus), is(false));
assertThat(AbstractConnectOperator.shouldResetAutoRestartStatus(autoRestartStatus,
AutoRestart.DEFAULT_MAX_BACKOFF_MINUTES), is(false));

// Should reset after minute 12 when auto restart count is 3
autoRestartStatus = new AutoRestartStatusBuilder()
.withCount(3)
.withLastRestartTimestamp(ZonedDateTime.now(ZoneOffset.UTC).minusMinutes(13).format(DateTimeFormatter.ISO_INSTANT))
.build();
assertThat(AbstractConnectOperator.shouldResetAutoRestartStatus(autoRestartStatus), is(true));
assertThat(AbstractConnectOperator.shouldResetAutoRestartStatus(autoRestartStatus,
AutoRestart.DEFAULT_MAX_BACKOFF_MINUTES), is(true));

// Should not reset before minute 12 when auto restart count is 3
autoRestartStatus = new AutoRestartStatusBuilder()
.withCount(3)
.withLastRestartTimestamp(ZonedDateTime.now(ZoneOffset.UTC).minusMinutes(10).format(DateTimeFormatter.ISO_INSTANT))
.build();
assertThat(AbstractConnectOperator.shouldResetAutoRestartStatus(autoRestartStatus), is(false));
assertThat(AbstractConnectOperator.shouldResetAutoRestartStatus(autoRestartStatus,
AutoRestart.DEFAULT_MAX_BACKOFF_MINUTES), is(false));

// Should reset after 60 minutes when auto restart count is 25
// Should reset after AutoRestart.DEFAULT_MAX_BACKOFF_MINUTES minutes when auto restart count is 25
autoRestartStatus = new AutoRestartStatusBuilder()
.withCount(25)
.withLastRestartTimestamp(ZonedDateTime.now(ZoneOffset.UTC).minusMinutes(61).format(DateTimeFormatter.ISO_INSTANT))
.build();
assertThat(AbstractConnectOperator.shouldResetAutoRestartStatus(autoRestartStatus), is(true));
assertThat(AbstractConnectOperator.shouldResetAutoRestartStatus(autoRestartStatus,
AutoRestart.DEFAULT_MAX_BACKOFF_MINUTES), is(true));

// Should not reset after 59 minutes when auto restart count is 25
autoRestartStatus = new AutoRestartStatusBuilder()
.withCount(25)
.withLastRestartTimestamp(ZonedDateTime.now(ZoneOffset.UTC).minusMinutes(59).format(DateTimeFormatter.ISO_INSTANT))
.build();
assertThat(AbstractConnectOperator.shouldResetAutoRestartStatus(autoRestartStatus), is(false));
assertThat(AbstractConnectOperator.shouldResetAutoRestartStatus(autoRestartStatus,
AutoRestart.DEFAULT_MAX_BACKOFF_MINUTES), is(false));

// Should reset after configured maxBackoffMinutes when auto restart count is 25
autoRestartStatus = new AutoRestartStatusBuilder()
.withCount(25)
.withLastRestartTimestamp(ZonedDateTime.now(ZoneOffset.UTC).minusMinutes(16).format(DateTimeFormatter.ISO_INSTANT))
.build();
assertThat(AbstractConnectOperator.shouldResetAutoRestartStatus(autoRestartStatus,
15), is(true));

// Should not reset after 14 minutes when auto restart count is 25
autoRestartStatus = new AutoRestartStatusBuilder()
.withCount(25)
.withLastRestartTimestamp(ZonedDateTime.now(ZoneOffset.UTC).minusMinutes(14).format(DateTimeFormatter.ISO_INSTANT))
.build();
assertThat(AbstractConnectOperator.shouldResetAutoRestartStatus(autoRestartStatus, 15), is(false));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ Configures automatic restarts for connectors and tasks that are in a `FAILED` st

When enabled, a back-off algorithm applies the automatic restart to each failed connector and its tasks.
An incremental back-off interval is calculated using the formula `n * n + n` where `n` represents the number of previous restarts.
This interval is capped at a maximum of 60 minutes.
Consequently, a restart occurs immediately, followed by restarts after 2, 6, 12, 20, 30, 42, 56 minutes, and then at 60-minute intervals.
The restart counter is automatically reset once the connector has been running continuously for longer than the next backoff interval.
This interval is capped at a default maximum backoff time of 60 minutes. You can configure it using the `maxBackoffMinutes` property.
Consequently, a restart occurs immediately, followed by restarts after 2, 6, 12, 20, 30, 42, 56 minutes, up to `maxBackoffMinutes`, and then at `maxBackoffMinutes` intervals.
The restart counter is automatically reset once the connector has been running continuously for longer than the next backoff interval.
For example, if the connector has restarted 4 times, it must run without errors for 20 minutes before the restart count resets to 0.
By default, Strimzi initiates restarts of the connector and its tasks indefinitely.
However, you can use the `maxRestarts` property to set a maximum on the number of restarts.
Expand Down Expand Up @@ -39,6 +39,21 @@ spec:
maxRestarts: 10
----

Additionally, you could set a maximum limit on the backoff time.

.Enabling automatic restarts of failed connectors with custom backoff limit
[source,yaml,subs="attributes+"]
----
apiVersion: {KafkaConnectorApiVersion}
kind: KafkaConnector
metadata:
name: my-source-connector
spec:
autoRestart:
enabled: true
maxBackoffMinutes: 30
----

For MirrorMaker 2, use the `autoRestart` property of connectors in the `KafkaMirrorMaker2` resource to enable automatic restarts of failed connectors and tasks.

.Enabling automatic restarts of failed connectors for MirrorMaker 2
Expand All @@ -62,4 +77,6 @@ spec:
autoRestart:
enabled: true
# ...
----
----

You could also configure limits on the number of restarts or the backoff time.
Loading
Loading