Skip to content

Commit 8127aaf

Browse files
committed
feat: Implement append method for ORM background batching, enhance relational query filtering with relWhere, and refactor SQL expression imports.
1 parent 873a24e commit 8127aaf

File tree

5 files changed

+87
-18
lines changed

5 files changed

+87
-18
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ The high-performance core.
1515
- **Turbo Mode**: Native `RowBinary` serialization for 5-10x faster inserts than JSON.
1616
- **Type-Safe DSL**: Fully typed query builder and schema definition.
1717
- **Relational API**: Optimized one-to-many and one-to-one fetching using ClickHouse's `groupArray`.
18-
- **Background Batching**: Built-in client-side buffering for high-throughput writes.
18+
- **Background Batching**: Use `.batch()` and `.append(row)` for ultra-low latency, high-throughput writes.
1919

2020
### [2. housekit (CLI)](./packages/kit)
2121
The schema management and migration tool.

packages/kit/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@
4343
"README.md"
4444
],
4545
"scripts": {
46-
"build": "bun build ./src/index.ts --outdir ./dist --target node --external \"*\" && tsc --project tsconfig.build.json && mv dist/packages/kit/src/* dist/ && rm -rf dist/packages",
46+
"build": "bun build ./src/index.ts --outdir ./dist --target node --external \"*\" && tsc --project tsconfig.build.json && cp -r dist/packages/kit/src/* dist/ && rm -rf dist/packages",
4747
"clean": "rm -rf dist"
4848
},
4949
"dependencies": {

packages/orm/README.md

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -152,9 +152,10 @@ const builder = db.insert(webEvents)
152152
flushIntervalMs: 5000
153153
});
154154

155-
// These calls return immediately, flushing happens in the background
156-
builder.values(row1).execute();
157-
builder.values(row2).execute();
155+
// Add rows to the background queue.
156+
// Proccessing and flushing happen automatically.
157+
await builder.append(row1);
158+
await builder.append(row2);
158159
```
159160

160161
---

packages/orm/src/builders/insert.ts

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,37 @@ export class ClickHouseInsertBuilder<TTable extends TableDefinition<TableColumns
177177
return this;
178178
}
179179

180+
/**
181+
* Add a row to the background batcher.
182+
*
183+
* If batching is not yet configured, it will use default settings
184+
* (10,000 rows or 5 seconds).
185+
*
186+
* Note: This method is "fire-and-forget" and does not wait for
187+
* the database to acknowledge the insert.
188+
*/
189+
async append(row: TableInsert<TTable['$columns']>) {
190+
if (!this._batchConfig) {
191+
this.batch();
192+
}
193+
194+
const plan = buildInsertPlan(this.table);
195+
const batcher = globalBatcher(this.client);
196+
197+
// We temporarily set _values to this single row to reuse processRows logic
198+
const oldValues = this._values;
199+
this._values = [row];
200+
201+
try {
202+
const rowIterator = this.processRows(plan);
203+
for await (const processedRow of rowIterator) {
204+
batcher.add(this.table, processedRow, this._batchConfig!);
205+
}
206+
} finally {
207+
this._values = oldValues;
208+
}
209+
}
210+
180211
/**
181212
* Force JSON format (useful for debugging or compatibility).
182213
*

packages/orm/src/relational.ts

Lines changed: 50 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,34 @@
11
import { ClickHouseQueryBuilder } from './builders/select';
22
import { SQL } from './expressions';
3-
import { sql } from './expressions';
4-
import { and } from './modules/conditional';
3+
import * as ops from './expressions';
4+
import * as cond from './modules/conditional';
55
import { ClickHouseColumn, type RelationDefinition, type TableDefinition } from './core';
66
import type { SQLExpression } from './expressions';
77

8+
const operators = {
9+
eq: ops.eq,
10+
ne: ops.ne,
11+
gt: ops.gt,
12+
gte: ops.gte,
13+
lt: ops.lt,
14+
lte: ops.lte,
15+
inArray: ops.inArray,
16+
notInArray: ops.notInArray,
17+
between: ops.between,
18+
notBetween: ops.notBetween,
19+
has: ops.has,
20+
hasAll: ops.hasAll,
21+
hasAny: ops.hasAny,
22+
sql: ops.sql,
23+
and: cond.and,
24+
or: cond.or,
25+
not: cond.not,
26+
isNull: cond.isNull,
27+
isNotNull: cond.isNotNull,
28+
};
29+
30+
type Operators = typeof operators;
31+
832
/**
933
* Join strategy for relational queries
1034
*/
@@ -36,11 +60,11 @@ export type RelationalFindOptions<TTable = any> = {
3660

3761
function buildJoinCondition(fields: ClickHouseColumn[] | undefined, references: ClickHouseColumn[] | undefined): SQLExpression | null {
3862
if (!fields || !references || fields.length === 0 || references.length === 0) return null;
39-
const pairs = fields.map((f, i) => sql`${f} = ${references[i]}`);
63+
const pairs = fields.map((f, i) => ops.sql`${f} = ${references[i]}`);
4064
const filtered = pairs.filter((p): p is SQL => Boolean(p));
4165
if (filtered.length === 0) return null;
4266
if (filtered.length === 1) return filtered[0];
43-
const combined = and(...filtered);
67+
const combined = cond.and(...filtered);
4468
return combined || null;
4569
}
4670

@@ -73,7 +97,7 @@ export function buildRelationalAPI(client: any, schema?: Record<string, TableDef
7397
const selection: Record<string, any> = {};
7498
baseColumns.forEach(([key, col]) => { selection[key] = col; });
7599

76-
const relations = (tableDef as any).$relations as Record<string, RelationDefinition> | undefined; // Re-introduced
100+
const relations = (tableDef as any).$relations as Record<string, RelationDefinition> | undefined;
77101

78102
const requestedTopLevel = opts?.with ? Object.entries(opts.with).filter(([, v]) => v) : [];
79103

@@ -129,8 +153,14 @@ export function buildRelationalAPI(client: any, schema?: Record<string, TableDef
129153
for (const { relName, rel, options } of requestedNested) {
130154
const newPrefix = prefix ? `${prefix}_${relName}_` : `${relName}_`;
131155

156+
// Resolve relation filter (where)
157+
const relWhere = options.where
158+
? (typeof options.where === 'function' ? options.where(rel.table.$columns) : options.where)
159+
: null;
160+
132161
// Add join for this relation
133-
const joinCondition = buildJoinCondition(rel.fields, rel.references);
162+
let joinCondition = buildJoinCondition(rel.fields, rel.references);
163+
134164
if (joinCondition) {
135165
const joinType = (() => {
136166
const relIsDistributed = isDistributedTable(rel.table);
@@ -149,17 +179,25 @@ export function buildRelationalAPI(client: any, schema?: Record<string, TableDef
149179
if (rel.relation === 'many' && !prefix) {
150180
// For 'many' relations at top level, we add a groupArray(tuple(...)) selection
151181
const relCols = Object.entries(rel.table.$columns);
152-
const tupleArgs = relCols.map(([, col]) => sql`${col}`);
153-
currentSelection[relName] = sql`groupArray(tuple(${sql.join(tupleArgs, sql`, `)}))`;
182+
const tupleArgs = relCols.map(([, col]) => ops.sql`${col}`);
183+
184+
if (relWhere) {
185+
// Use groupArrayIf for filtering
186+
currentSelection[relName] = ops.sql`groupArrayIf(tuple(${ops.sql.join(tupleArgs, ops.sql`, `)}), ${relWhere})`;
187+
} else {
188+
currentSelection[relName] = ops.sql`groupArray(tuple(${ops.sql.join(tupleArgs, ops.sql`, `)}))`;
189+
}
190+
} else {
191+
// If it's a 'one' relation or nested 'many', we might need to add filter to join condition
192+
if (relWhere && rel.relation === 'one') {
193+
joinCondition = cond.and(joinCondition, relWhere) as SQLExpression;
194+
}
154195
}
155196

156197
currentJoins.push({ type: joinType, table: rel.table, on: joinCondition });
157198
}
158199

159200
// Recursively build selection for nested relations
160-
// If it's a 'one' relation, we continue flat selection
161-
// If it's a 'many' relation and we already optimized it via groupArray,
162-
// we don't need to add its columns to the flat selection.
163201
if (rel.relation === 'one') {
164202
const nestedResult = buildNestedSelection(rel.table, options.with, newPrefix, outerJoinStrategy, outerUseGlobal, outerUseAny, isInsideMany);
165203
Object.assign(currentSelection, nestedResult.selection);
@@ -304,5 +342,4 @@ export function buildRelationalAPI(client: any, schema?: Record<string, TableDef
304342
}
305343

306344
return api;
307-
}
308-
345+
}

0 commit comments

Comments
 (0)