Skip to content

Commit d7124c6

Browse files
committed
Add MySqlToDorisE2eITCase back.
1 parent 554c885 commit d7124c6

File tree

2 files changed

+96
-1
lines changed

2 files changed

+96
-1
lines changed

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchSink.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434

3535
/**
3636
* Copy from <a
37-
* href="https://github.com/apache/doris-flink-connector/blob/25.1.0/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchSink.java">...</a>
37+
* href="https://github.com/apache/doris-flink-connector/blob/25.1.0/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchSink.java">DorisBatchSink</a>
3838
* to add {@link #createWriter(WriterInitContext)}} method.
3939
*/
4040
@Deprecated
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package com.oceanbase.connector.flink.sink;
19+
20+
import org.apache.flink.api.common.typeutils.TypeSerializer;
21+
import org.apache.flink.api.connector.sink2.InitContextAdapter;
22+
import org.apache.flink.api.connector.sink2.Sink;
23+
import org.apache.flink.api.connector.sink2.SinkWriter;
24+
import org.apache.flink.api.connector.sink2.WriterInitContext;
25+
26+
import com.oceanbase.connector.flink.ConnectorOptions;
27+
import com.oceanbase.connector.flink.table.DataChangeRecord;
28+
import com.oceanbase.connector.flink.table.RecordSerializationSchema;
29+
30+
import java.io.IOException;
31+
32+
/**
33+
* Copy from <a
34+
* href="https://github.com/oceanbase/flink-connector-oceanbase/blob/v1.2/flink-connector-oceanbase-base/src/main/java/com/oceanbase/connector/flink/sink/OceanBaseSink.java">OceanBaseSink</a>
35+
* to add {@link #createWriter(WriterInitContext)} method.
36+
*/
37+
public class OceanBaseSink<T> implements Sink<T> {
38+
39+
private static final long serialVersionUID = 1L;
40+
41+
private final ConnectorOptions options;
42+
private final TypeSerializer<T> typeSerializer;
43+
private final RecordSerializationSchema<T> recordSerializer;
44+
private final DataChangeRecord.KeyExtractor keyExtractor;
45+
private final RecordFlusher recordFlusher;
46+
private final OceanBaseWriterEvent.Listener writerEventListener;
47+
48+
public OceanBaseSink(
49+
ConnectorOptions options,
50+
TypeSerializer<T> typeSerializer,
51+
RecordSerializationSchema<T> recordSerializer,
52+
DataChangeRecord.KeyExtractor keyExtractor,
53+
RecordFlusher recordFlusher) {
54+
this(options, typeSerializer, recordSerializer, keyExtractor, recordFlusher, null);
55+
}
56+
57+
public OceanBaseSink(
58+
ConnectorOptions options,
59+
TypeSerializer<T> typeSerializer,
60+
RecordSerializationSchema<T> recordSerializer,
61+
DataChangeRecord.KeyExtractor keyExtractor,
62+
RecordFlusher recordFlusher,
63+
OceanBaseWriterEvent.Listener writerEventListener) {
64+
this.options = options;
65+
this.typeSerializer = typeSerializer;
66+
this.recordSerializer = recordSerializer;
67+
this.keyExtractor = keyExtractor;
68+
this.recordFlusher = recordFlusher;
69+
this.writerEventListener = writerEventListener;
70+
}
71+
72+
@Override
73+
public SinkWriter<T> createWriter(InitContext context) {
74+
return new OceanBaseWriter<>(
75+
options,
76+
context,
77+
typeSerializer,
78+
recordSerializer,
79+
keyExtractor,
80+
recordFlusher,
81+
writerEventListener);
82+
}
83+
84+
@Override
85+
public SinkWriter<T> createWriter(WriterInitContext writerInitContext) throws IOException {
86+
return new OceanBaseWriter<>(
87+
options,
88+
new InitContextAdapter(writerInitContext),
89+
typeSerializer,
90+
recordSerializer,
91+
keyExtractor,
92+
recordFlusher,
93+
writerEventListener);
94+
}
95+
}

0 commit comments

Comments
 (0)