feat: AsyncInserter for concurrent inserts#410
Open
catinspace-au wants to merge 23 commits intoClickHouse:mainfrom
Open
feat: AsyncInserter for concurrent inserts#410catinspace-au wants to merge 23 commits intoClickHouse:mainfrom
catinspace-au wants to merge 23 commits intoClickHouse:mainfrom
Conversation
- Map 'Bool' type string to UInt8 in the column type parser (Bool is a UInt8 alias on the wire: true=1, false=0) - Implement sparse (custom) column serialization in the data block reader: read offset groups to locate non-default values, read only those values, then reconstruct the full column with zero/empty/null defaults at the remaining positions - Add native_bool_type integration test covering SELECT of true/false values; also exercises the sparse code path on the cluster where ClickHouse sends Bool columns with custom_ser=1
…e, Decimal, Enum, UUID, BFloat16, Point, Time Adds tests and bug fixes for all previously untested types: - UInt128/Int128 (u128/i128), UInt256/Int256 ([u8;32] raw LE) - BFloat16 (u16 raw bits; 1.0 = 0x3F80, 2.0 = 0x4000) - UUID ([u8;16], ClickHouse stores as two LE uint64s: low-value at byte 8) - Enum8/Enum16 (wire-compatible with Int8/Int16; read as i8/i16) - Date/Date32/DateTime/DateTime64 (3, 6, 9 precision) with epoch-relative int values - Time/Time64(3) — seconds/milliseconds since midnight - Decimal32/Decimal128/Decimal256 (all sizes; Decimal64 was already tested) - Point (geo) — fix: Point is Tuple(Float64,Float64) in columnar format, not a flat 16-byte blob; all x-values come before all y-values in the native stream, so we must read two separate Float64 sub-columns and interleave per-row
Each NativeClient now owns a lazily-initialised bounded connection pool (default size: 10) instead of opening a fresh TCP connection per operation. Pool model: - tokio Semaphore caps total (idle + in-use) connections to pool_size - Idle connections stored in Mutex<VecDeque> (FIFO — reuses recently returned connections first) - Pool created on first acquire() call with a snapshot of the current client config; builder methods that change connection params (with_addr, with_database, with_user, with_password, with_setting, with_lz4) reset the pool so fresh connections use updated config - clone() shares the same pool across client copies PooledConnection guard: - DerefMut<Target=NativeConnection> for transparent use at all call sites - Returns connection to pool on drop - discard() marks a connection as broken — it is closed and the semaphore slot is freed instead of queuing the conn as idle - Cursor aborts (exception / incomplete read) call discard() to avoid returning misaligned connections to the pool - NativeInsert::abort() discards the connection (incomplete INSERT) New public API: NativeClient::with_pool_size(n) Added native_pool_reuse integration test: 20 sequential pings on a pool capped to 1 connection verify that the same connection is reused.
…schema cache New files: - protocol.rs, io.rs, tcp.rs, connection.rs: low-level TCP + handshake - block_info.rs, client_info.rs: protocol framing helpers - writer.rs: packet serialisation (query, data block, ping, addendum) - compression.rs: LZ4 block decompression - error_codes.rs: server error parsing - sparse.rs: sparse column deserialisation (Bool / custom_ser=1) - encode.rs: RowBinary → columnar transpose for INSERT - schema.rs: TTL-backed schema cache (NativeSchemaCache) - inserter.rs: multi-batch NativeInserter<T> (mirrors HTTP Inserter) - batcher.rs, tests/it/batcher.rs: HTTP batcher (separate feature) Core changes: - Cargo.toml: deadpool, zstd, socket2 under native-transport feature - src/lib.rs: expose native module + public re-exports - tests/it/main.rs: wire in native and batcher test modules
…k; edge-case tests Pool: - Replace hand-rolled Semaphore/Mutex pool with deadpool::managed - NativeConnectionManager implements Manager::create/recycle - Poisoned connections rejected in recycle(); dropped instead of recycled - NativeClient stores NativePool directly (Arc-backed); builder methods call rebuild_pool() on config changes Connection: - Add poisoned field + is_poisoned(); check_alive() for recycle health check - check_alive(): rejects if poisoned, BufReader has leftover bytes, or non-blocking poll_read returns EOF/unexpected data (noop waker) Cursor / Query: - NativeRowCursor: add drain() to consume packets until EndOfStream - NativeRowCursor: Drop impl discards connection if stream not fully consumed - fetch_one / fetch_optional: call drain() after retrieving row so the connection is returned to the pool in a clean state (fixes RowNotFound race when pool connections were reused mid-stream) Insert: - NativeInsert<T>: add Drop impl that calls abort() to discard connection if end() was never called - end(): take conn out on success so Drop is a no-op after clean commit Tests (59 integration tests, 11 sparse unit tests): - Pool: concurrent (10 tasks / pool_size=2), error recovery, insert+ping - Bool/sparse: stream alignment, multi-column, large gap (1000 rows), single true at start, single true at end - Sparse unit: single-row, large gap, consecutive non-defaults, state carry - Cursor: fetch_one on empty, fetch_optional, bind() edge cases - INSERT: large batch, abort, sequential, bool insert - Schema cache: miss, clear_all
Encoder (encode.rs):
- Remove strip_low_cardinality() — send original type name to server
- Implement proper LowCardinality wire encoding in write_col_values:
* version=1, HAS_ADDITIONAL_KEYS flag, dict + indices
* For LC(Nullable(T)): dict type is T (not Nullable(T)); index 0 is
the null sentinel (default T value); null inputs → index 0
* For LC(T): plain T dict; indices map rows to unique values
* Chooses smallest index type (U8/U16/U32/U64) based on dict size
Reader (columns.rs):
- Fix read_low_cardinality_column for LC(Nullable(T)):
* Dict is now read using T (not Nullable(T)) — matches ClickHouse wire
* Index 0 → RowBinary null [0x01]; other indices → [0x00, T bytes]
* Non-nullable LC unchanged
Tests:
- native_insert_low_cardinality: LowCardinality(String) INSERT + readback
- native_insert_low_cardinality_nullable: LC(Nullable(String)) with None values
Ported from HyperI DFE Loader's per-table buffer + orchestrator pattern (dfe-loader/src/buffer/). The key improvement is embedding the batching policy in the library rather than requiring each consumer to re-implement the orchestrator select! loop. Architecture: bounded MPSC channel + background tokio task with select! over command recv and periodic timer tick. Provides concurrent writes, backpressure, and automatic flush on row/byte/period limits. HTTP transport: - AsyncInserter<T> + AsyncInserterConfig + AsyncInserterHandle<T> - feature = "async-inserter" (depends on "inserter" + tokio time/sync) Native TCP transport: - AsyncNativeInserter<T> + AsyncNativeInserterConfig + AsyncNativeInserterHandle<T> - Same MPSC pattern, uses NativeInserter<T> internally TableBatcher<T> reworked as thin wrapper over AsyncInserter<T>: - append() now takes owned T (was &T::Value<'_>) - Removes Arc<Mutex<Inserter>> in favour of channel-based concurrency Integration tests: happy path, edge cases (single row, empty flush, double flush, max_rows=1, large strings, tiny channel, max_bytes trigger), failure cases (write/flush after end, bad table), and stress/concurrency tests (20 concurrent writers × 50 rows, interleaved flush).
Realistic, large, deeply nested JSON blobs matching Elastic Beat agent output in production. Exercises full insert round-trip for both HTTP and native TCP transports. Payloads: - Filebeat nginx access: Unicode URL params (CJK, French, German), nested headers, JWT cookie, geo coords, null fields - Winlogbeat Security 4625: Cyrillic usernames/workstation names, Windows SIDs, embedded \n\t in multiline message - Filebeat Java stack trace: Windows backslash paths, 3 chained exceptions, Spring CGLIB frames, diacritics in user_id - Winlogbeat PowerShell 4104: ScriptBlockText with heavy diacritics, 4-level deep nesting, escaped quotes, Base64 encoding - Filebeat Kubernetes: JSON-in-JSON message, CJK payment error, Greek symbols, Yen currency, Go goroutine stack trace - Mixed beats concurrent: all 5 sources from 5 concurrent handles, max_rows=15 forcing mid-batch flushes, per-source count verification
…s, and wire format Create docs/ directory with topic-focused guides: - native-transport.md: connect, query, insert over TCP - connection-pooling.md: deadpool pool, health checks, discard pattern - batching.md: AsyncInserter, AsyncNativeInserter, TableBatcher hierarchy - types.md: full type coverage matrix across HTTP and native transports - wire-format.md: LowCardinality, Dynamic, Variant, Array encoding internals - migration.md: HTTP vs native trade-offs, switching guide, upstream differences Update root README.md with native transport section and links to docs.
Replace all text-art diagrams with ```mermaid blocks: - batching.md: inserter hierarchy + MPSC architecture - connection-pooling.md: cursor drain flow + pool architecture - native-transport.md: connection lifecycle - migration.md: branch structure
Update STATE.md branch strategy table with current 5-branch structure. Update migration.md Mermaid diagram with hyperi/ prefix.
Rich ClickHouse type parser lifted from HyperI dfe-loader. Handles Nullable, LowCardinality, Array, Map, DateTime64(p, tz), Decimal(p, s), FixedString(n), Enum. 15 unit tests. DynamicError for schema mismatch, encoding, and fetch errors.
ColumnDef with ParsedType, DynamicSchemaCache with TTL + invalidation, fetch_dynamic_schema() queries system.columns. Uses positional tuple fetch to avoid derive(Row) macro issues inside the crate. 5 unit tests.
Encodes Map<String, Value> to RowBinary using DynamicSchema. Supports all scalar types, Nullable, FixedString, UUID, IPv4/IPv6, Array, Map, JSON. Type-appropriate defaults for missing columns. Unknown types fall back to String encoding for forward compatibility. 14 unit tests. End-to-end CPU perspective documented: schema-reflected RowBinary shifts parsing cost from the ClickHouse cluster to the client, where the work (binary encoding vs JSON serialisation) is roughly equivalent but the server does zero parsing on ingest.
Client::dynamic_insert() creates a DynamicInsert that fetches schema from system.columns, encodes Map<String, Value> to RowBinary, and handles schema mismatch with automatic cache invalidation. Adds serde_json as a runtime dependency (was dev-only). Adds dynamic_schema_cache field to Client (shared via Arc).
MPSC bounded channel, background flush task, row count + time thresholds. Schema recovery on mismatch: invalidate cache, re-fetch, retry once. DynamicBatcherHandle for multi-producer concurrent writes. Client::dynamic_batcher() convenience method.
Tests: simple types, nullable columns, default column skipping, batcher end-flush. Requires running ClickHouse instance.
- fxhash (RUSTSEC-2025-0057, unmaintained) -> rustc-hash 2.x - linked-hash-map (no releases since 2020) -> indexmap (already a dep) - cargo update for semver-compatible bumps
…436) - Drop polonius-the-crab and 3 transitive deps (paste, higher-kinded-types, macro_rules_attribute) — clears RUSTSEC-2024-0436 (paste unmaintained) - Inline the raw-pointer reborrow that polonius wrapped behind a macro - Fix rustfmt.toml edition 2021 -> 2024 to match Cargo.toml
4 mock-based tests (no ClickHouse needed) + 3 integration tests covering the unsafe reborrow paths in poll_next and Next::poll: - single row, multi-row, empty result, fetch_all/fetch_one (mock) - large result spanning chunks, borrowed rows, small block size (integration)
2146491 to
5a571ff
Compare
Collaborator
|
wonderful |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Dependency: merge #409 (lc-insert) first.
AsyncInserter wraps Inserter with MPSC channel + background
flush task. Multiple tokio tasks can write concurrently via handles.
Works on both HTTP and native. Includes shared Quantities and Ticks
types (deduplicated from the HTTP and native inserter paths).