Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion eslint.base.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,10 @@ export default [
console: true,
window: true,
document: true,
globalThis: true
globalThis: true,
EventTarget: true,
CustomEvent: true,
EventListener: true
},
},
ignores: ['dist/', 'node_modules/'],
Expand Down
205 changes: 75 additions & 130 deletions packages/d2ts/examples/fruit-processed.ts
Original file line number Diff line number Diff line change
@@ -1,22 +1,62 @@
import { MultiSet } from '../src/multiset'
import { D2 } from '../src/index.js'
import { map, reduce, consolidate, output } from '../src/operators/index.js'
import { v } from '../src/order.js'
import { MessageType } from '../src/types.js'
import { map, reduce, consolidate } from '../src/operators/index.js'
import { Store } from '../src/store.js'

type FruitOrder = {
name: string,
quantity: number,
shipping_id: string,
name: string
quantity: number
shipping_id: string
status: 'packed' | 'shipped' | 'delivered'
}

const graph = new D2({ initialFrontier: v(0) })
const input = graph.newInput<FruitOrder>()

// Track quantities by status
const materializedStatus = new Map<string, number>();
const materializedProcessed = new Map<string, number>();
const fruitOrders = new Store<string, FruitOrder>()

const { materializedStatus, materializedProcessed } = Store.queryAll(
[fruitOrders],
([fruitStream]) => {
const statusStream = fruitStream.pipe(
// debug('Raw Input'),
map(
([orderId, order]) =>
[`${order.name}-${order.status}`, order.quantity] as [string, number],
),
// debug('After Map'),
reduce((values) => {
// The reduce function receives an array of [quantity, diff] for each key
// `diff` being the change in number of occurrences of the specific quantity
// It is not aware of the key, just that everything it is receiving is for the same key
// Here we want to sum the quantity for each key, so a sum of num * diff
let count = 0
for (const [num, diff] of values) {
count += num * diff
}
return [[count, 1]]
}),
// debug('Status Totals'),
consolidate(),
)
const processedStream = fruitStream.pipe(
// debug('Raw Input'),
map(
([orderId, order]) => [order.name, order.quantity] as [string, number],
),
// debug('After Map'),
reduce((values) => {
// Count the total number of each fruit processed
let count = 0
for (const [num, diff] of values) {
count += num * diff
}
return [[count, 1]]
}),
// debug('Total Processed'),
consolidate(),
)

const materializedStatus = Store.materialize(statusStream)
const materializedProcessed = Store.materialize(processedStream)
return { materializedStatus, materializedProcessed }
},
)

function showStatus() {
const obj = Object.fromEntries(materializedStatus.entries())
Expand All @@ -30,93 +70,25 @@ function showProcessed() {
console.log(JSON.stringify(obj, null, 2))
}

input.pipe(
// debug('Raw Input'),
map((order) => [`${order.name}-${order.status}`, order.quantity] as [string, number]),
// debug('After Map'),
reduce((values) => {
// The reduce function receives an array of [quantity, diff] for each key
// `diff` being the change in number of occurrences of the specific quantity
// It is not aware of the key, just that everything it is receiving is for the same key
// Here we want to sum the quantity for each key, so a sum of num * diff
let count = 0
for (const [num, diff] of values) {
count += num * diff
}
return [[count, 1]]
}),
// debug('Status Totals'),
consolidate(),
output((msg) => {
if (msg.type === MessageType.DATA) {
const entries = msg.data.collection.getInner();
// The entreis are:
// key: {fruit-name}-{status}
// count: number of items in that status
// diff: 1 if adding a row, -1 if removing a row
for (const [[key, count], diff] of entries) {
if (diff > 0) {
materializedStatus.set(key, count)
} else if (diff < 0) {
materializedStatus.delete(key)
}
}
}
})
)

// Track total processed quantities regardless of status
input.pipe(
// debug('Raw Input'),
map((order) => [order.name, order.quantity] as [string, number]),
// debug('After Map'),
reduce((values) => {
// Count the total number of each fruit processed
let count = 0
for (const [num, diff] of values) {
count += num * diff
}
return [[count, 1]]
}),
// debug('Total Processed'),
consolidate(),
output((msg) => {
if (msg.type === MessageType.DATA) {
const entries = msg.data.collection.getInner();
for (const [[key, count], diff] of entries) {
if (diff > 0) {
materializedProcessed.set(key, count)
} else if (diff < 0) {
materializedProcessed.delete(key)
}
}
}
})
)

graph.finalize()

console.log('--------------------------------')

// Initial packing of orders
console.log('Sending initial orders')
input.sendData(v(0), new MultiSet([
[{
fruitOrders.transaction((tx) => {
tx.set('A001', {
name: 'apple',
quantity: 100,
shipping_id: 'A001',
status: 'packed'
}, 1],
[{
status: 'packed',
})
tx.set('B001', {
name: 'banana',
quantity: 150,
shipping_id: 'B001',
status: 'packed'
}, 1]
]))
status: 'packed',
})
})

input.sendFrontier(v(1)) // Send a frontier to set the new minimum version
graph.step() // Step the graph to process the data
// Show the materialized status and processed totals:
showStatus()
showProcessed()
Expand All @@ -125,64 +97,37 @@ console.log('--------------------------------')

// Ship 2 orders
console.log('Shipping 2 orders')
input.sendData(v(1), new MultiSet([
// Remove from packed status
[{
name: 'apple',
quantity: 100,
shipping_id: 'A001',
status: 'packed'
}, -1],
// Add to shipped status
[{
fruitOrders.transaction((tx) => {
tx.set('A001', {
name: 'apple',
quantity: 100,
shipping_id: 'A001',
status: 'shipped'
}, 1],

[{
name: 'banana',
quantity: 150,
shipping_id: 'B001',
status: 'packed'
}, -1],
[{
status: 'shipped',
})
tx.set('B001', {
name: 'banana',
quantity: 150,
shipping_id: 'B001',
status: 'shipped'
}, 1]
]))
status: 'shipped',
})
})

input.sendFrontier(v(2))
graph.step()
showStatus()
showProcessed()

console.log('--------------------------------')

// One order arrives
console.log('One order arrives')
input.sendData(v(2), new MultiSet([
// Remove from shipped status
[{
name: 'apple',
quantity: 100,
shipping_id: 'A001',
status: 'shipped'
}, -1],
// Add to delivered status
[{
fruitOrders.transaction((tx) => {
tx.set('A001', {
name: 'apple',
quantity: 100,
shipping_id: 'A001',
status: 'delivered'
}, 1]
]))
status: 'delivered',
})
})

input.sendFrontier(v(3))
graph.step()
showStatus()
showProcessed()

Expand Down
Loading
Loading