Skip to content

Commit 554c885

Browse files
committed
Add MySqlToDorisE2eITCase back.
1 parent b0670ae commit 554c885

File tree

4 files changed

+259
-121
lines changed

4 files changed

+259
-121
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.doris.flink.sink.batch;
19+
20+
import org.apache.flink.annotation.PublicEvolving;
21+
import org.apache.flink.annotation.VisibleForTesting;
22+
import org.apache.flink.api.connector.sink2.InitContextAdapter;
23+
import org.apache.flink.api.connector.sink2.Sink;
24+
import org.apache.flink.api.connector.sink2.SinkWriter;
25+
import org.apache.flink.api.connector.sink2.WriterInitContext;
26+
import org.apache.flink.util.Preconditions;
27+
28+
import org.apache.doris.flink.cfg.DorisExecutionOptions;
29+
import org.apache.doris.flink.cfg.DorisOptions;
30+
import org.apache.doris.flink.cfg.DorisReadOptions;
31+
import org.apache.doris.flink.sink.writer.serializer.DorisRecordSerializer;
32+
33+
import java.io.IOException;
34+
35+
/**
36+
* Copy from <a
37+
* href="https://github.com/apache/doris-flink-connector/blob/25.1.0/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchSink.java">...</a>
38+
* to add {@link #createWriter(WriterInitContext)}} method.
39+
*/
40+
@Deprecated
41+
@PublicEvolving
42+
public class DorisBatchSink<IN> implements Sink<IN> {
43+
private final DorisOptions dorisOptions;
44+
private final DorisReadOptions dorisReadOptions;
45+
private final DorisExecutionOptions dorisExecutionOptions;
46+
private final DorisRecordSerializer<IN> serializer;
47+
48+
public DorisBatchSink(
49+
DorisOptions dorisOptions,
50+
DorisReadOptions dorisReadOptions,
51+
DorisExecutionOptions dorisExecutionOptions,
52+
DorisRecordSerializer<IN> serializer) {
53+
this.dorisOptions = dorisOptions;
54+
this.dorisReadOptions = dorisReadOptions;
55+
this.dorisExecutionOptions = dorisExecutionOptions;
56+
this.serializer = serializer;
57+
}
58+
59+
@Override
60+
public SinkWriter<IN> createWriter(InitContext initContext) throws IOException {
61+
DorisBatchWriter<IN> dorisBatchWriter =
62+
new DorisBatchWriter<IN>(
63+
initContext,
64+
serializer,
65+
dorisOptions,
66+
dorisReadOptions,
67+
dorisExecutionOptions);
68+
return dorisBatchWriter;
69+
}
70+
71+
@Override
72+
public SinkWriter<IN> createWriter(WriterInitContext context) throws IOException {
73+
DorisBatchWriter<IN> dorisBatchWriter =
74+
new DorisBatchWriter<IN>(
75+
new InitContextAdapter(context),
76+
serializer,
77+
dorisOptions,
78+
dorisReadOptions,
79+
dorisExecutionOptions);
80+
return dorisBatchWriter;
81+
}
82+
83+
public static <IN> DorisBatchSink.Builder<IN> builder() {
84+
return new DorisBatchSink.Builder<>();
85+
}
86+
87+
/**
88+
* build for DorisBatchSink.
89+
*
90+
* @param <IN> record type.
91+
*/
92+
public static class Builder<IN> {
93+
private DorisOptions dorisOptions;
94+
private DorisReadOptions dorisReadOptions;
95+
private DorisExecutionOptions dorisExecutionOptions;
96+
private DorisRecordSerializer<IN> serializer;
97+
98+
public DorisBatchSink.Builder<IN> setDorisOptions(DorisOptions dorisOptions) {
99+
this.dorisOptions = dorisOptions;
100+
return this;
101+
}
102+
103+
public DorisBatchSink.Builder<IN> setDorisReadOptions(DorisReadOptions dorisReadOptions) {
104+
this.dorisReadOptions = dorisReadOptions;
105+
return this;
106+
}
107+
108+
public DorisBatchSink.Builder<IN> setDorisExecutionOptions(
109+
DorisExecutionOptions dorisExecutionOptions) {
110+
this.dorisExecutionOptions = dorisExecutionOptions;
111+
return this;
112+
}
113+
114+
public DorisBatchSink.Builder<IN> setSerializer(DorisRecordSerializer<IN> serializer) {
115+
this.serializer = serializer;
116+
return this;
117+
}
118+
119+
public DorisBatchSink<IN> build() {
120+
Preconditions.checkNotNull(dorisOptions);
121+
Preconditions.checkNotNull(dorisExecutionOptions);
122+
Preconditions.checkNotNull(serializer);
123+
if (dorisReadOptions == null) {
124+
dorisReadOptions = DorisReadOptions.builder().build();
125+
}
126+
return new DorisBatchSink<>(
127+
dorisOptions, dorisReadOptions, dorisExecutionOptions, serializer);
128+
}
129+
}
130+
131+
@VisibleForTesting
132+
public DorisReadOptions getDorisReadOptions() {
133+
return dorisReadOptions;
134+
}
135+
}

0 commit comments

Comments
 (0)