diff --git a/examples/autoscaling/Dockerfile b/examples/autoscaling/Dockerfile index 30bdd47acd..9733738932 100644 --- a/examples/autoscaling/Dockerfile +++ b/examples/autoscaling/Dockerfile @@ -16,5 +16,5 @@ # limitations under the License. ################################################################################ -FROM flink:1.18 +FROM flink:1.20.1-java17 COPY ./target/autoscaling*.jar /opt/flink/usrlib/autoscaling.jar diff --git a/examples/autoscaling/src/main/java/autoscaling/AutoscalingExample.java b/examples/autoscaling/src/main/java/autoscaling/AutoscalingExample.java index e76b8ec308..af68187220 100644 --- a/examples/autoscaling/src/main/java/autoscaling/AutoscalingExample.java +++ b/examples/autoscaling/src/main/java/autoscaling/AutoscalingExample.java @@ -21,7 +21,7 @@ import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink; /** Autoscaling Example. */ public class AutoscalingExample { @@ -43,13 +43,7 @@ public Long map(Long i) throws Exception { return end; } }); - stream.addSink( - new SinkFunction() { - @Override - public void invoke(Long value, Context context) throws Exception { - // Do nothing - } - }); + stream.sinkTo(new DiscardingSink<>()); env.execute("Autoscaling Example"); } } diff --git a/examples/autoscaling/src/main/java/autoscaling/LoadSimulationPipeline.java b/examples/autoscaling/src/main/java/autoscaling/LoadSimulationPipeline.java index 5daaefd2c5..ee26b7a53f 100644 --- a/examples/autoscaling/src/main/java/autoscaling/LoadSimulationPipeline.java +++ b/examples/autoscaling/src/main/java/autoscaling/LoadSimulationPipeline.java @@ -27,7 +27,7 @@ import org.apache.flink.connector.datagen.source.GeneratorFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.sink.DiscardingSink; +import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink; import org.apache.flink.util.Collector; import org.slf4j.Logger; @@ -128,7 +128,7 @@ public static void main(String[] args) throws Exception { .broadcast(); } - stream.addSink(new DiscardingSink<>()); + stream.sinkTo(new DiscardingSink<>()); } env.execute( @@ -164,7 +164,8 @@ public void flatMap(Long record, Collector out) throws Exception { double amplitude = getAmplitude(currentEpoch); - double loadPerSubTask = maxLoad / getRuntimeContext().getNumberOfParallelSubtasks(); + double loadPerSubTask = + maxLoad / getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks(); long busyTimeMs = (long) (loadPerSubTask * samplingIntervalMs * amplitude); long remainingTimeMs = @@ -172,7 +173,7 @@ public void flatMap(Long record, Collector out) throws Exception { long sleepTime = Math.min(busyTimeMs, remainingTimeMs); LOG.info( "{}> epoch: {} busyTime: {} remainingTime: {} sleepTime: {} amplitude: {}", - getRuntimeContext().getIndexOfThisSubtask(), + getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(), currentEpoch, busyTimeMs, remainingTimeMs,