@@ -82,20 +82,32 @@ public static void main(String[] args) throws Exception {
8282 * Creates an unbounded stream that continuously emits the constant value 42L.
8383 * Flink's DataGeneratorSource with RateLimiterStrategy is used to control the emission rate.
8484 *
85- * Rate Calculation :
86- * - samplingIntervalMs / 10 gives maxSleepTimeMs, which represents the interval between emissions .
87- * - To determine the number of records emitted per second:
88- * 1000 / maxSleepTimeMs
89- * Since 1000 ms equals 1 second, this formula calculates the emission rate .
85+ * Emission Rate Logic :
86+ * - The goal is to generate a fixed number of impulses per sampling interval .
87+ * - `samplingIntervalMs` defines the duration of one sampling interval in milliseconds.
88+ * - We define `IMPULSES_PER_SAMPLING_INTERVAL = 10`, meaning that for every sampling interval,
89+ * exactly 10 impulses should be generated .
9090 *
91- * Example :
92- * - If samplingIntervalMs = 1000 ms :
93- * maxSleepTimeMs = 100 ms
94- * - This results in :
95- * 1000 ms / 100 ms = 10 records per second.
91+ * To calculate the total number of records emitted per second :
92+ * 1. Determine how many sampling intervals fit within one second :
93+ * samplingIntervalsPerSecond = 1000 / samplingIntervalMs;
94+ * 2. Multiply this by the number of impulses per interval to get the total rate :
95+ * impulsesPerSecond = IMPULSES_PER_SAMPLING_INTERVAL * samplingIntervalsPerSecond;
9696 *
97- * RateLimiterStrategy.perSecond((double) 1000 / ((double) samplingIntervalMs / 10))
98- * ensures this rate is maintained efficiently without blocking execution.
97+ * Example Calculations:
98+ * - If `samplingIntervalMs = 1000 ms`:
99+ * - `samplingIntervalsPerSecond = 1000 / 1000 = 1`
100+ * - `impulsesPerSecond = 10 * 1 = 10 records per second`
101+ * - If `samplingIntervalMs = 500 ms`:
102+ * - `samplingIntervalsPerSecond = 1000 / 500 = 2`
103+ * - `impulsesPerSecond = 10 * 2 = 20 records per second`
104+ * - If `samplingIntervalMs = 2000 ms`:
105+ * - `samplingIntervalsPerSecond = 1000 / 2000 = 0.5`
106+ * - `impulsesPerSecond = 10 * 0.5 = 5 records per second`
107+ *
108+ * This approach ensures that the number of records emitted dynamically scales
109+ * based on the sampling interval while maintaining the target of 10 impulses per interval.
110+ * RateLimiterStrategy internally distributes these emissions efficiently over time.
99111 */
100112 DataStream <Long > stream =
101113 env .fromSource (
@@ -109,7 +121,7 @@ public static void main(String[] args) throws Exception {
109121 / 10 )), // Controls rate
110122 Types .LONG ),
111123 WatermarkStrategy .noWatermarks (),
112- "ImpulseSource (Using DataGeneratorSource) " );
124+ "ImpulseSource" );
113125
114126 for (String load : taskLoads ) {
115127 double maxLoad = Double .parseDouble (load );
0 commit comments