Skip to content

Commit a2f0424

Browse files
Add range partition manager
1 parent 1f90d97 commit a2f0424

File tree

7 files changed

+311
-11
lines changed

7 files changed

+311
-11
lines changed

database/bun.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"database/sql"
66
"fmt"
7+
"runtime"
78

89
"github.com/dipdup-net/go-lib/config"
910
"github.com/pkg/errors"
@@ -46,6 +47,9 @@ func (db *Bun) Connect(ctx context.Context, cfg config.Database) error {
4647
))
4748
db.conn = bun.NewDB(db.sqldb, pgdialect.New())
4849
}
50+
maxOpenConns := 4 * runtime.GOMAXPROCS(0)
51+
db.sqldb.SetMaxOpenConns(maxOpenConns)
52+
db.sqldb.SetMaxIdleConns(maxOpenConns)
4953
return nil
5054
}
5155

@@ -57,6 +61,15 @@ func (db *Bun) Close() error {
5761
return db.sqldb.Close()
5862
}
5963

64+
// Exec -
65+
func (db *Bun) Exec(ctx context.Context, query string, args ...any) (int64, error) {
66+
result, err := db.conn.ExecContext(ctx, query, args...)
67+
if err != nil {
68+
return 0, err
69+
}
70+
return result.RowsAffected()
71+
}
72+
6073
// Ping -
6174
func (db *Bun) Ping(ctx context.Context) error {
6275
if db.conn == nil {

database/db.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ type SchemeCommenter interface {
2020
// Database -
2121
type Database interface {
2222
Connect(ctx context.Context, cfg config.Database) error
23+
Exec(ctx context.Context, query string, args ...any) (int64, error)
2324

2425
StateRepository
2526
SchemeCommenter

database/db_test.go

Lines changed: 32 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -64,16 +64,16 @@ func newDatabase(ctx context.Context, typ string, cfg config.Database) (Database
6464
}
6565
}
6666

67-
// TestSuite -
68-
type TestSuite struct {
67+
// DBTestSuite -
68+
type DBTestSuite struct {
6969
suite.Suite
7070
psqlContainer *PostgreSQLContainer
7171
db Database
7272
typ string
7373
}
7474

7575
// SetupSuite -
76-
func (s *TestSuite) SetupSuite() {
76+
func (s *DBTestSuite) SetupSuite() {
7777
ctx, ctxCancel := context.WithTimeout(context.Background(), 30*time.Second)
7878
defer ctxCancel()
7979

@@ -98,15 +98,15 @@ func (s *TestSuite) SetupSuite() {
9898
s.Require().NoError(err)
9999
}
100100

101-
func (s *TestSuite) TearDownSuite() {
101+
func (s *DBTestSuite) TearDownSuite() {
102102
ctx, ctxCancel := context.WithTimeout(context.Background(), 5*time.Second)
103103
defer ctxCancel()
104104

105105
s.Require().NoError(s.db.Close())
106106
s.Require().NoError(s.psqlContainer.Terminate(ctx))
107107
}
108108

109-
func (s *TestSuite) TestStateCreate() {
109+
func (s *DBTestSuite) TestStateCreate() {
110110
db, err := sql.Open("postgres", s.psqlContainer.GetDSN())
111111
s.Require().NoError(err)
112112

@@ -137,7 +137,7 @@ func (s *TestSuite) TestStateCreate() {
137137
s.Require().Equal(state.UpdatedAt, newState.UpdatedAt)
138138
}
139139

140-
func (s *TestSuite) TestStateUpdate() {
140+
func (s *DBTestSuite) TestStateUpdate() {
141141
db, err := sql.Open("postgres", s.psqlContainer.GetDSN())
142142
s.Require().NoError(err)
143143

@@ -167,7 +167,7 @@ func (s *TestSuite) TestStateUpdate() {
167167
s.Require().Equal(state.UpdatedAt, newState.UpdatedAt)
168168
}
169169

170-
func (s *TestSuite) TestState() {
170+
func (s *DBTestSuite) TestState() {
171171
db, err := sql.Open("postgres", s.psqlContainer.GetDSN())
172172
s.Require().NoError(err)
173173

@@ -191,7 +191,7 @@ func (s *TestSuite) TestState() {
191191
s.Require().Equal(state.IndexName, testIndex)
192192
}
193193

194-
func (s *TestSuite) TestDeleteState() {
194+
func (s *DBTestSuite) TestDeleteState() {
195195
db, err := sql.Open("postgres", s.psqlContainer.GetDSN())
196196
s.Require().NoError(err)
197197

@@ -215,22 +215,43 @@ func (s *TestSuite) TestDeleteState() {
215215
s.Require().Error(err)
216216
}
217217

218-
func (s *TestSuite) TestPing() {
218+
func (s *DBTestSuite) TestPing() {
219219
ctx, ctxCancel := context.WithTimeout(context.Background(), 5*time.Second)
220220
defer ctxCancel()
221221

222222
s.Require().NoError(s.db.Ping(ctx))
223223
}
224224

225-
func (s *TestSuite) TestMakeComments() {
225+
func (s *DBTestSuite) TestMakeComments() {
226226
ctx, ctxCancel := context.WithTimeout(context.Background(), 5*time.Second)
227227
defer ctxCancel()
228228

229229
s.Require().NoError(MakeComments(ctx, s.db, &State{}))
230230
}
231231

232+
func (s *DBTestSuite) TestExec() {
233+
db, err := sql.Open("postgres", s.psqlContainer.GetDSN())
234+
s.Require().NoError(err)
235+
236+
fixtures, err := testfixtures.New(
237+
testfixtures.Database(db),
238+
testfixtures.Dialect("postgres"),
239+
testfixtures.Directory("fixtures"),
240+
)
241+
s.Require().NoError(err)
242+
s.Require().NoError(fixtures.Load())
243+
s.Require().NoError(db.Close())
244+
245+
ctx, ctxCancel := context.WithTimeout(context.Background(), 5*time.Second)
246+
defer ctxCancel()
247+
248+
count, err := s.db.Exec(ctx, "delete from dipdup_state where index_name = ?", testIndex)
249+
s.Require().NoError(err)
250+
s.Require().EqualValues(1, count)
251+
}
252+
232253
func TestSuite_Run(t *testing.T) {
233-
ts := new(TestSuite)
254+
ts := new(DBTestSuite)
234255
for _, typ := range []string{"gorm", "pg-go", "bun"} {
235256
ts.typ = typ
236257
}

database/gorm.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,15 @@ func (db *Gorm) Close() error {
8787
return sql.Close()
8888
}
8989

90+
// Exec -
91+
func (db *Gorm) Exec(ctx context.Context, query string, args ...any) (int64, error) {
92+
tx := db.conn.WithContext(ctx).Exec(query, args...)
93+
if tx.Error != nil {
94+
return 0, tx.Error
95+
}
96+
return tx.RowsAffected, nil
97+
}
98+
9099
// Ping -
91100
func (db *Gorm) Ping(ctx context.Context) error {
92101
if db.conn == nil {

database/partition.go

Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
package database
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"time"
7+
8+
"github.com/pkg/errors"
9+
"github.com/uptrace/bun"
10+
)
11+
12+
// PartitionBy -
13+
type PartitionBy int
14+
15+
const (
16+
PartitionByMonth PartitionBy = iota + 1
17+
PartitionByYear
18+
)
19+
20+
type params struct {
21+
id string
22+
start time.Time
23+
end time.Time
24+
success bool
25+
}
26+
27+
// RangePartitionManager -
28+
type RangePartitionManager struct {
29+
conn Database
30+
by PartitionBy
31+
32+
lastId string
33+
}
34+
35+
// NewPartitionManager -
36+
func NewPartitionManager(conn Database, by PartitionBy) RangePartitionManager {
37+
return RangePartitionManager{
38+
conn: conn,
39+
by: by,
40+
}
41+
}
42+
43+
const createPartitionTemplate = `CREATE TABLE IF NOT EXISTS ? PARTITION OF ? FOR VALUES FROM (?) TO (?);`
44+
45+
func monthBoundaries(current time.Time) (time.Time, time.Time) {
46+
start := time.Date(current.Year(), current.Month(), 1, 0, 0, 0, 0, time.UTC)
47+
end := start.AddDate(0, 1, 0)
48+
49+
return start, end
50+
}
51+
52+
func yearBoundaries(current time.Time) (time.Time, time.Time) {
53+
start := time.Date(current.Year(), 1, 1, 0, 0, 0, 0, time.UTC)
54+
end := start.AddDate(1, 0, 0)
55+
56+
return start, end
57+
}
58+
59+
func monthPartitionId(currentTime time.Time) string {
60+
return fmt.Sprintf("%d_%02d", currentTime.Year(), currentTime.Month())
61+
}
62+
63+
func yearPartitionId(currentTime time.Time) string {
64+
return fmt.Sprintf("%d", currentTime.Year())
65+
}
66+
67+
func (pm *RangePartitionManager) getParameters(currentTime time.Time) (params, error) {
68+
var p params
69+
70+
switch pm.by {
71+
case PartitionByMonth:
72+
p.id = monthPartitionId(currentTime)
73+
case PartitionByYear:
74+
p.id = yearPartitionId(currentTime)
75+
default:
76+
return p, errors.Errorf("unknown partition by: %d", pm.by)
77+
}
78+
79+
p.success = p.id != pm.lastId
80+
if !p.success {
81+
return p, nil
82+
}
83+
84+
switch pm.by {
85+
case PartitionByMonth:
86+
p.start, p.end = monthBoundaries(currentTime)
87+
case PartitionByYear:
88+
p.start, p.end = yearBoundaries(currentTime)
89+
default:
90+
return p, errors.Errorf("unknown partition by: %d", pm.by)
91+
}
92+
93+
return p, nil
94+
}
95+
96+
// CreatePartition -
97+
func (pm *RangePartitionManager) CreatePartition(ctx context.Context, currentTime time.Time, tableName string) error {
98+
p, err := pm.getParameters(currentTime)
99+
if err != nil {
100+
return err
101+
}
102+
if !p.success {
103+
return nil
104+
}
105+
106+
partitionName := fmt.Sprintf("%s_%s", tableName, p.id)
107+
if _, err := pm.conn.Exec(
108+
ctx,
109+
createPartitionTemplate,
110+
bun.Ident(partitionName),
111+
bun.Ident(tableName),
112+
p.start.Format(time.RFC3339Nano),
113+
p.end.Format(time.RFC3339Nano),
114+
); err != nil {
115+
return err
116+
}
117+
118+
pm.lastId = p.id
119+
return nil
120+
}
121+
122+
// CreatePartitions -
123+
func (pm *RangePartitionManager) CreatePartitions(ctx context.Context, currentTime time.Time, tableNames ...string) error {
124+
p, err := pm.getParameters(currentTime)
125+
if err != nil {
126+
return err
127+
}
128+
if !p.success {
129+
return nil
130+
}
131+
132+
for _, tableName := range tableNames {
133+
partitionName := fmt.Sprintf("%s_%s", tableName, p.id)
134+
if _, err := pm.conn.Exec(
135+
ctx,
136+
createPartitionTemplate,
137+
bun.Ident(partitionName),
138+
bun.Ident(tableName),
139+
p.start.Format(time.RFC3339Nano),
140+
p.end.Format(time.RFC3339Nano),
141+
); err != nil {
142+
return err
143+
}
144+
}
145+
146+
pm.lastId = p.id
147+
return nil
148+
}

0 commit comments

Comments
 (0)