@@ -94,20 +94,9 @@ public static void main(String[] args) throws Exception {
9494 * 2. Multiply this by the number of impulses per interval to get the total rate:
9595 * impulsesPerSecond = IMPULSES_PER_SAMPLING_INTERVAL * samplingIntervalsPerSecond;
9696 *
97- * Example Calculations:
98- * - If `samplingIntervalMs = 1000 ms`:
99- * - `samplingIntervalsPerSecond = 1000 / 1000 = 1`
100- * - `impulsesPerSecond = 10 * 1 = 10 records per second`
101- * - If `samplingIntervalMs = 500 ms`:
102- * - `samplingIntervalsPerSecond = 1000 / 500 = 2`
103- * - `impulsesPerSecond = 10 * 2 = 20 records per second`
104- * - If `samplingIntervalMs = 2000 ms`:
105- * - `samplingIntervalsPerSecond = 1000 / 2000 = 0.5`
106- * - `impulsesPerSecond = 10 * 0.5 = 5 records per second`
107- *
108- * This approach ensures that the number of records emitted dynamically scales
109- * based on the sampling interval while maintaining the target of 10 impulses per interval.
110- * RateLimiterStrategy internally distributes these emissions efficiently over time.
97+ * Example:
98+ * - If `samplingIntervalMs = 500 ms` and `IMPULSES_PER_SAMPLING_INTERVAL = 10`:
99+ * impulsesPerSecond = (1000 / 500) * 10 = 2 * 10 = 20 records per second.
111100 */
112101 DataStream <Long > stream =
113102 env .fromSource (
@@ -116,9 +105,7 @@ public static void main(String[] args) throws Exception {
116105 (index ) -> 42L , // Emits constant value 42
117106 Long .MAX_VALUE , // Unbounded stream
118107 RateLimiterStrategy .perSecond (
119- (double ) 1000
120- / ((double ) samplingIntervalMs
121- / 10 )), // Controls rate
108+ (1000.0 / samplingIntervalMs ) * 10 ), // Controls rate
122109 Types .LONG ),
123110 WatermarkStrategy .noWatermarks (),
124111 "ImpulseSource" );
0 commit comments