File tree Expand file tree Collapse file tree 1 file changed +19
-5
lines changed
examples/autoscaling/src/main/java/autoscaling Expand file tree Collapse file tree 1 file changed +19
-5
lines changed Original file line number Diff line number Diff line change @@ -78,11 +78,25 @@ public static void main(String[] args) throws Exception {
7878 for (String branch : maxLoadPerTask .split ("\n " )) {
7979 String [] taskLoads = branch .split (";" );
8080
81- // Creates a continuous, unbounded stream of constant values (42L).
82- // Instead of manually controlling emission with Thread.sleep(), this uses Flink's
83- // built-in DataGeneratorSource
84- // with RateLimiterStrategy to achieve the same effect.
85- // The rate is dynamically calculated based on samplingIntervalMs.
81+ /*
82+ * Creates an unbounded stream that continuously emits the constant value 42L.
83+ * Flink's DataGeneratorSource with RateLimiterStrategy is used to control the emission rate.
84+ *
85+ * Rate Calculation:
86+ * - samplingIntervalMs / 10 gives maxSleepTimeMs, which represents the interval between emissions.
87+ * - To determine the number of records emitted per second:
88+ * 1000 / maxSleepTimeMs
89+ * Since 1000 ms equals 1 second, this formula calculates the emission rate.
90+ *
91+ * Example:
92+ * - If samplingIntervalMs = 1000 ms:
93+ * maxSleepTimeMs = 100 ms
94+ * - This results in:
95+ * 1000 ms / 100 ms = 10 records per second.
96+ *
97+ * RateLimiterStrategy.perSecond((double) 1000 / ((double) samplingIntervalMs / 10))
98+ * ensures this rate is maintained efficiently without blocking execution.
99+ */
86100 DataStream <Long > stream =
87101 env .fromSource (
88102 new DataGeneratorSource <>(
You can’t perform that action at this time.
0 commit comments