Skip to content

Commit 8fb3e3f

Browse files
Feature: create tables with patitioning
1 parent a2f0424 commit 8fb3e3f

File tree

9 files changed

+152
-44
lines changed

9 files changed

+152
-44
lines changed

database/bun.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,3 +123,33 @@ func (db *Bun) MakeColumnComment(ctx context.Context, tableName string, columnNa
123123

124124
return err
125125
}
126+
127+
// CreateTable -
128+
func (db *Bun) CreateTable(ctx context.Context, model any, opts ...CreateTableOption) error {
129+
if model == nil {
130+
return nil
131+
}
132+
var options CreateTableOptions
133+
for i := range opts {
134+
opts[i](&options)
135+
}
136+
137+
query := db.DB().
138+
NewCreateTable().
139+
Model(model)
140+
141+
if options.ifNotExists {
142+
query = query.IfNotExists()
143+
}
144+
145+
if options.partitionBy != "" {
146+
query = query.PartitionBy(options.partitionBy)
147+
}
148+
149+
if options.temporary {
150+
query = query.Temp()
151+
}
152+
153+
_, err := query.Exec(ctx)
154+
return err
155+
}

database/comment.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,14 @@ import (
99
"github.com/pkg/errors"
1010
)
1111

12+
const (
13+
fieldTableName = "tableName"
14+
fieldBaseModel = "BaseModel"
15+
)
16+
1217
// MakeComments -
1318
func MakeComments(ctx context.Context, sc SchemeCommenter, models ...interface{}) error {
14-
if models == nil {
19+
if len(models) == 0 {
1520
return nil
1621
}
1722

@@ -31,7 +36,7 @@ func MakeComments(ctx context.Context, sc SchemeCommenter, models ...interface{}
3136
for i := 0; i < modelType.NumField(); i++ {
3237
fieldType := modelType.Field(i)
3338

34-
if fieldType.Name == "tableName" || fieldType.Name == "BaseModel" {
39+
if fieldType.Name == fieldTableName || fieldType.Name == fieldBaseModel {
3540
var ok bool
3641
tableName, ok = getDatabaseTagName(fieldType)
3742
if !ok {
@@ -77,7 +82,7 @@ func makeEmbeddedComments(ctx context.Context, sc SchemeCommenter, tableName str
7782
continue
7883
}
7984

80-
if fieldType.Name == "tableName" {
85+
if fieldType.Name == fieldTableName {
8186
return errors.New("Embedded type must not have tableName field.")
8287
}
8388

@@ -143,7 +148,6 @@ func getDatabaseTagName(fieldType reflect.StructField) (name string, ok bool) {
143148

144149
func getComment(fieldType reflect.StructField) (string, bool) {
145150
commentTag, ok := fieldType.Tag.Lookup("comment")
146-
147151
if ok {
148152
return commentTag, ok
149153
}

database/db.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ type SchemeCommenter interface {
2121
type Database interface {
2222
Connect(ctx context.Context, cfg config.Database) error
2323
Exec(ctx context.Context, query string, args ...any) (int64, error)
24+
CreateTable(ctx context.Context, model any, opts ...CreateTableOption) error
2425

2526
StateRepository
2627
SchemeCommenter

database/db_test.go

Lines changed: 11 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import (
66
"testing"
77
"time"
88

9-
"github.com/go-pg/pg/v10/orm"
109
_ "github.com/lib/pq"
1110
"github.com/pkg/errors"
1211

@@ -22,43 +21,11 @@ const (
2221
func newDatabase(ctx context.Context, typ string, cfg config.Database) (Database, error) {
2322
switch typ {
2423
case "gorm":
25-
db := NewGorm()
26-
if err := db.Connect(ctx, cfg); err != nil {
27-
return nil, err
28-
}
29-
if err := db.DB().AutoMigrate(&State{}); err != nil {
30-
if err := db.Close(); err != nil {
31-
return nil, err
32-
}
33-
return nil, err
34-
}
35-
return db, nil
24+
return NewGorm(), nil
3625
case "pg-go":
37-
db := NewPgGo()
38-
if err := db.Connect(ctx, cfg); err != nil {
39-
return nil, err
40-
}
41-
if err := db.DB().WithContext(ctx).Model(&State{}).CreateTable(&orm.CreateTableOptions{
42-
IfNotExists: true,
43-
}); err != nil {
44-
if err := db.Close(); err != nil {
45-
return nil, err
46-
}
47-
return nil, err
48-
}
49-
return db, nil
26+
return NewPgGo(), nil
5027
case "bun":
51-
db := NewBun()
52-
if err := db.Connect(ctx, cfg); err != nil {
53-
return nil, err
54-
}
55-
if _, err := db.DB().NewCreateTable().Model(&State{}).IfNotExists().Exec(ctx); err != nil {
56-
if err := db.Close(); err != nil {
57-
return nil, err
58-
}
59-
return nil, err
60-
}
61-
return db, nil
28+
return NewBun(), nil
6229
default:
6330
return nil, errors.Errorf("unknown ORM: %s", typ)
6431
}
@@ -87,14 +54,20 @@ func (s *DBTestSuite) SetupSuite() {
8754
s.Require().NoError(err)
8855
s.psqlContainer = psqlContainer
8956

90-
s.db, err = newDatabase(ctx, s.typ, config.Database{
57+
cfg := config.Database{
9158
Kind: config.DBKindPostgres,
9259
User: s.psqlContainer.Config.User,
9360
Database: s.psqlContainer.Config.Database,
9461
Password: s.psqlContainer.Config.Password,
9562
Host: s.psqlContainer.Config.Host,
9663
Port: s.psqlContainer.MappedPort().Int(),
97-
})
64+
}
65+
66+
s.db, err = newDatabase(ctx, s.typ, cfg)
67+
s.Require().NoError(err)
68+
err = s.db.Connect(ctx, cfg)
69+
s.Require().NoError(err)
70+
err = s.db.CreateTable(ctx, &State{}, WithIfNotExists())
9871
s.Require().NoError(err)
9972
}
10073

database/gorm.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,3 +151,13 @@ func (db *Gorm) MakeColumnComment(ctx context.Context, tableName string, columnN
151151
columnName,
152152
comment).Error
153153
}
154+
155+
// CreateTable -
156+
func (db *Gorm) CreateTable(ctx context.Context, model any, opts ...CreateTableOption) error {
157+
if model == nil {
158+
return nil
159+
}
160+
161+
// options are ignored because it's not supported by gorm
162+
return db.DB().WithContext(ctx).AutoMigrate(model)
163+
}

database/options.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package database
2+
3+
// CreateTableOptions -
4+
type CreateTableOptions struct {
5+
partitionBy string
6+
ifNotExists bool
7+
temporary bool
8+
}
9+
10+
// CreateTableOption -
11+
type CreateTableOption func(opts *CreateTableOptions)
12+
13+
// WithPartitioning -
14+
func WithPartitioning(by string) CreateTableOption {
15+
return func(opts *CreateTableOptions) {
16+
opts.partitionBy = by
17+
}
18+
}
19+
20+
// WithIfNotExists -
21+
func WithIfNotExists() CreateTableOption {
22+
return func(opts *CreateTableOptions) {
23+
opts.ifNotExists = true
24+
}
25+
}
26+
27+
// WithTemporary -
28+
func WithTemporary() CreateTableOption {
29+
return func(opts *CreateTableOptions) {
30+
opts.temporary = true
31+
}
32+
}

database/partition.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package database
33
import (
44
"context"
55
"fmt"
6+
"reflect"
67
"time"
78

89
"github.com/pkg/errors"
@@ -146,3 +147,42 @@ func (pm *RangePartitionManager) CreatePartitions(ctx context.Context, currentTi
146147
pm.lastId = p.id
147148
return nil
148149
}
150+
151+
// CreateTables - creates tables by passed models. If partition tag is present table will be partitioned
152+
func CreateTables(ctx context.Context, conn Database, models ...any) error {
153+
if len(models) == 0 {
154+
return nil
155+
}
156+
157+
for i := range models {
158+
if models[i] == nil {
159+
continue
160+
}
161+
modelType := reflect.TypeOf(models[i])
162+
if reflect.ValueOf(models[i]).Kind() == reflect.Ptr {
163+
modelType = modelType.Elem()
164+
}
165+
166+
for j := 0; j < modelType.NumField(); j++ {
167+
fieldType := modelType.Field(j)
168+
169+
if fieldType.Name != fieldTableName && fieldType.Name != fieldBaseModel {
170+
continue
171+
}
172+
173+
options := []CreateTableOption{
174+
WithIfNotExists(),
175+
}
176+
177+
partitionTag, ok := fieldType.Tag.Lookup("partition")
178+
if ok {
179+
options = append(options, WithPartitioning(partitionTag))
180+
}
181+
if err := conn.CreateTable(ctx, models[i], options...); err != nil {
182+
return err
183+
}
184+
}
185+
}
186+
187+
return nil
188+
}

database/partition_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import (
1111
)
1212

1313
type logs struct {
14-
bun.BaseModel `bun:"logs"`
14+
bun.BaseModel `bun:"logs" partition:"RANGE(log_time)"`
1515

1616
Id int `pg:"id,pk"`
1717
LogString string `pg:"log_string"`
@@ -52,7 +52,7 @@ func (s *PartitionTestSuite) SetupSuite() {
5252
})
5353
s.Require().NoError(err)
5454

55-
_, err = s.db.DB().NewCreateTable().Model(&logs{}).PartitionBy("RANGE(log_time)").IfNotExists().Exec(ctx)
55+
err = CreateTables(ctx, s.db, &State{}, &logs{})
5656
s.Require().NoError(err)
5757

5858
s.pm = NewPartitionManager(s.db, PartitionByMonth)

database/pg.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88

99
"github.com/dipdup-net/go-lib/config"
1010
pg "github.com/go-pg/pg/v10"
11+
"github.com/go-pg/pg/v10/orm"
1112
"github.com/pkg/errors"
1213
)
1314

@@ -114,3 +115,20 @@ func (db *PgGo) MakeColumnComment(ctx context.Context, tableName string, columnN
114115

115116
return err
116117
}
118+
119+
// CreateTable -
120+
func (db *PgGo) CreateTable(ctx context.Context, model any, opts ...CreateTableOption) error {
121+
if model == nil {
122+
return nil
123+
}
124+
var options CreateTableOptions
125+
for i := range opts {
126+
opts[i](&options)
127+
}
128+
129+
// option 'partitionBy' is ignored because it's work via tag in pg-go library
130+
return db.DB().WithContext(ctx).Model(model).CreateTable(&orm.CreateTableOptions{
131+
IfNotExists: options.ifNotExists,
132+
Temp: options.temporary,
133+
})
134+
}

0 commit comments

Comments
 (0)