Skip to content

Update to Flink 2.0 #164

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
flink: ["1.18.1", "1.19.1", "1.20.0"]
flink: ["2.0.0"]
steps:
- uses: actions/checkout@v3

Expand All @@ -38,7 +38,7 @@ jobs:

- name: Test JavaDoc
run: mvn $MAVEN_CLI_OPTS $JAVA_ADDITIONAL_OPTS javadoc:javadoc
if: startsWith(matrix.flink, '1.20')
if: startsWith(matrix.flink, '2.0')

- name: Add coverage to PR
id: jacoco
Expand All @@ -48,4 +48,4 @@ jobs:
token: ${{ secrets.GITHUB_TOKEN }}
min-coverage-overall: 40
min-coverage-changed-files: 60
if: startsWith(matrix.flink, '1.20') && github.event.pull_request.head.repo.fork == false
if: startsWith(matrix.flink, '2.0') && github.event.pull_request.head.repo.fork == false
12 changes: 3 additions & 9 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -69,15 +69,15 @@ under the License.
<!-- IMPORTANT: If you update Flink, remember to update link to its docs in maven-javadoc-plugin <links>
section, omitting the patch part (so for 1.15.0 use 1.15). -->

<flink.version>1.18.1</flink.version>
<flink.version>2.0.0</flink.version>

<target.java.version>11</target.java.version>
<scala.binary.version>2.12</scala.binary.version>
<maven.compiler.source>${target.java.version}</maven.compiler.source>
<maven.compiler.target>${target.java.version}</maven.compiler.target>
<lombok.version>1.18.22</lombok.version>
<jackson.version>2.18.1</jackson.version>
<junit5.version>5.10.1</junit5.version>
<junit5.version>5.11.4</junit5.version>
<junit.jupiter.version>${junit5.version}</junit.jupiter.version>
<assertj.core.version>3.21.0</assertj.core.version>
<mockito.version>4.0.0</mockito.version>
Expand Down Expand Up @@ -106,12 +106,6 @@ under the License.
<dependencies>
<!-- Apache Flink dependencies -->
<!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
Expand Down Expand Up @@ -513,7 +507,7 @@ under the License.
<version>3.6.3</version>
<configuration>
<links>
<link>https://nightlies.apache.org/flink/flink-docs-release-1.15/api/java/</link>
<link>https://nightlies.apache.org/flink/flink-docs-release-2.0/api/java/</link>
</links>
</configuration>
<executions>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package com.getindata.connectors.http;

import org.apache.flink.api.common.serialization.SerializationSchema.InitializationContext;
import org.apache.flink.api.connector.sink2.Sink.InitContext;
import org.apache.flink.api.connector.sink2.WriterInitContext;
import org.apache.flink.connector.base.sink.writer.ElementConverter;

/**
* An enhancement for Flink's {@link ElementConverter} that expose {@link #open(InitContext)} method
* that will be called by HTTP connect code to ensure that element converter is initialized
* An enhancement for Flink's {@link ElementConverter} that expose {@link #open(WriterInitContext)}
* method that will be called by HTTP connect code to ensure that element converter is initialized
* properly. This is required for cases when Flink's SerializationSchema and DeserializationSchema
* objects like JsonRowDataSerializationSchema are used.
* <p>
Expand All @@ -29,6 +29,6 @@ public interface SchemaLifecycleAwareElementConverter<InputT, RequestEntryT>
*
* @param context Contextual information that can be used during initialization.
*/
void open(InitContext context);
void open(WriterInitContext context);

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@
import java.util.Collections;
import java.util.Properties;

import org.apache.flink.api.connector.sink2.StatefulSinkWriter;
import org.apache.flink.api.connector.sink2.WriterInitContext;
import org.apache.flink.connector.base.sink.AsyncSinkBase;
import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
import org.apache.flink.connector.base.sink.writer.ElementConverter;
import org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
Expand Down Expand Up @@ -115,7 +118,7 @@ protected HttpSinkInternal(

@Override
public StatefulSinkWriter<InputT, BufferedRequestState<HttpSinkRequestEntry>> createWriter(
InitContext context) throws IOException {
WriterInitContext context) throws IOException {

ElementConverter<InputT, HttpSinkRequestEntry> elementConverter = getElementConverter();
if (elementConverter instanceof SchemaLifecycleAwareElementConverter) {
Expand All @@ -126,12 +129,7 @@ public StatefulSinkWriter<InputT, BufferedRequestState<HttpSinkRequestEntry>> cr
return new HttpSinkWriter<>(
elementConverter,
context,
getMaxBatchSize(),
getMaxInFlightRequests(),
getMaxBufferedRequests(),
getMaxBatchSizeInBytes(),
getMaxTimeInBufferMS(),
getMaxRecordSizeInBytes(),
getAsyncSinkWriterConfiguration(),
endpointUrl,
sinkHttpClientBuilder.build(
properties,
Expand All @@ -146,19 +144,14 @@ public StatefulSinkWriter<InputT, BufferedRequestState<HttpSinkRequestEntry>> cr

@Override
public StatefulSinkWriter<InputT, BufferedRequestState<HttpSinkRequestEntry>> restoreWriter(
InitContext context,
WriterInitContext context,
Collection<BufferedRequestState<HttpSinkRequestEntry>> recoveredState)
throws IOException {

return new HttpSinkWriter<>(
getElementConverter(),
context,
getMaxBatchSize(),
getMaxInFlightRequests(),
getMaxBufferedRequests(),
getMaxBatchSizeInBytes(),
getMaxTimeInBufferMS(),
getMaxRecordSizeInBytes(),
getAsyncSinkWriterConfiguration(),
endpointUrl,
sinkHttpClientBuilder.build(
properties,
Expand All @@ -171,6 +164,17 @@ public StatefulSinkWriter<InputT, BufferedRequestState<HttpSinkRequestEntry>> re
);
}

private AsyncSinkWriterConfiguration getAsyncSinkWriterConfiguration() {
return AsyncSinkWriterConfiguration.builder()
.setMaxBatchSize(getMaxBatchSize())
.setMaxBatchSizeInBytes(getMaxBatchSizeInBytes())
.setMaxInFlightRequests(getMaxInFlightRequests())
.setMaxBufferedRequests(getMaxBufferedRequests())
.setMaxTimeInBufferMS(getMaxTimeInBufferMS())
.setMaxRecordSizeInBytes(getMaxRecordSizeInBytes())
.build();
}

@Override
public SimpleVersionedSerializer<BufferedRequestState<HttpSinkRequestEntry>>
getWriterStateSerializer() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
package com.getindata.connectors.http.internal.sink;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;

import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.WriterInitContext;
import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
import org.apache.flink.connector.base.sink.writer.ElementConverter;
import org.apache.flink.connector.base.sink.writer.ResultHandler;
import org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;

Expand Down Expand Up @@ -47,20 +47,13 @@ public class HttpSinkWriter<InputT> extends AsyncSinkWriter<InputT, HttpSinkRequ

public HttpSinkWriter(
ElementConverter<InputT, HttpSinkRequestEntry> elementConverter,
Sink.InitContext context,
int maxBatchSize,
int maxInFlightRequests,
int maxBufferedRequests,
long maxBatchSizeInBytes,
long maxTimeInBufferMS,
long maxRecordSizeInBytes,
WriterInitContext context,
AsyncSinkWriterConfiguration writerConfiguration,
String endpointUrl,
SinkHttpClient sinkHttpClient,
Collection<BufferedRequestState<HttpSinkRequestEntry>> bufferedRequestStates,
Properties properties) {

super(elementConverter, context, maxBatchSize, maxInFlightRequests, maxBufferedRequests,
maxBatchSizeInBytes, maxTimeInBufferMS, maxRecordSizeInBytes, bufferedRequestStates);
super(elementConverter, context, writerConfiguration, bufferedRequestStates);
this.endpointUrl = endpointUrl;
this.sinkHttpClient = sinkHttpClient;

Expand All @@ -83,7 +76,7 @@ public HttpSinkWriter(
@Override
protected void submitRequestEntries(
List<HttpSinkRequestEntry> requestEntries,
Consumer<List<HttpSinkRequestEntry>> requestResult) {
ResultHandler<HttpSinkRequestEntry> resultHandler) {
var future = sinkHttpClient.putRequests(requestEntries, endpointUrl);
future.whenCompleteAsync((response, err) -> {
if (err != null) {
Expand Down Expand Up @@ -114,7 +107,7 @@ protected void submitRequestEntries(
//requestResult.accept(Collections.emptyList());
//}
}
requestResult.accept(Collections.emptyList());
resultHandler.complete();
}, sinkWriterThreadPool);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package com.getindata.connectors.http.internal.table;

import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.connector.sink2.Sink.InitContext;
import org.apache.flink.api.connector.sink2.SinkWriter.Context;
import org.apache.flink.api.connector.sink2.WriterInitContext;
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.FlinkRuntimeException;

Expand All @@ -27,7 +27,7 @@ public SerializationSchemaElementConverter(
}

@Override
public void open(InitContext context) {
public void open(WriterInitContext context) {
if (!schemaOpened) {
try {
serializationSchema.open(context.asSerializationSchemaInitializationContext());
Expand Down
15 changes: 11 additions & 4 deletions src/test/java/com/getindata/StreamTableJob.java
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package com.getindata;

import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.utils.ParameterTool;
import java.time.Duration;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestartStrategyOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.util.ParameterTool;

public class StreamTableJob {

Expand All @@ -13,9 +16,13 @@ public static void main(String[] args) {
ParameterTool parameters = ParameterTool.fromSystemProperties();
parameters = parameters.mergeWith(ParameterTool.fromArgs(args));

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Configuration config = new Configuration();
config.set(RestartStrategyOptions.RESTART_STRATEGY, "fixed-delay");
config.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 1000);
config.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_DELAY, Duration.ofMillis(1000));

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
// env.enableCheckpointing(5000);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1000, 1000));
env.setParallelism(1);
env.disableOperatorChaining();
env.getConfig().setGlobalJobParameters(parameters);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.util.stream.IntStream;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.connector.source.lookup.LookupOptions;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
Expand All @@ -16,7 +17,7 @@ void verifyFixedDelayRetryConfig() {
var config = new Configuration();
config.setString("gid.connector.http.source.lookup.retry-strategy.type", "fixed-delay");
config.setString("gid.connector.http.source.lookup.retry-strategy.fixed-delay.delay", "10s");
config.setInteger("lookup.max-retries", 12);
config.set(LookupOptions.MAX_RETRIES, 12);

var retryConfig = RetryConfigProvider.create(config);

Expand All @@ -32,8 +33,8 @@ void verifyExponentialDelayConfig() {
config.setString("gid.connector.http.source.lookup.retry-strategy.type", "exponential-delay");
config.setString("gid.connector.http.source.lookup.retry-strategy.exponential-delay.initial-backoff", "15ms");
config.setString("gid.connector.http.source.lookup.retry-strategy.exponential-delay.max-backoff", "120ms");
config.setInteger("gid.connector.http.source.lookup.retry-strategy.exponential-delay.backoff-multiplier", 2);
config.setInteger("lookup.max-retries", 6);
config.setString("gid.connector.http.source.lookup.retry-strategy.exponential-delay.backoff-multiplier", "2");
config.set(LookupOptions.MAX_RETRIES, 6);

var retryConfig = RetryConfigProvider.create(config);
var intervalFunction = retryConfig.getIntervalFunction();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;

import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.connector.sink2.Sink.InitContext;
import org.apache.flink.api.connector.sink2.WriterInitContext;
import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
import org.apache.flink.connector.base.sink.writer.ElementConverter;
import org.apache.flink.connector.base.sink.writer.ResultHandler;
import org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.groups.OperatorIOMetricGroup;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
Expand All @@ -38,7 +39,7 @@ class HttpSinkWriterTest {
private ElementConverter<String, HttpSinkRequestEntry> elementConverter;

@Mock
private InitContext context;
private WriterInitContext context;

@Mock
private SinkHttpClient httpClient;
Expand All @@ -64,12 +65,14 @@ public void setUp() {
this.httpSinkWriter = new HttpSinkWriter<>(
elementConverter,
context,
10,
10,
100,
10,
10,
10,
AsyncSinkWriterConfiguration.builder()
.setMaxBatchSize(10)
.setMaxBatchSizeInBytes(10)
.setMaxInFlightRequests(10)
.setMaxBufferedRequests(100)
.setMaxTimeInBufferMS(10)
.setMaxRecordSizeInBytes(10)
.build(),
"http://localhost/client",
httpClient,
stateBuffer,
Expand All @@ -85,11 +88,25 @@ public void testErrorMetric() throws InterruptedException {
when(httpClient.putRequests(anyList(), anyString())).thenReturn(future);

HttpSinkRequestEntry request = new HttpSinkRequestEntry("PUT", "hello".getBytes());
Consumer<List<HttpSinkRequestEntry>> requestResult =
httpSinkRequestEntries -> log.info(String.valueOf(httpSinkRequestEntries));
ResultHandler<HttpSinkRequestEntry> resultHandler = new ResultHandler<HttpSinkRequestEntry>() {
@Override
public void complete() {
log.info("Request completed successfully");
}

@Override
public void completeExceptionally(Exception e) {
log.error("Request failed.", e);
}

@Override
public void retryForEntries(List<HttpSinkRequestEntry> requestEntriesToRetry) {
log.warn("Request failed partially.");
}
};

List<HttpSinkRequestEntry> requestEntries = Collections.singletonList(request);
this.httpSinkWriter.submitRequestEntries(requestEntries, requestResult);
this.httpSinkWriter.submitRequestEntries(requestEntries, resultHandler);

// would be good to use Countdown Latch instead sleep...
Thread.sleep(2000);
Expand Down
Loading
Loading