Skip to content

Commit 432e3ab

Browse files
authored
Materialize query pipeline to a store and query it (#6)
* Materialize query pipeline to a store * Refactor materialize to be static method on Store * Draft of how a store (or multiple) could be queried qith D2 * Fix linting errors * Change store to not use EventTarget * Basic tests for the queryAll method * Add a query method to store * Store transactionAll static method
1 parent 4edb9b0 commit 432e3ab

File tree

5 files changed

+987
-131
lines changed

5 files changed

+987
-131
lines changed

eslint.base.mjs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,10 @@ export default [
5151
console: true,
5252
window: true,
5353
document: true,
54-
globalThis: true
54+
globalThis: true,
55+
EventTarget: true,
56+
CustomEvent: true,
57+
EventListener: true
5558
},
5659
},
5760
ignores: ['dist/', 'node_modules/'],

packages/d2ts/examples/fruit-processed.ts

Lines changed: 75 additions & 130 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,62 @@
1-
import { MultiSet } from '../src/multiset'
2-
import { D2 } from '../src/index.js'
3-
import { map, reduce, consolidate, output } from '../src/operators/index.js'
4-
import { v } from '../src/order.js'
5-
import { MessageType } from '../src/types.js'
1+
import { map, reduce, consolidate } from '../src/operators/index.js'
2+
import { Store } from '../src/store.js'
63

74
type FruitOrder = {
8-
name: string,
9-
quantity: number,
10-
shipping_id: string,
5+
name: string
6+
quantity: number
7+
shipping_id: string
118
status: 'packed' | 'shipped' | 'delivered'
129
}
1310

14-
const graph = new D2({ initialFrontier: v(0) })
15-
const input = graph.newInput<FruitOrder>()
16-
17-
// Track quantities by status
18-
const materializedStatus = new Map<string, number>();
19-
const materializedProcessed = new Map<string, number>();
11+
const fruitOrders = new Store<string, FruitOrder>()
12+
13+
const { materializedStatus, materializedProcessed } = Store.queryAll(
14+
[fruitOrders],
15+
([fruitStream]) => {
16+
const statusStream = fruitStream.pipe(
17+
// debug('Raw Input'),
18+
map(
19+
([orderId, order]) =>
20+
[`${order.name}-${order.status}`, order.quantity] as [string, number],
21+
),
22+
// debug('After Map'),
23+
reduce((values) => {
24+
// The reduce function receives an array of [quantity, diff] for each key
25+
// `diff` being the change in number of occurrences of the specific quantity
26+
// It is not aware of the key, just that everything it is receiving is for the same key
27+
// Here we want to sum the quantity for each key, so a sum of num * diff
28+
let count = 0
29+
for (const [num, diff] of values) {
30+
count += num * diff
31+
}
32+
return [[count, 1]]
33+
}),
34+
// debug('Status Totals'),
35+
consolidate(),
36+
)
37+
const processedStream = fruitStream.pipe(
38+
// debug('Raw Input'),
39+
map(
40+
([orderId, order]) => [order.name, order.quantity] as [string, number],
41+
),
42+
// debug('After Map'),
43+
reduce((values) => {
44+
// Count the total number of each fruit processed
45+
let count = 0
46+
for (const [num, diff] of values) {
47+
count += num * diff
48+
}
49+
return [[count, 1]]
50+
}),
51+
// debug('Total Processed'),
52+
consolidate(),
53+
)
54+
55+
const materializedStatus = Store.materialize(statusStream)
56+
const materializedProcessed = Store.materialize(processedStream)
57+
return { materializedStatus, materializedProcessed }
58+
},
59+
)
2060

2161
function showStatus() {
2262
const obj = Object.fromEntries(materializedStatus.entries())
@@ -30,93 +70,25 @@ function showProcessed() {
3070
console.log(JSON.stringify(obj, null, 2))
3171
}
3272

33-
input.pipe(
34-
// debug('Raw Input'),
35-
map((order) => [`${order.name}-${order.status}`, order.quantity] as [string, number]),
36-
// debug('After Map'),
37-
reduce((values) => {
38-
// The reduce function receives an array of [quantity, diff] for each key
39-
// `diff` being the change in number of occurrences of the specific quantity
40-
// It is not aware of the key, just that everything it is receiving is for the same key
41-
// Here we want to sum the quantity for each key, so a sum of num * diff
42-
let count = 0
43-
for (const [num, diff] of values) {
44-
count += num * diff
45-
}
46-
return [[count, 1]]
47-
}),
48-
// debug('Status Totals'),
49-
consolidate(),
50-
output((msg) => {
51-
if (msg.type === MessageType.DATA) {
52-
const entries = msg.data.collection.getInner();
53-
// The entreis are:
54-
// key: {fruit-name}-{status}
55-
// count: number of items in that status
56-
// diff: 1 if adding a row, -1 if removing a row
57-
for (const [[key, count], diff] of entries) {
58-
if (diff > 0) {
59-
materializedStatus.set(key, count)
60-
} else if (diff < 0) {
61-
materializedStatus.delete(key)
62-
}
63-
}
64-
}
65-
})
66-
)
67-
68-
// Track total processed quantities regardless of status
69-
input.pipe(
70-
// debug('Raw Input'),
71-
map((order) => [order.name, order.quantity] as [string, number]),
72-
// debug('After Map'),
73-
reduce((values) => {
74-
// Count the total number of each fruit processed
75-
let count = 0
76-
for (const [num, diff] of values) {
77-
count += num * diff
78-
}
79-
return [[count, 1]]
80-
}),
81-
// debug('Total Processed'),
82-
consolidate(),
83-
output((msg) => {
84-
if (msg.type === MessageType.DATA) {
85-
const entries = msg.data.collection.getInner();
86-
for (const [[key, count], diff] of entries) {
87-
if (diff > 0) {
88-
materializedProcessed.set(key, count)
89-
} else if (diff < 0) {
90-
materializedProcessed.delete(key)
91-
}
92-
}
93-
}
94-
})
95-
)
96-
97-
graph.finalize()
98-
9973
console.log('--------------------------------')
10074

10175
// Initial packing of orders
10276
console.log('Sending initial orders')
103-
input.sendData(v(0), new MultiSet([
104-
[{
77+
fruitOrders.transaction((tx) => {
78+
tx.set('A001', {
10579
name: 'apple',
10680
quantity: 100,
10781
shipping_id: 'A001',
108-
status: 'packed'
109-
}, 1],
110-
[{
82+
status: 'packed',
83+
})
84+
tx.set('B001', {
11185
name: 'banana',
11286
quantity: 150,
11387
shipping_id: 'B001',
114-
status: 'packed'
115-
}, 1]
116-
]))
88+
status: 'packed',
89+
})
90+
})
11791

118-
input.sendFrontier(v(1)) // Send a frontier to set the new minimum version
119-
graph.step() // Step the graph to process the data
12092
// Show the materialized status and processed totals:
12193
showStatus()
12294
showProcessed()
@@ -125,64 +97,37 @@ console.log('--------------------------------')
12597

12698
// Ship 2 orders
12799
console.log('Shipping 2 orders')
128-
input.sendData(v(1), new MultiSet([
129-
// Remove from packed status
130-
[{
131-
name: 'apple',
132-
quantity: 100,
133-
shipping_id: 'A001',
134-
status: 'packed'
135-
}, -1],
136-
// Add to shipped status
137-
[{
100+
fruitOrders.transaction((tx) => {
101+
tx.set('A001', {
138102
name: 'apple',
139103
quantity: 100,
140104
shipping_id: 'A001',
141-
status: 'shipped'
142-
}, 1],
143-
144-
[{
145-
name: 'banana',
146-
quantity: 150,
147-
shipping_id: 'B001',
148-
status: 'packed'
149-
}, -1],
150-
[{
105+
status: 'shipped',
106+
})
107+
tx.set('B001', {
151108
name: 'banana',
152109
quantity: 150,
153110
shipping_id: 'B001',
154-
status: 'shipped'
155-
}, 1]
156-
]))
111+
status: 'shipped',
112+
})
113+
})
157114

158-
input.sendFrontier(v(2))
159-
graph.step()
160115
showStatus()
161116
showProcessed()
162117

163118
console.log('--------------------------------')
164119

165120
// One order arrives
166121
console.log('One order arrives')
167-
input.sendData(v(2), new MultiSet([
168-
// Remove from shipped status
169-
[{
170-
name: 'apple',
171-
quantity: 100,
172-
shipping_id: 'A001',
173-
status: 'shipped'
174-
}, -1],
175-
// Add to delivered status
176-
[{
122+
fruitOrders.transaction((tx) => {
123+
tx.set('A001', {
177124
name: 'apple',
178125
quantity: 100,
179126
shipping_id: 'A001',
180-
status: 'delivered'
181-
}, 1]
182-
]))
127+
status: 'delivered',
128+
})
129+
})
183130

184-
input.sendFrontier(v(3))
185-
graph.step()
186131
showStatus()
187132
showProcessed()
188133

0 commit comments

Comments
 (0)