Skip to content

Commit f746c91

Browse files
authored
Merge pull request #5 from iluhinsky/refactoring/db
Moved db-related code to dedicated module
2 parents 5e7c903 + aeed0ae commit f746c91

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

76 files changed

+7387
-3953
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ perfkit is a set of performance tools.
44

55
It includes:
66
* [benchmark](benchmark) - a library for writing benchmarks
7+
* [db](db) - a library for working with databases
78
* [acronis-db-bench](acronis-db-bench) - a tool to run database benchmarks
89
* [acronis-restrelay-bench](acronis-restrelay-bench) - a tool to run end-to-end cloud deployment benchmarks
910

acronis-db-bench/VERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
1.1.1
1+
1.1.1-beta

acronis-db-bench/acronis-db-bench.go

Lines changed: 46 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,12 @@ import (
99
"sort"
1010
"strings"
1111

12+
_ "net/http/pprof"
13+
1214
"github.com/acronis/perfkit/benchmark"
13-
embeddedpostgres "github.com/fergusstrange/embedded-postgres" // embedder postgres
15+
"github.com/acronis/perfkit/db"
16+
17+
_ "github.com/acronis/perfkit/db/sql" // sql drivers
1418
)
1519

1620
// Version is a version of the acronis-db-bench
@@ -29,11 +33,10 @@ func printVersion() {
2933

3034
// TestOpts is a structure to store all the test options
3135
type TestOpts struct {
32-
DBOpts benchmark.DatabaseOpts
33-
BenchOpts BenchOpts
34-
EmbeddedPostgresOpts EmbeddedPostgresOpts
35-
TestcaseOpts TestcaseOpts
36-
CTIOpts CTIOpts
36+
DBOpts DatabaseOpts
37+
BenchOpts BenchOpts
38+
TestcaseOpts TestcaseOpts
39+
CTIOpts CTIOpts
3740
}
3841

3942
// BenchOpts is a structure to store all the benchmark options
@@ -70,17 +73,17 @@ type TestcaseOpts struct {
7073

7174
// DBTestData is a structure to store all the test data
7275
type DBTestData struct {
73-
TestDesc *TestDesc
74-
EventBus *EventBus
75-
EmbeddedPostgres *embeddedpostgres.EmbeddedPostgres
76-
EffectiveBatch int // EffectiveBatch reflects the default value if the --batch option is not set, it can be different for different tests
76+
TestDesc *TestDesc
77+
EventBus *EventBus
78+
TenantsCache *TenantsCache
79+
EffectiveBatch int // EffectiveBatch reflects the default value if the --batch option is not set, it can be different for different tests
7780

7881
scores map[string][]benchmark.Score
7982
}
8083

8184
// DBWorkerData is a structure to store all the worker data
8285
type DBWorkerData struct {
83-
conn *benchmark.DBConnector
86+
conn *DBConnector
8487
}
8588

8689
var header = strings.Repeat("=", 120) + "\n"
@@ -97,17 +100,12 @@ func Main() {
97100
var testOpts TestOpts
98101
b.Cli.AddFlagGroup("Database options", "", &testOpts.DBOpts)
99102
b.Cli.AddFlagGroup("acronis-db-bench specific options", "", &testOpts.BenchOpts)
100-
b.Cli.AddFlagGroup("Embedded Postgres specific options", "", &testOpts.EmbeddedPostgresOpts)
101103
b.Cli.AddFlagGroup("Testcase specific options", "", &testOpts.TestcaseOpts)
102104
b.Cli.AddFlagGroup("CTI-pattern simulation test options", "", &testOpts.CTIOpts)
103105

104106
return &testOpts
105107
}
106108

107-
b.PreExit = func() {
108-
finiEmbeddedPostgres(b)
109-
}
110-
111109
b.PrintScore = func(score benchmark.Score) {
112110
testData := b.Vault.(*DBTestData)
113111
var format string
@@ -148,15 +146,22 @@ func Main() {
148146
b.Vault.(*DBTestData).EffectiveBatch = 1
149147
}
150148

149+
var dialectName, err = db.GetDialectName(b.TestOpts.(*TestOpts).DBOpts.ConnString)
150+
if err != nil {
151+
b.Exit(err)
152+
}
153+
151154
if testOpts.BenchOpts.List {
152155
groups, _ := GetTests()
153156
fmt.Printf(header) //nolint:staticcheck
154157
for _, g := range groups {
158+
155159
str := fmt.Sprintf(" -- %s", g.name) //nolint:perfsprint
156160
fmt.Printf("\n%s %s\n\n", str, strings.Repeat("-", 130-len(str)))
161+
157162
var testsOutput []string
158163
for _, t := range g.tests {
159-
if testOpts.DBOpts.Driver != "" && !t.dbIsSupported(testOpts.DBOpts.Driver) {
164+
if dialectName != "" && !t.dbIsSupported(dialectName) {
160165
continue
161166
}
162167
testsOutput = append(testsOutput, fmt.Sprintf(" %-39s : %s : %s\n", t.name, t.getDBs(), t.description))
@@ -166,15 +171,13 @@ func Main() {
166171
}
167172
fmt.Printf("\n")
168173
fmt.Printf("Databases symbol legend:\n\n ")
169-
for _, db := range benchmark.GetDatabases() {
174+
for _, db := range db.GetDatabases() {
170175
fmt.Printf(" %s - %s;", db.Symbol, db.Name)
171176
}
172177
fmt.Printf("\n\n")
173178
b.Exit()
174179
}
175180

176-
initEmbeddedPostgres(b)
177-
178181
if testOpts.BenchOpts.Describe {
179182
describeTest(b, testOpts)
180183
b.Exit()
@@ -198,17 +201,24 @@ func Main() {
198201
if testOpts.DBOpts.Reconnect {
199202
b.PreWorker = func(workerId int) {
200203
conn := b.WorkerData[workerId].(*DBWorkerData).conn
201-
conn.Close()
204+
conn.database.Close()
202205
}
203206
}
204207

205208
c := dbConnector(b)
206209

207-
driver, version := c.GetVersion()
210+
driver, version, err := c.database.GetVersion()
211+
if err != nil {
212+
b.Exit("Failed to get database version: %v", err)
213+
}
214+
208215
fmt.Printf("Connected to '%s' database: %s\n", driver, version)
209216
fmt.Printf(header) //nolint:staticcheck
210217

211-
content, dbInfo := c.GetInfo(version)
218+
content, dbInfo, err := c.database.GetInfo(version)
219+
if err != nil {
220+
b.Exit("Failed to get database info: %v", err)
221+
}
212222

213223
if testOpts.BenchOpts.Info || b.Logger.LogLevel > benchmark.LogInfo {
214224
if testOpts.BenchOpts.Info {
@@ -222,7 +232,7 @@ func Main() {
222232

223233
if testOpts.BenchOpts.ProfilerPort > 0 {
224234
go func() {
225-
err := http.ListenAndServe(fmt.Sprintf("localhost:%d", testOpts.BenchOpts.ProfilerPort), nil)
235+
err = http.ListenAndServe(fmt.Sprintf("localhost:%d", testOpts.BenchOpts.ProfilerPort), nil)
226236
if err != nil {
227237
b.Exit("Failed to start profiler server: %v", err)
228238
}
@@ -232,8 +242,10 @@ func Main() {
232242
}
233243

234244
b.Init = func() {
235-
b.TenantsCache.SetTenantsWorkingSet(b.TestOpts.(*TestOpts).BenchOpts.TenantsWorkingSet)
236-
b.TenantsCache.SetCTIsWorkingSet(b.TestOpts.(*TestOpts).BenchOpts.CTIsWorkingSet)
245+
b.Vault.(*DBTestData).TenantsCache = NewTenantsCache(b)
246+
247+
b.Vault.(*DBTestData).TenantsCache.SetTenantsWorkingSet(b.TestOpts.(*TestOpts).BenchOpts.TenantsWorkingSet)
248+
b.Vault.(*DBTestData).TenantsCache.SetCTIsWorkingSet(b.TestOpts.(*TestOpts).BenchOpts.CTIsWorkingSet)
237249

238250
if b.Logger.LogLevel > benchmark.LogInfo && !testOpts.BenchOpts.Info {
239251
b.Log(benchmark.LogTrace, 0, getDBInfo(b, content))
@@ -298,9 +310,15 @@ func executeTests(b *benchmark.Benchmark, testOpts *TestOpts) {
298310
if !exists {
299311
b.Exit(fmt.Sprintf("Test: '%s' doesn't exist, see the list of available tests using --list option\n", testOpts.BenchOpts.Test))
300312
}
313+
314+
var dialectName, err = db.GetDialectName(testOpts.DBOpts.ConnString)
315+
if err != nil {
316+
b.Exit(err)
317+
}
318+
301319
test := tests[testOpts.BenchOpts.Test]
302-
if !test.dbIsSupported(testOpts.DBOpts.Driver) {
303-
b.Exit(fmt.Sprintf("Test: '%s' doesn't support '%s' database\n", testOpts.BenchOpts.Test, testOpts.DBOpts.Driver))
320+
if !test.dbIsSupported(dialectName) {
321+
b.Exit(fmt.Sprintf("Test: '%s' doesn't support '%s' database\n", testOpts.BenchOpts.Test, dialectName))
304322
}
305323
test.launcherFunc(b, test)
306324
}

acronis-db-bench/db.go

Lines changed: 204 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,204 @@
1+
package main
2+
3+
import (
4+
"fmt"
5+
"os"
6+
"sync"
7+
"time"
8+
9+
"github.com/acronis/perfkit/benchmark"
10+
"github.com/acronis/perfkit/db"
11+
)
12+
13+
const SequenceName = "acronis_db_bench_sequence" // SequenceName is the name of the sequence used for generating IDs
14+
15+
// DatabaseOpts represents common flags for every test
16+
type DatabaseOpts struct {
17+
ConnString string `long:"connection-string" description:"connection string" default:"sqlite://:memory:" required:"false"`
18+
MaxOpenConns int `long:"max-open-cons" description:"max open connections per worker" default:"2" required:"false"`
19+
Reconnect bool `long:"reconnect" description:"reconnect to DB before every test iteration" required:"false"`
20+
21+
DryRun bool `long:"dry-run" description:"do not execute any INSERT/UPDATE/DELETE queries on DB-side" required:"false"`
22+
23+
LogQueries bool `long:"log-queries" description:"log queries" required:"false"`
24+
LogReadedRows bool `long:"log-readed-rows" description:"log readed rows" required:"false"`
25+
LogQueryTime bool `long:"log-query-time" description:"log query time" required:"false"`
26+
27+
DontCleanup bool `long:"dont-cleanup" description:"do not cleanup DB content before/after the test in '-t all' mode" required:"false"`
28+
UseTruncate bool `long:"use-truncate" description:"use TRUNCATE instead of DROP TABLE in cleanup procedure" required:"false"`
29+
}
30+
31+
// dbConnectorsPool is a simple connection pool, required not to saturate DB connection pool
32+
type dbConnectorsPool struct {
33+
lock sync.Mutex
34+
pool map[string]*DBConnector
35+
}
36+
37+
// key returns a unique key for the connection pool
38+
func (p *dbConnectorsPool) key(connectionString string, workerID int) string {
39+
return fmt.Sprintf("%s-%d", connectionString, workerID)
40+
}
41+
42+
// take returns a connection from the pool or nil if the pool is empty
43+
func (p *dbConnectorsPool) take(dbOpts *DatabaseOpts, workerID int) *DBConnector {
44+
k := p.key(dbOpts.ConnString, workerID)
45+
46+
p.lock.Lock()
47+
defer p.lock.Unlock()
48+
49+
conn, exists := p.pool[k]
50+
if exists {
51+
delete(p.pool, k)
52+
conn.Log(benchmark.LogTrace, "taking connection from the connection pool")
53+
54+
return conn
55+
}
56+
57+
return nil
58+
}
59+
60+
// put puts a connection to the pool
61+
func (p *dbConnectorsPool) put(conn *DBConnector) {
62+
k := p.key(conn.DbOpts.ConnString, conn.WorkerID)
63+
64+
p.lock.Lock()
65+
defer p.lock.Unlock()
66+
67+
_, exists := p.pool[k]
68+
if exists {
69+
FatalError("trying to put connection while another connection in the pool already exists")
70+
}
71+
p.pool[k] = conn
72+
conn.Log(benchmark.LogTrace, "releasing connection to the connection pool")
73+
}
74+
75+
// newDBConnectorsPool creates a new connection pool
76+
func newDBConnectorsPool() *dbConnectorsPool {
77+
p := dbConnectorsPool{}
78+
p.pool = make(map[string]*DBConnector)
79+
80+
return &p
81+
}
82+
83+
// connPool is a global connection pool
84+
var connPool = newDBConnectorsPool()
85+
86+
// connectionsChecker checks for potential connections leak
87+
func connectionsChecker(conn *DBConnector) {
88+
for {
89+
if conn.database != nil {
90+
openConnections := 0
91+
92+
conn.lock.Lock()
93+
if conn.database != nil {
94+
stats := conn.database.Stats()
95+
openConnections = stats.OpenConnections
96+
}
97+
conn.lock.Unlock()
98+
99+
if openConnections > 1 {
100+
conn.Log(benchmark.LogError, "internal error: potential connections leak detected")
101+
}
102+
}
103+
time.Sleep(3 * time.Second)
104+
}
105+
}
106+
107+
/*
108+
* DB connection management
109+
*/
110+
111+
// DBConnector is a wrapper for DB connection
112+
type DBConnector struct {
113+
Logger *benchmark.Logger
114+
DbOpts *DatabaseOpts
115+
RetryAttempts int
116+
WorkerID int
117+
118+
lock sync.Mutex
119+
database db.Database
120+
}
121+
122+
// NewDBConnector creates a new DBConnector
123+
func NewDBConnector(dbOpts *DatabaseOpts, workerID int, logger *benchmark.Logger, retryAttempts int) (*DBConnector, error) {
124+
c := connPool.take(dbOpts, workerID)
125+
if c != nil {
126+
return c, nil
127+
}
128+
129+
var queryLogger, readedRowsLogger, queryTimeLogger db.Logger
130+
if dbOpts.LogQueries {
131+
queryLogger = &dbLogger{level: benchmark.LogTrace, worker: workerID, logger: logger}
132+
}
133+
134+
if dbOpts.LogReadedRows {
135+
readedRowsLogger = &dbLogger{level: benchmark.LogTrace, worker: workerID, logger: logger}
136+
}
137+
138+
if dbOpts.LogQueryTime {
139+
queryTimeLogger = &dbLogger{level: benchmark.LogTrace, worker: workerID, logger: logger}
140+
}
141+
142+
var dbConn, err = db.Open(db.Config{
143+
ConnString: dbOpts.ConnString,
144+
MaxOpenConns: dbOpts.MaxOpenConns,
145+
DryRun: dbOpts.DryRun,
146+
UseTruncate: dbOpts.UseTruncate,
147+
148+
QueryLogger: queryLogger,
149+
ReadedRowsLogger: readedRowsLogger,
150+
QueryTimeLogger: queryTimeLogger,
151+
})
152+
if err != nil {
153+
return nil, err
154+
}
155+
156+
c = &DBConnector{
157+
Logger: logger,
158+
DbOpts: dbOpts,
159+
RetryAttempts: retryAttempts,
160+
WorkerID: workerID,
161+
database: dbConn,
162+
}
163+
164+
// go connectionsChecker(c)
165+
166+
return c, nil
167+
}
168+
169+
// Release releases the connection to the pool
170+
func (c *DBConnector) Release() {
171+
connPool.put(c)
172+
}
173+
174+
// SetLogLevel sets log level
175+
func (c *DBConnector) SetLogLevel(logLevel int) {
176+
c.Logger.LogLevel = logLevel
177+
}
178+
179+
// Log logs a message
180+
func (c *DBConnector) Log(LogLevel int, format string, args ...interface{}) {
181+
c.Logger.Log(LogLevel, c.WorkerID, format, args...)
182+
}
183+
184+
// Logn logs a message without a newline
185+
func (c *DBConnector) Logn(LogLevel int, format string, args ...interface{}) {
186+
c.Logger.Logn(LogLevel, c.WorkerID, format, args...)
187+
}
188+
189+
// Exit exits with an error message
190+
func (c *DBConnector) Exit(fmts string, args ...interface{}) {
191+
fmt.Printf(fmts, args...)
192+
fmt.Println()
193+
os.Exit(127)
194+
}
195+
196+
type dbLogger struct {
197+
level int
198+
worker int
199+
logger *benchmark.Logger
200+
}
201+
202+
func (l *dbLogger) Log(format string, args ...interface{}) {
203+
l.logger.Log(l.level, l.worker, format, args...)
204+
}

0 commit comments

Comments
 (0)