Skip to content

Commit 0dd13f6

Browse files
author
sammieliu
committed
init
0 parents  commit 0dd13f6

File tree

8 files changed

+284
-0
lines changed

8 files changed

+284
-0
lines changed

flink-connector-files/.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
target/
2+
.idea/
3+
*.iml

flink-connector-files/pom.xml

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<modelVersion>4.0.0</modelVersion>
6+
7+
<groupId>com.tencent.cloud.oceanus</groupId>
8+
<artifactId>flink-connector-files</artifactId>
9+
<version>1.0.0</version>
10+
11+
<properties>
12+
<maven.compiler.source>8</maven.compiler.source>
13+
<maven.compiler.target>8</maven.compiler.target>
14+
<flink.version>1.13.3</flink.version>
15+
<scala.binary.version>2.11</scala.binary.version>
16+
</properties>
17+
18+
<dependencies>
19+
<!-- Apache Flink dependencies -->
20+
<!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
21+
<dependency>
22+
<groupId>org.apache.flink</groupId>
23+
<artifactId>flink-table-common</artifactId>
24+
<version>${flink.version}</version>
25+
<scope>provided</scope>
26+
</dependency>
27+
<dependency>
28+
<groupId>org.apache.flink</groupId>
29+
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
30+
<version>${flink.version}</version>
31+
<scope>provided</scope>
32+
</dependency>
33+
<dependency>
34+
<groupId>org.apache.flink</groupId>
35+
<artifactId>flink-core</artifactId>
36+
<version>${flink.version}</version>
37+
<scope>provided</scope>
38+
</dependency>
39+
<dependency>
40+
<groupId>org.apache.flink</groupId>
41+
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
42+
<version>${flink.version}</version>
43+
<scope>provided</scope>
44+
</dependency>
45+
46+
</dependencies>
47+
</project>
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
package com.tencent.cloud.oceanus.connector.file.source;
2+
3+
import org.apache.flink.api.common.typeinfo.TypeInformation;
4+
import org.apache.flink.api.connector.source.Boundedness;
5+
import org.apache.flink.api.connector.source.Source;
6+
import org.apache.flink.api.connector.source.SourceReader;
7+
import org.apache.flink.api.connector.source.SourceReaderContext;
8+
import org.apache.flink.api.connector.source.SplitEnumerator;
9+
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
10+
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
11+
import org.apache.flink.core.io.SimpleVersionedSerializer;
12+
import org.apache.flink.table.data.RowData;
13+
14+
import com.tencent.cloud.oceanus.connector.file.split.FileSourceSplit;
15+
import com.tencent.cloud.oceanus.connector.file.split.PendingSplitsCheckpoint;
16+
17+
/** */
18+
public class FileSource
19+
implements Source<RowData, FileSourceSplit, PendingSplitsCheckpoint>,
20+
ResultTypeQueryable<RowData> {
21+
private final TypeInformation<RowData> producedTypeInfo;
22+
23+
public FileSource(TypeInformation<RowData> producedTypeInfo) {
24+
this.producedTypeInfo = producedTypeInfo;
25+
}
26+
27+
@Override
28+
public Boundedness getBoundedness() {
29+
return null;
30+
}
31+
32+
@Override
33+
public SourceReader<RowData, FileSourceSplit> createReader(SourceReaderContext readerContext)
34+
throws Exception {
35+
return null;
36+
}
37+
38+
@Override
39+
public SplitEnumerator<FileSourceSplit, PendingSplitsCheckpoint> createEnumerator(
40+
SplitEnumeratorContext<FileSourceSplit> enumContext) throws Exception {
41+
return null;
42+
}
43+
44+
@Override
45+
public SplitEnumerator<FileSourceSplit, PendingSplitsCheckpoint> restoreEnumerator(
46+
SplitEnumeratorContext<FileSourceSplit> enumContext, PendingSplitsCheckpoint checkpoint)
47+
throws Exception {
48+
return null;
49+
}
50+
51+
@Override
52+
public SimpleVersionedSerializer<FileSourceSplit> getSplitSerializer() {
53+
return null;
54+
}
55+
56+
@Override
57+
public SimpleVersionedSerializer<PendingSplitsCheckpoint> getEnumeratorCheckpointSerializer() {
58+
return null;
59+
}
60+
61+
@Override
62+
public TypeInformation<RowData> getProducedType() {
63+
return producedTypeInfo;
64+
}
65+
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package com.tencent.cloud.oceanus.connector.file.split;
2+
3+
import org.apache.flink.api.connector.source.SourceSplit;
4+
5+
/** */
6+
public class FileSourceSplit implements SourceSplit {
7+
@Override
8+
public String splitId() {
9+
return null;
10+
}
11+
}
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
package com.tencent.cloud.oceanus.connector.file.split;
2+
3+
/** */
4+
public class PendingSplitsCheckpoint {}
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
package com.tencent.cloud.oceanus.connector.file.table;
2+
3+
import org.apache.flink.api.common.typeinfo.TypeInformation;
4+
import org.apache.flink.table.catalog.ResolvedSchema;
5+
import org.apache.flink.table.connector.ChangelogMode;
6+
import org.apache.flink.table.connector.source.DynamicTableSource;
7+
import org.apache.flink.table.connector.source.ScanTableSource;
8+
import org.apache.flink.table.connector.source.SourceProvider;
9+
import org.apache.flink.table.data.RowData;
10+
import org.apache.flink.types.RowKind;
11+
12+
import com.tencent.cloud.oceanus.connector.file.source.FileSource;
13+
14+
import java.util.Objects;
15+
16+
import static org.apache.flink.util.Preconditions.checkNotNull;
17+
18+
/** */
19+
public class FileDynamicSource implements ScanTableSource {
20+
21+
private final String path;
22+
private final ResolvedSchema schema;
23+
24+
public FileDynamicSource(String path, ResolvedSchema schema) {
25+
this.path = checkNotNull(path);
26+
this.schema = checkNotNull(schema);
27+
}
28+
29+
@Override
30+
public ChangelogMode getChangelogMode() {
31+
return ChangelogMode.newBuilder().addContainedKind(RowKind.INSERT).build();
32+
}
33+
34+
@Override
35+
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) {
36+
final TypeInformation<RowData> producedTypeInfo =
37+
context.createTypeInformation(schema.toPhysicalRowDataType());
38+
return SourceProvider.of(new FileSource(producedTypeInfo));
39+
}
40+
41+
@Override
42+
public DynamicTableSource copy() {
43+
return new FileDynamicSource(path, schema);
44+
}
45+
46+
@Override
47+
public String asSummaryString() {
48+
return "FileSource";
49+
}
50+
51+
@Override
52+
public boolean equals(Object o) {
53+
if (this == o) {
54+
return true;
55+
}
56+
if (o == null || getClass() != o.getClass()) {
57+
return false;
58+
}
59+
FileDynamicSource that = (FileDynamicSource) o;
60+
return Objects.equals(path, that.path) && Objects.equals(schema, that.schema);
61+
}
62+
63+
@Override
64+
public int hashCode() {
65+
return Objects.hash(path, schema);
66+
}
67+
68+
@Override
69+
public String toString() {
70+
return "FileDynamicSource{" + "path='" + path + '\'' + ", schema=" + schema + '}';
71+
}
72+
}
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
package com.tencent.cloud.oceanus.connector.file.table;
2+
3+
import org.apache.flink.configuration.ConfigOption;
4+
import org.apache.flink.configuration.ReadableConfig;
5+
import org.apache.flink.table.api.ValidationException;
6+
import org.apache.flink.table.catalog.Column;
7+
import org.apache.flink.table.catalog.ResolvedSchema;
8+
import org.apache.flink.table.connector.source.DynamicTableSource;
9+
import org.apache.flink.table.factories.DynamicTableSourceFactory;
10+
import org.apache.flink.table.factories.FactoryUtil;
11+
import org.apache.flink.table.types.logical.LogicalTypeRoot;
12+
13+
import java.util.HashSet;
14+
import java.util.List;
15+
import java.util.Set;
16+
import java.util.stream.Collectors;
17+
18+
import static org.apache.flink.configuration.ConfigOptions.key;
19+
20+
/** FileSource connector Factory. */
21+
public class FileDynamicTableFactory implements DynamicTableSourceFactory {
22+
23+
private static final String IDENTIFIER = "file";
24+
25+
private static final ConfigOption<String> PATH =
26+
key("path").stringType().noDefaultValue().withDescription("The path of a directory");
27+
28+
@Override
29+
public DynamicTableSource createDynamicTableSource(Context context) {
30+
final FactoryUtil.TableFactoryHelper helper =
31+
FactoryUtil.createTableFactoryHelper(this, context);
32+
helper.validate();
33+
34+
final ReadableConfig config = helper.getOptions();
35+
final ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();
36+
final List<Column> physicalColumns =
37+
schema.getColumns().stream()
38+
.filter(Column::isPhysical)
39+
.collect(Collectors.toList());
40+
41+
if (physicalColumns.size() != 1
42+
|| !physicalColumns
43+
.get(0)
44+
.getDataType()
45+
.getLogicalType()
46+
.getTypeRoot()
47+
.equals(LogicalTypeRoot.VARCHAR)) {
48+
throw new ValidationException(
49+
String.format(
50+
"Currently, we can only read files line by line. "
51+
+ "That is, only one physical field of type STRING is supported, but got %d columns (%s).",
52+
physicalColumns.size(),
53+
physicalColumns.stream()
54+
.map(
55+
column ->
56+
column.getName()
57+
+ " "
58+
+ column.getDataType().toString())
59+
.collect(Collectors.joining(", "))));
60+
}
61+
62+
return new FileDynamicSource(config.get(PATH), schema);
63+
}
64+
65+
@Override
66+
public String factoryIdentifier() {
67+
return IDENTIFIER;
68+
}
69+
70+
@Override
71+
public Set<ConfigOption<?>> requiredOptions() {
72+
Set<ConfigOption<?>> options = new HashSet<>();
73+
options.add(PATH);
74+
return options;
75+
}
76+
77+
@Override
78+
public Set<ConfigOption<?>> optionalOptions() {
79+
return new HashSet<>();
80+
}
81+
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
com.tencent.cloud.oceanus.connector.file.table.FileDynamicTableFactory

0 commit comments

Comments
 (0)