|
36 | 36 | import org.slf4j.Logger; |
37 | 37 | import org.slf4j.LoggerFactory; |
38 | 38 |
|
39 | | -import java.io.Serializable; |
40 | 39 | import java.util.HashMap; |
41 | 40 | import java.util.Iterator; |
42 | 41 | import java.util.Map; |
|
57 | 56 | * emit records rather than emit the records directly. |
58 | 57 | */ |
59 | 58 | public class IncrementalSourceRecordEmitter<T> |
60 | | - implements RecordEmitter<SourceRecords, T, SourceSplitState>, Serializable { |
| 59 | + implements RecordEmitter<SourceRecords, T, SourceSplitState> { |
61 | 60 |
|
62 | 61 | private static final Logger LOG = LoggerFactory.getLogger(IncrementalSourceRecordEmitter.class); |
63 | 62 | private static final FlinkJsonTableChangeSerializer TABLE_CHANGE_SERIALIZER = |
64 | 63 | new FlinkJsonTableChangeSerializer(); |
65 | 64 |
|
66 | | - protected DebeziumDeserializationSchema<T> debeziumDeserializationSchema; |
67 | | - protected SourceReaderMetrics sourceReaderMetrics; |
68 | | - protected boolean includeSchemaChanges; |
69 | | - protected OutputCollector<T> outputCollector; |
70 | | - protected OffsetFactory offsetFactory; |
| 65 | + protected final DebeziumDeserializationSchema<T> debeziumDeserializationSchema; |
| 66 | + protected final SourceReaderMetrics sourceReaderMetrics; |
| 67 | + protected final boolean includeSchemaChanges; |
| 68 | + protected final OutputCollector<T> outputCollector; |
| 69 | + protected final OffsetFactory offsetFactory; |
71 | 70 |
|
72 | 71 | public IncrementalSourceRecordEmitter( |
73 | 72 | DebeziumDeserializationSchema<T> debeziumDeserializationSchema, |
@@ -153,10 +152,6 @@ public Offset getOffsetPosition(Map<String, ?> offset) { |
153 | 152 | return offsetFactory.newOffset(offsetStrMap); |
154 | 153 | } |
155 | 154 |
|
156 | | - public void setSourceReaderMetrics(SourceReaderMetrics sourceReaderMetrics) { |
157 | | - this.sourceReaderMetrics = sourceReaderMetrics; |
158 | | - } |
159 | | - |
160 | 155 | protected void emitElement(SourceRecord element, SourceOutput<T> output) throws Exception { |
161 | 156 | sourceReaderMetrics.markRecord(); |
162 | 157 | sourceReaderMetrics.updateRecordCounters(element); |
@@ -191,7 +186,7 @@ protected void reportMetrics(SourceRecord element) { |
191 | 186 | } |
192 | 187 |
|
193 | 188 | /** An adapter between {@link SourceOutput} and {@link Collector}. */ |
194 | | - protected static class OutputCollector<T> implements Collector<T>, Serializable { |
| 189 | + protected static class OutputCollector<T> implements Collector<T> { |
195 | 190 | public SourceOutput<T> output; |
196 | 191 | public Long currentMessageTimestamp; |
197 | 192 |
|
|
0 commit comments