Skip to content

Commit 4148161

Browse files
author
sammieliu
committed
flink-connector-files
1 parent 0dd13f6 commit 4148161

19 files changed

+1112
-25
lines changed

flink-connector-files/pom.xml

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,4 +44,22 @@
4444
</dependency>
4545

4646
</dependencies>
47+
48+
<build>
49+
<plugins>
50+
<plugin>
51+
<groupId>org.apache.maven.plugins</groupId>
52+
<artifactId>maven-source-plugin</artifactId>
53+
<version>2.2.1</version><!--$NO-MVN-MAN-VER$-->
54+
<executions>
55+
<execution>
56+
<id>attach-sources</id>
57+
<goals>
58+
<goal>jar</goal>
59+
</goals>
60+
</execution>
61+
</executions>
62+
</plugin>
63+
</plugins>
64+
</build>
4765
</project>
Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
package com.tencent.cloud.oceanus.connector.file.enumerator;
2+
3+
import org.apache.flink.api.connector.source.SplitEnumerator;
4+
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
5+
import org.apache.flink.core.fs.Path;
6+
7+
import com.tencent.cloud.oceanus.connector.file.enumerator.assigner.FileSplitAssigner;
8+
import com.tencent.cloud.oceanus.connector.file.enumerator.assigner.SimpleSplitAssigner;
9+
import com.tencent.cloud.oceanus.connector.file.enumerator.enumerate.FileEnumerator;
10+
import com.tencent.cloud.oceanus.connector.file.enumerator.enumerate.NonSplittingRecursiveEnumerator;
11+
import com.tencent.cloud.oceanus.connector.file.split.FileSourceSplit;
12+
import org.slf4j.Logger;
13+
import org.slf4j.LoggerFactory;
14+
15+
import javax.annotation.Nullable;
16+
17+
import java.io.IOException;
18+
import java.util.Collection;
19+
import java.util.HashSet;
20+
import java.util.Iterator;
21+
import java.util.LinkedHashMap;
22+
import java.util.List;
23+
import java.util.Map;
24+
import java.util.Optional;
25+
import java.util.stream.Collectors;
26+
27+
import static org.apache.flink.util.Preconditions.checkNotNull;
28+
29+
/** */
30+
public class FileSourceEnumerator
31+
implements SplitEnumerator<FileSourceSplit, PendingSplitsCheckpoint> {
32+
private static final Logger LOG = LoggerFactory.getLogger(FileSourceEnumerator.class);
33+
34+
private final FileEnumerator enumerator;
35+
private final FileSplitAssigner splitAssigner;
36+
37+
private final SplitEnumeratorContext<FileSourceSplit> context;
38+
private final Path[] paths;
39+
private final HashSet<Path> alreadyDiscoveredPaths;
40+
41+
private final LinkedHashMap<Integer, String> readersAwaitingSplit;
42+
// ------------------------------------------------------------------------
43+
44+
public FileSourceEnumerator(
45+
SplitEnumeratorContext<FileSourceSplit> context,
46+
Path[] paths,
47+
Collection<FileSourceSplit> splits,
48+
Collection<Path> alreadyDiscoveredPaths) {
49+
this.enumerator = new NonSplittingRecursiveEnumerator();
50+
this.splitAssigner = new SimpleSplitAssigner(checkNotNull(splits));
51+
this.context = checkNotNull(context);
52+
this.paths = checkNotNull(paths);
53+
this.alreadyDiscoveredPaths = new HashSet<>(checkNotNull(alreadyDiscoveredPaths));
54+
this.readersAwaitingSplit = new LinkedHashMap<>();
55+
}
56+
57+
@Override
58+
public void start() {
59+
context.callAsync(
60+
() -> enumerator.enumerateSplits(paths, 1),
61+
this::processDiscoveredSplits,
62+
2000,
63+
1000);
64+
}
65+
66+
@Override
67+
public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {
68+
readersAwaitingSplit.put(subtaskId, requesterHostname);
69+
assignSplits();
70+
}
71+
72+
@Override
73+
public void addSplitsBack(List<FileSourceSplit> splits, int subtaskId) {
74+
LOG.info("File Source Enumerator adds splits back: {}", splits);
75+
splitAssigner.addSplits(splits);
76+
}
77+
78+
@Override
79+
public void addReader(int subtaskId) {
80+
// this source is purely lazy-pull-based, nothing to do upon registration
81+
}
82+
83+
@Override
84+
public PendingSplitsCheckpoint snapshotState(long checkpointId) throws Exception {
85+
final PendingSplitsCheckpoint checkpoint =
86+
PendingSplitsCheckpoint.fromCollectionSnapshot(
87+
splitAssigner.remainingSplits(), alreadyDiscoveredPaths);
88+
89+
LOG.debug("Source Checkpoint is {}", checkpoint);
90+
return checkpoint;
91+
}
92+
93+
@Override
94+
public void close() throws IOException {
95+
// no resources to close
96+
}
97+
98+
// ------------------------------------------------------------------------
99+
100+
private void processDiscoveredSplits(Collection<FileSourceSplit> splits, Throwable error) {
101+
if (error != null) {
102+
LOG.error("Failed to enumerate files", error);
103+
return;
104+
}
105+
106+
final Collection<FileSourceSplit> newSplits =
107+
splits.stream()
108+
.filter((split) -> alreadyDiscoveredPaths.add(split.path()))
109+
.collect(Collectors.toList());
110+
splitAssigner.addSplits(newSplits);
111+
112+
assignSplits();
113+
}
114+
115+
private void assignSplits() {
116+
final Iterator<Map.Entry<Integer, String>> awaitingReader =
117+
readersAwaitingSplit.entrySet().iterator();
118+
119+
while (awaitingReader.hasNext()) {
120+
final Map.Entry<Integer, String> nextAwaiting = awaitingReader.next();
121+
122+
// if the reader that requested another split has failed in the meantime, remove
123+
// it from the list of waiting readers
124+
if (!context.registeredReaders().containsKey(nextAwaiting.getKey())) {
125+
awaitingReader.remove();
126+
continue;
127+
}
128+
129+
final String hostname = nextAwaiting.getValue();
130+
final int awaitingSubtask = nextAwaiting.getKey();
131+
final Optional<FileSourceSplit> nextSplit = splitAssigner.getNext();
132+
if (nextSplit.isPresent()) {
133+
context.assignSplit(nextSplit.get(), awaitingSubtask);
134+
awaitingReader.remove();
135+
} else {
136+
break;
137+
}
138+
}
139+
}
140+
}
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
package com.tencent.cloud.oceanus.connector.file.enumerator;
2+
3+
import org.apache.flink.connector.file.src.PendingSplitsCheckpointSerializer;
4+
import org.apache.flink.core.fs.Path;
5+
6+
import com.tencent.cloud.oceanus.connector.file.split.FileSourceSplit;
7+
8+
import javax.annotation.Nullable;
9+
10+
import java.util.ArrayList;
11+
import java.util.Collection;
12+
import java.util.Collections;
13+
14+
import static org.apache.flink.util.Preconditions.checkNotNull;
15+
16+
/** */
17+
public class PendingSplitsCheckpoint {
18+
/** The splits in the checkpoint. */
19+
private final Collection<FileSourceSplit> splits;
20+
21+
/**
22+
* The paths that are no longer in the enumerator checkpoint, but have been processed before and
23+
* should this be ignored. Relevant only for sources in continuous monitoring mode.
24+
*/
25+
private final Collection<Path> alreadyProcessedPaths;
26+
27+
/**
28+
* The cached byte representation from the last serialization step. This helps to avoid paying
29+
* repeated serialization cost for the same checkpoint object. This field is used by {@link
30+
* PendingSplitsCheckpointSerializer}.
31+
*/
32+
@Nullable byte[] serializedFormCache;
33+
34+
protected PendingSplitsCheckpoint(
35+
Collection<FileSourceSplit> splits, Collection<Path> alreadyProcessedPaths) {
36+
this.splits = Collections.unmodifiableCollection(splits);
37+
this.alreadyProcessedPaths = Collections.unmodifiableCollection(alreadyProcessedPaths);
38+
}
39+
40+
// ------------------------------------------------------------------------
41+
public Collection<FileSourceSplit> getSplits() {
42+
return splits;
43+
}
44+
45+
public Collection<Path> getAlreadyProcessedPaths() {
46+
return alreadyProcessedPaths;
47+
}
48+
49+
// ------------------------------------------------------------------------
50+
51+
@Override
52+
public String toString() {
53+
return "PendingSplitsCheckpoint{"
54+
+ "splits="
55+
+ splits
56+
+ ", alreadyProcessedPaths="
57+
+ alreadyProcessedPaths
58+
+ '}';
59+
}
60+
61+
// ------------------------------------------------------------------------
62+
// factories
63+
// ------------------------------------------------------------------------
64+
65+
public static PendingSplitsCheckpoint fromCollectionSnapshot(
66+
final Collection<FileSourceSplit> splits) {
67+
checkNotNull(splits);
68+
69+
// create a copy of the collection to make sure this checkpoint is immutable
70+
final Collection<FileSourceSplit> copy = new ArrayList<>(splits);
71+
return new PendingSplitsCheckpoint(copy, Collections.emptySet());
72+
}
73+
74+
public static PendingSplitsCheckpoint fromCollectionSnapshot(
75+
final Collection<FileSourceSplit> splits,
76+
final Collection<Path> alreadyProcessedPaths) {
77+
checkNotNull(splits);
78+
79+
// create a copy of the collection to make sure this checkpoint is immutable
80+
final Collection<FileSourceSplit> splitsCopy = new ArrayList<>(splits);
81+
final Collection<Path> pathsCopy = new ArrayList<>(alreadyProcessedPaths);
82+
83+
return new PendingSplitsCheckpoint(splitsCopy, pathsCopy);
84+
}
85+
86+
static PendingSplitsCheckpoint reusingCollection(
87+
final Collection<FileSourceSplit> splits,
88+
final Collection<Path> alreadyProcessedPaths) {
89+
return new PendingSplitsCheckpoint(splits, alreadyProcessedPaths);
90+
}
91+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
package com.tencent.cloud.oceanus.connector.file.enumerator;
2+
3+
import org.apache.flink.core.fs.Path;
4+
import org.apache.flink.core.io.SimpleVersionedSerializer;
5+
6+
import com.tencent.cloud.oceanus.connector.file.split.FileSourceSplit;
7+
8+
import java.io.IOException;
9+
import java.nio.ByteBuffer;
10+
import java.nio.ByteOrder;
11+
import java.nio.charset.StandardCharsets;
12+
import java.util.ArrayList;
13+
import java.util.Collection;
14+
15+
import static org.apache.flink.util.Preconditions.checkArgument;
16+
import static org.apache.flink.util.Preconditions.checkNotNull;
17+
18+
/** */
19+
public class PendingSplitsCheckpointSerializer
20+
implements SimpleVersionedSerializer<PendingSplitsCheckpoint> {
21+
private static final int VERSION = 1;
22+
23+
private static final int VERSION_1_MAGIC_NUMBER = 0xDEADBEEF;
24+
25+
private final SimpleVersionedSerializer<FileSourceSplit> splitSerializer;
26+
27+
public PendingSplitsCheckpointSerializer(
28+
SimpleVersionedSerializer<FileSourceSplit> splitSerializer) {
29+
this.splitSerializer = checkNotNull(splitSerializer);
30+
}
31+
32+
// ------------------------------------------------------------------------
33+
@Override
34+
public int getVersion() {
35+
return VERSION;
36+
}
37+
38+
@Override
39+
public byte[] serialize(PendingSplitsCheckpoint checkpoint) throws IOException {
40+
checkArgument(
41+
checkpoint.getClass() == PendingSplitsCheckpoint.class,
42+
"Cannot serialize subclasses of PendingSplitsCheckpoint");
43+
44+
// optimization: the splits lazily cache their own serialized form
45+
if (checkpoint.serializedFormCache != null) {
46+
return checkpoint.serializedFormCache;
47+
}
48+
49+
final SimpleVersionedSerializer<FileSourceSplit> splitSerializer =
50+
this.splitSerializer; // stack cache
51+
final Collection<FileSourceSplit> splits = checkpoint.getSplits();
52+
final Collection<Path> processedPaths = checkpoint.getAlreadyProcessedPaths();
53+
54+
final ArrayList<byte[]> serializedSplits = new ArrayList<>(splits.size());
55+
final ArrayList<byte[]> serializedPaths = new ArrayList<>(processedPaths.size());
56+
57+
int totalLen =
58+
16; // four ints: magic, version of split serializer, count splits, count paths
59+
60+
for (FileSourceSplit split : splits) {
61+
final byte[] serSplit = splitSerializer.serialize(split);
62+
serializedSplits.add(serSplit);
63+
totalLen += serSplit.length + 4; // 4 bytes for the length field
64+
}
65+
66+
for (Path path : processedPaths) {
67+
final byte[] serPath = path.toString().getBytes(StandardCharsets.UTF_8);
68+
serializedPaths.add(serPath);
69+
totalLen += serPath.length + 4; // 4 bytes for the length field
70+
}
71+
72+
final byte[] result = new byte[totalLen];
73+
final ByteBuffer byteBuffer = ByteBuffer.wrap(result).order(ByteOrder.LITTLE_ENDIAN);
74+
byteBuffer.putInt(VERSION_1_MAGIC_NUMBER);
75+
byteBuffer.putInt(splitSerializer.getVersion());
76+
byteBuffer.putInt(serializedSplits.size());
77+
byteBuffer.putInt(serializedPaths.size());
78+
79+
for (byte[] splitBytes : serializedSplits) {
80+
byteBuffer.putInt(splitBytes.length);
81+
byteBuffer.put(splitBytes);
82+
}
83+
84+
for (byte[] pathBytes : serializedPaths) {
85+
byteBuffer.putInt(pathBytes.length);
86+
byteBuffer.put(pathBytes);
87+
}
88+
89+
assert byteBuffer.remaining() == 0;
90+
91+
// optimization: cache the serialized from, so we avoid the byte work during repeated
92+
// serialization
93+
checkpoint.serializedFormCache = result;
94+
95+
return result;
96+
}
97+
98+
@Override
99+
public PendingSplitsCheckpoint deserialize(int version, byte[] serialized) throws IOException {
100+
if (version != VERSION) {
101+
throw new IOException("Unknown version: " + version);
102+
}
103+
final ByteBuffer bb = ByteBuffer.wrap(serialized).order(ByteOrder.LITTLE_ENDIAN);
104+
105+
final int magic = bb.getInt();
106+
if (magic != VERSION_1_MAGIC_NUMBER) {
107+
throw new IOException(
108+
String.format(
109+
"Invalid magic number for PendingSplitsCheckpoint. "
110+
+ "Expected: %X , found %X",
111+
VERSION_1_MAGIC_NUMBER, magic));
112+
}
113+
114+
final int splitSerializerVersion = bb.getInt();
115+
final int numSplits = bb.getInt();
116+
final int numPaths = bb.getInt();
117+
118+
final SimpleVersionedSerializer<FileSourceSplit> splitSerializer =
119+
this.splitSerializer; // stack cache
120+
final ArrayList<FileSourceSplit> splits = new ArrayList<>(numSplits);
121+
final ArrayList<Path> paths = new ArrayList<>(numPaths);
122+
123+
for (int remaining = numSplits; remaining > 0; remaining--) {
124+
final byte[] bytes = new byte[bb.getInt()];
125+
bb.get(bytes);
126+
final FileSourceSplit split =
127+
splitSerializer.deserialize(splitSerializerVersion, bytes);
128+
splits.add(split);
129+
}
130+
131+
for (int remaining = numPaths; remaining > 0; remaining--) {
132+
final byte[] bytes = new byte[bb.getInt()];
133+
bb.get(bytes);
134+
final Path path = new Path(new String(bytes, StandardCharsets.UTF_8));
135+
paths.add(path);
136+
}
137+
138+
return PendingSplitsCheckpoint.reusingCollection(splits, paths);
139+
}
140+
}

0 commit comments

Comments
 (0)