Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions packages/service/common/vectorStore/pg/class.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,24 @@ export class PgVectorCtrl {
CREATE EXTENSION IF NOT EXISTS vector;
CREATE TABLE IF NOT EXISTS ${DatasetVectorTableName} (
id BIGSERIAL PRIMARY KEY,
vector VECTOR(1536) NOT NULL,
halfvector HALFVEC(1536) NOT NULL,
team_id VARCHAR(50) NOT NULL,
dataset_id VARCHAR(50) NOT NULL,
collection_id VARCHAR(50) NOT NULL,
createtime TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
`);

await PgClient.query(
`CREATE INDEX CONCURRENTLY IF NOT EXISTS vector_index ON ${DatasetVectorTableName} USING hnsw (vector vector_ip_ops) WITH (m = 32, ef_construction = 128);`
);
await PgClient.query(
`CREATE INDEX CONCURRENTLY IF NOT EXISTS team_dataset_collection_index ON ${DatasetVectorTableName} USING btree(team_id, dataset_id, collection_id);`
);
await PgClient.query(
`CREATE INDEX CONCURRENTLY IF NOT EXISTS create_time_index ON ${DatasetVectorTableName} USING btree(createtime);`
);
await PgClient.query(
`CREATE INDEX CONCURRENTLY IF NOT EXISTS halfvector_index ON ${DatasetVectorTableName} USING hnsw (halfvector halfvec_ip_ops) WITH (m = 32, ef_construction = 128);`
);

// 10w rows
// await PgClient.query(`
// ALTER TABLE modeldata SET (
Expand Down Expand Up @@ -72,7 +73,7 @@ export class PgVectorCtrl {
const { rowCount, rows } = await PgClient.insert(DatasetVectorTableName, {
values: [
[
{ key: 'vector', value: `[${vector}]` },
{ key: 'halfvector', value: `[${vector}]` },
{ key: 'team_id', value: String(teamId) },
{ key: 'dataset_id', value: String(datasetId) },
{ key: 'collection_id', value: String(collectionId) }
Expand Down Expand Up @@ -190,7 +191,7 @@ export class PgVectorCtrl {
SET LOCAL hnsw.ef_search = ${global.systemEnv?.pgHNSWEfSearch || 100};
SET LOCAL hnsw.iterative_scan = relaxed_order;
WITH relaxed_results AS MATERIALIZED (
select id, collection_id, vector <#> '[${vector}]' AS score
select id, collection_id, halfvector <#> '[${vector}]' AS score
from ${DatasetVectorTableName}
where team_id='${teamId}'
AND dataset_id IN (${datasetIds.map((id) => `'${String(id)}'`).join(',')})
Expand Down
8 changes: 4 additions & 4 deletions packages/service/common/vectorStore/pg/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ export const connectPg = async (): Promise<Pool> => {
keepAlive: true,
idleTimeoutMillis: 600000,
connectionTimeoutMillis: 20000,
query_timeout: 30000,
statement_timeout: 40000,
query_timeout: 1000000,
statement_timeout: 600000,
idle_in_transaction_session_timeout: 60000
});

Expand Down Expand Up @@ -169,13 +169,13 @@ class PgClass {
const pg = await connectPg();
return pg.query<{ id: string }>(sql);
}
async query<T extends QueryResultRow = any>(sql: string) {
async query<T extends QueryResultRow = any>(sql: string, warning = true) {
const pg = await connectPg();
const start = Date.now();
return pg.query<T>(sql).then((res) => {
const time = Date.now() - start;

if (time > 300) {
if (warning && time > 300) {
addLog.warn(`pg query time: ${time}ms, sql: ${sql}`);
}

Expand Down
119 changes: 119 additions & 0 deletions projects/app/src/pages/api/admin/inithalfvec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
import type { NextApiRequest, NextApiResponse } from 'next';
import { jsonRes } from '@fastgpt/service/common/response';
import { connectToDatabase } from '@/service/mongo';
import { authCert } from '@fastgpt/service/support/permission/auth/common';
import { PgClient } from '@fastgpt/service/common/vectorStore/pg';
import { DatasetVectorTableName } from '@fastgpt/service/common/vectorStore/constants';

async function setHalfvec() {
let totalRowsUpdated = 0;
let lastLoggedTime = Date.now();
let lastLoggedRows = 0;

const logUpdateSpeed = () => {
const currentTime = Date.now();
const timeElapsed = (currentTime - lastLoggedTime) / 1000; // seconds
const rowsUpdated = totalRowsUpdated - lastLoggedRows;
const speed = rowsUpdated / timeElapsed; // rows per second
console.log(`Update speed: ${speed.toFixed(2)} rows/s`);
lastLoggedTime = currentTime;
lastLoggedRows = totalRowsUpdated;
};

const asyncUpdate = async () => {
while (true) {
const rowsUpdated = await PgClient.query(
`BEGIN;
SET LOCAL synchronous_commit = off;
UPDATE ${DatasetVectorTableName}
SET halfvector = vector
WHERE ctid = ANY(ARRAY(
SELECT ctid FROM ${DatasetVectorTableName} WHERE halfvector IS NULL LIMIT 200
FOR NO KEY UPDATE SKIP LOCKED
))`,
false
);
if (rowsUpdated?.rowCount) {
totalRowsUpdated += rowsUpdated.rowCount;
console.log(`Rows updated: ${rowsUpdated.rowCount}`);
} else {
console.log('No more rows to update');
break;
}
}
};

const worker = async () => {
let retry = 0;
while (retry < 3) {
try {
await asyncUpdate();
break;
} catch (error: any) {
console.error('Error updating halfvector:', error?.message);
retry++;
}
}
};

const maxConcurrency = Number(process.env.DB_MAX_LINK || 20);
const telemetryInterval = setInterval(logUpdateSpeed, 10000);

try {
await Promise.all(Array.from({ length: maxConcurrency }, () => worker()));
} finally {
clearInterval(telemetryInterval);
}

console.log('halfvector column updated');
}

export default async function handler(req: NextApiRequest, res: NextApiResponse) {
try {
await connectToDatabase();
await authCert({ req, authRoot: true });

// pg add column halfvector
const columnExists = await PgClient.query(`
SELECT column_name
FROM information_schema.columns
WHERE table_name='${DatasetVectorTableName}';
`);

if (columnExists.rows.findIndex((item) => item.column_name === 'halfvector') === -1) {
await PgClient.query(
`ALTER TABLE ${DatasetVectorTableName} ADD COLUMN halfvector halfvec(1536);`
);
console.log('halfvector column added');
}

if (columnExists.rows.findIndex((item) => item.column_name === 'vector') !== -1) {
await setHalfvec();
}

// set halfvector NOT NULL
await PgClient.query(
`BEGIN;
ALTER TABLE ${DatasetVectorTableName} ALTER COLUMN halfvector SET NOT NULL;
DROP INDEX IF EXISTS vector_index;
ALTER TABLE ${DatasetVectorTableName} DROP COLUMN IF EXISTS vector;
COMMIT;
`
);
console.log('halfvector column set not null');

// VACUUM
PgClient.query(`VACUUM ${DatasetVectorTableName};`, true);

jsonRes(res, {
message: 'success'
});
} catch (error) {
console.log(error);

jsonRes(res, {
code: 500,
error
});
}
}
Loading