Skip to content

Commit 2c5b7d1

Browse files
author
Georg Traar
committed
- Extract SQL Generation
- Adjust Cursor function names to camel case to align with rest - Update create table method
1 parent f186a22 commit 2c5b7d1

File tree

7 files changed

+366
-86
lines changed

7 files changed

+366
-86
lines changed

src/CrateDBClient.ts

Lines changed: 54 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,12 @@ import {
1111
CrateDBBulkResponse,
1212
CrateDBRecord,
1313
CrateDBErrorResponse,
14+
OptimizeOptions,
15+
ColumnDefinition,
16+
TableOptions,
1417
} from './interfaces';
1518
import { CrateDBError, DeserializationError, RequestError } from './utils/Error.js';
19+
import { StatementGenerator } from './StatementGenerator.js';
1620

1721
// Configuration options with CrateDB-specific environment variables
1822
const defaultConfig: CrateDBConfig = {
@@ -162,23 +166,6 @@ export class CrateDBClient {
162166
}
163167
}
164168

165-
// Convenience methods for common SQL operations
166-
private _generateInsertQuery(tableName: string, keys: string[], primaryKeys: string[] | null): string {
167-
const placeholders = keys.map(() => '?').join(', ');
168-
let query = `INSERT INTO ${tableName} (${keys.map((key) => `"${key}"`).join(', ')}) VALUES (${placeholders})`;
169-
170-
if (primaryKeys && primaryKeys.length > 0) {
171-
const keysWithoutPrimary = keys.filter((key) => !primaryKeys.includes(key));
172-
const updates = keysWithoutPrimary.map((key) => `"${key}" = excluded."${key}"`).join(', ');
173-
query += ` ON CONFLICT (${primaryKeys.map((key) => `"${key}"`).join(', ')}) DO UPDATE SET ${updates}`;
174-
} else {
175-
query += ' ON CONFLICT DO NOTHING';
176-
}
177-
178-
query += ';'; // Ensure the query ends with a semicolon
179-
return query;
180-
}
181-
182169
async insert(
183170
tableName: string,
184171
obj: Record<string, unknown>,
@@ -196,7 +183,7 @@ export class CrateDBClient {
196183
}
197184

198185
const keys = Object.keys(obj);
199-
const query = this._generateInsertQuery(tableName, keys, primaryKeys);
186+
const query = StatementGenerator.insert(tableName, keys, primaryKeys);
200187
const args = Object.values(obj);
201188

202189
// Execute the query
@@ -233,7 +220,7 @@ export class CrateDBClient {
233220
uniqueKeys.map((key) => (Object.prototype.hasOwnProperty.call(obj, key) ? obj[key] : null))
234221
);
235222

236-
const query = this._generateInsertQuery(tableName, uniqueKeys, primaryKeys);
223+
const query = StatementGenerator.insert(tableName, uniqueKeys, primaryKeys);
237224

238225
// Execute the query with bulk arguments
239226
const response = await this.executeMany(query, bulkArgs);
@@ -245,32 +232,69 @@ export class CrateDBClient {
245232

246233
async update(tableName: string, options: Record<string, unknown>, whereClause: string): Promise<CrateDBResponse> {
247234
const { keys, values, args } = this._prepareOptions(options);
248-
const setClause = keys.map((key, i) => `${key}=${values[i]}`).join(', ');
249-
const query = `UPDATE ${tableName} SET ${setClause} WHERE ${whereClause}`;
235+
const query = StatementGenerator.update(tableName, options, whereClause);
250236
return this.execute(query, args);
251237
}
252238

253239
async delete(tableName: string, whereClause: string): Promise<CrateDBResponse> {
254-
const query = `DELETE FROM ${tableName} WHERE ${whereClause}`;
240+
const query = StatementGenerator.delete(tableName, whereClause);
255241
return this.execute(query);
256242
}
257243

244+
/**
245+
* Drops a table if it exists in CrateDB.
246+
*
247+
* Constructs and executes a `DROP TABLE IF EXISTS` SQL statement.
248+
*
249+
* @param {string} tableName - The name of the table to drop.
250+
* @returns {Promise<CrateDBResponse>} A promise resolving to the response from CrateDB.
251+
*/
258252
async drop(tableName: string): Promise<CrateDBResponse> {
259-
const query = `DROP TABLE IF EXISTS ${tableName}`;
253+
const query = StatementGenerator.dropTable(tableName);
260254
return this.execute(query);
261255
}
262256

257+
async createTable(
258+
tableName: string,
259+
schema: Record<string, ColumnDefinition>,
260+
options?: TableOptions
261+
): Promise<CrateDBResponse> {
262+
const query = StatementGenerator.createTable(tableName, schema, options);
263+
return this.execute(query);
264+
}
265+
266+
/**
267+
* Refreshes a given table by refreshing it in CrateDB.
268+
*
269+
* The `REFRESH TABLE` command makes recently committed changes available for querying
270+
* without waiting for automatic refresh intervals.
271+
*
272+
* @param {string} tableName - The name of the table to refresh.
273+
* @returns {Promise<CrateDBResponse>} A promise resolving to the response from CrateDB.
274+
*/
263275
async refresh(tableName: string): Promise<CrateDBResponse> {
264-
const query = `REFRESH TABLE ${tableName}`;
276+
const query = StatementGenerator.refresh(tableName);
265277
return this.execute(query);
266278
}
267279

268-
async createTable(schema: Record<string, Record<string, string>>): Promise<CrateDBResponse> {
269-
const tableName = Object.keys(schema)[0];
270-
const columns = Object.entries(schema[tableName])
271-
.map(([col, type]) => `"${col}" ${type}`)
272-
.join(', ');
273-
const query = `CREATE TABLE ${tableName} (${columns})`;
280+
/**
281+
* Optimizes a given table or specific partitions in CrateDB by merging table segments.
282+
*
283+
* The `OPTIMIZE TABLE` command reduces the number of segments in a table, improving
284+
* query performance and reducing storage overhead. It supports optimizing the entire table
285+
* or specific partitions and allows additional optimization parameters.
286+
*
287+
* @param {string} tableName - The name of the table to optimize.
288+
* @param {OptimizeOptions} [options] - Optional parameters for table optimization.
289+
* @param {Record<string, string | number>} [partitions] - Optional key-value pairs specifying partition columns and values.
290+
* @returns {Promise<CrateDBResponse>} A promise resolving to the response from CrateDB.
291+
*/
292+
async optimize(
293+
tableName: string,
294+
options?: OptimizeOptions,
295+
partitions?: Record<string, string | number>
296+
): Promise<CrateDBResponse> {
297+
const query = StatementGenerator.optimize(tableName, options, partitions);
274298
return this.execute(query);
275299
}
276300

src/Cursor.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,13 +40,13 @@ export class Cursor {
4040
this.isOpen = true;
4141
}
4242

43-
async fetchone(): Promise<CrateDBRecord | null> {
43+
async fetchOne(): Promise<CrateDBRecord | null> {
4444
this._ensureOpen();
4545
const result = await this._execute(`FETCH NEXT FROM ${this.cursorName}`);
4646
return result.length > 0 ? result[0] : null; // Return the first row or null
4747
}
4848

49-
async fetchmany(size = 10): Promise<Array<CrateDBRecord>> {
49+
async fetchMany(size = 10): Promise<Array<CrateDBRecord>> {
5050
if (size < 1) {
5151
// Return an empty array if size is less than 1
5252
return [];
@@ -55,7 +55,7 @@ export class Cursor {
5555
return await this._execute(`FETCH ${size} FROM ${this.cursorName}`);
5656
}
5757

58-
async fetchall(): Promise<Array<CrateDBRecord>> {
58+
async fetchAll(): Promise<Array<CrateDBRecord>> {
5959
this._ensureOpen();
6060
return await this._execute(`FETCH ALL FROM ${this.cursorName}`);
6161
}
@@ -64,7 +64,7 @@ export class Cursor {
6464
this._ensureOpen();
6565

6666
while (true) {
67-
const rows = await this.fetchmany(size);
67+
const rows = await this.fetchMany(size);
6868

6969
if (!rows || rows.length === 0) {
7070
break; // Stop iteration when no more rows are returned

src/StatementGenerator.ts

Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
import { OptimizeOptions, ColumnDefinition, TableOptions } from './interfaces';
2+
import { Serializer } from './Serializer';
3+
4+
export class StatementGenerator {
5+
public static createTable(
6+
tableName: string,
7+
schema: Record<string, ColumnDefinition>,
8+
options?: TableOptions
9+
): string {
10+
// Build column definitions
11+
const columns = Object.entries(schema)
12+
.map(([col, definition]) => {
13+
let colDef = `"${col}" ${definition.type.toUpperCase()}`;
14+
if (definition.notNull) {
15+
colDef += ' NOT NULL';
16+
}
17+
if (definition.defaultValue !== undefined) {
18+
colDef += ` DEFAULT ${definition.defaultValue}`;
19+
}
20+
return colDef;
21+
})
22+
.join(', ');
23+
24+
// Build primary key clause if any
25+
const primaryKeys = Object.keys(schema)
26+
.filter((col) => schema[col].primaryKey)
27+
.map((col) => `"${col}"`)
28+
.join(', ');
29+
const primaryKeyClause = primaryKeys ? `, PRIMARY KEY(${primaryKeys})` : '';
30+
31+
// Start query construction
32+
let query = `CREATE TABLE ${this.quoteIdentifier(tableName)} (${columns}${primaryKeyClause})`;
33+
34+
// Partition clause
35+
if (options?.partitionedBy?.length) {
36+
const partitionCols = options.partitionedBy.map((col) => `"${col}"`).join(', ');
37+
query += ` PARTITIONED BY (${partitionCols})`;
38+
}
39+
40+
// Cluster clause
41+
if (options?.clusteredBy || options?.numberOfShards) {
42+
if (options.clusteredBy && options.numberOfShards) {
43+
query += ` CLUSTERED BY ("${options.clusteredBy}") INTO ${options.numberOfShards} SHARDS`;
44+
} else if (options.clusteredBy) {
45+
query += ` CLUSTERED BY ("${options.clusteredBy}")`;
46+
} else if (options.numberOfShards) {
47+
query += ` CLUSTERED INTO ${options.numberOfShards} SHARDS`;
48+
}
49+
}
50+
51+
// Replicas clause
52+
if (options?.numberOfReplicas !== undefined) {
53+
query += ` WITH (number_of_replicas = '${options.numberOfReplicas}')`;
54+
}
55+
56+
return query + ';';
57+
}
58+
59+
public static dropTable(tableName: string): string {
60+
return `DROP TABLE ${this.quoteIdentifier(tableName)};`;
61+
}
62+
63+
public static delete(tableName: string, whereClause?: string): string {
64+
return `DELETE FROM ${this.quoteIdentifier(tableName)} WHERE ${whereClause};`;
65+
}
66+
67+
public static update(tableName: string, options: Record<string, unknown>, whereClause: string): string {
68+
const { keys, values } = this._prepareOptions(options);
69+
const setClause = keys.map((key, i) => `${key}=${values[i]}`).join(', ');
70+
return `UPDATE ${this.quoteIdentifier(tableName)} SET ${setClause} WHERE ${whereClause};`;
71+
}
72+
73+
public static insert(tableName: string, keys: string[], primaryKeys: string[] | null): string {
74+
const placeholders = keys.map(() => '?').join(', ');
75+
const columns = keys.map((key) => `"${key}"`).join(', ');
76+
let query = `INSERT INTO ${this.quoteIdentifier(tableName)} (${columns}) VALUES (${placeholders})`;
77+
78+
if (primaryKeys && primaryKeys.length > 0) {
79+
const nonPrimaryKeys = keys.filter((key) => !primaryKeys.includes(key));
80+
const updates = nonPrimaryKeys.map((key) => `"${key}" = excluded."${key}"`).join(', ');
81+
const pkClause = primaryKeys.map((key) => `"${key}"`).join(', ');
82+
query += ` ON CONFLICT (${pkClause}) DO UPDATE SET ${updates}`;
83+
} else {
84+
query += ' ON CONFLICT DO NOTHING';
85+
}
86+
87+
return query + ';';
88+
}
89+
90+
public static refresh(tableName: string): string {
91+
return `REFRESH TABLE ${this.quoteIdentifier(tableName)};`;
92+
}
93+
94+
public static optimize(
95+
tableName: string,
96+
options?: OptimizeOptions,
97+
partitions?: Record<string, string | number>
98+
): string {
99+
let query = `OPTIMIZE TABLE ${this.quoteIdentifier(tableName)}`;
100+
101+
// Build options clause
102+
if (options && Object.keys(options).length > 0) {
103+
const optionsClauses = Object.entries(options)
104+
.map(([key, value]) => `${key}=${Serializer.serialize(value)}`)
105+
.join(', ');
106+
query += ` WITH (${optionsClauses})`;
107+
}
108+
109+
// Build partitions clause
110+
if (partitions && Object.keys(partitions).length > 0) {
111+
const partitionClauses = Object.entries(partitions)
112+
.map(([key, value]) => {
113+
const val = typeof value === 'string' ? `'${value}'` : value;
114+
return `${key}=${val}`;
115+
})
116+
.join(', ');
117+
query += ` PARTITION (${partitionClauses})`;
118+
}
119+
120+
return query + ';';
121+
}
122+
123+
private static quoteIdentifier(tableName: string): string {
124+
return tableName
125+
.split('.')
126+
.map((part) => `"${part}"`)
127+
.join('.');
128+
}
129+
130+
private static _prepareOptions(options: Record<string, unknown>): {
131+
keys: string[];
132+
values: string[];
133+
args: unknown[];
134+
} {
135+
const keys = Object.keys(options).map((key) => `"${key}"`);
136+
const values = keys.map(() => '?');
137+
const args = Object.values(options);
138+
return { keys, values, args };
139+
}
140+
}

src/interfaces.ts

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,3 +67,29 @@ export interface CrateDBErrorResponse {
6767
}
6868

6969
export type CrateDBRecord = Record<string, unknown>;
70+
71+
export type OptimizeOptions = {
72+
max_num_segments?: number; // Defines the number of segments to merge to
73+
only_expunge_deletes?: boolean; // If true, only segments with deletes are merged
74+
flush?: boolean; // If false, prevents automatic flushing after optimization
75+
};
76+
77+
/**
78+
* Defines the structure of a column in a CrateDB table.
79+
*/
80+
export type ColumnDefinition = {
81+
type: string;
82+
primaryKey?: boolean;
83+
notNull?: boolean;
84+
defaultValue?: unknown;
85+
};
86+
87+
/**
88+
* Defines additional table options such as clustering, partitioning, and replicas.
89+
*/
90+
export type TableOptions = {
91+
clusteredBy?: string; // Column to cluster by
92+
partitionedBy?: string[]; // List of partition key columns
93+
numberOfShards?: number; // Replication factor
94+
numberOfReplicas?: string | number; // Replication factor
95+
};

0 commit comments

Comments
 (0)