Skip to content

Commit c9773f4

Browse files
committed
[FLINK-38216][checkpoint][refactor] Split EndOfChannelStateEvent into EndOfInputChannelStateEvent and EndOfOutputChannelStateEvent
1 parent 45c47ba commit c9773f4

File tree

11 files changed

+112
-30
lines changed

11 files changed

+112
-30
lines changed

flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,8 @@
4343
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
4444
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
4545
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
46-
import org.apache.flink.runtime.io.network.partition.consumer.EndOfChannelStateEvent;
46+
import org.apache.flink.runtime.io.network.partition.consumer.EndOfInputChannelStateEvent;
47+
import org.apache.flink.runtime.io.network.partition.consumer.EndOfOutputChannelStateEvent;
4748
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
4849
import org.apache.flink.util.InstantiationUtil;
4950

@@ -70,7 +71,7 @@ public class EventSerializer {
7071

7172
private static final int CANCEL_CHECKPOINT_MARKER_EVENT = 4;
7273

73-
private static final int END_OF_CHANNEL_STATE_EVENT = 5;
74+
private static final int END_OF_OUTPUT_CHANNEL_STATE_EVENT = 5;
7475

7576
private static final int ANNOUNCEMENT_EVENT = 6;
7677

@@ -84,6 +85,8 @@ public class EventSerializer {
8485

8586
private static final int GENERALIZED_WATERMARK_EVENT = 11;
8687

88+
private static final int END_OF_INPUT_CHANNEL_STATE_EVENT = 12;
89+
8790
private static final byte CHECKPOINT_TYPE_CHECKPOINT = 0;
8891

8992
private static final byte CHECKPOINT_TYPE_SAVEPOINT = 1;
@@ -109,8 +112,10 @@ public static ByteBuffer toSerializedEvent(AbstractEvent event) throws IOExcepti
109112
return serializeCheckpointBarrier((CheckpointBarrier) event);
110113
} else if (eventClass == EndOfSuperstepEvent.class) {
111114
return ByteBuffer.wrap(new byte[] {0, 0, 0, END_OF_SUPERSTEP_EVENT});
112-
} else if (eventClass == EndOfChannelStateEvent.class) {
113-
return ByteBuffer.wrap(new byte[] {0, 0, 0, END_OF_CHANNEL_STATE_EVENT});
115+
} else if (eventClass == EndOfOutputChannelStateEvent.class) {
116+
return ByteBuffer.wrap(new byte[] {0, 0, 0, END_OF_OUTPUT_CHANNEL_STATE_EVENT});
117+
} else if (eventClass == EndOfInputChannelStateEvent.class) {
118+
return ByteBuffer.wrap(new byte[] {0, 0, 0, END_OF_INPUT_CHANNEL_STATE_EVENT});
114119
} else if (eventClass == EndOfData.class) {
115120
return ByteBuffer.wrap(
116121
new byte[] {
@@ -197,8 +202,10 @@ public static AbstractEvent fromSerializedEvent(ByteBuffer buffer, ClassLoader c
197202
return deserializeCheckpointBarrier(buffer);
198203
} else if (type == END_OF_SUPERSTEP_EVENT) {
199204
return EndOfSuperstepEvent.INSTANCE;
200-
} else if (type == END_OF_CHANNEL_STATE_EVENT) {
201-
return EndOfChannelStateEvent.INSTANCE;
205+
} else if (type == END_OF_OUTPUT_CHANNEL_STATE_EVENT) {
206+
return EndOfOutputChannelStateEvent.INSTANCE;
207+
} else if (type == END_OF_INPUT_CHANNEL_STATE_EVENT) {
208+
return EndOfInputChannelStateEvent.INSTANCE;
202209
} else if (type == END_OF_USER_RECORDS_EVENT) {
203210
return new EndOfData(StopMode.values()[buffer.get()]);
204211
} else if (type == CANCEL_CHECKPOINT_MARKER_EVENT) {

flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
2525
import org.apache.flink.runtime.io.network.api.EndOfData;
2626
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
27-
import org.apache.flink.runtime.io.network.partition.consumer.EndOfChannelStateEvent;
27+
import org.apache.flink.runtime.io.network.partition.consumer.EndOfOutputChannelStateEvent;
2828

2929
import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
3030
import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufAllocator;
@@ -408,7 +408,7 @@ public boolean isPartialRecord() {
408408
public static DataType getDataType(AbstractEvent event, boolean hasPriority) {
409409
if (hasPriority) {
410410
return PRIORITIZED_EVENT_BUFFER;
411-
} else if (event instanceof EndOfChannelStateEvent) {
411+
} else if (event instanceof EndOfOutputChannelStateEvent) {
412412
return RECOVERY_COMPLETION;
413413
} else if (event instanceof EndOfData) {
414414
return END_OF_DATA;

flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedResultPartition.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
2929
import org.apache.flink.runtime.io.network.buffer.BufferPool;
3030
import org.apache.flink.runtime.io.network.logger.NetworkActionsLogger;
31-
import org.apache.flink.runtime.io.network.partition.consumer.EndOfChannelStateEvent;
31+
import org.apache.flink.runtime.io.network.partition.consumer.EndOfOutputChannelStateEvent;
3232
import org.apache.flink.util.function.SupplierWithException;
3333

3434
import javax.annotation.Nullable;
@@ -262,7 +262,7 @@ public void finishReadRecoveredState(boolean notifyAndBlockOnCompletion) throws
262262
return;
263263
}
264264
try (BufferConsumer eventBufferConsumer =
265-
EventSerializer.toBufferConsumer(EndOfChannelStateEvent.INSTANCE, false)) {
265+
EventSerializer.toBufferConsumer(EndOfOutputChannelStateEvent.INSTANCE, false)) {
266266
for (int i = 0; i < subpartitions.length; i++) {
267267
if (((PipelinedSubpartition) subpartitions[i]).isSupportChannelStateRecover()) {
268268
addToSubpartition(i, eventBufferConsumer.copy(), 0);
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.runtime.io.network.partition.consumer;
20+
21+
import org.apache.flink.core.memory.DataInputView;
22+
import org.apache.flink.core.memory.DataOutputView;
23+
import org.apache.flink.runtime.event.RuntimeEvent;
24+
25+
/** Marks the end of recovered state of {@link RecoveredInputChannel} of this subtask. */
26+
public class EndOfInputChannelStateEvent extends RuntimeEvent {
27+
28+
/** The singleton instance of this event. */
29+
public static final EndOfInputChannelStateEvent INSTANCE = new EndOfInputChannelStateEvent();
30+
31+
// ------------------------------------------------------------------------
32+
33+
// not instantiable
34+
private EndOfInputChannelStateEvent() {}
35+
36+
// ------------------------------------------------------------------------
37+
38+
@Override
39+
public void read(DataInputView in) {
40+
// Nothing to do here
41+
}
42+
43+
@Override
44+
public void write(DataOutputView out) {
45+
// Nothing to do here
46+
}
47+
48+
// ------------------------------------------------------------------------
49+
50+
@Override
51+
public int hashCode() {
52+
return 20250813;
53+
}
54+
55+
@Override
56+
public boolean equals(Object obj) {
57+
return obj != null && obj.getClass() == EndOfInputChannelStateEvent.class;
58+
}
59+
60+
@Override
61+
public String toString() {
62+
return getClass().getSimpleName();
63+
}
64+
}
Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,19 +23,19 @@
2323
import org.apache.flink.runtime.event.RuntimeEvent;
2424

2525
/**
26-
* Marks the end of recovered state of {@link RecoveredInputChannel} of this subtask or {@link
26+
* Marks the end of recovered state of {@link
2727
* org.apache.flink.runtime.io.network.partition.ResultSubpartition ResultSubpartition} on the
2828
* upstream.
2929
*/
30-
public class EndOfChannelStateEvent extends RuntimeEvent {
30+
public class EndOfOutputChannelStateEvent extends RuntimeEvent {
3131

3232
/** The singleton instance of this event. */
33-
public static final EndOfChannelStateEvent INSTANCE = new EndOfChannelStateEvent();
33+
public static final EndOfOutputChannelStateEvent INSTANCE = new EndOfOutputChannelStateEvent();
3434

3535
// ------------------------------------------------------------------------
3636

3737
// not instantiable
38-
private EndOfChannelStateEvent() {}
38+
private EndOfOutputChannelStateEvent() {}
3939

4040
// ------------------------------------------------------------------------
4141

@@ -58,7 +58,7 @@ public int hashCode() {
5858

5959
@Override
6060
public boolean equals(Object obj) {
61-
return obj != null && obj.getClass() == EndOfChannelStateEvent.class;
61+
return obj != null && obj.getClass() == EndOfOutputChannelStateEvent.class;
6262
}
6363

6464
@Override

flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RecoveredInputChannel.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,8 @@ public void onRecoveredStateBuffer(Buffer buffer) {
154154
}
155155

156156
public void finishReadRecoveredState() throws IOException {
157-
onRecoveredStateBuffer(EventSerializer.toBuffer(EndOfChannelStateEvent.INSTANCE, false));
157+
onRecoveredStateBuffer(
158+
EventSerializer.toBuffer(EndOfInputChannelStateEvent.INSTANCE, false));
158159
bufferManager.releaseFloatingBuffers();
159160
LOG.debug("{}/{} finished recovering input.", inputGate.getOwningTaskName(), channelInfo);
160161
}
@@ -172,22 +173,22 @@ private BufferAndAvailability getNextRecoveredStateBuffer() throws IOException {
172173

173174
if (next == null) {
174175
return null;
175-
} else if (isEndOfChannelStateEvent(next)) {
176+
} else if (isEndOfInputChannelStateEvent(next)) {
176177
stateConsumedFuture.complete(null);
177178
return null;
178179
} else {
179180
return new BufferAndAvailability(next, nextDataType, 0, sequenceNumber++);
180181
}
181182
}
182183

183-
private boolean isEndOfChannelStateEvent(Buffer buffer) throws IOException {
184+
private boolean isEndOfInputChannelStateEvent(Buffer buffer) throws IOException {
184185
if (buffer.isBuffer()) {
185186
return false;
186187
}
187188

188189
AbstractEvent event = EventSerializer.fromBuffer(buffer, getClass().getClassLoader());
189190
buffer.setReaderIndex(0);
190-
return event.getClass() == EndOfChannelStateEvent.class;
191+
return event.getClass() == EndOfInputChannelStateEvent.class;
191192
}
192193

193194
@Override

flink-runtime/src/main/java/org/apache/flink/streaming/runtime/io/AbstractStreamTaskNetworkInput.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
2727
import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
2828
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
29-
import org.apache.flink.runtime.io.network.partition.consumer.EndOfChannelStateEvent;
29+
import org.apache.flink.runtime.io.network.partition.consumer.EndOfOutputChannelStateEvent;
3030
import org.apache.flink.runtime.plugable.DeserializationDelegate;
3131
import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
3232
import org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate;
@@ -270,7 +270,7 @@ protected DataInputStatus processEvent(BufferOrEvent bufferOrEvent, DataOutput<T
270270
if (checkpointedInputGate.isFinished()) {
271271
return DataInputStatus.END_OF_INPUT;
272272
}
273-
} else if (event.getClass() == EndOfChannelStateEvent.class) {
273+
} else if (event.getClass() == EndOfOutputChannelStateEvent.class) {
274274
if (checkpointedInputGate.allChannelsRecovered()) {
275275
return DataInputStatus.END_OF_RECOVERY;
276276
}

flink-runtime/src/main/java/org/apache/flink/streaming/runtime/io/checkpointing/CheckpointedInputGate.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
3030
import org.apache.flink.runtime.io.network.api.EventAnnouncement;
3131
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
32-
import org.apache.flink.runtime.io.network.partition.consumer.EndOfChannelStateEvent;
32+
import org.apache.flink.runtime.io.network.partition.consumer.EndOfOutputChannelStateEvent;
3333
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
3434
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
3535
import org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput;
@@ -200,7 +200,7 @@ private Optional<BufferOrEvent> handleEvent(BufferOrEvent bufferOrEvent) throws
200200
announcedBarrier,
201201
eventAnnouncement.getSequenceNumber(),
202202
bufferOrEvent.getChannelInfo());
203-
} else if (bufferOrEvent.getEvent().getClass() == EndOfChannelStateEvent.class) {
203+
} else if (bufferOrEvent.getEvent().getClass() == EndOfOutputChannelStateEvent.class) {
204204
upstreamRecoveryTracker.handleEndOfRecovery(bufferOrEvent.getChannelInfo());
205205
}
206206
return Optional.of(bufferOrEvent);

flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@
3838
import org.apache.flink.runtime.io.network.api.SubtaskConnectionDescriptor;
3939
import org.apache.flink.runtime.io.network.buffer.Buffer;
4040
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
41+
import org.apache.flink.runtime.io.network.partition.consumer.EndOfInputChannelStateEvent;
42+
import org.apache.flink.runtime.io.network.partition.consumer.EndOfOutputChannelStateEvent;
4143
import org.apache.flink.runtime.io.network.util.TestTaskEvent;
4244
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
4345

@@ -120,6 +122,9 @@ class EventSerializerTest {
120122
new RecoveryMetadata(3),
121123
new WatermarkEvent(new LongWatermark(42L, "test"), false),
122124
new WatermarkEvent(new BoolWatermark(true, "test"), true),
125+
new WatermarkEvent(new BoolWatermark(true, "test"), true),
126+
EndOfInputChannelStateEvent.INSTANCE,
127+
EndOfOutputChannelStateEvent.INSTANCE,
123128
};
124129

125130
@Test
@@ -161,6 +166,9 @@ void testToBufferConsumer() throws IOException {
161166
assertThat(bufferConsumer.build().getDataType())
162167
.isEqualTo(Buffer.DataType.UNALIGNED_WATERMARK_EVENT);
163168
}
169+
} else if (evt instanceof EndOfOutputChannelStateEvent) {
170+
assertThat(bufferConsumer.build().getDataType())
171+
.isEqualTo(Buffer.DataType.RECOVERY_COMPLETION);
164172
} else {
165173
assertThat(bufferConsumer.build().getDataType())
166174
.isEqualTo(Buffer.DataType.EVENT_BUFFER);
@@ -191,6 +199,8 @@ void testToBuffer() throws IOException {
191199
assertThat(buffer.getDataType())
192200
.isEqualTo(Buffer.DataType.UNALIGNED_WATERMARK_EVENT);
193201
}
202+
} else if (evt instanceof EndOfOutputChannelStateEvent) {
203+
assertThat(buffer.getDataType()).isEqualTo(Buffer.DataType.RECOVERY_COMPLETION);
194204
} else {
195205
assertThat(buffer.getDataType()).isEqualTo(Buffer.DataType.EVENT_BUFFER);
196206
}

flink-runtime/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/CheckpointedInputGateTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
import org.apache.flink.runtime.io.network.buffer.Buffer;
3838
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
3939
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
40-
import org.apache.flink.runtime.io.network.partition.consumer.EndOfChannelStateEvent;
40+
import org.apache.flink.runtime.io.network.partition.consumer.EndOfOutputChannelStateEvent;
4141
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelBuilder;
4242
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
4343
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
@@ -86,7 +86,7 @@ void testUpstreamResumedUponEndOfRecovery() throws Exception {
8686
enqueueEndOfState(gate, channelIndex);
8787
Optional<BufferOrEvent> bufferOrEvent = gate.pollNext();
8888
while (bufferOrEvent.isPresent()
89-
&& bufferOrEvent.get().getEvent() instanceof EndOfChannelStateEvent
89+
&& bufferOrEvent.get().getEvent() instanceof EndOfOutputChannelStateEvent
9090
&& !gate.allChannelsRecovered()) {
9191
bufferOrEvent = gate.pollNext();
9292
}
@@ -97,7 +97,7 @@ void testUpstreamResumedUponEndOfRecovery() throws Exception {
9797
Optional<BufferOrEvent> polled = gate.pollNext();
9898
assertThat(polled).isPresent();
9999
assertThat(polled.get().isEvent()).isTrue();
100-
assertThat(polled.get().getEvent()).isEqualTo(EndOfChannelStateEvent.INSTANCE);
100+
assertThat(polled.get().getEvent()).isEqualTo(EndOfOutputChannelStateEvent.INSTANCE);
101101
assertThat(resumeCounter.getNumResumed()).isEqualTo(numberOfChannels);
102102
assertThat(gate.pollNext())
103103
.as("should only be a single event no matter of what is the number of channels")
@@ -282,7 +282,7 @@ private void assertAddedInputSize(
282282

283283
private void enqueueEndOfState(CheckpointedInputGate checkpointedInputGate, int channelIndex)
284284
throws IOException {
285-
enqueue(checkpointedInputGate, channelIndex, EndOfChannelStateEvent.INSTANCE);
285+
enqueue(checkpointedInputGate, channelIndex, EndOfOutputChannelStateEvent.INSTANCE);
286286
}
287287

288288
private void enqueueEndOfPartition(

0 commit comments

Comments
 (0)