Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion charts/oceanbase-dashboard/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ sqlAnalyzer:
cpuRequest: "100m"
cpuLimit: "1"
memoryRequest: "128Mi"
memoryLimit: "1Gi"
memoryLimit: "2Gi"
sqlAuditLimit: 10000
slowSqlThresholdMilliSeconds: 1000

2 changes: 1 addition & 1 deletion internal/sql-analyzer/store/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (

func StartMemoryMonitoring(ctx context.Context, name string, db *sql.DB, l *logger.Logger) {
go func() {
ticker := time.NewTicker(1 * time.Minute)
ticker := time.NewTicker(30 * time.Minute)
defer ticker.Stop()
for {
select {
Expand Down
48 changes: 13 additions & 35 deletions internal/sql-analyzer/store/sql_audit.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,43 +45,25 @@ type SqlAuditStore struct {
}

func NewSqlAuditStore(c context.Context, path string, maxOpenConns int, threads int, l *logger.Logger) (*SqlAuditStore, error) {
// Use an in-memory DuckDB database for operations.
db, err := sql.Open("duckdb", "") // In-memory
if err != nil {
return nil, fmt.Errorf("failed to open in-memory duckdb: %w", err)
// Ensure the data directory exists
if err := os.MkdirAll(path, 0755); err != nil {
return nil, fmt.Errorf("failed to create data directory %s: %w", path, err)
}

db.SetMaxOpenConns(maxOpenConns)

// Set memory limit for in-memory DB
conn, err := db.Conn(c)
if err == nil {
memLimit := os.Getenv("DUCKDB_MEMORY_LIMIT")
if memLimit == "" {
memLimit = "512MB"
}
if _, err := conn.ExecContext(c, fmt.Sprintf("PRAGMA memory_limit='%s'", memLimit)); err != nil {
l.Warnf("Failed to set duckdb memory limit for sql audit store: %v", err)
}
if _, err := conn.ExecContext(c, "SET allocator_background_threads=true"); err != nil {
l.Warnf("Failed to set allocator_background_threads for sql audit store: %v", err)
}
if _, err := conn.ExecContext(c, "SET preserve_insertion_order=false"); err != nil {
l.Warnf("Failed to set preserve_insertion_order=false for sql audit store: %v", err)
}
if _, err := conn.ExecContext(c, fmt.Sprintf("SET threads=%d", threads)); err != nil {
l.Warnf("Failed to set threads=%d for sql audit store: %v", threads, err)
}
conn.Close()
} else {
l.Warnf("Failed to get connection to set memory limit for sql audit store: %v", err)
memLimit := os.Getenv("DUCKDB_MEMORY_LIMIT")
if memLimit == "" {
memLimit = "512MB"
}

// Ensure the data directory exists
if err := os.MkdirAll(path, 0755); err != nil {
return nil, fmt.Errorf("failed to create data directory %s: %w", path, err)
// Use an in-memory DuckDB database with configuration parameters in the DSN.
dsn := fmt.Sprintf("?memory_limit=%s&allocator_background_threads=true&preserve_insertion_order=false&threads=%d", memLimit, threads)
db, err := sql.Open("duckdb", dsn)
if err != nil {
return nil, fmt.Errorf("failed to open in-memory duckdb: %w", err)
}

db.SetMaxOpenConns(maxOpenConns)

store := &SqlAuditStore{db: db, path: path, ctx: c, Logger: l}
return store, nil
}
Expand Down Expand Up @@ -150,10 +132,6 @@ func (s *SqlAuditStore) InsertBatch(resultsSlices [][]model.SqlAudit) error {
}
}()

if _, err := conn.ExecContext(s.ctx, "SET preserve_insertion_order=false"); err != nil {
s.Logger.Warnf("Failed to set preserve_insertion_order=false for batch insert: %v", err)
}

// Use the appender to load data into the temp table.
err = conn.Raw(func(driverConn any) error {
duckdbConn, ok := driverConn.(driver.Conn)
Expand Down
64 changes: 33 additions & 31 deletions internal/sql-analyzer/store/sql_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type PlanStore struct {

func (s *PlanStore) InitSqlPlanTable() error {
// Create table if not exists
_, err := s.db.Exec(sqlconst.CreateSqlPlanTable)
_, err := s.db.ExecContext(s.ctx, sqlconst.CreateSqlPlanTable)
return err
}

Expand All @@ -48,7 +48,14 @@ func NewPlanStore(c context.Context, path string, maxOpenConns int, threads int,
return nil, fmt.Errorf("failed to create data directory %s: %w", path, err)
}

dsn := filepath.Join(path, "sql_plan.duckdb")
memLimit := os.Getenv("DUCKDB_MEMORY_LIMIT")
if memLimit == "" {
memLimit = "512MB"
}

dsn := fmt.Sprintf("%s?memory_limit=%s&allocator_background_threads=true&preserve_insertion_order=false&threads=%d",
filepath.Join(path, "sql_plan.duckdb"), memLimit, threads)

var db *sql.DB
var err error
var conn *sql.Conn
Expand All @@ -65,7 +72,8 @@ func NewPlanStore(c context.Context, path string, maxOpenConns int, threads int,
// sql.Open doesn't actually connect. We need to try to get a connection.
conn, err = db.Conn(c)
if err == nil {
break // Success
conn.Close() // Close check connection
break // Success
}

db.Close() // Close the db handle on failure
Expand All @@ -79,27 +87,6 @@ func NewPlanStore(c context.Context, path string, maxOpenConns int, threads int,

db.SetMaxOpenConns(maxOpenConns)

// Set memory limit
memLimit := os.Getenv("DUCKDB_MEMORY_LIMIT")
if memLimit == "" {
memLimit = "512MB"
}
if _, err := conn.ExecContext(c, fmt.Sprintf("PRAGMA memory_limit='%s'", memLimit)); err != nil {
l.Warnf("Failed to set duckdb memory limit: %v", err)
}

if _, err := conn.ExecContext(c, "SET allocator_background_threads=true"); err != nil {
l.Warnf("Failed to set allocator_background_threads: %v", err)
}
if _, err := conn.ExecContext(c, "SET preserve_insertion_order=false"); err != nil {
l.Warnf("Failed to set preserve_insertion_order=false: %v", err)
}
if _, err := conn.ExecContext(c, fmt.Sprintf("SET threads=%d", threads)); err != nil {
l.Warnf("Failed to set threads=%d: %v", threads, err)
}

conn.Close() // Close the temporary connection, the pool will manage connections from here.

s := &PlanStore{db: db, ctx: c, Logger: l}
return s, nil
}
Expand All @@ -108,7 +95,7 @@ func (s *PlanStore) LoadExistingPlans() ([]model.SqlPlanIdentifier, error) {
s.mu.RLock()
defer s.mu.RUnlock()

rows, err := s.db.Query(sqlconst.ListSqlPlanIdentifier)
rows, err := s.db.QueryContext(s.ctx, sqlconst.ListSqlPlanIdentifier)
if err != nil {
return nil, errors.Wrap(err, "failed to query existing plans")
}
Expand All @@ -130,6 +117,9 @@ func (s *PlanStore) LoadExistingPlans() ([]model.SqlPlanIdentifier, error) {
PlanID: planID,
})
}
if err := rows.Err(); err != nil {
return nil, errors.Wrap(err, "error during rows iteration")
}
return existingPlans, nil
}

Expand All @@ -144,7 +134,7 @@ func (s *PlanStore) Store(plan model.SqlPlan) error {
plan.PartitionStart, plan.Other, plan.AccessPredicates, plan.FilterPredicates, plan.StartupPredicates,
plan.Projection, plan.SpecialPredicates, plan.QblockName, plan.Remarks, plan.OtherXML}

if _, err := s.db.Exec(sqlconst.StoreSqlPlanStatement, valueArgs...); err != nil {
if _, err := s.db.ExecContext(s.ctx, sqlconst.StoreSqlPlanStatement, valueArgs...); err != nil {
return err
}

Expand All @@ -156,7 +146,7 @@ func (s *PlanStore) PlanExists(ident model.SqlPlanIdentifier) (bool, error) {
defer s.mu.RUnlock()

var count int
err := s.db.QueryRow(sqlconst.CheckPlanExistence, ident.TenantID, ident.SvrIP, ident.SvrPort, ident.PlanID).Scan(&count)
err := s.db.QueryRowContext(s.ctx, sqlconst.CheckPlanExistence, ident.TenantID, ident.SvrIP, ident.SvrPort, ident.PlanID).Scan(&count)
if err != nil {
return false, errors.Wrap(err, "failed to query plan existence")
}
Expand All @@ -167,7 +157,7 @@ func (s *PlanStore) GetPlanDetail(ident model.SqlPlanIdentifier) ([]model.SqlPla
s.mu.RLock()
defer s.mu.RUnlock()

rows, err := s.db.Query(sqlconst.SelectSqlPlanFromDuckdb, ident.TenantID, ident.SvrIP, ident.SvrPort, ident.PlanID)
rows, err := s.db.QueryContext(s.ctx, sqlconst.SelectSqlPlanFromDuckdb, ident.TenantID, ident.SvrIP, ident.SvrPort, ident.PlanID)
if err != nil {
return nil, errors.Wrap(err, "failed to query plans by sqlId and planHash")
}
Expand All @@ -191,6 +181,9 @@ func (s *PlanStore) GetPlanDetail(ident model.SqlPlanIdentifier) ([]model.SqlPla
}
plans = append(plans, plan)
}
if err := rows.Err(); err != nil {
return nil, errors.Wrap(err, "error during rows iteration")
}
return plans, nil
}

Expand All @@ -205,7 +198,7 @@ func (s *PlanStore) GetPlanStatsBySqlId(sqlId string) ([]model.PlanStatistic, er
s.mu.RLock()
defer s.mu.RUnlock()

rows, err := s.db.Query(sqlconst.GetPlanStats, sqlId)
rows, err := s.db.QueryContext(s.ctx, sqlconst.GetPlanStats, sqlId)
if err != nil {
return nil, errors.Wrap(err, "failed to query plan statistics by sqlId")
}
Expand Down Expand Up @@ -235,6 +228,9 @@ func (s *PlanStore) GetPlanStatsBySqlId(sqlId string) ([]model.PlanStatistic, er
}
stats = append(stats, stat)
}
if err := rows.Err(); err != nil {
return nil, errors.Wrap(err, "error during rows iteration")
}
return stats, nil
}

Expand All @@ -243,7 +239,7 @@ func (s *PlanStore) GetTableInfoBySqlId(sqlId string) ([]model.TableInfo, error)
defer s.mu.RUnlock()

// keep the max object id(table id), tables may dropped and recreated
rows, err := s.db.Query(sqlconst.GetTableInfo, sqlId)
rows, err := s.db.QueryContext(s.ctx, sqlconst.GetTableInfo, sqlId)
if err != nil {
return nil, errors.Wrap(err, "failed to query table info by sqlId")
}
Expand All @@ -261,14 +257,17 @@ func (s *PlanStore) GetTableInfoBySqlId(sqlId string) ([]model.TableInfo, error)
}
tables = append(tables, table)
}
if err := rows.Err(); err != nil {
return nil, errors.Wrap(err, "error during rows iteration")
}
return tables, nil
}

func (s *PlanStore) DebugQuery(query string, args ...interface{}) ([]map[string]interface{}, error) {
s.mu.RLock()
defer s.mu.RUnlock()

rows, err := s.db.Query(query, args...)
rows, err := s.db.QueryContext(s.ctx, query, args...)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -307,6 +306,9 @@ func (s *PlanStore) DebugQuery(query string, args ...interface{}) ([]map[string]
}
results = append(results, m)
}
if err := rows.Err(); err != nil {
return nil, err
}
return results, nil
}

Expand Down