@@ -22,11 +22,18 @@ var _ Storage = (*storageSQLite)(nil)
2222
2323// storageSQLite handles storing and retrieving query data
2424type storageSQLite struct {
25- db * sql.DB
26- exitChan chan struct {}
25+ db * sql.DB
26+ exitChan chan struct {}
27+ createIncomingQueryStmt * sql.Stmt
28+ finishIncomingQueryStmt * sql.Stmt
29+ cancelIncomingQueryStmt * sql.Stmt
30+ createOutgoingQueryStmt * sql.Stmt
31+ finishOutgoingQueryStmt * sql.Stmt
32+ cancelOutgoingQueryStmt * sql.Stmt
33+ logger * zap.Logger
2734}
2835
29- // initialize creates the necessary tables if they don't exist
36+ // initialize creates the necessary tables and prepared statements
3037func (s * storageSQLite ) initialize () error {
3138 // Create the incoming_queries table
3239 createIncomingTableSQL := `
@@ -71,8 +78,10 @@ func (s *storageSQLite) initialize() error {
7178 CREATE INDEX IF NOT EXISTS idx_outgoing_queries_database_name ON outgoing_queries(database_name);
7279 `
7380
81+ var err error
82+
7483 // Enable foreign key support
75- _ , err : = s .db .Exec ("PRAGMA foreign_keys = ON;" )
84+ _ , err = s .db .Exec ("PRAGMA foreign_keys = ON;" )
7685 if err != nil {
7786 return fmt .Errorf ("enabling foreign keys: %w" , err )
7887 }
@@ -88,6 +97,46 @@ func (s *storageSQLite) initialize() error {
8897 return fmt .Errorf ("creating outgoing_queries table: %w" , err )
8998 }
9099
100+ // Prepare statements for better performance
101+ s .createIncomingQueryStmt , err = s .db .Prepare (
102+ "INSERT INTO incoming_queries (id, data_source_kind, created_at, rows_read, bytes_read, state) VALUES (?, ?, ?, ?, ?, ?)" )
103+ if err != nil {
104+ return fmt .Errorf ("preparing create incoming query statement: %w" , err )
105+ }
106+
107+ s .finishIncomingQueryStmt , err = s .db .Prepare (
108+ "UPDATE incoming_queries SET state = ?, finished_at = ?, rows_read = ?, bytes_read = ? WHERE id = ?" )
109+ if err != nil {
110+ return fmt .Errorf ("preparing finish incoming query statement: %w" , err )
111+ }
112+
113+ s .cancelIncomingQueryStmt , err = s .db .Prepare (
114+ "UPDATE incoming_queries SET state = ?, finished_at = ?, error = ?, rows_read = ?, bytes_read = ? WHERE id = ?" )
115+ if err != nil {
116+ return fmt .Errorf ("preparing cancel incoming query statement: %w" , err )
117+ }
118+
119+ // Prepare statements for outgoing queries
120+ s .createOutgoingQueryStmt , err = s .db .Prepare (`
121+ INSERT INTO outgoing_queries
122+ (id, incoming_query_id, database_name, database_endpoint, rows_read, query_text, query_args, created_at, state)
123+ VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)` )
124+ if err != nil {
125+ return fmt .Errorf ("preparing create outgoing query statement: %w" , err )
126+ }
127+
128+ s .finishOutgoingQueryStmt , err = s .db .Prepare (
129+ "UPDATE outgoing_queries SET state = ?, finished_at = ?, rows_read = ? WHERE id = ?" )
130+ if err != nil {
131+ return fmt .Errorf ("preparing finish outgoing query statement: %w" , err )
132+ }
133+
134+ s .cancelOutgoingQueryStmt , err = s .db .Prepare (
135+ "UPDATE outgoing_queries SET state = ?, finished_at = ?, error = ? WHERE id = ?" )
136+ if err != nil {
137+ return fmt .Errorf ("preparing cancel outgoing query statement: %w" , err )
138+ }
139+
91140 return nil
92141}
93142
@@ -128,8 +177,8 @@ func (s *storageSQLite) CreateIncomingQuery(
128177 now := time .Now ().UTC ()
129178 id := uuid .NewString ()
130179
131- _ , err := s . db . ExecContext ( ctx ,
132- "INSERT INTO incoming_queries (id, data_source_kind, created_at, rows_read, bytes_read, state) VALUES (?, ?, ?, ?, ?, ?)" ,
180+ // Use the prepared statement for better performance
181+ _ , err := s . createIncomingQueryStmt . ExecContext ( ctx ,
133182 id , dataSourceKind .String (), now , 0 , 0 , stateToString (observation .QueryState_QUERY_STATE_RUNNING ),
134183 )
135184 if err != nil {
@@ -148,8 +197,7 @@ func (s *storageSQLite) FinishIncomingQuery(
148197 ctx context.Context , logger * zap.Logger , id string , stats * api_service_protos.TReadSplitsResponse_TStats ) error {
149198 finishedAt := time .Now ().UTC ()
150199
151- result , err := s .db .ExecContext (ctx ,
152- "UPDATE incoming_queries SET state = ?, finished_at = ?, rows_read = ?, bytes_read = ? WHERE id = ?" ,
200+ result , err := s .finishIncomingQueryStmt .ExecContext (ctx ,
153201 stateToString (observation .QueryState_QUERY_STATE_FINISHED ), finishedAt , stats .Rows , stats .Bytes , id ,
154202 )
155203 if err != nil {
@@ -178,8 +226,7 @@ func (s *storageSQLite) CancelIncomingQuery(ctx context.Context, logger *zap.Log
178226) error {
179227 finishedAt := time .Now ().UTC ()
180228
181- result , err := s .db .ExecContext (ctx ,
182- "UPDATE incoming_queries SET state = ?, finished_at = ?, error = ?, rows_read = ?, bytes_read = ? WHERE id = ?" ,
229+ result , err := s .cancelIncomingQueryStmt .ExecContext (ctx ,
183230 stateToString (observation .QueryState_QUERY_STATE_CANCELED ), finishedAt , errorMsg , stats .Rows , stats .Bytes , id ,
184231 )
185232 if err != nil {
@@ -314,14 +361,13 @@ func (s *storageSQLite) CreateOutgoingQuery(
314361 now := time .Now ().UTC ()
315362 id := uuid .NewString ()
316363
317- // Execute the insert within the transaction
318- _ , err = tx .ExecContext (ctx ,
319- `INSERT INTO outgoing_queries
320- (id, incoming_query_id, database_name, database_endpoint, rows_read, query_text, query_args, created_at, state)
321- VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)` ,
364+ // Use the prepared statement for better performance
365+ stmt := tx .StmtContext (ctx , s .createOutgoingQueryStmt )
366+ _ , err = stmt .ExecContext (ctx ,
322367 id , incomingQueryID , dsi .Database , common .EndpointToString (dsi .Endpoint ),
323368 0 , queryText , fmt .Sprint (queryArgs ), now , stateToString (observation .QueryState_QUERY_STATE_RUNNING ),
324369 )
370+
325371 if err != nil {
326372 rollback ()
327373 return logger , "" , fmt .Errorf ("creating outgoing query: %w" , err )
@@ -340,11 +386,10 @@ func (s *storageSQLite) CreateOutgoingQuery(
340386}
341387
342388// FinishOutgoingQuery marks an outgoing query as finished
343- func (s * storageSQLite ) FinishOutgoingQuery (_ context.Context , logger * zap.Logger , id string , rowsRead int64 ) error {
389+ func (s * storageSQLite ) FinishOutgoingQuery (ctx context.Context , logger * zap.Logger , id string , rowsRead int64 ) error {
344390 finishedAt := time .Now ().UTC ()
345391
346- result , err := s .db .Exec (
347- "UPDATE outgoing_queries SET state = ?, finished_at = ?, rows_read = ? WHERE id = ?" ,
392+ result , err := s .finishOutgoingQueryStmt .ExecContext (ctx ,
348393 stateToString (observation .QueryState_QUERY_STATE_FINISHED ), finishedAt , rowsRead , id ,
349394 )
350395 if err != nil {
@@ -366,11 +411,10 @@ func (s *storageSQLite) FinishOutgoingQuery(_ context.Context, logger *zap.Logge
366411}
367412
368413// CancelOutgoingQuery marks an outgoing query as canceled with an error message
369- func (s * storageSQLite ) CancelOutgoingQuery (_ context.Context , logger * zap.Logger , id string , errorMsg string ) error {
414+ func (s * storageSQLite ) CancelOutgoingQuery (ctx context.Context , logger * zap.Logger , id string , errorMsg string ) error {
370415 finishedAt := time .Now ().UTC ()
371416
372- result , err := s .db .Exec (
373- "UPDATE outgoing_queries SET state = ?, finished_at = ?, error = ? WHERE id = ?" ,
417+ result , err := s .cancelOutgoingQueryStmt .ExecContext (ctx ,
374418 stateToString (observation .QueryState_QUERY_STATE_CANCELED ), finishedAt , errorMsg , id ,
375419 )
376420 if err != nil {
@@ -510,7 +554,35 @@ func (s *storageSQLite) Close(_ context.Context) error {
510554 if s .db != nil {
511555 close (s .exitChan ) // Signal the garbage collector to stop
512556
513- return s .db .Close ()
557+ // Close prepared statements for incoming queries
558+ if err := s .createIncomingQueryStmt .Close (); err != nil {
559+ s .logger .Error ("failed to close create incoming query statement" , zap .Error (err ))
560+ }
561+
562+ if err := s .finishIncomingQueryStmt .Close (); err != nil {
563+ s .logger .Error ("failed to close finish incoming query statement" , zap .Error (err ))
564+ }
565+
566+ if err := s .cancelIncomingQueryStmt .Close (); err != nil {
567+ s .logger .Error ("failed to close cancel incoming query statement" , zap .Error (err ))
568+ }
569+
570+ // Close prepared statements for outgoing queries
571+ if err := s .createOutgoingQueryStmt .Close (); err != nil {
572+ s .logger .Error ("failed to close create outgoing query statement" , zap .Error (err ))
573+ }
574+
575+ if err := s .finishOutgoingQueryStmt .Close (); err != nil {
576+ s .logger .Error ("failed to close finish outgoing query statement" , zap .Error (err ))
577+ }
578+
579+ if err := s .cancelOutgoingQueryStmt .Close (); err != nil {
580+ s .logger .Error ("failed to close cancel outgoing query statement" , zap .Error (err ))
581+ }
582+
583+ if err := s .db .Close (); err != nil {
584+ s .logger .Error ("failed to close database" , zap .Error (err ))
585+ }
514586 }
515587
516588 return nil
@@ -577,20 +649,29 @@ func (s *storageSQLite) startGarbageCollector(logger *zap.Logger, ttl time.Durat
577649
578650// newStorageSQLite creates a new Storage instance
579651func newStorageSQLite (logger * zap.Logger , cfg * config.TObservationConfig_TStorage_TSQLite ) (Storage , error ) {
580- db , err := sql .Open ("sqlite3" , cfg .Path + "?_txlock=exclusive&_journal=WAL&_sync=FULL&_secure_delete=TRUE&_mutex=full " )
652+ db , err := sql .Open ("sqlite3" , cfg .Path + "?_txlock=immediate&_mutex=no&cache=shared " )
581653 if err != nil {
582654 return nil , fmt .Errorf ("opening SQLite database: %w" , err )
583655 }
584656
585- db .SetMaxOpenConns (1 )
586- db .SetMaxIdleConns (1 )
657+ db .SetMaxOpenConns (10 )
658+ db .SetMaxIdleConns (5 )
659+ db .SetConnMaxLifetime (30 * time .Minute )
587660
588- // Set pragmas for better performance
661+ // Set pragmas for maximum write performance
589662 pragmas := []string {
590- "PRAGMA synchronous = FULL" ,
591- "PRAGMA journal_mode = WAL" ,
592- "PRAGMA locking_mode = EXCLUSIVE" ,
593- "PRAGMA busy_timeout = 5000" ,
663+ "PRAGMA synchronous = OFF" , // Disable synchronization for maximum speed (data loss acceptable)
664+ "PRAGMA journal_mode = WAL" , // Write-Ahead Logging for better write performance
665+ "PRAGMA secure_delete = FALSE" , // Disable secure delete for better performance
666+ "PRAGMA locking_mode = NORMAL" , // Normal locking mode for better concurrency
667+ "PRAGMA busy_timeout = 5000" , // Wait 5000ms when database is locked
668+ "PRAGMA temp_store = MEMORY" , // Store temporary data in memory
669+ "PRAGMA mmap_size = 268435456" , // 256MB - default is 0 (disabled)
670+ "PRAGMA page_size = 8192" , // Larger page size for better write performance
671+ "PRAGMA cache_size = 20000" , // Increased cache size for better performance
672+ "PRAGMA auto_vacuum = INCREMENTAL" , // More efficient vacuuming
673+ "PRAGMA journal_size_limit = 67108864" , // 64MB - limit WAL file size
674+ "PRAGMA wal_autocheckpoint = 1000" , // Checkpoint WAL file after 1000 pages
594675 }
595676
596677 for _ , pragma := range pragmas {
@@ -603,6 +684,7 @@ func newStorageSQLite(logger *zap.Logger, cfg *config.TObservationConfig_TStorag
603684 storage := & storageSQLite {
604685 db : db ,
605686 exitChan : make (chan struct {}),
687+ logger : logger ,
606688 }
607689
608690 if err = storage .initialize (); err != nil {
0 commit comments