11import { ICommonObject , INode , INodeData , INodeParams } from '../../../src/Interface'
22import { getBaseClasses , getCredentialData , getCredentialParam } from '../../../src/utils'
33import { ListKeyOptions , RecordManagerInterface , UpdateOptions } from '@langchain/community/indexes/base'
4- import { DataSource , QueryRunner } from 'typeorm'
4+ import { DataSource } from 'typeorm'
55
66class MySQLRecordManager_RecordManager implements INode {
77 label : string
@@ -167,47 +167,58 @@ type MySQLRecordManagerOptions = {
167167
168168class MySQLRecordManager implements RecordManagerInterface {
169169 lc_namespace = [ 'langchain' , 'recordmanagers' , 'mysql' ]
170-
171- datasource : DataSource
172-
173- queryRunner : QueryRunner
174-
170+ config : MySQLRecordManagerOptions
175171 tableName : string
176-
177172 namespace : string
178173
179174 constructor ( namespace : string , config : MySQLRecordManagerOptions ) {
180- const { mysqlOptions , tableName } = config
175+ const { tableName } = config
181176 this . namespace = namespace
182177 this . tableName = tableName || 'upsertion_records'
183- this . datasource = new DataSource ( mysqlOptions )
178+ this . config = config
179+ }
180+
181+ private async getDataSource ( ) : Promise < DataSource > {
182+ const { mysqlOptions } = this . config
183+ if ( ! mysqlOptions ) {
184+ throw new Error ( 'No datasource options provided' )
185+ }
186+ // Prevent using default Postgres port, otherwise will throw uncaught error and crashing the app
187+ if ( mysqlOptions . port === 5432 ) {
188+ throw new Error ( 'Invalid port number' )
189+ }
190+ const dataSource = new DataSource ( mysqlOptions )
191+ await dataSource . initialize ( )
192+ return dataSource
184193 }
185194
186195 async createSchema ( ) : Promise < void > {
187196 try {
188- const appDataSource = await this . datasource . initialize ( )
189-
190- this . queryRunner = appDataSource . createQueryRunner ( )
197+ const dataSource = await this . getDataSource ( )
198+ const queryRunner = dataSource . createQueryRunner ( )
191199
192- await this . queryRunner . manager . query ( `create table if not exists \`${ this . tableName } \` (
200+ await queryRunner . manager . query ( `create table if not exists \`${ this . tableName } \` (
193201 \`uuid\` varchar(36) primary key default (UUID()),
194202 \`key\` varchar(255) not null,
195203 \`namespace\` varchar(255) not null,
196204 \`updated_at\` DOUBLE precision not null,
197205 \`group_id\` longtext,
198206 unique key \`unique_key_namespace\` (\`key\`,
199207\`namespace\`));` )
208+
200209 const columns = [ `updated_at` , `key` , `namespace` , `group_id` ]
201210 for ( const column of columns ) {
202211 // MySQL does not support 'IF NOT EXISTS' function for Index
203- const Check = await this . queryRunner . manager . query (
212+ const Check = await queryRunner . manager . query (
204213 `SELECT COUNT(1) IndexIsThere FROM INFORMATION_SCHEMA.STATISTICS
205214 WHERE table_schema=DATABASE() AND table_name='${ this . tableName } ' AND index_name='${ column } _index';`
206215 )
207216 if ( Check [ 0 ] . IndexIsThere === 0 )
208- await this . queryRunner . manager . query ( `CREATE INDEX \`${ column } _index\`
217+ await queryRunner . manager . query ( `CREATE INDEX \`${ column } _index\`
209218 ON \`${ this . tableName } \` (\`${ column } \`);` )
210219 }
220+
221+ await queryRunner . release ( )
211222 } catch ( e : any ) {
212223 // This error indicates that the table already exists
213224 // Due to asynchronous nature of the code, it is possible that
@@ -221,12 +232,17 @@ class MySQLRecordManager implements RecordManagerInterface {
221232 }
222233
223234 async getTime ( ) : Promise < number > {
235+ const dataSource = await this . getDataSource ( )
224236 try {
225- const res = await this . queryRunner . manager . query ( `SELECT UNIX_TIMESTAMP(NOW()) AS epoch` )
237+ const queryRunner = dataSource . createQueryRunner ( )
238+ const res = await queryRunner . manager . query ( `SELECT UNIX_TIMESTAMP(NOW()) AS epoch` )
239+ await queryRunner . release ( )
226240 return Number . parseFloat ( res [ 0 ] . epoch )
227241 } catch ( error ) {
228242 console . error ( 'Error getting time in MySQLRecordManager:' )
229243 throw error
244+ } finally {
245+ await dataSource . destroy ( )
230246 }
231247 }
232248
@@ -235,6 +251,9 @@ class MySQLRecordManager implements RecordManagerInterface {
235251 return
236252 }
237253
254+ const dataSource = await this . getDataSource ( )
255+ const queryRunner = dataSource . createQueryRunner ( )
256+
238257 const updatedAt = await this . getTime ( )
239258 const { timeAtLeast, groupIds : _groupIds } = updateOptions ?? { }
240259
@@ -261,9 +280,18 @@ class MySQLRecordManager implements RecordManagerInterface {
261280 ON DUPLICATE KEY UPDATE \`updated_at\` = VALUES(\`updated_at\`)`
262281
263282 // To handle multiple files upsert
264- for ( const record of recordsToUpsert ) {
265- // Consider using a transaction for batch operations
266- await this . queryRunner . manager . query ( query , record . flat ( ) )
283+ try {
284+ for ( const record of recordsToUpsert ) {
285+ // Consider using a transaction for batch operations
286+ await queryRunner . manager . query ( query , record . flat ( ) )
287+ }
288+
289+ await queryRunner . release ( )
290+ } catch ( error ) {
291+ console . error ( 'Error updating in MySQLRecordManager:' )
292+ throw error
293+ } finally {
294+ await dataSource . destroy ( )
267295 }
268296 }
269297
@@ -272,6 +300,9 @@ class MySQLRecordManager implements RecordManagerInterface {
272300 return [ ]
273301 }
274302
303+ const dataSource = await this . getDataSource ( )
304+ const queryRunner = dataSource . createQueryRunner ( )
305+
275306 // Prepare the placeholders and the query
276307 const placeholders = keys . map ( ( ) => `?` ) . join ( ', ' )
277308 const query = `
@@ -284,21 +315,27 @@ class MySQLRecordManager implements RecordManagerInterface {
284315
285316 try {
286317 // Execute the query
287- const rows = await this . queryRunner . manager . query ( query , [ this . namespace , ...keys . flat ( ) ] )
318+ const rows = await queryRunner . manager . query ( query , [ this . namespace , ...keys . flat ( ) ] )
288319 // Create a set of existing keys for faster lookup
289320 const existingKeysSet = new Set ( rows . map ( ( row : { key : string } ) => row . key ) )
290321 // Map the input keys to booleans indicating if they exist
291322 keys . forEach ( ( key , index ) => {
292323 existsArray [ index ] = existingKeysSet . has ( key )
293324 } )
325+ await queryRunner . release ( )
294326 return existsArray
295327 } catch ( error ) {
296328 console . error ( 'Error checking existence of keys' )
297- throw error // Allow the caller to handle the error
329+ throw error
330+ } finally {
331+ await dataSource . destroy ( )
298332 }
299333 }
300334
301335 async listKeys ( options ?: ListKeyOptions ) : Promise < string [ ] > {
336+ const dataSource = await this . getDataSource ( )
337+ const queryRunner = dataSource . createQueryRunner ( )
338+
302339 try {
303340 const { before, after, limit, groupIds } = options ?? { }
304341 let query = `SELECT \`key\` FROM \`${ this . tableName } \` WHERE \`namespace\` = ?`
@@ -330,11 +367,14 @@ class MySQLRecordManager implements RecordManagerInterface {
330367 query += ';'
331368
332369 // Directly using try/catch with async/await for cleaner flow
333- const result = await this . queryRunner . manager . query ( query , values )
370+ const result = await queryRunner . manager . query ( query , values )
371+ await queryRunner . release ( )
334372 return result . map ( ( row : { key : string } ) => row . key )
335373 } catch ( error ) {
336374 console . error ( 'MySQLRecordManager listKeys Error: ' )
337- throw error // Re-throw the error to be handled by the caller
375+ throw error
376+ } finally {
377+ await dataSource . destroy ( )
338378 }
339379 }
340380
@@ -343,16 +383,22 @@ class MySQLRecordManager implements RecordManagerInterface {
343383 return
344384 }
345385
386+ const dataSource = await this . getDataSource ( )
387+ const queryRunner = dataSource . createQueryRunner ( )
388+
346389 const placeholders = keys . map ( ( ) => '?' ) . join ( ', ' )
347390 const query = `DELETE FROM \`${ this . tableName } \` WHERE \`namespace\` = ? AND \`key\` IN (${ placeholders } );`
348391 const values = [ this . namespace , ...keys ] . map ( ( v ) => ( typeof v !== 'string' ? `${ v } ` : v ) )
349392
350393 // Directly using try/catch with async/await for cleaner flow
351394 try {
352- await this . queryRunner . manager . query ( query , values )
395+ await queryRunner . manager . query ( query , values )
396+ await queryRunner . release ( )
353397 } catch ( error ) {
354398 console . error ( 'Error deleting keys' )
355- throw error // Re-throw the error to be handled by the caller
399+ throw error
400+ } finally {
401+ await dataSource . destroy ( )
356402 }
357403 }
358404}
0 commit comments