diff --git a/.changeset/evil-waves-kneel.md b/.changeset/evil-waves-kneel.md new file mode 100644 index 0000000..977432d --- /dev/null +++ b/.changeset/evil-waves-kneel.md @@ -0,0 +1,5 @@ +--- +'@electric-sql/d2mini': patch +--- + +fix an issue where messages could be lost if you sent multiple batches to a graph with a join operator before calling run diff --git a/packages/d2mini/src/operators/join.ts b/packages/d2mini/src/operators/join.ts index ac50551..7c35e65 100644 --- a/packages/d2mini/src/operators/join.ts +++ b/packages/d2mini/src/operators/join.ts @@ -38,21 +38,21 @@ export class JoinOperator extends BinaryOperator< const deltaA = new Index() const deltaB = new Index() - // Process input A + // Process input A - process ALL messages, not just the first one const messagesA = this.inputAMessages() - if (messagesA.length > 0) { - const message = messagesA[0] as unknown as MultiSet<[K, V1]> - for (const [item, multiplicity] of message.getInner()) { + for (const message of messagesA) { + const multiSetMessage = message as unknown as MultiSet<[K, V1]> + for (const [item, multiplicity] of multiSetMessage.getInner()) { const [key, value] = item deltaA.addValue(key, [value, multiplicity]) } } - // Process input B + // Process input B - process ALL messages, not just the first one const messagesB = this.inputBMessages() - if (messagesB.length > 0) { - const message = messagesB[0] as unknown as MultiSet<[K, V2]> - for (const [item, multiplicity] of message.getInner()) { + for (const message of messagesB) { + const multiSetMessage = message as unknown as MultiSet<[K, V2]> + for (const [item, multiplicity] of multiSetMessage.getInner()) { const [key, value] = item deltaB.addValue(key, [value, multiplicity]) } diff --git a/packages/d2mini/tests/operators/groupBy.test.ts b/packages/d2mini/tests/operators/groupBy.test.ts index deaba21..5911597 100644 --- a/packages/d2mini/tests/operators/groupBy.test.ts +++ b/packages/d2mini/tests/operators/groupBy.test.ts @@ -797,9 +797,7 @@ describe('Operators', () => { expect(result).toHaveLength(2) // Should have 2 groups // Find the group for category A - let categoryAGroup = result.find( - ([key]) => key[0] === '{"category":"A"}', - ) + let categoryAGroup = result.find(([key]) => key[0] === '{"category":"A"}') expect(categoryAGroup).toBeDefined() expect(categoryAGroup[0][1].total).toBe(30) // Sum of 10 + 20 @@ -854,11 +852,7 @@ describe('Operators', () => { expect(result).toEqual(expectedReAdditionResult) // Step 4: Verify no lingering effects by adding more data - input.sendData( - new MultiSet([ - [{ category: 'A', amount: 15 }, 1], - ]), - ) + input.sendData(new MultiSet([[{ category: 'A', amount: 15 }, 1]])) graph.run() // Verify aggregate is updated correctly from the new baseline diff --git a/packages/d2mini/tests/operators/join-types.test.ts b/packages/d2mini/tests/operators/join-types.test.ts index 060f18c..8d08cd3 100644 --- a/packages/d2mini/tests/operators/join-types.test.ts +++ b/packages/d2mini/tests/operators/join-types.test.ts @@ -29,6 +29,130 @@ describe('Operators', () => { testJoin(joinType) }) }) + + describe('Multiple batch processing regression tests', () => { + joinTypes.forEach((joinType) => { + test(`${joinType} join with multiple batches sent before running`, () => { + const graph = new D2() + const inputA = graph.newInput<[string, string]>() + const inputB = graph.newInput<[string, string]>() + const results: any[] = [] + + inputA.pipe( + join(inputB, joinType as any), + consolidate(), + output((message) => { + results.push(...message.getInner()) + }), + ) + + graph.finalize() + + // Send multiple batches to inputA before running + inputA.sendData( + new MultiSet([ + [['batch1_item1', 'a1'], 1], + [['batch1_item2', 'a2'], 1], + ]), + ) + + inputA.sendData(new MultiSet([[['batch2_item1', 'a3'], 1]])) + + inputA.sendData( + new MultiSet([ + [['batch3_item1', 'a4'], 1], + [['batch3_item2', 'a5'], 1], + ]), + ) + + // Send corresponding data to inputB (some matches, some don't) + inputB.sendData( + new MultiSet([ + [['batch1_item1', 'x1'], 1], // matches + [['batch2_item1', 'x2'], 1], // matches + [['batch3_item2', 'x3'], 1], // matches + [['non_matching', 'x4'], 1], // doesn't match any inputA + ]), + ) + + // Run the graph - should process all batches + graph.run() + + // Collect all keys that appear in the results (regardless of multiplicity) + const processedKeys = new Set() + for (const [[key, _], _mult] of results) { + processedKeys.add(key) + } + + // Verify behavior based on join type + switch (joinType) { + case 'inner': + // Only matching keys should appear + expect(processedKeys.has('batch1_item1')).toBe(true) + expect(processedKeys.has('batch2_item1')).toBe(true) + expect(processedKeys.has('batch3_item2')).toBe(true) + // Non-matching keys should not appear + expect(processedKeys.has('batch1_item2')).toBe(false) + expect(processedKeys.has('batch3_item1')).toBe(false) + expect(processedKeys.has('non_matching')).toBe(false) + expect(processedKeys.size).toBe(3) + break + + case 'left': + // All inputA keys should appear (some with null for inputB) + expect(processedKeys.has('batch1_item1')).toBe(true) // matched + expect(processedKeys.has('batch1_item2')).toBe(true) // unmatched + expect(processedKeys.has('batch2_item1')).toBe(true) // matched + expect(processedKeys.has('batch3_item1')).toBe(true) // unmatched + expect(processedKeys.has('batch3_item2')).toBe(true) // matched + // InputB-only keys should not appear + expect(processedKeys.has('non_matching')).toBe(false) + expect(processedKeys.size).toBe(5) + break + + case 'right': + // All inputB keys should appear (some with null for inputA) + expect(processedKeys.has('batch1_item1')).toBe(true) // matched + expect(processedKeys.has('batch2_item1')).toBe(true) // matched + expect(processedKeys.has('batch3_item2')).toBe(true) // matched + expect(processedKeys.has('non_matching')).toBe(true) // unmatched + // InputA-only keys should not appear + expect(processedKeys.has('batch1_item2')).toBe(false) + expect(processedKeys.has('batch3_item1')).toBe(false) + expect(processedKeys.size).toBe(4) + break + + case 'full': + // All keys from both inputs should appear + expect(processedKeys.has('batch1_item1')).toBe(true) // matched + expect(processedKeys.has('batch1_item2')).toBe(true) // inputA only + expect(processedKeys.has('batch2_item1')).toBe(true) // matched + expect(processedKeys.has('batch3_item1')).toBe(true) // inputA only + expect(processedKeys.has('batch3_item2')).toBe(true) // matched + expect(processedKeys.has('non_matching')).toBe(true) // inputB only + expect(processedKeys.size).toBe(6) + break + + case 'anti': + // Only inputA keys that don't match inputB should appear + expect(processedKeys.has('batch1_item2')).toBe(true) // unmatched in inputA + expect(processedKeys.has('batch3_item1')).toBe(true) // unmatched in inputA + // Matched keys should not appear + expect(processedKeys.has('batch1_item1')).toBe(false) + expect(processedKeys.has('batch2_item1')).toBe(false) + expect(processedKeys.has('batch3_item2')).toBe(false) + // InputB-only keys should not appear + expect(processedKeys.has('non_matching')).toBe(false) + expect(processedKeys.size).toBe(2) + break + } + + // Most importantly: ensure we actually got some results + // (This test would have failed before the bug fix due to data loss) + expect(results.length).toBeGreaterThan(0) + }) + }) + }) }) }) diff --git a/packages/d2mini/tests/operators/join.test.ts b/packages/d2mini/tests/operators/join.test.ts index 2522d88..4295170 100644 --- a/packages/d2mini/tests/operators/join.test.ts +++ b/packages/d2mini/tests/operators/join.test.ts @@ -135,4 +135,157 @@ function testJoin() { ], ]) }) + + test('join with multiple batches sent before running - regression test for data loss bug', () => { + const graph = new D2() + const inputA = graph.newInput<[string, string]>() + const inputB = graph.newInput<[string, string]>() + const messages: MultiSet<[string, [string, string]]>[] = [] + + inputA.pipe( + join(inputB), + output((message) => { + messages.push(message as MultiSet<[string, [string, string]]>) + }), + ) + + graph.finalize() + + // Send multiple batches to inputA before running + inputA.sendData( + new MultiSet([ + [['key1', 'batch1_a'], 1], + [['key2', 'batch1_b'], 1], + ]), + ) + + inputA.sendData( + new MultiSet([ + [['key3', 'batch2_a'], 1], + [['key4', 'batch2_b'], 1], + ]), + ) + + inputA.sendData(new MultiSet([[['key5', 'batch3_a'], 1]])) + + // Send corresponding data to inputB + inputB.sendData( + new MultiSet([ + [['key1', 'x1'], 1], + [['key2', 'x2'], 1], + [['key3', 'x3'], 1], + [['key4', 'x4'], 1], + [['key5', 'x5'], 1], + ]), + ) + + // Run the graph - should process all batches + graph.run() + + // Verify we got results + expect(messages.length).toBeGreaterThan(0) + + // Collect all keys that were processed + const processedKeys = new Set() + for (const message of messages) { + for (const [[key, _], _mult] of message.getInner()) { + processedKeys.add(key) + } + } + + // All keys from all batches should be present + const expectedKeys = ['key1', 'key2', 'key3', 'key4', 'key5'] + for (const key of expectedKeys) { + expect(processedKeys.has(key)).toBe(true) + } + + expect(processedKeys.size).toBe(5) + }) + + test('join comparison: step-by-step vs batch processing should give same results', () => { + // Step-by-step processing + const graph1 = new D2() + const inputA1 = graph1.newInput<[string, string]>() + const inputB1 = graph1.newInput<[string, string]>() + const stepMessages: MultiSet[] = [] + + inputA1.pipe( + join(inputB1), + output((message) => { + stepMessages.push(message) + }), + ) + + graph1.finalize() + + // Set up inputB data first + inputB1.sendData( + new MultiSet([ + [['item1', 'x1'], 1], + [['item2', 'x2'], 1], + [['item3', 'x3'], 1], + ]), + ) + + // Send and process inputA one batch at a time + inputA1.sendData(new MultiSet([[['item1', 'a1'], 1]])) + graph1.run() + + inputA1.sendData(new MultiSet([[['item2', 'a2'], 1]])) + graph1.run() + + inputA1.sendData(new MultiSet([[['item3', 'a3'], 1]])) + graph1.run() + + // Batch processing + const graph2 = new D2() + const inputA2 = graph2.newInput<[string, string]>() + const inputB2 = graph2.newInput<[string, string]>() + const batchMessages: MultiSet[] = [] + + inputA2.pipe( + join(inputB2), + output((message) => { + batchMessages.push(message) + }), + ) + + graph2.finalize() + + // Set up inputB data + inputB2.sendData( + new MultiSet([ + [['item1', 'x1'], 1], + [['item2', 'x2'], 1], + [['item3', 'x3'], 1], + ]), + ) + + // Send all inputA batches then run once + inputA2.sendData(new MultiSet([[['item1', 'a1'], 1]])) + inputA2.sendData(new MultiSet([[['item2', 'a2'], 1]])) + inputA2.sendData(new MultiSet([[['item3', 'a3'], 1]])) + graph2.run() + + // Collect all keys from both approaches + const stepKeys = new Set() + const batchKeys = new Set() + + for (const message of stepMessages) { + for (const [[key, _], _mult] of message.getInner()) { + stepKeys.add(key) + } + } + + for (const message of batchMessages) { + for (const [[key, _], _mult] of message.getInner()) { + batchKeys.add(key) + } + } + + // Both approaches should process the same items + expect(stepKeys.size).toBe(3) + expect(batchKeys.size).toBe(3) + expect(stepKeys).toEqual(batchKeys) + }) }