Skip to content

Commit b5d9c0d

Browse files
authored
[hotfix-#1847][chunjun-connector-kafka] DynamicKafkaSerializationSchema durationCounter have two different type(#1847) (#1962)
1 parent 60bc4eb commit b5d9c0d

File tree

3 files changed

+7
-23
lines changed

3 files changed

+7
-23
lines changed

chunjun-connectors/chunjun-connector-kafka/src/main/java/com/dtstack/chunjun/connector/kafka/sink/DynamicKafkaSerializationSchema.java

Lines changed: 3 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,7 @@
2626
import com.dtstack.chunjun.metrics.BaseMetric;
2727
import com.dtstack.chunjun.metrics.RowSizeCalculator;
2828
import com.dtstack.chunjun.restore.FormatState;
29-
import com.dtstack.chunjun.throwable.ChunJunRuntimeException;
3029
import com.dtstack.chunjun.util.JsonUtil;
31-
import com.dtstack.chunjun.util.ReflectionUtils;
3230

3331
import org.apache.flink.api.common.ExecutionConfig;
3432
import org.apache.flink.api.common.accumulators.LongCounter;
@@ -52,8 +50,6 @@
5250
import javax.annotation.Nullable;
5351

5452
import java.io.Serializable;
55-
import java.lang.reflect.InvocationTargetException;
56-
import java.util.Objects;
5753
import java.util.Properties;
5854

5955
/**
@@ -365,21 +361,9 @@ private void initStatisticsAccumulator() {
365361
numWriteCounter = runtimeContext.getLongCounter(Metrics.NUM_WRITES);
366362
snapshotWriteCounter = runtimeContext.getLongCounter(Metrics.SNAPSHOT_WRITES);
367363
bytesWriteCounter = runtimeContext.getLongCounter(Metrics.WRITE_BYTES);
368-
try {
369-
durationCounter =
370-
(LongMaximum)
371-
Objects.requireNonNull(
372-
ReflectionUtils.getDeclaredMethod(
373-
runtimeContext,
374-
"getAccumulator",
375-
String.class,
376-
Class.class))
377-
.invoke(
378-
runtimeContext,
379-
Metrics.WRITE_DURATION,
380-
LongMaximum.class);
381-
} catch (IllegalAccessException | InvocationTargetException e) {
382-
throw new ChunJunRuntimeException(e);
364+
durationCounter = new LongMaximum(0);
365+
if (runtimeContext.getAccumulator(Metrics.WRITE_DURATION) == null) {
366+
runtimeContext.addAccumulator(Metrics.WRITE_DURATION, durationCounter);
383367
}
384368

385369
outputMetric = new BaseMetric(runtimeContext);

chunjun-core/src/main/java/com/dtstack/chunjun/metrics/AccumulatorCollector.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ public AccumulatorCollector(StreamingRuntimeContext context, List<String> metric
6464
valueAccumulatorMap = new HashMap<>(metricNames.size());
6565
for (String metricName : metricNames) {
6666
valueAccumulatorMap.put(
67-
metricName, new ValueAccumulator(context.getLongCounter(metricName), 0));
67+
metricName, new ValueAccumulator(context.getAccumulator(metricName), 0));
6868
}
6969

7070
scheduledExecutorService =
@@ -179,6 +179,6 @@ public long getLocalAccumulatorValue(String name) {
179179
if (valueAccumulator == null) {
180180
return 0;
181181
}
182-
return valueAccumulator.getLocal().getLocalValue();
182+
return (long) valueAccumulator.getLocal().getLocalValue();
183183
}
184184
}

chunjun-core/src/main/java/com/dtstack/chunjun/metrics/ValueAccumulator.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
*/
1818
package com.dtstack.chunjun.metrics;
1919

20-
import org.apache.flink.api.common.accumulators.LongCounter;
20+
import org.apache.flink.api.common.accumulators.Accumulator;
2121

2222
import lombok.AllArgsConstructor;
2323
import lombok.Getter;
@@ -27,6 +27,6 @@
2727
@Getter
2828
@Setter
2929
public class ValueAccumulator {
30-
private final LongCounter local;
30+
private final Accumulator local;
3131
private long global;
3232
}

0 commit comments

Comments
 (0)