Skip to content

fix: handle VectorBatchAppender type mismatch in consolidateBatches#43

Merged
jghoman merged 8 commits intomainfrom
eric/fix-consolidate-batches-type-mismatch
Mar 13, 2026
Merged

fix: handle VectorBatchAppender type mismatch in consolidateBatches#43
jghoman merged 8 commits intomainfrom
eric/fix-consolidate-batches-type-mismatch

Conversation

@EDsCODE
Copy link
Copy Markdown

@EDsCODE EDsCODE commented Mar 10, 2026

Summary

  • All production events-ducklake tasks are FAILED with IllegalArgumentException: The targetVector to append must have the same type as the targetVector being appended in consolidateBatches
  • Root cause: castBatchToSchema used ArrowType equality for the "same type, direct copy" path. For complex types (Struct, List, Map), ArrowType is always equal (e.g., both ArrowType.Struct) even when children differ. This caused raw copyFromSafe calls between vectors with mismatched child types, which either threw or corrupted data
  • Fix 1 (root cause): Changed to Field.equals which compares children, nullability, and metadata. Mismatched fields now go through castVectorValues for proper type promotion
  • Fix 2 (missing promotions): Added cast handlers for type pairs that ArrowSchemaMerge.unifySchemas can produce but castVectorValues didn't handle: Bool → Int32/Int64/Float64, TinyInt/SmallInt → Int32
  • Fix 3 (safety net): consolidateBatches now catches VectorBatchAppender exceptions and falls back to writing batches individually, instead of crashing the task

Test plan

  • SchemaMismatchIntegrationTest — sends records with conflicting types (string vs number) in separate poll cycles, verifies task stays RUNNING after flush
  • Unit tests pass
  • Deploy to dev and verify tasks recover after restart

🤖 Generated with Claude Code

When records across separate poll cycles have different types for the
same field (e.g., string vs number), consolidateBatches would crash the
task with "The targetVector to append must have the same type". Now it
catches the exception and falls back to writing batches individually.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@EDsCODE
Copy link
Copy Markdown
Author

EDsCODE commented Mar 10, 2026

don't like this. could end up with too many writes

@EDsCODE EDsCODE marked this pull request as draft March 10, 2026 21:08
EDsCODE and others added 5 commits March 10, 2026 18:37
Two fixes:

1. castBatchToSchema used ArrowType equality for the "same type, direct
   copy" path. For complex types (Struct, List, Map), ArrowType is always
   equal (e.g., both ArrowType.Struct) even when children differ. Changed
   to Field.equals which also compares children, nullability, and metadata.
   Mismatched children now go through castVectorValues instead of a raw
   copyFromSafe that can throw or corrupt data.

2. Added missing type promotions in castVectorValues that
   ArrowSchemaMerge.unifySchemas can produce but had no cast handler:
   - Bool → Int32/Int64/Float64 (Bool is in areAllNumeric)
   - TinyInt/SmallInt → Int32 (promoteNumericTypes can return Int32)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Remove unused allocator parameter from consolidate()
- Remove per-run catch that could write corrupt data on partial append failure
- Narrow flushBatches catch to RuntimeException, re-throw directly
- Add consolidation failure cleanup path to close original batches
- Update javadoc with explicit ownership contract and failure semantics
- Update SchemaMismatchIntegrationTest javadoc for new approach
@jghoman
Copy link
Copy Markdown

jghoman commented Mar 13, 2026

Rather than falling back on single batches, I've changed the code to write contiguous schema-matching batches in the most efficient way possible. This removed a lot of code, and opened up several other efficiencies. Also added a lot of tests.

BatchConsolidator.java

  • Removed unused BufferAllocator allocator parameter from consolidate()
  • Removed per-run try/catch that would add corrupt batches to result on partial append failure
  • Moved single-batch run check (run.size() == 1) up to the caller loop, removed redundant guard in consolidateRun()
  • Rewrote consolidate() javadoc with explicit ownership contract, failure semantics, @param/@return/@throws

DucklakeSinkTask.java (flushBatches)

  • Wrapped BatchConsolidator.consolidate() call in its own try/catch that closes all original batches on failure (prevents Arrow memory leak)
  • Narrowed write-path catch from Exception to RuntimeException
  • Changed throw new RuntimeException("...", e) to throw e (no double-wrap since writer.write() already throws RuntimeException)
  • Updated call site to match removed allocator parameter

SchemaMismatchIntegrationTest.java

  • Updated class javadoc: "falls back to writing batches individually" → "groups contiguous compatible batches and writes incompatible batches as separate runs"
  • Updated test method javadoc: references BatchConsolidator and contiguous-run grouping
  • Updated inline comment at the Thread.sleep: "falls back to individual writes" → "writes them as separate runs"

BatchConsolidationTest.java

  • Updated all 29 consolidate() call sites to remove allocator argument
  • Removed unused assertThrows import (from failed corrupted-vector test exploration)

@jghoman
Copy link
Copy Markdown

jghoman commented Mar 13, 2026

This also removes the type promotion that had initially been in the PR.

Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR addresses production task failures caused by Arrow VectorBatchAppender type mismatches during batch consolidation by introducing schema-aware consolidation that only appends batches with compatible (including nested-child) schemas, and by moving consolidation logic out of DucklakeSinkTask into a dedicated helper.

Changes:

  • Add BatchConsolidator to group contiguous batches by compatible schema and consolidate each run via in-place VectorBatchAppender append.
  • Update DucklakeSinkTask.flushBatches to use BatchConsolidator and simplify the flush path by writing one batch per compatible run.
  • Add unit + integration coverage for consolidation behavior, schema-compatibility edge cases, and the previously failing mismatch scenario.

Reviewed changes

Copilot reviewed 5 out of 5 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
src/main/java/com/inyo/ducklake/connect/DucklakeSinkTask.java Switch flush-time consolidation to BatchConsolidator and adjust close/error-handling paths.
src/main/java/com/inyo/ducklake/connect/BatchConsolidator.java New utility to group by compatible schema and consolidate runs in-place using VectorBatchAppender.
src/test/java/com/inyo/ducklake/connect/BatchConsolidationTest.java New unit tests for consolidation correctness, schema compatibility, ordering, and memory management.
src/integrationTest/java/com/inyo/ducklake/connect/SchemaMismatchIntegrationTest.java New integration test reproducing the schema mismatch scenario across poll cycles and asserting the task remains running.
CLAUDE.md Update local dev environment guidance (mise → flox).

@jghoman jghoman marked this pull request as ready for review March 13, 2026 00:46
@jghoman jghoman merged commit 8edf2ed into main Mar 13, 2026
21 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants