Skip to content

feat: Deadpool connection pool for native transport#408

Open
catinspace-au wants to merge 5 commits intoClickHouse:mainfrom
hyperi-io:hyperi/connection-pooling
Open

feat: Deadpool connection pool for native transport#408
catinspace-au wants to merge 5 commits intoClickHouse:mainfrom
hyperi-io:hyperi/connection-pooling

Conversation

@catinspace-au
Copy link
Copy Markdown
Contributor

Dependency: merge #407 (native-transport) first.

Connection pooling with health checks and cursor drain.

  • Configurable pool size (default 10)
  • check_alive() before returning to idle pool
  • Cursor drain on partial read (prevents protocol misalignment)
  • Connection poisoning on error

- Map 'Bool' type string to UInt8 in the column type parser (Bool is
  a UInt8 alias on the wire: true=1, false=0)
- Implement sparse (custom) column serialization in the data block
  reader: read offset groups to locate non-default values, read only
  those values, then reconstruct the full column with zero/empty/null
  defaults at the remaining positions
- Add native_bool_type integration test covering SELECT of true/false
  values; also exercises the sparse code path on the cluster where
  ClickHouse sends Bool columns with custom_ser=1
…e, Decimal, Enum, UUID, BFloat16, Point, Time

Adds tests and bug fixes for all previously untested types:

- UInt128/Int128 (u128/i128), UInt256/Int256 ([u8;32] raw LE)
- BFloat16 (u16 raw bits; 1.0 = 0x3F80, 2.0 = 0x4000)
- UUID ([u8;16], ClickHouse stores as two LE uint64s: low-value at byte 8)
- Enum8/Enum16 (wire-compatible with Int8/Int16; read as i8/i16)
- Date/Date32/DateTime/DateTime64 (3, 6, 9 precision) with epoch-relative int values
- Time/Time64(3) — seconds/milliseconds since midnight
- Decimal32/Decimal128/Decimal256 (all sizes; Decimal64 was already tested)
- Point (geo) — fix: Point is Tuple(Float64,Float64) in columnar format,
  not a flat 16-byte blob; all x-values come before all y-values in the
  native stream, so we must read two separate Float64 sub-columns and
  interleave per-row
Each NativeClient now owns a lazily-initialised bounded connection pool
(default size: 10) instead of opening a fresh TCP connection per operation.

Pool model:
- tokio Semaphore caps total (idle + in-use) connections to pool_size
- Idle connections stored in Mutex<VecDeque> (FIFO — reuses recently
  returned connections first)
- Pool created on first acquire() call with a snapshot of the current
  client config; builder methods that change connection params
  (with_addr, with_database, with_user, with_password, with_setting,
  with_lz4) reset the pool so fresh connections use updated config
- clone() shares the same pool across client copies

PooledConnection guard:
- DerefMut<Target=NativeConnection> for transparent use at all call sites
- Returns connection to pool on drop
- discard() marks a connection as broken — it is closed and the
  semaphore slot is freed instead of queuing the conn as idle
- Cursor aborts (exception / incomplete read) call discard() to avoid
  returning misaligned connections to the pool
- NativeInsert::abort() discards the connection (incomplete INSERT)

New public API: NativeClient::with_pool_size(n)

Added native_pool_reuse integration test: 20 sequential pings on a
pool capped to 1 connection verify that the same connection is reused.
…schema cache

New files:
- protocol.rs, io.rs, tcp.rs, connection.rs: low-level TCP + handshake
- block_info.rs, client_info.rs: protocol framing helpers
- writer.rs: packet serialisation (query, data block, ping, addendum)
- compression.rs: LZ4 block decompression
- error_codes.rs: server error parsing
- sparse.rs: sparse column deserialisation (Bool / custom_ser=1)
- encode.rs: RowBinary → columnar transpose for INSERT
- schema.rs: TTL-backed schema cache (NativeSchemaCache)
- inserter.rs: multi-batch NativeInserter<T> (mirrors HTTP Inserter)
- batcher.rs, tests/it/batcher.rs: HTTP batcher (separate feature)

Core changes:
- Cargo.toml: deadpool, zstd, socket2 under native-transport feature
- src/lib.rs: expose native module + public re-exports
- tests/it/main.rs: wire in native and batcher test modules
…k; edge-case tests

Pool:
- Replace hand-rolled Semaphore/Mutex pool with deadpool::managed
- NativeConnectionManager implements Manager::create/recycle
- Poisoned connections rejected in recycle(); dropped instead of recycled
- NativeClient stores NativePool directly (Arc-backed); builder methods
  call rebuild_pool() on config changes

Connection:
- Add poisoned field + is_poisoned(); check_alive() for recycle health check
- check_alive(): rejects if poisoned, BufReader has leftover bytes, or
  non-blocking poll_read returns EOF/unexpected data (noop waker)

Cursor / Query:
- NativeRowCursor: add drain() to consume packets until EndOfStream
- NativeRowCursor: Drop impl discards connection if stream not fully consumed
- fetch_one / fetch_optional: call drain() after retrieving row so the
  connection is returned to the pool in a clean state (fixes RowNotFound
  race when pool connections were reused mid-stream)

Insert:
- NativeInsert<T>: add Drop impl that calls abort() to discard connection
  if end() was never called
- end(): take conn out on success so Drop is a no-op after clean commit

Tests (59 integration tests, 11 sparse unit tests):
- Pool: concurrent (10 tasks / pool_size=2), error recovery, insert+ping
- Bool/sparse: stream alignment, multi-column, large gap (1000 rows),
  single true at start, single true at end
- Sparse unit: single-row, large gap, consecutive non-defaults, state carry
- Cursor: fetch_one on empty, fetch_optional, bind() edge cases
- INSERT: large batch, abort, sequential, bool insert
- Schema cache: miss, clear_all
@CLAassistant
Copy link
Copy Markdown

CLAassistant commented Apr 1, 2026

CLA assistant check
All committers have signed the CLA.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants