Skip to content

Commit 45c62ca

Browse files
committed
[FLINK-39103][runtime] Tolerate deserializer release failure if the corresponding channel already had errors
On cleanup, deserializer recycles its buffers, potentially notifying the input channel. However, if the input channel has encountered an error (such as RemoteTransportException); then notification will fail which might cause the whol cleanup to fail and lead to TM shutdown.
1 parent b5fbf2b commit 45c62ca

File tree

5 files changed

+116
-2
lines changed

5 files changed

+116
-2
lines changed

flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -346,6 +346,11 @@ public long unsynchronizedGetSizeOfQueuedBuffers() {
346346
*/
347347
public void notifyRequiredSegmentId(int subpartitionId, int segmentId) throws IOException {}
348348

349+
/** Whether this input channel has encountered error. */
350+
public boolean hasError() {
351+
return cause.get() != null;
352+
}
353+
349354
// ------------------------------------------------------------------------
350355

351356
/**

flink-runtime/src/main/java/org/apache/flink/streaming/runtime/io/AbstractStreamTaskNetworkInput.java

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.flink.runtime.io.network.api.EndOfData;
2626
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
2727
import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
28+
import org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException;
2829
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
2930
import org.apache.flink.runtime.io.network.partition.consumer.EndOfOutputChannelStateEvent;
3031
import org.apache.flink.runtime.plugable.DeserializationDelegate;
@@ -39,6 +40,9 @@
3940
import org.apache.flink.streaming.util.watermark.WatermarkUtils;
4041
import org.apache.flink.util.ExceptionUtils;
4142

43+
import org.slf4j.Logger;
44+
import org.slf4j.LoggerFactory;
45+
4246
import java.io.IOException;
4347
import java.util.ArrayList;
4448
import java.util.Collections;
@@ -59,6 +63,9 @@
5963
public abstract class AbstractStreamTaskNetworkInput<
6064
T, R extends RecordDeserializer<DeserializationDelegate<StreamElement>>>
6165
implements StreamTaskInput<T> {
66+
67+
private static final Logger LOG = LoggerFactory.getLogger(AbstractStreamTaskNetworkInput.class);
68+
6269
protected final CheckpointedInputGate checkpointedInputGate;
6370
protected final DeserializationDelegate<StreamElement> deserializationDelegate;
6471
protected final TypeSerializer<T> inputSerializer;
@@ -315,13 +322,25 @@ public CompletableFuture<?> getAvailableFuture() {
315322

316323
@Override
317324
public void close() throws IOException {
318-
// release the deserializers . this part should not ever fail
325+
// WARNING: throwing an exception from this method might fail Task closing procedure and
326+
// terminate the TM
319327
Exception err = null;
320328
for (InputChannelInfo channelInfo : new ArrayList<>(recordDeserializers.keySet())) {
329+
final boolean hadError =
330+
checkpointedInputGate.getChannel(channelInfo.getInputChannelIdx()).hasError();
321331
try {
322332
releaseDeserializer(channelInfo);
323333
} catch (Exception e) {
324-
err = e;
334+
if (hadError
335+
&& ExceptionUtils.findThrowable(e, RemoteTransportException.class)
336+
.isPresent()) {
337+
LOG.warn(
338+
"Ignoring deserializer release failure - the channel {} has encountered a transport error before: {}",
339+
channelInfo,
340+
e.getMessage());
341+
} else {
342+
err = ExceptionUtils.firstOrSuppressed(e, err);
343+
}
325344
}
326345
}
327346
if (err != null) {

flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelTest.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,16 @@ void testExponentialNoBackoff() {
101101
assertThat(ch.getCurrentBackoff()).isZero();
102102
}
103103

104+
@Test
105+
void testHasError() {
106+
InputChannel ch = createInputChannel(0, 0);
107+
108+
assertThat(ch.hasError()).isFalse();
109+
110+
ch.setError(new RuntimeException("test error"));
111+
assertThat(ch.hasError()).isTrue();
112+
}
113+
104114
private InputChannel createInputChannel(int initialBackoff, int maxBackoff) {
105115
return new MockInputChannel(
106116
mock(SingleInputGate.class),

flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,11 @@ public void endInput(boolean emitEndOfData) {
205205
}
206206
}
207207

208+
/** Sets an error on the specified channel. */
209+
public void setChannelError(int channel, Throwable error) {
210+
inputChannels[channel].setError(error);
211+
}
212+
208213
/** Returns true iff all input queues are empty. */
209214
public boolean allQueuesEmpty() {
210215
for (int i = 0; i < numInputChannels; i++) {

flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
3535
import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
3636
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
37+
import org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException;
3738
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
3839
import org.apache.flink.runtime.io.network.partition.consumer.CheckpointableInput;
3940
import org.apache.flink.runtime.io.network.partition.consumer.EndOfOutputChannelStateEvent;
@@ -64,7 +65,10 @@
6465
import org.junit.jupiter.api.AfterEach;
6566
import org.junit.jupiter.api.Test;
6667

68+
import javax.annotation.Nullable;
69+
6770
import java.io.IOException;
71+
import java.net.InetSocketAddress;
6872
import java.nio.ByteBuffer;
6973
import java.util.ArrayList;
7074
import java.util.Collections;
@@ -78,10 +82,17 @@
7882
import java.util.stream.Collectors;
7983

8084
import static org.assertj.core.api.Assertions.assertThat;
85+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
8186

8287
/** Tests for {@link StreamTaskNetworkInput}. */
8388
class StreamTaskNetworkInputTest {
8489

90+
private static final RuntimeException WRAPPED_TRANSPORT_EXCEPTION =
91+
new RuntimeException(
92+
new RemoteTransportException(
93+
"simulated release failure",
94+
InetSocketAddress.createUnresolved("localhost", 8080)));
95+
8596
private static final int PAGE_SIZE = 1000;
8697

8798
private final IOManager ioManager = new IOManagerAsync();
@@ -473,6 +484,55 @@ int getNumberOfEmittedRecords() {
473484
}
474485
}
475486

487+
@Test
488+
void testCloseIgnoresReleaseFailureFromChannelWithError() throws Exception {
489+
testErrorHandlingOnClose(WRAPPED_TRANSPORT_EXCEPTION, WRAPPED_TRANSPORT_EXCEPTION, null);
490+
}
491+
492+
@Test
493+
void testCloseThrowsReleaseFailureFromChannelWithoutError() throws Exception {
494+
testErrorHandlingOnClose(WRAPPED_TRANSPORT_EXCEPTION, null, RuntimeException.class);
495+
}
496+
497+
@Test
498+
void testCloseThrowsReleaseFailureFromChannelWithNonTransportError() throws Exception {
499+
RuntimeException error = new RuntimeException("something else broke");
500+
testErrorHandlingOnClose(error, error, RuntimeException.class);
501+
}
502+
503+
private void testErrorHandlingOnClose(
504+
RuntimeException error,
505+
@Nullable RuntimeException channelError,
506+
Class<? extends Exception> expectedException)
507+
throws Exception {
508+
int numInputChannels = 2;
509+
LongSerializer inSerializer = LongSerializer.INSTANCE;
510+
StreamTestSingleInputGate<Long> inputGate =
511+
new StreamTestSingleInputGate<>(numInputChannels, 0, inSerializer, 1024);
512+
513+
Map<InputChannelInfo, TestRecordDeserializer> deserializers =
514+
createDeserializers(inputGate.getInputGate());
515+
516+
// Replace the deserializer for channel 0 with one that throws on clear()
517+
InputChannelInfo channel0 = new InputChannelInfo(0, 0);
518+
deserializers.put(
519+
channel0,
520+
new ThrowingTestRecordDeserializer(ioManager.getSpillingDirectoriesPaths(), error));
521+
522+
StreamTaskInput<Long> input =
523+
new TestStreamTaskNetworkInput(
524+
inputGate, inSerializer, numInputChannels, deserializers);
525+
526+
if (channelError != null) {
527+
inputGate.setChannelError(0, error);
528+
}
529+
if (expectedException == null) {
530+
input.close();
531+
} else {
532+
assertThatThrownBy(input::close).isInstanceOf(RuntimeException.class);
533+
}
534+
}
535+
476536
private static class TestStreamTaskNetworkInput
477537
extends AbstractStreamTaskNetworkInput<Long, TestRecordDeserializer> {
478538
public TestStreamTaskNetworkInput(
@@ -496,4 +556,19 @@ public CompletableFuture<Void> prepareSnapshot(
496556
throw new UnsupportedOperationException();
497557
}
498558
}
559+
560+
private static class ThrowingTestRecordDeserializer extends TestRecordDeserializer {
561+
562+
private final RuntimeException error;
563+
564+
public ThrowingTestRecordDeserializer(String[] tmpDirectories, RuntimeException error) {
565+
super(tmpDirectories);
566+
this.error = error;
567+
}
568+
569+
@Override
570+
public void clear() {
571+
throw error;
572+
}
573+
}
499574
}

0 commit comments

Comments
 (0)