Skip to content

Commit f35aaad

Browse files
committed
Merge branch 'v1.5.0_dev_kafkasinkmetric' into 'v1.5.0_dev'
kafkasink metric See merge request !14
2 parents 0ebbe2f + 3b9b0ba commit f35aaad

File tree

15 files changed

+879
-15
lines changed

15 files changed

+879
-15
lines changed

kafka09/kafka09-sink/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939
<configuration>
4040
<artifactSet>
4141
<excludes>
42-
42+
<exclude>org.slf4j</exclude>
4343
</excludes>
4444
</artifactSet>
4545
<filters>
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
* <p>
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
* <p>
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package com.dtstack.flink.sql.sink.kafka;
19+
20+
import com.dtstack.flink.sql.metric.MetricConstant;
21+
import org.apache.flink.api.common.functions.RuntimeContext;
22+
import org.apache.flink.api.common.serialization.SerializationSchema;
23+
import org.apache.flink.configuration.Configuration;
24+
import org.apache.flink.metrics.Counter;
25+
import org.apache.flink.metrics.MeterView;
26+
import org.apache.flink.metrics.MetricGroup;
27+
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
28+
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer09;
29+
import org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper;
30+
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
31+
import org.apache.kafka.clients.producer.Callback;
32+
import org.apache.kafka.clients.producer.RecordMetadata;
33+
import org.apache.kafka.common.Metric;
34+
import org.apache.kafka.common.MetricName;
35+
36+
import java.util.Map;
37+
import java.util.Properties;
38+
39+
/**
40+
* Reason:
41+
* Date: 2019/4/24
42+
* Company: www.dtstack.com
43+
*
44+
* @author maqi
45+
*/
46+
public class CustomerFlinkKafkaProducer09<Row> extends FlinkKafkaProducer09<Row> {
47+
48+
CustomerJsonRowSerializationSchema schema;
49+
50+
public CustomerFlinkKafkaProducer09(String topicId, SerializationSchema<Row> serializationSchema, Properties producerConfig) {
51+
super(topicId, serializationSchema, producerConfig);
52+
this.schema = (CustomerJsonRowSerializationSchema) serializationSchema;
53+
}
54+
55+
@Override
56+
public void open(Configuration configuration) {
57+
producer = getKafkaProducer(this.producerConfig);
58+
59+
RuntimeContext ctx = getRuntimeContext();
60+
Counter counter = ctx.getMetricGroup().counter(MetricConstant.DT_NUM_RECORDS_OUT);
61+
MeterView meter = ctx.getMetricGroup().meter(MetricConstant.DT_NUM_RECORDS_OUT_RATE, new MeterView(counter, 20));
62+
63+
schema.setCounter(counter);
64+
65+
super.open(configuration);
66+
}
67+
68+
}
Lines changed: 219 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,219 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package com.dtstack.flink.sql.sink.kafka;
19+
20+
import org.apache.flink.api.common.serialization.SerializationSchema;
21+
import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
22+
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
23+
import org.apache.flink.api.common.typeinfo.TypeInformation;
24+
import org.apache.flink.api.common.typeinfo.Types;
25+
import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
26+
import org.apache.flink.api.java.typeutils.RowTypeInfo;
27+
import org.apache.flink.formats.json.JsonRowDeserializationSchema;
28+
import org.apache.flink.formats.json.JsonSchemaConverter;
29+
import org.apache.flink.metrics.Counter;
30+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
31+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
32+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
33+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ContainerNode;
34+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
35+
import org.apache.flink.types.Row;
36+
import org.apache.flink.util.Preconditions;
37+
38+
import java.math.BigDecimal;
39+
import java.math.BigInteger;
40+
import java.sql.Time;
41+
import java.sql.Timestamp;
42+
import java.text.SimpleDateFormat;
43+
44+
/**
45+
* Serialization schema that serializes an object of Flink types into a JSON bytes.
46+
*
47+
* <p>Serializes the input Flink object into a JSON string and
48+
* converts it into <code>byte[]</code>.
49+
*
50+
* <p>Result <code>byte[]</code> messages can be deserialized using {@link JsonRowDeserializationSchema}.
51+
*/
52+
public class CustomerJsonRowSerializationSchema implements SerializationSchema<Row> {
53+
54+
private static final long serialVersionUID = -2885556750743978636L;
55+
56+
/** Type information describing the input type. */
57+
private final TypeInformation<Row> typeInfo;
58+
59+
/** Object mapper that is used to create output JSON objects. */
60+
private final ObjectMapper mapper = new ObjectMapper();
61+
62+
/** Formatter for RFC 3339-compliant string representation of a time value (with UTC timezone, without milliseconds). */
63+
private SimpleDateFormat timeFormat = new SimpleDateFormat("HH:mm:ss'Z'");
64+
65+
/** Formatter for RFC 3339-compliant string representation of a time value (with UTC timezone). */
66+
private SimpleDateFormat timeFormatWithMillis = new SimpleDateFormat("HH:mm:ss.SSS'Z'");
67+
68+
/** Formatter for RFC 3339-compliant string representation of a timestamp value (with UTC timezone). */
69+
private SimpleDateFormat timestampFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
70+
71+
/** Reusable object node. */
72+
private transient ObjectNode node;
73+
74+
private Counter counter;
75+
76+
/**
77+
* Creates a JSON serialization schema for the given type information.
78+
*
79+
* @param typeInfo The field names of {@link Row} are used to map to JSON properties.
80+
*/
81+
public CustomerJsonRowSerializationSchema(TypeInformation<Row> typeInfo) {
82+
Preconditions.checkNotNull(typeInfo, "Type information");
83+
this.typeInfo = typeInfo;
84+
}
85+
86+
/**
87+
* Creates a JSON serialization schema for the given JSON schema.
88+
*
89+
* @param jsonSchema JSON schema describing the result type
90+
*
91+
* @see <a href="http://json-schema.org/">http://json-schema.org/</a>
92+
*/
93+
public CustomerJsonRowSerializationSchema(String jsonSchema) {
94+
this(JsonSchemaConverter.convert(jsonSchema));
95+
}
96+
97+
@Override
98+
public byte[] serialize(Row row) {
99+
if (node == null) {
100+
node = mapper.createObjectNode();
101+
}
102+
103+
try {
104+
convertRow(node, (RowTypeInfo) typeInfo, row);
105+
counter.inc();
106+
return mapper.writeValueAsBytes(node);
107+
} catch (Throwable t) {
108+
throw new RuntimeException("Could not serialize row '" + row + "'. " +
109+
"Make sure that the schema matches the input.", t);
110+
}
111+
}
112+
113+
// --------------------------------------------------------------------------------------------
114+
115+
private ObjectNode convertRow(ObjectNode reuse, RowTypeInfo info, Row row) {
116+
if (reuse == null) {
117+
reuse = mapper.createObjectNode();
118+
}
119+
final String[] fieldNames = info.getFieldNames();
120+
final TypeInformation<?>[] fieldTypes = info.getFieldTypes();
121+
122+
// validate the row
123+
if (row.getArity() != fieldNames.length) {
124+
throw new IllegalStateException(String.format(
125+
"Number of elements in the row '%s' is different from number of field names: %d", row, fieldNames.length));
126+
}
127+
128+
for (int i = 0; i < fieldNames.length; i++) {
129+
final String name = fieldNames[i];
130+
131+
final JsonNode fieldConverted = convert(reuse, reuse.get(name), fieldTypes[i], row.getField(i));
132+
reuse.set(name, fieldConverted);
133+
}
134+
135+
return reuse;
136+
}
137+
138+
private JsonNode convert(ContainerNode<?> container, JsonNode reuse, TypeInformation<?> info, Object object) {
139+
if (info == Types.VOID || object == null) {
140+
return container.nullNode();
141+
} else if (info == Types.BOOLEAN) {
142+
return container.booleanNode((Boolean) object);
143+
} else if (info == Types.STRING) {
144+
return container.textNode((String) object);
145+
} else if (info == Types.BIG_DEC) {
146+
// convert decimal if necessary
147+
if (object instanceof BigDecimal) {
148+
return container.numberNode((BigDecimal) object);
149+
}
150+
return container.numberNode(BigDecimal.valueOf(((Number) object).doubleValue()));
151+
} else if (info == Types.BIG_INT) {
152+
// convert integer if necessary
153+
if (object instanceof BigInteger) {
154+
return container.numberNode((BigInteger) object);
155+
}
156+
return container.numberNode(BigInteger.valueOf(((Number) object).longValue()));
157+
} else if (info == Types.SQL_DATE) {
158+
return container.textNode(object.toString());
159+
} else if (info == Types.SQL_TIME) {
160+
final Time time = (Time) object;
161+
// strip milliseconds if possible
162+
if (time.getTime() % 1000 > 0) {
163+
return container.textNode(timeFormatWithMillis.format(time));
164+
}
165+
return container.textNode(timeFormat.format(time));
166+
} else if (info == Types.SQL_TIMESTAMP) {
167+
return container.textNode(timestampFormat.format((Timestamp) object));
168+
} else if (info instanceof RowTypeInfo) {
169+
if (reuse != null && reuse instanceof ObjectNode) {
170+
return convertRow((ObjectNode) reuse, (RowTypeInfo) info, (Row) object);
171+
} else {
172+
return convertRow(null, (RowTypeInfo) info, (Row) object);
173+
}
174+
} else if (info instanceof ObjectArrayTypeInfo) {
175+
if (reuse != null && reuse instanceof ArrayNode) {
176+
return convertObjectArray((ArrayNode) reuse, ((ObjectArrayTypeInfo) info).getComponentInfo(), (Object[]) object);
177+
} else {
178+
return convertObjectArray(null, ((ObjectArrayTypeInfo) info).getComponentInfo(), (Object[]) object);
179+
}
180+
} else if (info instanceof BasicArrayTypeInfo) {
181+
if (reuse != null && reuse instanceof ArrayNode) {
182+
return convertObjectArray((ArrayNode) reuse, ((BasicArrayTypeInfo) info).getComponentInfo(), (Object[]) object);
183+
} else {
184+
return convertObjectArray(null, ((BasicArrayTypeInfo) info).getComponentInfo(), (Object[]) object);
185+
}
186+
} else if (info instanceof PrimitiveArrayTypeInfo && ((PrimitiveArrayTypeInfo) info).getComponentType() == Types.BYTE) {
187+
return container.binaryNode((byte[]) object);
188+
} else {
189+
// for types that were specified without JSON schema
190+
// e.g. POJOs
191+
try {
192+
return mapper.valueToTree(object);
193+
} catch (IllegalArgumentException e) {
194+
throw new IllegalStateException("Unsupported type information '" + info + "' for object: " + object, e);
195+
}
196+
}
197+
}
198+
199+
private ArrayNode convertObjectArray(ArrayNode reuse, TypeInformation<?> info, Object[] array) {
200+
if (reuse == null) {
201+
reuse = mapper.createArrayNode();
202+
} else {
203+
reuse.removeAll();
204+
}
205+
206+
for (Object object : array) {
207+
reuse.add(convert(reuse, null, info, object));
208+
}
209+
return reuse;
210+
}
211+
212+
public Counter getCounter() {
213+
return counter;
214+
}
215+
216+
public void setCounter(Counter counter) {
217+
this.counter = counter;
218+
}
219+
}

kafka09/kafka09-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/CustomerKafka09JsonTableSink.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ public CustomerKafka09JsonTableSink(String topic, Properties properties, KafkaPa
6060

6161
@Override
6262
protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties, SerializationSchema<Row> serializationSchema, FlinkKafkaPartitioner<Row> partitioner) {
63-
return new FlinkKafkaProducer09<>(topic, serializationSchema, properties, partitioner);
63+
return new CustomerFlinkKafkaProducer09<Row>(topic, serializationSchema, properties);
6464
}
6565

6666
@Override

kafka09/kafka09-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/KafkaSink.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import org.apache.flink.api.common.serialization.SerializationSchema;
2525
import org.apache.flink.api.common.typeinfo.TypeInformation;
2626
import org.apache.flink.api.java.typeutils.RowTypeInfo;
27-
import org.apache.flink.formats.json.JsonRowSerializationSchema;
2827
import org.apache.flink.streaming.api.datastream.DataStream;
2928
import org.apache.flink.streaming.connectors.kafka.KafkaTableSink;
3029
import org.apache.flink.table.sinks.AppendStreamTableSink;
@@ -69,7 +68,7 @@ public KafkaSink genStreamSink(TargetTableInfo targetTableInfo) {
6968
}
7069
properties.setProperty("bootstrap.servers", kafka09SinkTableInfo.getBootstrapServers());
7170

72-
this.serializationSchema = new JsonRowSerializationSchema(getOutputType());
71+
this.serializationSchema = new CustomerJsonRowSerializationSchema(getOutputType());
7372
return this;
7473
}
7574

kafka10/kafka10-sink/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939
<configuration>
4040
<artifactSet>
4141
<excludes>
42-
42+
<exclude>org.slf4j</exclude>
4343
</excludes>
4444
</artifactSet>
4545
<filters>

0 commit comments

Comments
 (0)