Skip to content

Commit 7cb01a0

Browse files
authored
Observation: tunnel all writing requests into a single database connection (#326)
1 parent bc9aa2d commit 7cb01a0

File tree

1 file changed

+21
-22
lines changed

1 file changed

+21
-22
lines changed

app/server/observation/storage_sqlite.go

Lines changed: 21 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"database/sql"
66
"fmt"
7+
"os"
78
"time"
89

910
"github.com/google/uuid"
@@ -31,6 +32,7 @@ type storageSQLite struct {
3132
finishOutgoingQueryStmt *sql.Stmt
3233
cancelOutgoingQueryStmt *sql.Stmt
3334
logger *zap.Logger
35+
cfg *config.TObservationConfig_TStorage_TSQLite
3436
}
3537

3638
// initialize creates the necessary tables and prepared statements
@@ -556,32 +558,32 @@ func (s *storageSQLite) Close(_ context.Context) error {
556558

557559
// Close prepared statements for incoming queries
558560
if err := s.createIncomingQueryStmt.Close(); err != nil {
559-
s.logger.Error("failed to close create incoming query statement", zap.Error(err))
561+
s.logger.Error("close create incoming query statement", zap.Error(err))
560562
}
561563

562564
if err := s.finishIncomingQueryStmt.Close(); err != nil {
563-
s.logger.Error("failed to close finish incoming query statement", zap.Error(err))
565+
s.logger.Error("close finish incoming query statement", zap.Error(err))
564566
}
565567

566568
if err := s.cancelIncomingQueryStmt.Close(); err != nil {
567-
s.logger.Error("failed to close cancel incoming query statement", zap.Error(err))
569+
s.logger.Error("close cancel incoming query statement", zap.Error(err))
568570
}
569571

570572
// Close prepared statements for outgoing queries
571573
if err := s.createOutgoingQueryStmt.Close(); err != nil {
572-
s.logger.Error("failed to close create outgoing query statement", zap.Error(err))
574+
s.logger.Error("close create outgoing query statement", zap.Error(err))
573575
}
574576

575577
if err := s.finishOutgoingQueryStmt.Close(); err != nil {
576-
s.logger.Error("failed to close finish outgoing query statement", zap.Error(err))
578+
s.logger.Error("close finish outgoing query statement", zap.Error(err))
577579
}
578580

579581
if err := s.cancelOutgoingQueryStmt.Close(); err != nil {
580-
s.logger.Error("failed to close cancel outgoing query statement", zap.Error(err))
582+
s.logger.Error("close cancel outgoing query statement", zap.Error(err))
581583
}
582584

583585
if err := s.db.Close(); err != nil {
584-
s.logger.Error("failed to close database", zap.Error(err))
586+
s.logger.Error("close database", zap.Error(err))
585587
}
586588
}
587589

@@ -590,14 +592,12 @@ func (s *storageSQLite) Close(_ context.Context) error {
590592

591593
// newStorageSQLite creates a new Storage instance
592594
func (s *storageSQLite) getDatabaseSize() (int64, error) {
593-
var size int64
594-
595-
err := s.db.QueryRow(`SELECT page_count * page_size as size FROM pragma_page_count(), pragma_page_size();`).Scan(&size)
595+
stat, err := os.Stat(s.cfg.Path)
596596
if err != nil {
597-
return 0, fmt.Errorf("failed to get database size: %w", err)
597+
return 0, fmt.Errorf("os stat file '%s': %w", s.cfg.Path, err)
598598
}
599599

600-
return size, nil
600+
return stat.Size(), nil
601601
}
602602

603603
func (s *storageSQLite) collectGarbage(logger *zap.Logger, ttl time.Duration) {
@@ -606,26 +606,26 @@ func (s *storageSQLite) collectGarbage(logger *zap.Logger, ttl time.Duration) {
606606
// Log storage size before cleanup
607607
sizeBefore, err := s.getDatabaseSize()
608608
if err != nil {
609-
logger.Error("failed to get storage size before cleanup", zap.Error(err))
609+
logger.Error("get database size before cleanup", zap.Error(err))
610610
}
611611

612612
_, err = s.db.Exec(`
613613
DELETE FROM incoming_queries WHERE created_at < ?;
614614
DELETE FROM outgoing_queries WHERE created_at < ?;
615615
`, cutoff, cutoff)
616616
if err != nil {
617-
logger.Error("failed to clean up old queries", zap.Error(err))
617+
logger.Error("clean up old queries", zap.Error(err))
618618
}
619619

620620
_, err = s.db.Exec(` VACUUM; `)
621621
if err != nil {
622-
logger.Error("failed to vacuum database", zap.Error(err))
622+
logger.Error("vacuum database", zap.Error(err))
623623
}
624624

625625
// Log storage size after cleanup
626626
sizeAfter, err := s.getDatabaseSize()
627627
if err != nil {
628-
logger.Error("failed to get storage size after cleanup", zap.Error(err))
628+
logger.Error("get database size after cleanup", zap.Error(err))
629629
}
630630

631631
logger.Info("garbage collection completed", zap.Int64("size_before", sizeBefore), zap.Int64("size_after", sizeAfter))
@@ -649,22 +649,20 @@ func (s *storageSQLite) startGarbageCollector(logger *zap.Logger, ttl time.Durat
649649

650650
// newStorageSQLite creates a new Storage instance
651651
func newStorageSQLite(logger *zap.Logger, cfg *config.TObservationConfig_TStorage_TSQLite) (Storage, error) {
652-
db, err := sql.Open("sqlite3", cfg.Path+"?_txlock=immediate&_mutex=no&cache=shared")
652+
db, err := sql.Open("sqlite3", cfg.Path+"?_txlock=immediate&cache=shared")
653653
if err != nil {
654654
return nil, fmt.Errorf("opening SQLite database: %w", err)
655655
}
656656

657-
db.SetMaxOpenConns(10)
658-
db.SetMaxIdleConns(5)
659-
db.SetConnMaxLifetime(30 * time.Minute)
657+
db.SetMaxOpenConns(1)
660658

661659
// Set pragmas for maximum write performance
662660
pragmas := []string{
663-
"PRAGMA synchronous = OFF", // Disable synchronization for maximum speed (data loss acceptable)
661+
"PRAGMA synchronous = NORMAL", // Sync at checkpoints, not per-transaction
664662
"PRAGMA journal_mode = WAL", // Write-Ahead Logging for better write performance
665663
"PRAGMA secure_delete = FALSE", // Disable secure delete for better performance
666664
"PRAGMA locking_mode = NORMAL", // Normal locking mode for better concurrency
667-
"PRAGMA busy_timeout = 5000", // Wait 5000ms when database is locked
665+
"PRAGMA busy_timeout = 30000", // Wait 30000ms when database is locked
668666
"PRAGMA temp_store = MEMORY", // Store temporary data in memory
669667
"PRAGMA mmap_size = 268435456", // 256MB - default is 0 (disabled)
670668
"PRAGMA page_size = 8192", // Larger page size for better write performance
@@ -685,6 +683,7 @@ func newStorageSQLite(logger *zap.Logger, cfg *config.TObservationConfig_TStorag
685683
db: db,
686684
exitChan: make(chan struct{}),
687685
logger: logger,
686+
cfg: cfg,
688687
}
689688

690689
if err = storage.initialize(); err != nil {

0 commit comments

Comments
 (0)