Skip to content

Commit f201bbf

Browse files
authored
[core] Should not close channelManager in BinaryExternalSortBuffer (#5466)
1 parent 1baa494 commit f201bbf

File tree

3 files changed

+12
-22
lines changed

3 files changed

+12
-22
lines changed

paimon-core/src/main/java/org/apache/paimon/sort/BinaryExternalSortBuffer.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public class BinaryExternalSortBuffer implements SortBuffer {
5050
private final BinaryRowSerializer serializer;
5151
private final BinaryInMemorySortBuffer inMemorySortBuffer;
5252
private final IOManager ioManager;
53-
private SpillChannelManager channelManager;
53+
private final SpillChannelManager channelManager;
5454
private final int maxNumFileHandles;
5555
private final BlockCompressionFactory compressionCodecFactory;
5656
private final int compressionBlockSize;
@@ -154,8 +154,7 @@ public void clear() {
154154
inMemorySortBuffer.clear();
155155
spillChannelIDs.clear();
156156
// delete files
157-
channelManager.close();
158-
channelManager = new SpillChannelManager();
157+
channelManager.reset();
159158
}
160159

161160
@Override

paimon-core/src/main/java/org/apache/paimon/sort/SpillChannelManager.java

Lines changed: 2 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -20,56 +20,40 @@
2020

2121
import org.apache.paimon.disk.FileIOChannel;
2222

23-
import java.io.Closeable;
2423
import java.io.File;
2524
import java.util.HashSet;
2625
import java.util.Iterator;
2726
import java.util.List;
2827

29-
import static org.apache.paimon.utils.Preconditions.checkArgument;
30-
3128
/** Channel manager to manage the life cycle of spill channels. */
32-
public class SpillChannelManager implements Closeable {
29+
public class SpillChannelManager {
3330

3431
private final HashSet<FileIOChannel.ID> channels;
3532
private final HashSet<FileIOChannel> openChannels;
3633

37-
private volatile boolean closed;
38-
3934
public SpillChannelManager() {
4035
this.channels = new HashSet<>(64);
4136
this.openChannels = new HashSet<>(64);
4237
}
4338

4439
/** Add a new File channel. */
4540
public synchronized void addChannel(FileIOChannel.ID id) {
46-
checkArgument(!closed);
4741
channels.add(id);
4842
}
4943

5044
/** Open File channels. */
5145
public synchronized void addOpenChannels(List<FileIOChannel> toOpen) {
52-
checkArgument(!closed);
5346
for (FileIOChannel channel : toOpen) {
5447
openChannels.add(channel);
5548
channels.remove(channel.getChannelID());
5649
}
5750
}
5851

5952
public synchronized void removeChannel(FileIOChannel.ID id) {
60-
checkArgument(!closed);
6153
channels.remove(id);
6254
}
6355

64-
@Override
65-
public synchronized void close() {
66-
67-
if (this.closed) {
68-
return;
69-
}
70-
71-
this.closed = true;
72-
56+
public synchronized void reset() {
7357
for (Iterator<FileIOChannel> channels = this.openChannels.iterator();
7458
channels.hasNext(); ) {
7559
final FileIOChannel channel = channels.next();

paimon-core/src/test/java/org/apache/paimon/sort/BinaryExternalSortBufferTest.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -188,8 +188,15 @@ public void testSpilling() throws Exception {
188188
innerTestSpilling(createBuffer());
189189
}
190190

191+
@Test
192+
public void testSpillingAndClearWithMaxFanIn() throws Exception {
193+
BinaryExternalSortBuffer buffer = createBuffer(2);
194+
innerTestSpilling(buffer);
195+
innerTestSpilling(buffer);
196+
}
197+
191198
private void innerTestSpilling(BinaryExternalSortBuffer sorter) throws Exception {
192-
int size = 1000_000;
199+
int size = 2000_000;
193200

194201
MockBinaryRowReader reader = new MockBinaryRowReader(size);
195202
sorter.write(reader);

0 commit comments

Comments
 (0)