Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ public FlinkOperatorConfiguration getOperatorConfiguration(
*
* @param baseConfMap The configuration map that should be searched for relevant Flink version
* prefixes.
* @param flinkVersion The FlinkVersion to be used
* @return A list of relevant Flink version prefixes in order of ascending Flink version.
*/
protected static List<String> getRelevantVersionPrefixes(
Expand Down Expand Up @@ -381,6 +382,7 @@ private void applyConfigsFromCurrentSpec(
* Get configuration for interacting with session jobs. Similar to the observe configuration for
* FlinkDeployments.
*
* @param name The name of the job
* @param deployment FlinkDeployment for the session cluster
* @param sessionJobSpec Session job spec
* @return Session job config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.flink.kubernetes.operator.service.FlinkResourceContextFactory;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.EventSourceUtils;
import org.apache.flink.kubernetes.operator.utils.ExceptionUtils;
import org.apache.flink.kubernetes.operator.utils.KubernetesClientUtils;
import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
import org.apache.flink.kubernetes.operator.utils.ValidatorUtils;
Expand Down Expand Up @@ -162,7 +163,7 @@ public UpdateControl<FlinkDeployment> reconcile(FlinkDeployment flinkApp, Contex
flinkApp,
EventRecorder.Type.Warning,
"ClusterDeploymentException",
e.getMessage(),
ExceptionUtils.getExceptionMessage(e),
EventRecorder.Component.JobManagerDeployment,
josdkContext.getClient());
throw new ReconciliationException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.flink.kubernetes.operator.service.FlinkResourceContextFactory;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.EventSourceUtils;
import org.apache.flink.kubernetes.operator.utils.ExceptionUtils;
import org.apache.flink.kubernetes.operator.utils.KubernetesClientUtils;
import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
import org.apache.flink.kubernetes.operator.utils.ValidatorUtils;
Expand Down Expand Up @@ -124,7 +125,7 @@ public UpdateControl<FlinkSessionJob> reconcile(
flinkSessionJob,
EventRecorder.Type.Warning,
"SessionJobException",
e.getMessage(),
ExceptionUtils.getExceptionMessage(e),
EventRecorder.Component.Job,
josdkContext.getClient());
throw new ReconciliationException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public interface FlinkResourceMutator extends Plugin {
* Mutate deployment and return the mutated Object.
*
* @param deployment A Flink application or session cluster deployment.
* @return the mutated Flink application or session cluster deployment.
*/
FlinkDeployment mutateDeployment(FlinkDeployment deployment);

Expand All @@ -39,13 +40,15 @@ public interface FlinkResourceMutator extends Plugin {
*
* @param sessionJob the session job to be mutated.
* @param session the target session cluster of the session job to be Mutated.
* @return the mutated session job.
*/
FlinkSessionJob mutateSessionJob(FlinkSessionJob sessionJob, Optional<FlinkDeployment> session);

/**
* Mutate snapshot and return the mutated Object.
*
* @param stateSnapshot the snapshot to be mutated.
* @return the mutated snapshot.
*/
FlinkStateSnapshot mutateStateSnapshot(FlinkStateSnapshot stateSnapshot);
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ protected boolean isResourceReadyToBeObserved(FlinkResourceContext<CR> ctx) {
* DEPLOYED state.
*
* @param ctx Context for resource.
* @return true if the resource was already upgraded, false otherwise.
*/
protected abstract boolean checkIfAlreadyUpgraded(FlinkResourceContext<CR> ctx);
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public class SnapshotTriggerTimestampStore {
* updated with this value.
*
* @param resource Flink resource
* @param snapshotType the snapshot type
* @param snapshotsSupplier supplies related snapshot resources
* @return instant of last trigger
*/
Expand Down Expand Up @@ -103,6 +104,7 @@ public Instant getLastPeriodicTriggerInstant(
* Updates the time a periodic snapshot was last triggered for this resource.
*
* @param resource Kubernetes resource
* @param snapshotType the snapshot type
* @param instant new timestamp
*/
public void updateLastPeriodicTriggerTimestamp(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -954,7 +954,15 @@ private void deleteJar(Configuration conf, String jarId) {
}
}

/** Wait until Deployment is removed, return remaining timeout. */
/**
* Wait until Deployment is removed, return remaining timeout.
*
* @param name name of the deployment
* @param deployment The deployment resource
* @param propagation DeletePropagation
* @param timeout Timeout to wait
* @return remaining timeout after deletion
*/
@VisibleForTesting
protected Duration deleteDeploymentBlocking(
String name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,16 @@ public boolean triggerEvent(
}

/**
* @param resource The resource
* @param type The type
* @param reason the reason
* @param message the message
* @param component the component
* @param messageKey the message key
* @param client the client
* @param interval Interval for dedupe. Null mean no dedupe.
* @return
*/
public boolean triggerEventWithInterval(
public void triggerEventWithInterval(
AbstractFlinkResource<?, ?> resource,
Type type,
String reason,
Expand All @@ -134,7 +140,7 @@ public boolean triggerEventWithInterval(
String messageKey,
KubernetesClient client,
@Nullable Duration interval) {
return EventUtils.createOrUpdateEventWithInterval(
EventUtils.createOrUpdateEventWithInterval(
client,
resource,
type,
Expand Down Expand Up @@ -166,12 +172,18 @@ public boolean triggerEventOnce(
}

/**
* @param resource The resource
* @param type The type
* @param reason the reason
* @param message the message
* @param component the component
* @param messageKey the message key
* @param client the client
* @param interval Interval for dedupe. Null mean no dedupe.
* @param dedupePredicate Predicate for dedupe algorithm..
* @param labels Labels to store in meta data for dedupe. Do nothing if null.
* @return
*/
public boolean triggerEventWithLabels(
public void triggerEventWithLabels(
AbstractFlinkResource<?, ?> resource,
Type type,
String reason,
Expand All @@ -182,7 +194,7 @@ public boolean triggerEventWithLabels(
@Nullable Duration interval,
@Nullable Predicate<Map<String, String>> dedupePredicate,
@Nullable Map<String, String> labels) {
return EventUtils.createOrUpdateEventWithLabels(
EventUtils.createOrUpdateEventWithLabels(
client,
resource,
type,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,17 @@
/** Exception utils. * */
public class ExceptionUtils {

private static final int EXCEPTION_LIMIT_FOR_EVENT_MESSAGE = 3;

/**
* Based on the flink ExceptionUtils#findThrowableSerializedAware but fixes an infinite loop bug
* resulting from SerializedThrowable deserialization errors.
*
* @param throwable the throwable to be processed
* @param searchType the type of the exception to search for
* @param classLoader the classloader to use for deserialization
* @param <T> the exception type
* @return the found exception, or empty if it is not found.
*/
public static <T extends Throwable> Optional<T> findThrowableSerializedAware(
Throwable throwable, Class<T> searchType, ClassLoader classLoader) {
Expand Down Expand Up @@ -57,4 +65,56 @@ public static <T extends Throwable> Optional<T> findThrowableSerializedAware(

return Optional.empty();
}

/**
* traverse the throwable and extract useful information for up to the first 3 possible
* exceptions in the hierarchy.
*
* @param throwable the throwable to be processed
* @return the exception message, which will have a format similar to "cause1 &rarr; cause2
* &rarr; cause3"
*/
public static String getExceptionMessage(Throwable throwable) {
return getExceptionMessage(throwable, 0);
}

/**
* Helper for recursion for `getExceptionMessage`.
*
* @param throwable the throwable to be processed
* @param level the level we are in. The caller will set this value to 0, and we will be
* incrementing it with each recursive call
* @return the exception message, which will have a format similar to "cause1 -> cause2 ->
* cause3"
*/
private static String getExceptionMessage(Throwable throwable, int level) {
if (throwable == null) {
return null;
}

if (throwable instanceof SerializedThrowable) {
var deserialized =
((SerializedThrowable) throwable)
.deserializeError(Thread.currentThread().getContextClassLoader());
if (deserialized == throwable) {
return "Unknown Error (SerializedThrowable)";
} else {
return getExceptionMessage(deserialized, level);
}
}

var msg =
Optional.ofNullable(throwable.getMessage())
.orElse(throwable.getClass().getSimpleName());

if (level >= EXCEPTION_LIMIT_FOR_EVENT_MESSAGE) {
return msg;
}

if (throwable.getCause() == null) {
return msg;
} else {
return msg + " -> " + getExceptionMessage(throwable.getCause(), level + 1);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ public static SnapshotStatus getLastSnapshotStatus(
* @param resource The resource to be snapshotted.
* @param conf The observe configuration of the resource.
* @param snapshotType The type of the snapshot.
* @param lastTrigger the last time the snapshot was triggered.
* @return An optional {@link SnapshotTriggerType}.
*/
@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ public void notifyListeners(CR resource, STATUS prevStatus) {
* operator behavior.
*
* @param resource Resource for which status update should be performed
* @param client Kubernetes client to use for the update
*/
@SneakyThrows
public void patchAndCacheStatus(CR resource, KubernetesClient client) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ Optional<String> validateSessionJob(
* Validate and return optional error.
*
* @param savepoint the savepoint to be validated.
* @param target the target resource of the savepoint to be validated.
* @return Optional error string, should be present iff validation resulted in an error
*/
Optional<String> validateStateSnapshot(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ public class TestingFlinkService extends AbstractFlinkService {
@Setter private boolean checkpointAvailable = true;
@Setter private boolean jobManagerReady = true;
@Setter private boolean deployFailure = false;
@Setter private Exception makeItFailWith;
@Setter private boolean triggerSavepointFailure = false;
@Setter private boolean disposeSavepointFailure = false;
@Setter private Runnable sessionJobSubmittedCallback;
Expand Down Expand Up @@ -212,6 +213,9 @@ public void submitApplicationCluster(
}

protected void deployApplicationCluster(JobSpec jobSpec, Configuration conf) throws Exception {
if (makeItFailWith != null) {
throw makeItFailWith;
}
if (deployFailure) {
throw new Exception("Deployment failure");
}
Expand Down Expand Up @@ -270,6 +274,10 @@ public JobID submitJobToSessionCluster(
@Nullable String savepoint)
throws Exception {

if (makeItFailWith != null) {
throw makeItFailWith;
}

if (deployFailure) {
throw new Exception("Deployment failure");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.IngressUtils;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.util.SerializedThrowable;

import io.fabric8.kubernetes.api.model.Event;
import io.fabric8.kubernetes.api.model.EventBuilder;
Expand Down Expand Up @@ -984,6 +985,33 @@ public void testEventOfNonDeploymentFailedException() throws Exception {
assertEquals("Deployment failure", event.getMessage());
}

@Test
public void testEventOfNonDeploymentFailedChainedException() {
assertTrue(testController.flinkResourceEvents().isEmpty());
var flinkDeployment = TestUtils.buildApplicationCluster();

flinkService.setMakeItFailWith(
new RuntimeException(
"Deployment Failure",
new IllegalStateException(
null,
new SerializedThrowable(new Exception("actual failure reason")))));
try {
testController.reconcile(flinkDeployment, context);
fail();
} catch (Exception expected) {
}
assertEquals(2, testController.flinkResourceEvents().size());

var event = testController.flinkResourceEvents().remove();
assertEquals("Submit", event.getReason());
event = testController.flinkResourceEvents().remove();
assertEquals("ClusterDeploymentException", event.getReason());
assertEquals(
"Deployment Failure -> IllegalStateException -> actual failure reason",
event.getMessage());
}

@Test
public void cleanUpNewDeployment() {
FlinkDeployment flinkDeployment = TestUtils.buildApplicationCluster();
Expand Down Expand Up @@ -1064,6 +1092,32 @@ public void testInitialSavepointOnError() throws Exception {
assertEquals("msp", flinkService.listJobs().get(0).f0);
}

@Test
public void testErrorOnReconcileWithChainedExceptions() throws Exception {
FlinkDeployment flinkDeployment = TestUtils.buildApplicationCluster();
flinkDeployment.getSpec().getJob().setInitialSavepointPath("msp");
flinkService.setMakeItFailWith(
new RuntimeException(
"Deployment Failure",
new IllegalStateException(
null,
new SerializedThrowable(new Exception("actual failure reason")))));
try {
testController.reconcile(flinkDeployment, context);
fail();
} catch (Exception expected) {
}
assertEquals(2, testController.flinkResourceEvents().size());

var event = testController.flinkResourceEvents().remove();
assertEquals("Submit", event.getReason());
event = testController.flinkResourceEvents().remove();
assertEquals("ClusterDeploymentException", event.getReason());
assertEquals(
"Deployment Failure -> IllegalStateException -> actual failure reason",
event.getMessage());
}

@Test
public void testInitialHaError() throws Exception {
var appCluster = TestUtils.buildApplicationCluster(FlinkVersion.v1_20);
Expand Down
Loading
Loading