Skip to content

Commit 48359a3

Browse files
[FLINK-33525] Move ImpulseSource to new Source API (#950)
1 parent 9eb3c38 commit 48359a3

File tree

2 files changed

+49
-27
lines changed

2 files changed

+49
-27
lines changed

examples/autoscaling/pom.xml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,13 @@ under the License.
4545
<version>${flink.version}</version>
4646
<scope>provided</scope>
4747
</dependency>
48+
49+
<dependency>
50+
<groupId>org.apache.flink</groupId>
51+
<artifactId>flink-connector-datagen</artifactId>
52+
<version>${flink.version}</version>
53+
</dependency>
54+
4855
<dependency>
4956
<groupId>org.apache.flink</groupId>
5057
<artifactId>flink-clients</artifactId>

examples/autoscaling/src/main/java/autoscaling/LoadSimulationPipeline.java

Lines changed: 42 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,16 @@
1818

1919
package autoscaling;
2020

21+
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
2122
import org.apache.flink.api.common.functions.RichFlatMapFunction;
23+
import org.apache.flink.api.common.typeinfo.Types;
24+
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
2225
import org.apache.flink.api.java.utils.ParameterTool;
26+
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
27+
import org.apache.flink.connector.datagen.source.GeneratorFunction;
2328
import org.apache.flink.streaming.api.datastream.DataStream;
2429
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
2530
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
26-
import org.apache.flink.streaming.api.functions.source.SourceFunction;
2731
import org.apache.flink.util.Collector;
2832

2933
import org.slf4j.Logger;
@@ -60,6 +64,11 @@ public class LoadSimulationPipeline {
6064

6165
private static final Logger LOG = LoggerFactory.getLogger(LoadSimulationPipeline.class);
6266

67+
// Number of impulses (records) emitted per sampling interval.
68+
// This value determines how many records should be generated within each `samplingIntervalMs`
69+
// period.
70+
private static final int IMPULSES_PER_SAMPLING_INTERVAL = 10;
71+
6372
public static void main(String[] args) throws Exception {
6473
var env = StreamExecutionEnvironment.getExecutionEnvironment();
6574
env.disableOperatorChaining();
@@ -74,8 +83,39 @@ public static void main(String[] args) throws Exception {
7483
for (String branch : maxLoadPerTask.split("\n")) {
7584
String[] taskLoads = branch.split(";");
7685

86+
/*
87+
* Creates an unbounded stream that continuously emits the constant value 42L.
88+
* Flink's DataGeneratorSource with RateLimiterStrategy is used to control the emission rate.
89+
*
90+
* Emission Rate Logic:
91+
* - The goal is to generate a fixed number of impulses per sampling interval.
92+
* - `samplingIntervalMs` defines the duration of one sampling interval in milliseconds.
93+
* - We define `IMPULSES_PER_SAMPLING_INTERVAL = 10`, meaning that for every sampling interval,
94+
* exactly 10 impulses should be generated.
95+
*
96+
* To calculate the total number of records emitted per second:
97+
* 1. Determine how many sampling intervals fit within one second:
98+
* samplingIntervalsPerSecond = 1000 / samplingIntervalMs;
99+
* 2. Multiply this by the number of impulses per interval to get the total rate:
100+
* impulsesPerSecond = IMPULSES_PER_SAMPLING_INTERVAL * samplingIntervalsPerSecond;
101+
*
102+
* Example:
103+
* - If `samplingIntervalMs = 500 ms` and `IMPULSES_PER_SAMPLING_INTERVAL = 10`:
104+
* impulsesPerSecond = (1000 / 500) * 10 = 2 * 10 = 20 records per second.
105+
*/
77106
DataStream<Long> stream =
78-
env.addSource(new ImpulseSource(samplingIntervalMs)).name("ImpulseSource");
107+
env.fromSource(
108+
new DataGeneratorSource<>(
109+
(GeneratorFunction<Long, Long>)
110+
(index) -> 42L, // Emits constant value 42
111+
Long.MAX_VALUE, // Unbounded stream
112+
RateLimiterStrategy.perSecond(
113+
(1000.0 / samplingIntervalMs)
114+
* IMPULSES_PER_SAMPLING_INTERVAL), // Controls
115+
// rate
116+
Types.LONG),
117+
WatermarkStrategy.noWatermarks(),
118+
"ImpulseSource");
79119

80120
for (String load : taskLoads) {
81121
double maxLoad = Double.parseDouble(load);
@@ -97,31 +137,6 @@ public static void main(String[] args) throws Exception {
97137
+ ")");
98138
}
99139

100-
private static class ImpulseSource implements SourceFunction<Long> {
101-
private final int maxSleepTimeMs;
102-
volatile boolean canceled;
103-
104-
public ImpulseSource(int samplingInterval) {
105-
this.maxSleepTimeMs = samplingInterval / 10;
106-
}
107-
108-
@Override
109-
public void run(SourceContext<Long> sourceContext) throws Exception {
110-
while (!canceled) {
111-
synchronized (sourceContext.getCheckpointLock()) {
112-
sourceContext.collect(42L);
113-
}
114-
// Provide an impulse to keep the load simulation active
115-
Thread.sleep(maxSleepTimeMs);
116-
}
117-
}
118-
119-
@Override
120-
public void cancel() {
121-
canceled = true;
122-
}
123-
}
124-
125140
private static class LoadSimulationFn extends RichFlatMapFunction<Long, Long> {
126141

127142
private final double maxLoad;

0 commit comments

Comments
 (0)