11# Accumulator
22
3- Accumulator is a special kind of window similar to a [ Session Window] ( session.md ) designed for complex operations like
3+ Accumulator is a special kind of window similar to a [ Session Window] ( session.md ) designed for complex operations like
44reordering, custom triggering, and joining multiple ordered streams. Like other windowing strategies (fixed,
5- sliding, or session windows), the Accumulator window maintains state for each key, but unlike others, it allows for
6- manipulation of the ` Datum ` and emitting them based on custom rules (e.g., sorting) . Accumulator solves is a different
7- type of problem outside both ` map ` /` flatmap ` (one to ~ one) and ` reduce ` (many to ~ one) and instead of ` Message ` , we
5+ sliding, or session windows), the Accumulator window maintains state for each key, but unlike others, it allows for
6+ manipulation of the ` Datum ` and emitting them based on custom rules (e.g., sorting) . Accumulator solves is a different
7+ type of problem outside both ` map ` /` flatmap ` (one to ~ one) and ` reduce ` (many to ~ one) and instead of ` Message ` , we
88have to emit back the "manipulated" ` Datum ` .
99
1010![ plot] ( ../../../../assets/accumulator.png )
1111
12- Another difference between the Accumulator and the Session windows is that in Accumulator, there is no concept of
12+ Another difference between the Accumulator and the Session windows is that in Accumulator, there is no concept of
1313[ window merge] ( ./session.md#window-merge ) .
1414
1515## Why Accumulator?
1616
1717Accumulator is a powerful concept that lets you tap into the raw Datum stream and manipulate not just the order but the
1818Datum stream itself. It has a powerful semantics where the input and output is a stream of ` Datum ` creating a
1919Global Window. It opens up the possibility of very advanced use cases like custom triggers (e.g., count based triggers
20- combined with windowing strategies).
20+ combined with windowing strategies).
2121
2222=== "Go"
2323
@@ -70,30 +70,30 @@ combined with windowing strategies).
7070=== "Rust"
7171
7272 ```rust
73- struct Accumulator {
74- state: OrderedList<Datum>,
75- }
73+ struct Accumulator {
74+ state: OrderedList<Datum>,
75+ }
7676
77- impl Accumulator {
78- fn new() -> Self {
79- Self {
80- state: OrderedList::new(),
81- }
82- }
77+ impl Accumulator {
78+ fn new() -> Self {
79+ Self {
80+ state: OrderedList::new(),
81+ }
82+ }
83+
84+ fn process(&mut self, input_stream: &[Datum], output_stream: &mut Vec<Datum>) {
85+ for i in input_stream {
86+ // Check if the watermark has progressed
87+ if WatermarkProgressed(i) {
88+ // Pop all sorted elements and write to output stream
89+ let popped = self.state.pop_all();
90+ output_stream.extend(popped);
91+ }
92+ self.state.insert(i.clone());
93+ }
94+ }
95+ }
8396
84- fn process(&mut self, input_stream: &[Datum], output_stream: &mut Vec<Datum>) {
85- for i in input_stream {
86- // Check if the watermark has progressed
87- if WatermarkProgressed(i) {
88- // Pop all sorted elements and write to output stream
89- let popped = self.state.pop_all();
90- output_stream.extend(popped);
91- }
92- self.state.insert(i.clone());
93- }
94- }
95- }
96-
9797 ```
9898
9999=== "Java"
@@ -125,12 +125,12 @@ combined with windowing strategies).
125125
126126### Considerations
127127
128- The Accumulator window is powerful but should be used carefully as it can cause pipeline stalling if not configured
128+ The Accumulator window is powerful but should be used carefully as it can cause pipeline stalling if not configured
129129properly.
130130
131131#### Factors to consider
132132
133- Please consider the following factors when using the Accumulator window (not comprehensive):
133+ Please consider the following factors when using the Accumulator window (not comprehensive):
134134
1351351 . For high-throughput scenarios, ensure adequate storage is provisioned
1361362 . The timeout should be set based on the expected data arrival patterns and latency requirements
@@ -173,7 +173,7 @@ such as "300ms", "1.5h" or "2h45m". Valid time units are "ns", "us" (or "µs"),
173173
174174### timeout
175175
176- The ` timeout` is the duration of inactivity (no data flowing in for a particular key) after which the accumulator state
176+ The ` timeout` is the duration of inactivity (no data flowing in for a particular key) after which the accumulator state
177177is removed. This helps prevent memory leaks by cleaning up state for keys that are no longer active.
178178
179179# # How It Works
@@ -185,13 +185,13 @@ The Accumulator window works by:
1851853. New elements are inserted into the ordered list based on their event time
1861864. If no new data arrives for a key within the specified timeout period, the window is closed
187187
188- Unlike both `map` or `reduce` operations, where `Datum` is consumed and `Message` is returned, for reordering with the
188+ Unlike both `map` or `reduce` operations, where `Datum` is consumed and `Message` is returned, for reordering with the
189189Accumulator, the `Datum` is kept intact.
190190
191191
192192# # Example
193193
194- Here's an [example](https://github.com/numaproj/numaflow/blob/main/examples/13-accumulator-window.yaml) of using an
194+ Here's an [example](https://github.com/numaproj/numaflow/blob/main/examples/13-accumulator-window.yaml) of using an
195195Accumulator window to join and sort two HTTP sources :
196196
197197` ` ` yaml
@@ -268,13 +268,13 @@ Check out the snippets below to see the UDF examples for different languages:
268268 return
269269 }
270270 log.Println("Received datum with event time: ", datum.EventTime().UnixMilli())
271-
271+
272272 // watermark has moved, let's flush
273273 if datum.Watermark().After(s.latestWm) {
274274 s.latestWm = datum.Watermark()
275275 s.flushBuffer(output)
276276 }
277-
277+
278278 // store the data into the internal buffer
279279 s.insertSorted(datum)
280280 }
@@ -291,7 +291,7 @@ Check out the snippets below to see the UDF examples for different languages:
291291 _LOGGER.info("StreamSorter initialized")
292292 self.latest_wm = datetime.fromtimestamp(-1)
293293 self.sorted_buffer: list[Datum] = []
294-
294+
295295 async def handler(
296296 self,
297297 datums: AsyncIterable[Datum],
@@ -304,15 +304,15 @@ Check out the snippets below to see the UDF examples for different languages:
304304 f"Current latest watermark: {self.latest_wm}, "
305305 f"Datum watermark: {datum.watermark}"
306306 )
307-
307+
308308 # If watermark has moved forward
309309 if datum.watermark and datum.watermark > self.latest_wm:
310310 self.latest_wm = datum.watermark
311311 _LOGGER.info(f"Watermark updated: {self.latest_wm}")
312312 await self.flush_buffer(output)
313-
313+
314314 self.insert_sorted(datum)
315-
315+
316316 _LOGGER.info("Timeout reached")
317317 await self.flush_buffer(output, flush_all=True)
318318 ` ` `
@@ -398,5 +398,4 @@ Check out the snippets below to see the UDF examples for different languages:
398398 }
399399 ` ` `
400400 [View the full example in numaflow-java Github](https://github.com/numaproj/numaflow-java/blob/38734c04df9e2182c0dadf2c7a4d83997ea7c2ad/examples/src/main/java/io/numaproj/numaflow/examples/accumulator/sorter/StreamSorterFactory.java)
401-
402-
401+
0 commit comments