From 9bfe9b79c382ff87a394336ddca88058e09892f4 Mon Sep 17 00:00:00 2001 From: I-Info Date: Fri, 14 Mar 2025 15:03:10 +0800 Subject: [PATCH] perf: using halfvec --- .../service/common/vectorStore/pg/class.ts | 12 +- .../service/common/vectorStore/pg/index.ts | 8 +- .../app/src/pages/api/admin/inithalfvec.ts | 119 ++++++++++++++++++ 3 files changed, 129 insertions(+), 10 deletions(-) create mode 100644 projects/app/src/pages/api/admin/inithalfvec.ts diff --git a/packages/service/common/vectorStore/pg/class.ts b/packages/service/common/vectorStore/pg/class.ts index 6b7f42bd3ff7..ed712d6675d7 100644 --- a/packages/service/common/vectorStore/pg/class.ts +++ b/packages/service/common/vectorStore/pg/class.ts @@ -21,7 +21,7 @@ export class PgVectorCtrl { CREATE EXTENSION IF NOT EXISTS vector; CREATE TABLE IF NOT EXISTS ${DatasetVectorTableName} ( id BIGSERIAL PRIMARY KEY, - vector VECTOR(1536) NOT NULL, + halfvector HALFVEC(1536) NOT NULL, team_id VARCHAR(50) NOT NULL, dataset_id VARCHAR(50) NOT NULL, collection_id VARCHAR(50) NOT NULL, @@ -29,15 +29,15 @@ export class PgVectorCtrl { ); `); - await PgClient.query( - `CREATE INDEX CONCURRENTLY IF NOT EXISTS vector_index ON ${DatasetVectorTableName} USING hnsw (vector vector_ip_ops) WITH (m = 32, ef_construction = 128);` - ); await PgClient.query( `CREATE INDEX CONCURRENTLY IF NOT EXISTS team_dataset_collection_index ON ${DatasetVectorTableName} USING btree(team_id, dataset_id, collection_id);` ); await PgClient.query( `CREATE INDEX CONCURRENTLY IF NOT EXISTS create_time_index ON ${DatasetVectorTableName} USING btree(createtime);` ); + await PgClient.query( + `CREATE INDEX CONCURRENTLY IF NOT EXISTS halfvector_index ON ${DatasetVectorTableName} USING hnsw (halfvector halfvec_ip_ops) WITH (m = 32, ef_construction = 128);` + ); addLog.info('init pg successful'); } catch (error) { @@ -51,7 +51,7 @@ export class PgVectorCtrl { const { rowCount, rows } = await PgClient.insert(DatasetVectorTableName, { values: [ [ - { key: 'vector', value: `[${vector}]` }, + { key: 'halfvector', value: `[${vector}]` }, { key: 'team_id', value: String(teamId) }, { key: 'dataset_id', value: String(datasetId) }, { key: 'collection_id', value: String(collectionId) } @@ -169,7 +169,7 @@ export class PgVectorCtrl { SET LOCAL hnsw.ef_search = ${global.systemEnv?.pgHNSWEfSearch || 100}; SET LOCAL hnsw.iterative_scan = relaxed_order; WITH relaxed_results AS MATERIALIZED ( - select id, collection_id, vector <#> '[${vector}]' AS score + select id, collection_id, halfvector <#> '[${vector}]' AS score from ${DatasetVectorTableName} where team_id='${teamId}' AND dataset_id IN (${datasetIds.map((id) => `'${String(id)}'`).join(',')}) diff --git a/packages/service/common/vectorStore/pg/index.ts b/packages/service/common/vectorStore/pg/index.ts index 30f6e05414c4..2c333b28737f 100644 --- a/packages/service/common/vectorStore/pg/index.ts +++ b/packages/service/common/vectorStore/pg/index.ts @@ -16,8 +16,8 @@ export const connectPg = async (): Promise => { keepAlive: true, idleTimeoutMillis: 600000, connectionTimeoutMillis: 20000, - query_timeout: 30000, - statement_timeout: 40000, + query_timeout: 1000000, + statement_timeout: 600000, idle_in_transaction_session_timeout: 60000 }); @@ -169,13 +169,13 @@ class PgClass { const pg = await connectPg(); return pg.query<{ id: string }>(sql); } - async query(sql: string) { + async query(sql: string, warning = true) { const pg = await connectPg(); const start = Date.now(); return pg.query(sql).then((res) => { const time = Date.now() - start; - if (time > 300) { + if (warning && time > 300) { addLog.warn(`pg query time: ${time}ms, sql: ${sql}`); } diff --git a/projects/app/src/pages/api/admin/inithalfvec.ts b/projects/app/src/pages/api/admin/inithalfvec.ts new file mode 100644 index 000000000000..56d49eb74bf8 --- /dev/null +++ b/projects/app/src/pages/api/admin/inithalfvec.ts @@ -0,0 +1,119 @@ +import type { NextApiRequest, NextApiResponse } from 'next'; +import { jsonRes } from '@fastgpt/service/common/response'; +import { connectToDatabase } from '@/service/mongo'; +import { authCert } from '@fastgpt/service/support/permission/auth/common'; +import { PgClient } from '@fastgpt/service/common/vectorStore/pg'; +import { DatasetVectorTableName } from '@fastgpt/service/common/vectorStore/constants'; + +async function setHalfvec() { + let totalRowsUpdated = 0; + let lastLoggedTime = Date.now(); + let lastLoggedRows = 0; + + const logUpdateSpeed = () => { + const currentTime = Date.now(); + const timeElapsed = (currentTime - lastLoggedTime) / 1000; // seconds + const rowsUpdated = totalRowsUpdated - lastLoggedRows; + const speed = rowsUpdated / timeElapsed; // rows per second + console.log(`Update speed: ${speed.toFixed(2)} rows/s`); + lastLoggedTime = currentTime; + lastLoggedRows = totalRowsUpdated; + }; + + const asyncUpdate = async () => { + while (true) { + const rowsUpdated = await PgClient.query( + `BEGIN; + SET LOCAL synchronous_commit = off; + UPDATE ${DatasetVectorTableName} + SET halfvector = vector + WHERE ctid = ANY(ARRAY( + SELECT ctid FROM ${DatasetVectorTableName} WHERE halfvector IS NULL LIMIT 200 + FOR NO KEY UPDATE SKIP LOCKED + ))`, + false + ); + if (rowsUpdated?.rowCount) { + totalRowsUpdated += rowsUpdated.rowCount; + console.log(`Rows updated: ${rowsUpdated.rowCount}`); + } else { + console.log('No more rows to update'); + break; + } + } + }; + + const worker = async () => { + let retry = 0; + while (retry < 3) { + try { + await asyncUpdate(); + break; + } catch (error: any) { + console.error('Error updating halfvector:', error?.message); + retry++; + } + } + }; + + const maxConcurrency = Number(process.env.DB_MAX_LINK || 20); + const telemetryInterval = setInterval(logUpdateSpeed, 10000); + + try { + await Promise.all(Array.from({ length: maxConcurrency }, () => worker())); + } finally { + clearInterval(telemetryInterval); + } + + console.log('halfvector column updated'); +} + +export default async function handler(req: NextApiRequest, res: NextApiResponse) { + try { + await connectToDatabase(); + await authCert({ req, authRoot: true }); + + // pg add column halfvector + const columnExists = await PgClient.query(` + SELECT column_name + FROM information_schema.columns + WHERE table_name='${DatasetVectorTableName}'; + `); + + if (columnExists.rows.findIndex((item) => item.column_name === 'halfvector') === -1) { + await PgClient.query( + `ALTER TABLE ${DatasetVectorTableName} ADD COLUMN halfvector halfvec(1536);` + ); + console.log('halfvector column added'); + } + + if (columnExists.rows.findIndex((item) => item.column_name === 'vector') !== -1) { + await setHalfvec(); + } + + // set halfvector NOT NULL + await PgClient.query( + `BEGIN; + ALTER TABLE ${DatasetVectorTableName} ALTER COLUMN halfvector SET NOT NULL; + DROP INDEX IF EXISTS vector_index; + ALTER TABLE ${DatasetVectorTableName} DROP COLUMN IF EXISTS vector; + COMMIT; + ` + ); + console.log('halfvector column set not null'); + + // VACUUM + PgClient.query(`VACUUM ${DatasetVectorTableName};`, true); + + jsonRes(res, { + message: 'success' + }); + } catch (error) { + console.log(error); + + jsonRes(res, { + code: 500, + error + }); + } +}