Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions examples/autoscaling/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@ under the License.
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-datagen</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,16 @@

package autoscaling;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
import org.apache.flink.connector.datagen.source.GeneratorFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;

import org.slf4j.Logger;
Expand Down Expand Up @@ -74,8 +78,37 @@ public static void main(String[] args) throws Exception {
for (String branch : maxLoadPerTask.split("\n")) {
String[] taskLoads = branch.split(";");

/*
* Creates an unbounded stream that continuously emits the constant value 42L.
* Flink's DataGeneratorSource with RateLimiterStrategy is used to control the emission rate.
*
* Emission Rate Logic:
* - The goal is to generate a fixed number of impulses per sampling interval.
* - `samplingIntervalMs` defines the duration of one sampling interval in milliseconds.
* - We define `IMPULSES_PER_SAMPLING_INTERVAL = 10`, meaning that for every sampling interval,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: either introduce this constant and use it in the calculations or use plain text in the comment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the constant.

* exactly 10 impulses should be generated.
*
* To calculate the total number of records emitted per second:
* 1. Determine how many sampling intervals fit within one second:
* samplingIntervalsPerSecond = 1000 / samplingIntervalMs;
* 2. Multiply this by the number of impulses per interval to get the total rate:
* impulsesPerSecond = IMPULSES_PER_SAMPLING_INTERVAL * samplingIntervalsPerSecond;
*
* Example:
* - If `samplingIntervalMs = 500 ms` and `IMPULSES_PER_SAMPLING_INTERVAL = 10`:
* impulsesPerSecond = (1000 / 500) * 10 = 2 * 10 = 20 records per second.
*/
DataStream<Long> stream =
env.addSource(new ImpulseSource(samplingIntervalMs)).name("ImpulseSource");
env.fromSource(
new DataGeneratorSource<>(
(GeneratorFunction<Long, Long>)
(index) -> 42L, // Emits constant value 42
Long.MAX_VALUE, // Unbounded stream
RateLimiterStrategy.perSecond(
(1000.0 / samplingIntervalMs) * 10), // Controls rate
Types.LONG),
WatermarkStrategy.noWatermarks(),
"ImpulseSource");

for (String load : taskLoads) {
double maxLoad = Double.parseDouble(load);
Expand All @@ -97,31 +130,6 @@ public static void main(String[] args) throws Exception {
+ ")");
}

private static class ImpulseSource implements SourceFunction<Long> {
private final int maxSleepTimeMs;
volatile boolean canceled;

public ImpulseSource(int samplingInterval) {
this.maxSleepTimeMs = samplingInterval / 10;
}

@Override
public void run(SourceContext<Long> sourceContext) throws Exception {
while (!canceled) {
synchronized (sourceContext.getCheckpointLock()) {
sourceContext.collect(42L);
}
// Provide an impulse to keep the load simulation active
Thread.sleep(maxSleepTimeMs);
}
}

@Override
public void cancel() {
canceled = true;
}
}

private static class LoadSimulationFn extends RichFlatMapFunction<Long, Long> {

private final double maxLoad;
Expand Down