Skip to content

Commit 107a14f

Browse files
Add fruit shipping example (#2)
* Add fruit shipping example * Improve fruit example * Fix import * Fix lint errors --------- Co-authored-by: Sam Willis <[email protected]>
1 parent 7eea68f commit 107a14f

File tree

1 file changed

+230
-0
lines changed

1 file changed

+230
-0
lines changed
Lines changed: 230 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,230 @@
1+
import { MultiSet } from '../src/multiset'
2+
import { D2 } from '../src/index.js'
3+
import { map, reduce, consolidate, output } from '../src/operators/index.js'
4+
import { v } from '../src/order.js'
5+
import { MessageType } from '../src/types.js'
6+
7+
type FruitOrder = {
8+
name: string,
9+
quantity: number,
10+
shipping_id: string,
11+
status: 'packed' | 'shipped' | 'delivered'
12+
}
13+
14+
const graph = new D2({ initialFrontier: v(0) })
15+
const input = graph.newInput<FruitOrder>()
16+
17+
// Track quantities by status
18+
const materializedStatus = new Map<string, number>();
19+
const materializedProcessed = new Map<string, number>();
20+
21+
function showStatus() {
22+
const obj = Object.fromEntries(materializedStatus.entries())
23+
console.log('Counts by Status:')
24+
console.log(JSON.stringify(obj, null, 2))
25+
}
26+
27+
function showProcessed() {
28+
const obj = Object.fromEntries(materializedProcessed.entries())
29+
console.log('Fruit Processed:')
30+
console.log(JSON.stringify(obj, null, 2))
31+
}
32+
33+
input.pipe(
34+
// debug('Raw Input'),
35+
map((order) => [`${order.name}-${order.status}`, order.quantity] as [string, number]),
36+
// debug('After Map'),
37+
reduce((values) => {
38+
// The reduce function receives an array of [quantity, diff] for each key
39+
// `diff` being the change in number of occurrences of the specific quantity
40+
// It is not aware of the key, just that everything it is receiving is for the same key
41+
// Here we want to sum the quantity for each key, so a sum of num * diff
42+
let count = 0
43+
for (const [num, diff] of values) {
44+
count += num * diff
45+
}
46+
return [[count, 1]]
47+
}),
48+
// debug('Status Totals'),
49+
consolidate(),
50+
output((msg) => {
51+
if (msg.type === MessageType.DATA) {
52+
const entries = msg.data.collection.getInner();
53+
// The entreis are:
54+
// key: {fruit-name}-{status}
55+
// count: number of items in that status
56+
// diff: 1 if adding a row, -1 if removing a row
57+
for (const [[key, count], diff] of entries) {
58+
if (diff > 0) {
59+
materializedStatus.set(key, count)
60+
} else if (diff < 0) {
61+
materializedStatus.delete(key)
62+
}
63+
}
64+
}
65+
})
66+
)
67+
68+
// Track total processed quantities regardless of status
69+
input.pipe(
70+
// debug('Raw Input'),
71+
map((order) => [order.name, order.quantity] as [string, number]),
72+
// debug('After Map'),
73+
reduce((values) => {
74+
// Count the total number of each fruit processed
75+
let count = 0
76+
for (const [num, diff] of values) {
77+
count += num * diff
78+
}
79+
return [[count, 1]]
80+
}),
81+
// debug('Total Processed'),
82+
consolidate(),
83+
output((msg) => {
84+
if (msg.type === MessageType.DATA) {
85+
const entries = msg.data.collection.getInner();
86+
for (const [[key, count], diff] of entries) {
87+
if (diff > 0) {
88+
materializedProcessed.set(key, count)
89+
} else if (diff < 0) {
90+
materializedProcessed.delete(key)
91+
}
92+
}
93+
}
94+
})
95+
)
96+
97+
graph.finalize()
98+
99+
console.log('--------------------------------')
100+
101+
// Initial packing of orders
102+
console.log('Sending initial orders')
103+
input.sendData(v(0), new MultiSet([
104+
[{
105+
name: 'apple',
106+
quantity: 100,
107+
shipping_id: 'A001',
108+
status: 'packed'
109+
}, 1],
110+
[{
111+
name: 'banana',
112+
quantity: 150,
113+
shipping_id: 'B001',
114+
status: 'packed'
115+
}, 1]
116+
]))
117+
118+
input.sendFrontier(v(1)) // Send a frontier to set the new minimum version
119+
graph.step() // Step the graph to process the data
120+
// Show the materialized status and processed totals:
121+
showStatus()
122+
showProcessed()
123+
124+
console.log('--------------------------------')
125+
126+
// Ship 2 orders
127+
console.log('Shipping 2 orders')
128+
input.sendData(v(1), new MultiSet([
129+
// Remove from packed status
130+
[{
131+
name: 'apple',
132+
quantity: 100,
133+
shipping_id: 'A001',
134+
status: 'packed'
135+
}, -1],
136+
// Add to shipped status
137+
[{
138+
name: 'apple',
139+
quantity: 100,
140+
shipping_id: 'A001',
141+
status: 'shipped'
142+
}, 1],
143+
144+
[{
145+
name: 'banana',
146+
quantity: 150,
147+
shipping_id: 'B001',
148+
status: 'packed'
149+
}, -1],
150+
[{
151+
name: 'banana',
152+
quantity: 150,
153+
shipping_id: 'B001',
154+
status: 'shipped'
155+
}, 1]
156+
]))
157+
158+
input.sendFrontier(v(2))
159+
graph.step()
160+
showStatus()
161+
showProcessed()
162+
163+
console.log('--------------------------------')
164+
165+
// One order arrives
166+
console.log('One order arrives')
167+
input.sendData(v(2), new MultiSet([
168+
// Remove from shipped status
169+
[{
170+
name: 'apple',
171+
quantity: 100,
172+
shipping_id: 'A001',
173+
status: 'shipped'
174+
}, -1],
175+
// Add to delivered status
176+
[{
177+
name: 'apple',
178+
quantity: 100,
179+
shipping_id: 'A001',
180+
status: 'delivered'
181+
}, 1]
182+
]))
183+
184+
input.sendFrontier(v(3))
185+
graph.step()
186+
showStatus()
187+
showProcessed()
188+
189+
console.log('--------------------------------')
190+
191+
/*
192+
Output:
193+
--------------------------------
194+
Sending initial orders
195+
Counts by Status:
196+
{
197+
"apple-packed": 100,
198+
"banana-packed": 150
199+
}
200+
Fruit Processed:
201+
{
202+
"apple": 100,
203+
"banana": 150
204+
}
205+
--------------------------------
206+
Shipping 2 orders
207+
Counts by Status:
208+
{
209+
"apple-shipped": 100,
210+
"banana-shipped": 150
211+
}
212+
Fruit Processed:
213+
{
214+
"apple": 100,
215+
"banana": 150
216+
}
217+
--------------------------------
218+
One order arrives
219+
Counts by Status:
220+
{
221+
"banana-shipped": 150,
222+
"apple-delivered": 100
223+
}
224+
Fruit Processed:
225+
{
226+
"apple": 100,
227+
"banana": 150
228+
}
229+
--------------------------------
230+
*/

0 commit comments

Comments
 (0)