Skip to content

Commit 24f392d

Browse files
committed
kafka1.0 connector
1 parent 4f38b7f commit 24f392d

File tree

16 files changed

+1592
-0
lines changed

16 files changed

+1592
-0
lines changed

kafka/kafka-sink/pom.xml

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<parent>
6+
<artifactId>sql.kafka</artifactId>
7+
<groupId>com.dtstack.flink</groupId>
8+
<version>1.0-SNAPSHOT</version>
9+
<relativePath>../pom.xml</relativePath>
10+
</parent>
11+
<modelVersion>4.0.0</modelVersion>
12+
13+
<artifactId>sql.sink.kafka</artifactId>
14+
<version>1.0-SNAPSHOT</version>
15+
<name>kafka-sink</name>
16+
<packaging>jar</packaging>
17+
18+
19+
<dependencies>
20+
<dependency>
21+
<groupId>org.apache.flink</groupId>
22+
<artifactId>flink-json</artifactId>
23+
<version>${flink.version}</version>
24+
</dependency>
25+
</dependencies>
26+
27+
<build>
28+
<plugins>
29+
<plugin>
30+
<groupId>org.apache.maven.plugins</groupId>
31+
<artifactId>maven-shade-plugin</artifactId>
32+
<version>1.4</version>
33+
<executions>
34+
<execution>
35+
<phase>package</phase>
36+
<goals>
37+
<goal>shade</goal>
38+
</goals>
39+
<configuration>
40+
<artifactSet>
41+
<excludes>
42+
<exclude>org.slf4j</exclude>
43+
</excludes>
44+
</artifactSet>
45+
<filters>
46+
<filter>
47+
<artifact>*:*</artifact>
48+
<excludes>
49+
<exclude>META-INF/*.SF</exclude>
50+
<exclude>META-INF/*.DSA</exclude>
51+
<exclude>META-INF/*.RSA</exclude>
52+
</excludes>
53+
</filter>
54+
</filters>
55+
</configuration>
56+
</execution>
57+
</executions>
58+
</plugin>
59+
60+
<plugin>
61+
<artifactId>maven-antrun-plugin</artifactId>
62+
<version>1.2</version>
63+
<executions>
64+
<execution>
65+
<id>copy-resources</id>
66+
<!-- here the phase you need -->
67+
<phase>package</phase>
68+
<goals>
69+
<goal>run</goal>
70+
</goals>
71+
<configuration>
72+
<tasks>
73+
<copy todir="${basedir}/../../plugins/kafkasink">
74+
<fileset dir="target/">
75+
<include name="${project.artifactId}-${project.version}.jar"/>
76+
</fileset>
77+
</copy>
78+
79+
<move file="${basedir}/../../plugins/kafkasink/${project.artifactId}-${project.version}.jar"
80+
tofile="${basedir}/../../plugins/kafkasink/${project.name}-${git.branch}.jar"/>
81+
</tasks>
82+
</configuration>
83+
</execution>
84+
</executions>
85+
</plugin>
86+
</plugins>
87+
</build>
88+
89+
</project>
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.sink.kafka;
20+
21+
import com.dtstack.flink.sql.metric.MetricConstant;
22+
import org.apache.flink.api.common.functions.RuntimeContext;
23+
import org.apache.flink.api.common.serialization.SerializationSchema;
24+
import org.apache.flink.configuration.Configuration;
25+
import org.apache.flink.metrics.Counter;
26+
import org.apache.flink.metrics.MeterView;
27+
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
28+
29+
import java.util.Properties;
30+
31+
/**
32+
* @author: chuixue
33+
* @create: 2019-11-05 11:54
34+
* @description:
35+
**/
36+
public class CustomerFlinkKafkaProducer<Row> extends FlinkKafkaProducer<Row> {
37+
38+
CustomerJsonRowSerializationSchema schema;
39+
40+
public CustomerFlinkKafkaProducer(String topicId, SerializationSchema<Row> serializationSchema, Properties producerConfig) {
41+
super(topicId, serializationSchema, producerConfig);
42+
this.schema = (CustomerJsonRowSerializationSchema) serializationSchema;
43+
}
44+
45+
@Override
46+
public void open(Configuration configuration) {
47+
RuntimeContext ctx = getRuntimeContext();
48+
Counter counter = ctx.getMetricGroup().counter(MetricConstant.DT_NUM_RECORDS_OUT);
49+
MeterView meter = ctx.getMetricGroup().meter(MetricConstant.DT_NUM_RECORDS_OUT_RATE, new MeterView(counter, 20));
50+
51+
schema.setCounter(counter);
52+
53+
try {
54+
super.open(configuration);
55+
} catch (Exception e) {
56+
throw new RuntimeException("",e);
57+
}
58+
}
59+
60+
}
Lines changed: 222 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,222 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.sink.kafka;
20+
21+
import org.apache.flink.annotation.PublicEvolving;
22+
import org.apache.flink.api.common.serialization.SerializationSchema;
23+
import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
24+
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
25+
import org.apache.flink.api.common.typeinfo.TypeInformation;
26+
import org.apache.flink.api.common.typeinfo.Types;
27+
import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
28+
import org.apache.flink.api.java.typeutils.RowTypeInfo;
29+
import org.apache.flink.formats.json.JsonRowDeserializationSchema;
30+
import org.apache.flink.formats.json.JsonRowSchemaConverter;
31+
import org.apache.flink.metrics.Counter;
32+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
33+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
34+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
35+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ContainerNode;
36+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
37+
import org.apache.flink.types.Row;
38+
import org.apache.flink.util.Preconditions;
39+
40+
import java.math.BigDecimal;
41+
import java.math.BigInteger;
42+
import java.sql.Time;
43+
import java.sql.Timestamp;
44+
import java.text.SimpleDateFormat;
45+
46+
/**
47+
* Serialization schema that serializes an object of Flink types into a JSON bytes.
48+
*
49+
* <p>Serializes the input Flink object into a JSON string and
50+
* converts it into <code>byte[]</code>.
51+
*
52+
* <p>Result <code>byte[]</code> messages can be deserialized using {@link JsonRowDeserializationSchema}.
53+
*/
54+
@PublicEvolving
55+
public class CustomerJsonRowSerializationSchema implements SerializationSchema<Row> {
56+
57+
private static final long serialVersionUID = -2885556750743978636L;
58+
59+
/** Type information describing the input type. */
60+
private final TypeInformation<Row> typeInfo;
61+
62+
/** Object mapper that is used to create output JSON objects. */
63+
private final ObjectMapper mapper = new ObjectMapper();
64+
65+
/** Formatter for RFC 3339-compliant string representation of a time value (with UTC timezone, without milliseconds). */
66+
private SimpleDateFormat timeFormat = new SimpleDateFormat("HH:mm:ss'Z'");
67+
68+
/** Formatter for RFC 3339-compliant string representation of a time value (with UTC timezone). */
69+
private SimpleDateFormat timeFormatWithMillis = new SimpleDateFormat("HH:mm:ss.SSS'Z'");
70+
71+
/** Formatter for RFC 3339-compliant string representation of a timestamp value (with UTC timezone). */
72+
private SimpleDateFormat timestampFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
73+
74+
/** Reusable object node. */
75+
private transient ObjectNode node;
76+
77+
private Counter counter;
78+
79+
/**
80+
* Creates a JSON serialization schema for the given type information.
81+
*
82+
* @param typeInfo The field names of {@link Row} are used to map to JSON properties.
83+
*/
84+
public CustomerJsonRowSerializationSchema(TypeInformation<Row> typeInfo) {
85+
Preconditions.checkNotNull(typeInfo, "Type information");
86+
this.typeInfo = typeInfo;
87+
}
88+
89+
/**
90+
* Creates a JSON serialization schema for the given JSON schema.
91+
*
92+
* @param jsonSchema JSON schema describing the result type
93+
*
94+
* @see <a href="http://json-schema.org/">http://json-schema.org/</a>
95+
*/
96+
public CustomerJsonRowSerializationSchema(String jsonSchema) {
97+
this(JsonRowSchemaConverter.convert(jsonSchema));
98+
}
99+
100+
@Override
101+
public byte[] serialize(Row row) {
102+
if (node == null) {
103+
node = mapper.createObjectNode();
104+
}
105+
106+
try {
107+
convertRow(node, (RowTypeInfo) typeInfo, row);
108+
counter.inc();
109+
return mapper.writeValueAsBytes(node);
110+
} catch (Throwable t) {
111+
throw new RuntimeException("Could not serialize row '" + row + "'. " +
112+
"Make sure that the schema matches the input.", t);
113+
}
114+
}
115+
116+
// --------------------------------------------------------------------------------------------
117+
118+
private ObjectNode convertRow(ObjectNode reuse, RowTypeInfo info, Row row) {
119+
if (reuse == null) {
120+
reuse = mapper.createObjectNode();
121+
}
122+
final String[] fieldNames = info.getFieldNames();
123+
final TypeInformation<?>[] fieldTypes = info.getFieldTypes();
124+
125+
// validate the row
126+
if (row.getArity() != fieldNames.length) {
127+
throw new IllegalStateException(String.format(
128+
"Number of elements in the row '%s' is different from number of field names: %d", row, fieldNames.length));
129+
}
130+
131+
for (int i = 0; i < fieldNames.length; i++) {
132+
final String name = fieldNames[i];
133+
134+
final JsonNode fieldConverted = convert(reuse, reuse.get(name), fieldTypes[i], row.getField(i));
135+
reuse.set(name, fieldConverted);
136+
}
137+
138+
return reuse;
139+
}
140+
141+
private JsonNode convert(ContainerNode<?> container, JsonNode reuse, TypeInformation<?> info, Object object) {
142+
if (info == Types.VOID || object == null) {
143+
return container.nullNode();
144+
} else if (info == Types.BOOLEAN) {
145+
return container.booleanNode((Boolean) object);
146+
} else if (info == Types.STRING) {
147+
return container.textNode((String) object);
148+
} else if (info == Types.BIG_DEC) {
149+
// convert decimal if necessary
150+
if (object instanceof BigDecimal) {
151+
return container.numberNode((BigDecimal) object);
152+
}
153+
return container.numberNode(BigDecimal.valueOf(((Number) object).doubleValue()));
154+
} else if (info == Types.BIG_INT) {
155+
// convert integer if necessary
156+
if (object instanceof BigInteger) {
157+
return container.numberNode((BigInteger) object);
158+
}
159+
return container.numberNode(BigInteger.valueOf(((Number) object).longValue()));
160+
} else if (info == Types.SQL_DATE) {
161+
return container.textNode(object.toString());
162+
} else if (info == Types.SQL_TIME) {
163+
final Time time = (Time) object;
164+
// strip milliseconds if possible
165+
if (time.getTime() % 1000 > 0) {
166+
return container.textNode(timeFormatWithMillis.format(time));
167+
}
168+
return container.textNode(timeFormat.format(time));
169+
} else if (info == Types.SQL_TIMESTAMP) {
170+
return container.textNode(timestampFormat.format((Timestamp) object));
171+
} else if (info instanceof RowTypeInfo) {
172+
if (reuse != null && reuse instanceof ObjectNode) {
173+
return convertRow((ObjectNode) reuse, (RowTypeInfo) info, (Row) object);
174+
} else {
175+
return convertRow(null, (RowTypeInfo) info, (Row) object);
176+
}
177+
} else if (info instanceof ObjectArrayTypeInfo) {
178+
if (reuse != null && reuse instanceof ArrayNode) {
179+
return convertObjectArray((ArrayNode) reuse, ((ObjectArrayTypeInfo) info).getComponentInfo(), (Object[]) object);
180+
} else {
181+
return convertObjectArray(null, ((ObjectArrayTypeInfo) info).getComponentInfo(), (Object[]) object);
182+
}
183+
} else if (info instanceof BasicArrayTypeInfo) {
184+
if (reuse != null && reuse instanceof ArrayNode) {
185+
return convertObjectArray((ArrayNode) reuse, ((BasicArrayTypeInfo) info).getComponentInfo(), (Object[]) object);
186+
} else {
187+
return convertObjectArray(null, ((BasicArrayTypeInfo) info).getComponentInfo(), (Object[]) object);
188+
}
189+
} else if (info instanceof PrimitiveArrayTypeInfo && ((PrimitiveArrayTypeInfo) info).getComponentType() == Types.BYTE) {
190+
return container.binaryNode((byte[]) object);
191+
} else {
192+
// for types that were specified without JSON schema
193+
// e.g. POJOs
194+
try {
195+
return mapper.valueToTree(object);
196+
} catch (IllegalArgumentException e) {
197+
throw new IllegalStateException("Unsupported type information '" + info + "' for object: " + object, e);
198+
}
199+
}
200+
}
201+
202+
private ArrayNode convertObjectArray(ArrayNode reuse, TypeInformation<?> info, Object[] array) {
203+
if (reuse == null) {
204+
reuse = mapper.createArrayNode();
205+
} else {
206+
reuse.removeAll();
207+
}
208+
209+
for (Object object : array) {
210+
reuse.add(convert(reuse, null, info, object));
211+
}
212+
return reuse;
213+
}
214+
215+
public Counter getCounter() {
216+
return counter;
217+
}
218+
219+
public void setCounter(Counter counter) {
220+
this.counter = counter;
221+
}
222+
}

0 commit comments

Comments
 (0)