Skip to content

Commit d7a279c

Browse files
author
zhangyunfei
committed
feat: add GBase 8s database support (GBASE)
1 parent 2788339 commit d7a279c

File tree

10 files changed

+680
-1
lines changed

10 files changed

+680
-1
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
SOURCE ?= file go_bindata github github_ee bitbucket aws_s3 google_cloud_storage godoc_vfs gitlab
2-
DATABASE ?= postgres mysql redshift cassandra spanner cockroachdb yugabytedb clickhouse mongodb sqlserver firebird neo4j pgx pgx5 rqlite
2+
DATABASE ?= postgres mysql redshift cassandra spanner cockroachdb yugabytedb clickhouse mongodb sqlserver firebird neo4j pgx pgx5 rqlite gbase8s
33
DATABASE_TEST ?= $(DATABASE) sqlite sqlite3 sqlcipher
44
VERSION ?= $(shell git describe --tags 2>/dev/null | cut -c 2-)
55
TEST_FLAGS ?=

database/gbase8s/README.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
# GBase8s
2+
3+
`gbase8s://user:password@ip:port/dbname?query`
4+
5+
6+
| URL Query | WithInstance Config | Description |
7+
| --------------------- | ------------------- | ------------------------------------------------------------ |
8+
| `dbname` | `DatabaseName` | The name of the database to connect to |
9+
| `user` | | The user to sign in as |
10+
| `password` | | The user's password |
11+
| `ip` | | The ip to connect to. |
12+
| `port` | | The port to bind to. |
13+
14+
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
DROP TABLE IF EXISTS test;
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
CREATE TABLE IF NOT EXISTS test (
2+
username VARCHAR(20)
3+
);

database/gbase8s/gbase8s.go

Lines changed: 358 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,358 @@
1+
package gbase8s
2+
3+
import (
4+
"context"
5+
"database/sql"
6+
"fmt"
7+
"io"
8+
"net/url"
9+
"strings"
10+
"time"
11+
12+
_ "gitee.com/GBase8s/go-gci"
13+
"github.com/golang-migrate/migrate/v4/database"
14+
"github.com/hashicorp/go-multierror"
15+
"go.uber.org/atomic"
16+
)
17+
18+
func init() {
19+
database.Register("gbase8s", &Gbase8s{})
20+
}
21+
22+
var (
23+
_ database.Driver = (*Gbase8s)(nil)
24+
DefaultMigrationsTable = "schema_migrations"
25+
DefaultLockTable = "schema_lock"
26+
)
27+
28+
var (
29+
ErrNoDatabaseName = fmt.Errorf("no database name")
30+
ErrNilConfig = fmt.Errorf("no config")
31+
)
32+
33+
type Config struct {
34+
MigrationsTable string
35+
DatabaseName string
36+
LockTable string
37+
ForceLock bool
38+
StatementTimeout time.Duration
39+
}
40+
41+
type Gbase8s struct {
42+
conn *sql.Conn
43+
db *sql.DB
44+
isLocked atomic.Bool
45+
config *Config
46+
}
47+
48+
func WithConnection(ctx context.Context, conn *sql.Conn, config *Config) (*Gbase8s, error) {
49+
if config == nil {
50+
return nil, ErrNilConfig
51+
}
52+
53+
if err := conn.PingContext(ctx); err != nil {
54+
return nil, err
55+
}
56+
57+
gx := &Gbase8s{
58+
conn: conn,
59+
db: nil,
60+
config: config,
61+
}
62+
63+
if config.DatabaseName == "" {
64+
query := `SELECT DBINFO('dbname') FROM systables WHERE tabid = 1`
65+
var databaseName sql.NullString
66+
if err := conn.QueryRowContext(ctx, query).Scan(&databaseName); err != nil {
67+
return nil, &database.Error{OrigErr: err, Query: []byte(query)}
68+
}
69+
70+
if len(databaseName.String) == 0 {
71+
return nil, ErrNoDatabaseName
72+
}
73+
74+
config.DatabaseName = databaseName.String
75+
}
76+
77+
if len(config.MigrationsTable) == 0 {
78+
config.MigrationsTable = DefaultMigrationsTable
79+
}
80+
81+
if len(config.LockTable) == 0 {
82+
config.LockTable = DefaultLockTable
83+
}
84+
85+
if err := gx.ensureLockTable(); err != nil {
86+
return nil, err
87+
}
88+
89+
if err := gx.ensureVersionTable(); err != nil {
90+
return nil, err
91+
}
92+
93+
return gx, nil
94+
}
95+
96+
func WithInstance(instance *sql.DB, config *Config) (database.Driver, error) {
97+
ctx := context.Background()
98+
99+
if err := instance.Ping(); err != nil {
100+
return nil, err
101+
}
102+
103+
conn, err := instance.Conn(ctx)
104+
if err != nil {
105+
return nil, err
106+
}
107+
108+
gx, err := WithConnection(ctx, conn, config)
109+
if err != nil {
110+
return nil, err
111+
}
112+
113+
gx.db = instance
114+
115+
return gx, nil
116+
}
117+
118+
func (g *Gbase8s) Open(dns string) (database.Driver, error) {
119+
gurl, err := url.Parse(dns)
120+
if err != nil {
121+
return nil, err
122+
}
123+
124+
db, err := sql.Open("gbase8s", gurl.String())
125+
if err != nil {
126+
return nil, err
127+
}
128+
129+
// migrationsTable := gurl.Query().Get("x-migrations-table")
130+
// if len(migrationsTable) == 0 {
131+
// migrationsTable = DefaultMigrationsTable
132+
// }
133+
134+
// lockTable := gurl.Query().Get("x-lock-table")
135+
// if len(lockTable) == 0 {
136+
// lockTable = DefaultLockTable
137+
// }
138+
139+
// forceLockQuery := gurl.Query().Get("x-force-lock")
140+
// forceLock, err := strconv.ParseBool(forceLockQuery)
141+
// if err != nil {
142+
// forceLock = false
143+
// }
144+
145+
// statementTimeoutQuery := gurl.Query().Get("x-statement-timeout")
146+
// statementTimeout, err := strconv.Atoi(statementTimeoutQuery)
147+
// if err != nil {
148+
// statementTimeout = 0
149+
// }
150+
151+
migrationsTable := DefaultMigrationsTable
152+
lockTable := DefaultLockTable
153+
forceLock := false
154+
statementTimeout := 0
155+
156+
gx, err := WithInstance(db, &Config{
157+
DatabaseName: gurl.Path,
158+
MigrationsTable: migrationsTable,
159+
LockTable: lockTable,
160+
ForceLock: forceLock,
161+
StatementTimeout: time.Duration(statementTimeout) * time.Millisecond,
162+
})
163+
if err != nil {
164+
return nil, err
165+
}
166+
167+
return gx, nil
168+
}
169+
170+
func (g *Gbase8s) Close() error {
171+
connErr := g.conn.Close()
172+
var dbErr error
173+
if g.db != nil {
174+
dbErr = g.db.Close()
175+
}
176+
177+
if connErr != nil || dbErr != nil {
178+
return fmt.Errorf("conn: %v, db: %v", connErr, dbErr)
179+
}
180+
return nil
181+
}
182+
183+
func (g *Gbase8s) Lock() error {
184+
return database.CasRestoreOnErr(&g.isLocked, false, true, database.ErrLocked, func() error {
185+
tx, err := g.conn.BeginTx(context.Background(), nil)
186+
if err != nil {
187+
return err
188+
}
189+
defer func() {
190+
if err != nil {
191+
_ = tx.Rollback()
192+
} else {
193+
err = tx.Commit()
194+
}
195+
}()
196+
197+
aid, err := database.GenerateAdvisoryLockId(g.config.DatabaseName)
198+
if err != nil {
199+
return err
200+
}
201+
202+
query := "SELECT lock_id FROM " + g.config.LockTable + " WHERE lock_id = ?"
203+
rows, err := tx.QueryContext(context.Background(), query, aid)
204+
if err != nil {
205+
return database.Error{OrigErr: err, Err: "failed to fetch migration lock", Query: []byte(query)}
206+
}
207+
defer rows.Close()
208+
209+
if rows.Next() {
210+
if !g.config.ForceLock {
211+
return database.ErrLocked
212+
}
213+
query = "DELETE FROM " + g.config.LockTable + " WHERE lock_id = ?"
214+
if _, err := tx.ExecContext(context.Background(), query, aid); err != nil {
215+
return database.Error{OrigErr: err, Err: "failed to force release lock", Query: []byte(query)}
216+
}
217+
}
218+
219+
query = "INSERT INTO " + g.config.LockTable + " (lock_id) VALUES (?)"
220+
if _, err := tx.ExecContext(context.Background(), query, aid); err != nil {
221+
return database.Error{OrigErr: err, Err: "failed to set migration lock", Query: []byte(query)}
222+
}
223+
224+
return nil
225+
})
226+
}
227+
228+
func (g *Gbase8s) Unlock() error {
229+
return database.CasRestoreOnErr(&g.isLocked, true, false, database.ErrNotLocked, func() error {
230+
aid, err := database.GenerateAdvisoryLockId(g.config.DatabaseName)
231+
if err != nil {
232+
return err
233+
}
234+
235+
query := "DELETE FROM " + g.config.LockTable + " WHERE lock_id = ?"
236+
if _, err := g.conn.ExecContext(context.Background(), query, aid); err != nil {
237+
if strings.Contains(err.Error(), "ERROR: -206: 42000") {
238+
// ERROR: -206: 42000 is "Table Not Exists Error" in Gbase8s
239+
// when the lock table is fully removed; This is fine, and is a valid "unlocked" state for the schema
240+
return nil
241+
}
242+
return &database.Error{OrigErr: err, Query: []byte(query)}
243+
}
244+
return nil
245+
})
246+
}
247+
248+
func (g *Gbase8s) Run(migration io.Reader) error {
249+
migr, err := io.ReadAll(migration)
250+
if err != nil {
251+
return err
252+
}
253+
254+
ctx := context.Background()
255+
if g.config.StatementTimeout != 0 {
256+
var cancel context.CancelFunc
257+
ctx, cancel = context.WithTimeout(ctx, g.config.StatementTimeout)
258+
defer cancel()
259+
}
260+
261+
query := string(migr[:])
262+
if _, err := g.conn.ExecContext(ctx, query); err != nil {
263+
return database.Error{OrigErr: err, Err: "migration failed", Query: migr}
264+
}
265+
266+
return nil
267+
}
268+
269+
func (g *Gbase8s) SetVersion(version int, dirty bool) error {
270+
tx, err := g.conn.BeginTx(context.Background(), &sql.TxOptions{})
271+
if err != nil {
272+
return &database.Error{OrigErr: err, Err: "transaction start failed"}
273+
}
274+
275+
query := "DELETE FROM " + g.config.MigrationsTable
276+
if _, err := tx.ExecContext(context.Background(), query); err != nil {
277+
if errRollback := tx.Rollback(); errRollback != nil {
278+
err = multierror.Append(err, errRollback)
279+
}
280+
return &database.Error{OrigErr: err, Query: []byte(query)}
281+
}
282+
283+
if version >= 0 || (version == database.NilVersion && dirty) {
284+
query := "INSERT INTO " + g.config.MigrationsTable + "(version, dirty) VALUES (?, ?)"
285+
if _, err := tx.ExecContext(context.Background(), query, version, dirty); err != nil {
286+
if errRollback := tx.Rollback(); errRollback != nil {
287+
err = multierror.Append(err, errRollback)
288+
}
289+
return &database.Error{OrigErr: err, Query: []byte(query)}
290+
}
291+
}
292+
293+
if err := tx.Commit(); err != nil {
294+
return &database.Error{OrigErr: err, Err: "transaction commit failed"}
295+
}
296+
297+
return nil
298+
}
299+
300+
func (g *Gbase8s) Version() (version int, dirty bool, err error) {
301+
query := "SELECT FIRST 1 version, dirty FROM " + g.config.MigrationsTable
302+
err = g.conn.QueryRowContext(context.Background(), query).Scan(&version, &dirty)
303+
if err != nil {
304+
return database.NilVersion, false, nil
305+
}
306+
return version, dirty, nil
307+
}
308+
309+
func (g *Gbase8s) Drop() (err error) {
310+
query := "SELECT tabname FROM systables WHERE tabid > 1000 AND tabtype = 'T'"
311+
rows, err := g.conn.QueryContext(context.Background(), query)
312+
if err != nil {
313+
return &database.Error{OrigErr: err, Query: []byte(query)}
314+
}
315+
defer rows.Close()
316+
317+
var tables []string
318+
for rows.Next() {
319+
var table string
320+
if err := rows.Scan(&table); err != nil {
321+
return err
322+
}
323+
tables = append(tables, table)
324+
}
325+
326+
for _, tbl := range tables {
327+
if _, err := g.conn.ExecContext(context.Background(), fmt.Sprintf("DROP TABLE IF EXISTS %s", tbl)); err != nil {
328+
return err
329+
}
330+
}
331+
return nil
332+
}
333+
334+
func (g *Gbase8s) ensureVersionTable() (err error) {
335+
if err = g.Lock(); err != nil {
336+
return err
337+
}
338+
defer func() {
339+
if unlockErr := g.Unlock(); unlockErr != nil {
340+
err = multierror.Append(err, unlockErr)
341+
}
342+
}()
343+
344+
query := `CREATE TABLE IF NOT EXISTS "` + g.config.MigrationsTable + `" (version INT NOT NULL PRIMARY KEY, dirty SMALLINT NOT NULL)`
345+
if _, err = g.conn.ExecContext(context.Background(), query); err != nil {
346+
return &database.Error{OrigErr: err, Query: []byte(query)}
347+
}
348+
349+
return nil
350+
}
351+
352+
func (g *Gbase8s) ensureLockTable() error {
353+
query := `CREATE TABLE IF NOT EXISTS "` + g.config.LockTable + `" (lock_id INT NOT NULL PRIMARY KEY)`
354+
if _, err := g.conn.ExecContext(context.Background(), query); err != nil {
355+
return &database.Error{OrigErr: err, Query: []byte(query)}
356+
}
357+
return nil
358+
}

0 commit comments

Comments
 (0)