Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/evil-waves-kneel.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@electric-sql/d2mini': patch
---

fix an issue where messages could be lost if you sent multiple batches to a graph with a join operator before calling run
16 changes: 8 additions & 8 deletions packages/d2mini/src/operators/join.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,21 +38,21 @@ export class JoinOperator<K, V1, V2> extends BinaryOperator<
const deltaA = new Index<K, V1>()
const deltaB = new Index<K, V2>()

// Process input A
// Process input A - process ALL messages, not just the first one
const messagesA = this.inputAMessages()
if (messagesA.length > 0) {
const message = messagesA[0] as unknown as MultiSet<[K, V1]>
for (const [item, multiplicity] of message.getInner()) {
for (const message of messagesA) {
const multiSetMessage = message as unknown as MultiSet<[K, V1]>
for (const [item, multiplicity] of multiSetMessage.getInner()) {
const [key, value] = item
deltaA.addValue(key, [value, multiplicity])
}
}

// Process input B
// Process input B - process ALL messages, not just the first one
const messagesB = this.inputBMessages()
if (messagesB.length > 0) {
const message = messagesB[0] as unknown as MultiSet<[K, V2]>
for (const [item, multiplicity] of message.getInner()) {
for (const message of messagesB) {
const multiSetMessage = message as unknown as MultiSet<[K, V2]>
for (const [item, multiplicity] of multiSetMessage.getInner()) {
const [key, value] = item
deltaB.addValue(key, [value, multiplicity])
}
Expand Down
10 changes: 2 additions & 8 deletions packages/d2mini/tests/operators/groupBy.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -797,9 +797,7 @@ describe('Operators', () => {
expect(result).toHaveLength(2) // Should have 2 groups

// Find the group for category A
let categoryAGroup = result.find(
([key]) => key[0] === '{"category":"A"}',
)
let categoryAGroup = result.find(([key]) => key[0] === '{"category":"A"}')
expect(categoryAGroup).toBeDefined()
expect(categoryAGroup[0][1].total).toBe(30) // Sum of 10 + 20

Expand Down Expand Up @@ -854,11 +852,7 @@ describe('Operators', () => {
expect(result).toEqual(expectedReAdditionResult)

// Step 4: Verify no lingering effects by adding more data
input.sendData(
new MultiSet([
[{ category: 'A', amount: 15 }, 1],
]),
)
input.sendData(new MultiSet([[{ category: 'A', amount: 15 }, 1]]))
graph.run()

// Verify aggregate is updated correctly from the new baseline
Expand Down
124 changes: 124 additions & 0 deletions packages/d2mini/tests/operators/join-types.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,130 @@ describe('Operators', () => {
testJoin(joinType)
})
})

describe('Multiple batch processing regression tests', () => {
joinTypes.forEach((joinType) => {
test(`${joinType} join with multiple batches sent before running`, () => {
const graph = new D2()
const inputA = graph.newInput<[string, string]>()
const inputB = graph.newInput<[string, string]>()
const results: any[] = []

inputA.pipe(
join(inputB, joinType as any),
consolidate(),
output((message) => {
results.push(...message.getInner())
}),
)

graph.finalize()

// Send multiple batches to inputA before running
inputA.sendData(
new MultiSet([
[['batch1_item1', 'a1'], 1],
[['batch1_item2', 'a2'], 1],
]),
)

inputA.sendData(new MultiSet([[['batch2_item1', 'a3'], 1]]))

inputA.sendData(
new MultiSet([
[['batch3_item1', 'a4'], 1],
[['batch3_item2', 'a5'], 1],
]),
)

// Send corresponding data to inputB (some matches, some don't)
inputB.sendData(
new MultiSet([
[['batch1_item1', 'x1'], 1], // matches
[['batch2_item1', 'x2'], 1], // matches
[['batch3_item2', 'x3'], 1], // matches
[['non_matching', 'x4'], 1], // doesn't match any inputA
]),
)

// Run the graph - should process all batches
graph.run()

// Collect all keys that appear in the results (regardless of multiplicity)
const processedKeys = new Set<string>()
for (const [[key, _], _mult] of results) {
processedKeys.add(key)
}

// Verify behavior based on join type
switch (joinType) {
case 'inner':
// Only matching keys should appear
expect(processedKeys.has('batch1_item1')).toBe(true)
expect(processedKeys.has('batch2_item1')).toBe(true)
expect(processedKeys.has('batch3_item2')).toBe(true)
// Non-matching keys should not appear
expect(processedKeys.has('batch1_item2')).toBe(false)
expect(processedKeys.has('batch3_item1')).toBe(false)
expect(processedKeys.has('non_matching')).toBe(false)
expect(processedKeys.size).toBe(3)
break

case 'left':
// All inputA keys should appear (some with null for inputB)
expect(processedKeys.has('batch1_item1')).toBe(true) // matched
expect(processedKeys.has('batch1_item2')).toBe(true) // unmatched
expect(processedKeys.has('batch2_item1')).toBe(true) // matched
expect(processedKeys.has('batch3_item1')).toBe(true) // unmatched
expect(processedKeys.has('batch3_item2')).toBe(true) // matched
// InputB-only keys should not appear
expect(processedKeys.has('non_matching')).toBe(false)
expect(processedKeys.size).toBe(5)
break

case 'right':
// All inputB keys should appear (some with null for inputA)
expect(processedKeys.has('batch1_item1')).toBe(true) // matched
expect(processedKeys.has('batch2_item1')).toBe(true) // matched
expect(processedKeys.has('batch3_item2')).toBe(true) // matched
expect(processedKeys.has('non_matching')).toBe(true) // unmatched
// InputA-only keys should not appear
expect(processedKeys.has('batch1_item2')).toBe(false)
expect(processedKeys.has('batch3_item1')).toBe(false)
expect(processedKeys.size).toBe(4)
break

case 'full':
// All keys from both inputs should appear
expect(processedKeys.has('batch1_item1')).toBe(true) // matched
expect(processedKeys.has('batch1_item2')).toBe(true) // inputA only
expect(processedKeys.has('batch2_item1')).toBe(true) // matched
expect(processedKeys.has('batch3_item1')).toBe(true) // inputA only
expect(processedKeys.has('batch3_item2')).toBe(true) // matched
expect(processedKeys.has('non_matching')).toBe(true) // inputB only
expect(processedKeys.size).toBe(6)
break

case 'anti':
// Only inputA keys that don't match inputB should appear
expect(processedKeys.has('batch1_item2')).toBe(true) // unmatched in inputA
expect(processedKeys.has('batch3_item1')).toBe(true) // unmatched in inputA
// Matched keys should not appear
expect(processedKeys.has('batch1_item1')).toBe(false)
expect(processedKeys.has('batch2_item1')).toBe(false)
expect(processedKeys.has('batch3_item2')).toBe(false)
// InputB-only keys should not appear
expect(processedKeys.has('non_matching')).toBe(false)
expect(processedKeys.size).toBe(2)
break
}

// Most importantly: ensure we actually got some results
// (This test would have failed before the bug fix due to data loss)
expect(results.length).toBeGreaterThan(0)
})
})
})
})
})

Expand Down
153 changes: 153 additions & 0 deletions packages/d2mini/tests/operators/join.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -135,4 +135,157 @@ function testJoin() {
],
])
})

test('join with multiple batches sent before running - regression test for data loss bug', () => {
const graph = new D2()
const inputA = graph.newInput<[string, string]>()
const inputB = graph.newInput<[string, string]>()
const messages: MultiSet<[string, [string, string]]>[] = []

inputA.pipe(
join(inputB),
output((message) => {
messages.push(message as MultiSet<[string, [string, string]]>)
}),
)

graph.finalize()

// Send multiple batches to inputA before running
inputA.sendData(
new MultiSet([
[['key1', 'batch1_a'], 1],
[['key2', 'batch1_b'], 1],
]),
)

inputA.sendData(
new MultiSet([
[['key3', 'batch2_a'], 1],
[['key4', 'batch2_b'], 1],
]),
)

inputA.sendData(new MultiSet([[['key5', 'batch3_a'], 1]]))

// Send corresponding data to inputB
inputB.sendData(
new MultiSet([
[['key1', 'x1'], 1],
[['key2', 'x2'], 1],
[['key3', 'x3'], 1],
[['key4', 'x4'], 1],
[['key5', 'x5'], 1],
]),
)

// Run the graph - should process all batches
graph.run()

// Verify we got results
expect(messages.length).toBeGreaterThan(0)

// Collect all keys that were processed
const processedKeys = new Set<string>()
for (const message of messages) {
for (const [[key, _], _mult] of message.getInner()) {
processedKeys.add(key)
}
}

// All keys from all batches should be present
const expectedKeys = ['key1', 'key2', 'key3', 'key4', 'key5']
for (const key of expectedKeys) {
expect(processedKeys.has(key)).toBe(true)
}

expect(processedKeys.size).toBe(5)
})

test('join comparison: step-by-step vs batch processing should give same results', () => {
// Step-by-step processing
const graph1 = new D2()
const inputA1 = graph1.newInput<[string, string]>()
const inputB1 = graph1.newInput<[string, string]>()
const stepMessages: MultiSet<any>[] = []

inputA1.pipe(
join(inputB1),
output((message) => {
stepMessages.push(message)
}),
)

graph1.finalize()

// Set up inputB data first
inputB1.sendData(
new MultiSet([
[['item1', 'x1'], 1],
[['item2', 'x2'], 1],
[['item3', 'x3'], 1],
]),
)

// Send and process inputA one batch at a time
inputA1.sendData(new MultiSet([[['item1', 'a1'], 1]]))
graph1.run()

inputA1.sendData(new MultiSet([[['item2', 'a2'], 1]]))
graph1.run()

inputA1.sendData(new MultiSet([[['item3', 'a3'], 1]]))
graph1.run()

// Batch processing
const graph2 = new D2()
const inputA2 = graph2.newInput<[string, string]>()
const inputB2 = graph2.newInput<[string, string]>()
const batchMessages: MultiSet<any>[] = []

inputA2.pipe(
join(inputB2),
output((message) => {
batchMessages.push(message)
}),
)

graph2.finalize()

// Set up inputB data
inputB2.sendData(
new MultiSet([
[['item1', 'x1'], 1],
[['item2', 'x2'], 1],
[['item3', 'x3'], 1],
]),
)

// Send all inputA batches then run once
inputA2.sendData(new MultiSet([[['item1', 'a1'], 1]]))
inputA2.sendData(new MultiSet([[['item2', 'a2'], 1]]))
inputA2.sendData(new MultiSet([[['item3', 'a3'], 1]]))
graph2.run()

// Collect all keys from both approaches
const stepKeys = new Set<string>()
const batchKeys = new Set<string>()

for (const message of stepMessages) {
for (const [[key, _], _mult] of message.getInner()) {
stepKeys.add(key)
}
}

for (const message of batchMessages) {
for (const [[key, _], _mult] of message.getInner()) {
batchKeys.add(key)
}
}

// Both approaches should process the same items
expect(stepKeys.size).toBe(3)
expect(batchKeys.size).toBe(3)
expect(stepKeys).toEqual(batchKeys)
})
}