-
Notifications
You must be signed in to change notification settings - Fork 436
Use a separate unit to manage the cache of prepared statements #2937
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 9 commits
4586b3e
5409439
23e4e32
19c44fb
e23980a
d554abb
efaf874
8303f0a
0506ff5
dc3c8bb
0d116cc
ee5c787
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,200 @@ | ||
| // Package stmtcache contains tools for managing the prepared-statement cache. | ||
| package stmtcache | ||
|
|
||
| import ( | ||
| "context" | ||
| "database/sql" | ||
| "fmt" | ||
| "strings" | ||
| "sync" | ||
|
|
||
| "github.com/google/trillian/monitoring" | ||
| "k8s.io/klog/v2" | ||
| ) | ||
|
|
||
| var ( | ||
| once sync.Once | ||
| errStmtCounter monitoring.Counter | ||
| ) | ||
|
|
||
| // PlaceholderSQL SQL statement placeholder | ||
|
||
| const PlaceholderSQL = "<placeholder>" | ||
|
|
||
| // Stmt is wraps the sql.Stmt struct for handling and monitoring SQL errors. | ||
| // If Stmt execution errors occur, it is automatically closed and the prepared statements in the cache are cleared. | ||
| type Stmt struct { | ||
| statement string | ||
| placeholderNum int | ||
| stmtCache *StmtCache | ||
| stmt *sql.Stmt | ||
| parentStmt *Stmt | ||
| } | ||
|
|
||
| // errHandler handling and monitoring SQL errors | ||
| // This err parameter is not currently used, but it may be necessary to perform more granular processing and monitoring of different errs in the future. | ||
| func (s *Stmt) errHandler(_ error) { | ||
| o := s | ||
| if s.parentStmt != nil { | ||
| o = s.parentStmt | ||
| } | ||
|
|
||
| if err := o.Close(); err != nil { | ||
| klog.Warningf("Failed to close stmt: %s", err) | ||
| } | ||
|
|
||
| if o.stmtCache != nil { | ||
| once.Do(func() { | ||
| errStmtCounter = o.stmtCache.mf.NewCounter("sql_stmt_errors", "Number of statement execution errors") | ||
| }) | ||
|
|
||
| errStmtCounter.Inc() | ||
| } | ||
| } | ||
|
|
||
| // SQLStmt returns the referenced sql.Stmt struct. | ||
| func (s *Stmt) SQLStmt() *sql.Stmt { | ||
| return s.stmt | ||
| } | ||
|
|
||
| // Close closes the Stmt. | ||
| // Clear if Stmt belongs to cache | ||
| func (s *Stmt) Close() error { | ||
| if cache := s.stmtCache; cache != nil { | ||
| cache.clearOne(s) | ||
| } | ||
|
|
||
| return s.stmt.Close() | ||
| } | ||
|
|
||
| // WithTx returns a transaction-specific prepared statement from | ||
| // an existing statement. | ||
| // The transaction-specific Stmt is closed by the caller. | ||
| func (s *Stmt) WithTx(ctx context.Context, tx *sql.Tx) *Stmt { | ||
| parent := s | ||
| if s.parentStmt != nil { | ||
| parent = s.parentStmt | ||
| } | ||
| return &Stmt{ | ||
| parentStmt: parent, | ||
| stmt: tx.StmtContext(ctx, parent.stmt), | ||
| } | ||
| } | ||
|
|
||
| // ExecContext executes a prepared statement with the given arguments and | ||
| // returns a Result summarizing the effect of the statement. | ||
| func (s *Stmt) ExecContext(ctx context.Context, args ...any) (sql.Result, error) { | ||
| res, err := s.stmt.ExecContext(ctx, args...) | ||
| if err != nil { | ||
| s.errHandler(err) | ||
| } | ||
| return res, err | ||
| } | ||
|
|
||
| // QueryContext executes a prepared query statement with the given arguments | ||
| // and returns the query results as a *Rows. | ||
| func (s *Stmt) QueryContext(ctx context.Context, args ...any) (*sql.Rows, error) { | ||
| res, err := s.stmt.QueryContext(ctx, args...) | ||
| if err != nil { | ||
| s.errHandler(err) | ||
| } | ||
| return res, err | ||
| } | ||
|
|
||
| // QueryRowContext executes a prepared query statement with the given arguments. | ||
| // If an error occurs during the execution of the statement, that error will | ||
| // be returned by a call to Scan on the returned *Row, which is always non-nil. | ||
| // If the query selects no rows, the *Row's Scan will return ErrNoRows. | ||
| // Otherwise, the *Row's Scan scans the first selected row and discards | ||
| // the rest. | ||
| func (s *Stmt) QueryRowContext(ctx context.Context, args ...any) *sql.Row { | ||
| res := s.stmt.QueryRowContext(ctx, args...) | ||
| if err := res.Err(); err != nil { | ||
| s.errHandler(err) | ||
| } | ||
| return res | ||
| } | ||
|
|
||
| // StmtCache is a cache of the sql.Stmt structs. | ||
| type StmtCache struct { | ||
| db *sql.DB | ||
| statementMutex sync.Mutex | ||
| statements map[string]map[int]*sql.Stmt | ||
| mf monitoring.MetricFactory | ||
| } | ||
|
|
||
| // New creates a StmtCache instance. | ||
| func New(db *sql.DB, mf monitoring.MetricFactory) *StmtCache { | ||
| if mf == nil { | ||
| mf = monitoring.InertMetricFactory{} | ||
| } | ||
|
|
||
| return &StmtCache{ | ||
| db: db, | ||
| statements: make(map[string]map[int]*sql.Stmt), | ||
| mf: mf, | ||
| } | ||
| } | ||
|
|
||
| // clearOne clear the cache of a sql.Stmt. | ||
| func (sc *StmtCache) clearOne(s *Stmt) { | ||
| if s == nil || s.stmt == nil || s.stmtCache != sc { | ||
| return | ||
| } | ||
|
|
||
| sc.statementMutex.Lock() | ||
| defer sc.statementMutex.Unlock() | ||
|
|
||
| if sc.statements[s.statement] != nil && sc.statements[s.statement][s.placeholderNum] == s.stmt { | ||
|
||
| sc.statements[s.statement][s.placeholderNum] = nil | ||
| } | ||
| } | ||
|
|
||
| func (sc *StmtCache) getStmt(ctx context.Context, statement string, num int, first, rest string) (*sql.Stmt, error) { | ||
| sc.statementMutex.Lock() | ||
| defer sc.statementMutex.Unlock() | ||
|
|
||
| if sc.statements[statement] != nil { | ||
| if sc.statements[statement][num] != nil { | ||
| return sc.statements[statement][num], nil | ||
| } | ||
| } else { | ||
| sc.statements[statement] = make(map[int]*sql.Stmt) | ||
| } | ||
|
|
||
| s, err := sc.db.PrepareContext(ctx, expandPlaceholderSQL(statement, num, first, rest)) | ||
| if err != nil { | ||
| klog.Warningf("Failed to prepare statement %d: %s", num, err) | ||
| return nil, err | ||
| } | ||
|
|
||
| sc.statements[statement][num] = s | ||
|
|
||
| return s, nil | ||
| } | ||
|
|
||
| // expandPlaceholderSQL expands an sql statement by adding a specified number of '?' | ||
| // placeholder slots. At most one placeholder will be expanded. | ||
| func expandPlaceholderSQL(sql string, num int, first, rest string) string { | ||
| if num <= 0 { | ||
| panic(fmt.Errorf("trying to expand SQL placeholder with <= 0 parameters: %s", sql)) | ||
| } | ||
|
|
||
| parameters := first + strings.Repeat(","+rest, num-1) | ||
|
|
||
| return strings.Replace(sql, PlaceholderSQL, parameters, 1) | ||
| } | ||
|
|
||
| // GetStmt creates and caches sql.Stmt and returns their wrapper Stmt. | ||
| func (sc *StmtCache) GetStmt(ctx context.Context, statement string, num int, first, rest string) (*Stmt, error) { | ||
| stmt, err := sc.getStmt(ctx, statement, num, first, rest) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
|
|
||
| return &Stmt{ | ||
| statement: statement, | ||
| placeholderNum: num, | ||
| stmtCache: sc, | ||
| stmt: stmt, | ||
| }, nil | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure what 'unit' means?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The word 'separate unit' comes from the current TODO comment:
trillian/storage/mysql/tree_storage.go
Line 109 in bc857c2