Skip to content

Commit 0fccc9b

Browse files
authored
test harmonydb (#161)
1 parent 9bd93aa commit 0fccc9b

File tree

4 files changed

+262
-9
lines changed

4 files changed

+262
-9
lines changed

harmony/harmonydb/harmonydb.go

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,6 @@ import (
2323
"golang.org/x/xerrors"
2424

2525
"github.com/filecoin-project/curio/deps/config"
26-
27-
lhdb "github.com/filecoin-project/lotus/lib/harmony/harmonydb"
2826
)
2927

3028
type ITestID string
@@ -120,7 +118,7 @@ func New(hosts []string, username, password, database, port string, itestID ITes
120118

121119
cfg.ConnConfig.OnNotice = func(conn *pgconn.PgConn, n *pgconn.Notice) {
122120
logger.Debug("database notice: " + n.Message + ": " + n.Detail)
123-
lhdb.DBMeasures.Errors.M(1)
121+
DBMeasures.Errors.M(1)
124122
}
125123

126124
db := DB{cfg: cfg, schema: schema, hostnames: hosts} // pgx populated in AddStatsAndConnect
@@ -143,12 +141,12 @@ func (t tracer) TraceQueryStart(ctx context.Context, conn *pgx.Conn, data pgx.Tr
143141
return context.WithValue(context.WithValue(ctx, SQL_START, time.Now()), SQL_STRING, data.SQL)
144142
}
145143
func (t tracer) TraceQueryEnd(ctx context.Context, conn *pgx.Conn, data pgx.TraceQueryEndData) {
146-
lhdb.DBMeasures.Hits.M(1)
144+
DBMeasures.Hits.M(1)
147145
ms := time.Since(ctx.Value(SQL_START).(time.Time)).Milliseconds()
148-
lhdb.DBMeasures.TotalWait.M(ms)
149-
lhdb.DBMeasures.Waits.Observe(float64(ms))
146+
DBMeasures.TotalWait.M(ms)
147+
DBMeasures.Waits.Observe(float64(ms))
150148
if data.Err != nil {
151-
lhdb.DBMeasures.Errors.M(1)
149+
DBMeasures.Errors.M(1)
152150
}
153151
logger.Debugw("SQL run",
154152
"query", ctx.Value(SQL_STRING).(string),
@@ -182,8 +180,8 @@ func (db *DB) addStatsAndConnect() error {
182180
}
183181
db.cfg.AfterConnect = func(ctx context.Context, c *pgx.Conn) error {
184182
s := db.pgx.Stat()
185-
lhdb.DBMeasures.OpenConnections.M(int64(s.TotalConns()))
186-
lhdb.DBMeasures.WhichHost.Observe(hostnameToIndex[c.Config().Host])
183+
DBMeasures.OpenConnections.M(int64(s.TotalConns()))
184+
DBMeasures.WhichHost.Observe(hostnameToIndex[c.Config().Host])
187185

188186
//FUTURE place for any connection seasoning
189187
return nil

harmony/harmonydb/metrics.go

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
package harmonydb
2+
3+
import (
4+
"github.com/prometheus/client_golang/prometheus"
5+
"go.opencensus.io/stats"
6+
"go.opencensus.io/stats/view"
7+
"go.opencensus.io/tag"
8+
9+
"github.com/filecoin-project/lotus/metrics"
10+
)
11+
12+
var (
13+
dbTag, _ = tag.NewKey("db_name")
14+
pre = "curio_db_"
15+
waitsBuckets = []float64{0, 10, 20, 30, 50, 80, 130, 210, 340, 550, 890}
16+
whichHostBuckets = []float64{0, 1, 2, 3, 4, 5}
17+
)
18+
19+
// DBMeasures groups all db metrics.
20+
var DBMeasures = struct {
21+
Hits *stats.Int64Measure
22+
TotalWait *stats.Int64Measure
23+
Waits prometheus.Histogram
24+
OpenConnections *stats.Int64Measure
25+
Errors *stats.Int64Measure
26+
WhichHost prometheus.Histogram
27+
}{
28+
Hits: stats.Int64(pre+"hits", "Total number of uses.", stats.UnitDimensionless),
29+
TotalWait: stats.Int64(pre+"total_wait", "Total delay. A numerator over hits to get average wait.", stats.UnitMilliseconds),
30+
Waits: prometheus.NewHistogram(prometheus.HistogramOpts{
31+
Name: pre + "waits",
32+
Buckets: waitsBuckets,
33+
Help: "The histogram of waits for query completions.",
34+
}),
35+
OpenConnections: stats.Int64(pre+"open_connections", "Total connection count.", stats.UnitDimensionless),
36+
Errors: stats.Int64(pre+"errors", "Total error count.", stats.UnitDimensionless),
37+
WhichHost: prometheus.NewHistogram(prometheus.HistogramOpts{
38+
Name: pre + "which_host",
39+
Buckets: whichHostBuckets,
40+
Help: "The index of the hostname being used",
41+
}),
42+
}
43+
44+
// CacheViews groups all cache-related default views.
45+
func init() {
46+
metrics.RegisterViews(
47+
&view.View{
48+
Measure: DBMeasures.Hits,
49+
Aggregation: view.Sum(),
50+
TagKeys: []tag.Key{dbTag},
51+
},
52+
&view.View{
53+
Measure: DBMeasures.TotalWait,
54+
Aggregation: view.Sum(),
55+
TagKeys: []tag.Key{dbTag},
56+
},
57+
&view.View{
58+
Measure: DBMeasures.OpenConnections,
59+
Aggregation: view.LastValue(),
60+
TagKeys: []tag.Key{dbTag},
61+
},
62+
&view.View{
63+
Measure: DBMeasures.Errors,
64+
Aggregation: view.Sum(),
65+
TagKeys: []tag.Key{dbTag},
66+
},
67+
)
68+
err := prometheus.Register(DBMeasures.Waits)
69+
if err != nil {
70+
panic(err)
71+
}
72+
73+
err = prometheus.Register(DBMeasures.WhichHost)
74+
if err != nil {
75+
panic(err)
76+
}
77+
}

itests/curio_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@ func TestCurioHappyPath(t *testing.T) {
124124
fapi := fmt.Sprintf("%s:%s", string(token), full.ListenAddr)
125125

126126
sharedITestID := harmonydb.ITestNewID()
127+
127128
db, err := harmonydb.NewFromConfigWithITestID(t, sharedITestID)
128129
require.NoError(t, err)
129130

itests/harmonydb_test.go

Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
package itests
2+
3+
import (
4+
"context"
5+
"crypto/sha256"
6+
"encoding/hex"
7+
"fmt"
8+
"testing"
9+
10+
"github.com/filecoin-project/curio/harmony/harmonydb"
11+
12+
"github.com/filecoin-project/lotus/itests/kit"
13+
"github.com/filecoin-project/lotus/node/impl"
14+
)
15+
16+
func withSetup(t *testing.T, f func(*kit.TestMiner)) {
17+
_, miner, _ := kit.EnsembleMinimal(t,
18+
kit.LatestActorsAt(-1),
19+
kit.MockProofs(),
20+
kit.WithSectorIndexDB(),
21+
)
22+
23+
f(miner)
24+
}
25+
26+
func TestCrud(t *testing.T) {
27+
ctx, cancel := context.WithCancel(context.Background())
28+
defer cancel()
29+
30+
withSetup(t, func(miner *kit.TestMiner) {
31+
cdb := miner.BaseAPI.(*impl.StorageMinerAPI).HarmonyDB
32+
_, err := cdb.Exec(ctx, `
33+
INSERT INTO
34+
itest_scratch (some_int, content)
35+
VALUES
36+
(11, 'cows'),
37+
(5, 'cats')
38+
`)
39+
if err != nil {
40+
t.Fatal("Could not insert: ", err)
41+
}
42+
var ints []struct {
43+
Count int `db:"some_int"`
44+
Animal string `db:"content"`
45+
Unpopulated int
46+
}
47+
err = cdb.Select(ctx, &ints, "SELECT content, some_int FROM itest_scratch")
48+
if err != nil {
49+
t.Fatal("Could not select: ", err)
50+
}
51+
if len(ints) != 2 {
52+
t.Fatal("unexpected count of returns. Want 2, Got ", len(ints))
53+
}
54+
if ints[0].Count != 11 || ints[1].Count != 5 {
55+
t.Fatal("expected [11,5] got ", ints)
56+
}
57+
if ints[0].Animal != "cows" || ints[1].Animal != "cats" {
58+
t.Fatal("expected, [cows, cats] ", ints)
59+
}
60+
fmt.Println("test completed")
61+
})
62+
}
63+
64+
func TestTransaction(t *testing.T) {
65+
ctx, cancel := context.WithCancel(context.Background())
66+
defer cancel()
67+
68+
withSetup(t, func(miner *kit.TestMiner) {
69+
testID := harmonydb.ITestNewID()
70+
cdb := setupTestDB(t, testID)
71+
if _, err := cdb.Exec(ctx, "INSERT INTO itest_scratch (some_int) VALUES (4), (5), (6)"); err != nil {
72+
t.Fatal("E0", err)
73+
}
74+
_, err := cdb.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) {
75+
if _, err := tx.Exec("INSERT INTO itest_scratch (some_int) VALUES (7), (8), (9)"); err != nil {
76+
t.Fatal("E1", err)
77+
}
78+
79+
// sum1 is read from OUTSIDE the transaction so it's the old value
80+
var sum1 int
81+
if err := cdb.QueryRow(ctx, "SELECT SUM(some_int) FROM itest_scratch").Scan(&sum1); err != nil {
82+
t.Fatal("E2", err)
83+
}
84+
if sum1 != 4+5+6 {
85+
t.Fatal("Expected 15, got ", sum1)
86+
}
87+
88+
// sum2 is from INSIDE the transaction, so the updated value.
89+
var sum2 int
90+
if err := tx.QueryRow("SELECT SUM(some_int) FROM itest_scratch").Scan(&sum2); err != nil {
91+
t.Fatal("E3", err)
92+
}
93+
if sum2 != 4+5+6+7+8+9 {
94+
t.Fatal("Expected 39, got ", sum2)
95+
}
96+
return false, nil // rollback
97+
})
98+
if err != nil {
99+
t.Fatal("ET", err)
100+
}
101+
102+
var sum2 int
103+
// Query() example (yes, QueryRow would be preferred here)
104+
q, err := cdb.Query(ctx, "SELECT SUM(some_int) FROM itest_scratch")
105+
if err != nil {
106+
t.Fatal("E4", err)
107+
}
108+
defer q.Close()
109+
var rowCt int
110+
for q.Next() {
111+
err := q.Scan(&sum2)
112+
if err != nil {
113+
t.Fatal("error scanning ", err)
114+
}
115+
rowCt++
116+
}
117+
if sum2 != 4+5+6 {
118+
t.Fatal("Expected 15, got ", sum2)
119+
}
120+
if rowCt != 1 {
121+
t.Fatal("unexpected count of rows")
122+
}
123+
})
124+
}
125+
126+
func TestPartialWalk(t *testing.T) {
127+
ctx, cancel := context.WithCancel(context.Background())
128+
defer cancel()
129+
130+
withSetup(t, func(miner *kit.TestMiner) {
131+
testID := harmonydb.ITestNewID()
132+
cdb := setupTestDB(t, testID)
133+
if _, err := cdb.Exec(ctx, `
134+
INSERT INTO
135+
itest_scratch (content, some_int)
136+
VALUES
137+
('andy was here', 5),
138+
('lotus is awesome', 6),
139+
('hello world', 7),
140+
('3rd integration test', 8),
141+
('fiddlesticks', 9)
142+
`); err != nil {
143+
t.Fatal("e1", err)
144+
}
145+
146+
// TASK: FIND THE ID of the string with a specific SHA256
147+
needle := "b94d27b9934d3e08a52e52d7da7dabfac484efe37a5380ee9088f7ace2efcde9"
148+
q, err := cdb.Query(ctx, `SELECT id, content FROM itest_scratch`)
149+
if err != nil {
150+
t.Fatal("e2", err)
151+
}
152+
defer q.Close()
153+
154+
var tmp struct {
155+
Src string `db:"content"`
156+
ID int
157+
}
158+
159+
var done bool
160+
for q.Next() {
161+
162+
if err := q.StructScan(&tmp); err != nil {
163+
t.Fatal("structscan err " + err.Error())
164+
}
165+
166+
bSha := sha256.Sum256([]byte(tmp.Src))
167+
if hex.EncodeToString(bSha[:]) == needle {
168+
done = true
169+
break
170+
}
171+
}
172+
if !done {
173+
t.Fatal("We didn't find it.")
174+
}
175+
// Answer: tmp.ID
176+
})
177+
}

0 commit comments

Comments
 (0)