Skip to content

Commit 92a2693

Browse files
authored
[FLINK-38123] Upgrade flink version in the autoscaling example
1 parent 3069d5a commit 92a2693

File tree

3 files changed

+8
-13
lines changed

3 files changed

+8
-13
lines changed

examples/autoscaling/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,5 +16,5 @@
1616
# limitations under the License.
1717
################################################################################
1818

19-
FROM flink:1.18
19+
FROM flink:1.20.1-java17
2020
COPY ./target/autoscaling*.jar /opt/flink/usrlib/autoscaling.jar

examples/autoscaling/src/main/java/autoscaling/AutoscalingExample.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import org.apache.flink.api.common.functions.RichMapFunction;
2222
import org.apache.flink.streaming.api.datastream.DataStream;
2323
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
24-
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
24+
import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
2525

2626
/** Autoscaling Example. */
2727
public class AutoscalingExample {
@@ -43,13 +43,7 @@ public Long map(Long i) throws Exception {
4343
return end;
4444
}
4545
});
46-
stream.addSink(
47-
new SinkFunction<Long>() {
48-
@Override
49-
public void invoke(Long value, Context context) throws Exception {
50-
// Do nothing
51-
}
52-
});
46+
stream.sinkTo(new DiscardingSink<>());
5347
env.execute("Autoscaling Example");
5448
}
5549
}

examples/autoscaling/src/main/java/autoscaling/LoadSimulationPipeline.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
import org.apache.flink.connector.datagen.source.GeneratorFunction;
2828
import org.apache.flink.streaming.api.datastream.DataStream;
2929
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
30-
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
30+
import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
3131
import org.apache.flink.util.Collector;
3232

3333
import org.slf4j.Logger;
@@ -128,7 +128,7 @@ public static void main(String[] args) throws Exception {
128128
.broadcast();
129129
}
130130

131-
stream.addSink(new DiscardingSink<>());
131+
stream.sinkTo(new DiscardingSink<>());
132132
}
133133

134134
env.execute(
@@ -164,15 +164,16 @@ public void flatMap(Long record, Collector<Long> out) throws Exception {
164164

165165
double amplitude = getAmplitude(currentEpoch);
166166

167-
double loadPerSubTask = maxLoad / getRuntimeContext().getNumberOfParallelSubtasks();
167+
double loadPerSubTask =
168+
maxLoad / getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks();
168169

169170
long busyTimeMs = (long) (loadPerSubTask * samplingIntervalMs * amplitude);
170171
long remainingTimeMs =
171172
(timeMillis / samplingIntervalMs + 1) * samplingIntervalMs - timeMillis;
172173
long sleepTime = Math.min(busyTimeMs, remainingTimeMs);
173174
LOG.info(
174175
"{}> epoch: {} busyTime: {} remainingTime: {} sleepTime: {} amplitude: {}",
175-
getRuntimeContext().getIndexOfThisSubtask(),
176+
getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(),
176177
currentEpoch,
177178
busyTimeMs,
178179
remainingTimeMs,

0 commit comments

Comments
 (0)