Skip to content

Commit 91bc6b4

Browse files
authored
[datastore] Move flags and ConnectParameters to a dedicated package, merge datastoreutils into datastore (#1416)
1 parent 955aed1 commit 91bc6b4

File tree

18 files changed

+258
-277
lines changed

18 files changed

+258
-277
lines changed

cmds/core-service/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ go run ./cmds/core-service \
2929

3030
#### CockroachDB cluster
3131

32-
To run correctly, core-service must be able to [access](../../pkg/datastore/flags/flags.go) a CockroachDB or a Yugabyte cluster. Provision of this cluster is handled automatically for a local development environment if following [the instructions for a standalone instance](../../build/dev/standalone_instance.md).
32+
To run correctly, core-service must be able to [access](../../pkg/datastore/params/params.go) a CockroachDB or a Yugabyte cluster. Provision of this cluster is handled automatically for a local development environment if following [the instructions for a standalone instance](../../build/dev/standalone_instance.md).
3333

3434
Alternatively, a CockroachDB instance can be created manually with:
3535

cmds/core-service/main.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import (
2323
aux "github.com/interuss/dss/pkg/aux_"
2424
auxc "github.com/interuss/dss/pkg/aux_/store/datastore"
2525
"github.com/interuss/dss/pkg/build"
26-
"github.com/interuss/dss/pkg/datastoreutils"
26+
"github.com/interuss/dss/pkg/datastore"
2727
"github.com/interuss/dss/pkg/logging"
2828
"github.com/interuss/dss/pkg/rid/application"
2929
rid_v1 "github.com/interuss/dss/pkg/rid/server/v1"
@@ -350,7 +350,7 @@ func main() {
350350
backoff := 0
351351
for {
352352
if err := RunHTTPServer(ctx, cancel, *address, *locality); err != nil {
353-
if stacktrace.GetCode(err) == datastoreutils.CodeRetryable {
353+
if stacktrace.GetCode(err) == datastore.CodeRetryable {
354354
logger.Info(fmt.Sprintf("Prerequisites not yet satisfied; waiting %.fs to retry...", backoffs[backoff].Seconds()), zap.Error(err))
355355
time.Sleep(backoffs[backoff])
356356
if backoff < len(backoffs)-1 {

cmds/db-manager/migration/migrate.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import (
1212

1313
"github.com/coreos/go-semver/semver"
1414
"github.com/interuss/dss/pkg/datastore"
15-
datastoreflags "github.com/interuss/dss/pkg/datastore/flags"
15+
"github.com/interuss/dss/pkg/datastore/params"
1616

1717
"github.com/interuss/stacktrace"
1818
"github.com/spf13/cobra"
@@ -227,7 +227,7 @@ func migrate(cmd *cobra.Command, _ []string) error {
227227

228228
func connectTo(ctx context.Context, dbName string) (*datastore.Datastore, error) {
229229
// Connect to database server
230-
connectParameters := datastoreflags.ConnectParameters()
230+
connectParameters := params.GetConnectParameters()
231231
connectParameters.DBName = dbName
232232
return datastore.Dial(ctx, connectParameters)
233233
}

pkg/aux_/accepted_ca_certs.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,12 @@ import (
77
"strings"
88

99
restapi "github.com/interuss/dss/pkg/api/auxv1"
10-
"github.com/interuss/dss/pkg/datastore/flags"
10+
"github.com/interuss/dss/pkg/datastore/params"
1111
)
1212

1313
func (a *Server) GetAcceptedCAs(ctx context.Context, req *restapi.GetAcceptedCAsRequest) restapi.GetAcceptedCAsResponseSet {
1414

15-
connectParameters := flags.ConnectParameters()
15+
connectParameters := params.GetConnectParameters()
1616

1717
CAFile := connectParameters.GetCAFile()
1818

pkg/aux_/instance_ca_certs.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,12 @@ import (
77
"strings"
88

99
restapi "github.com/interuss/dss/pkg/api/auxv1"
10-
"github.com/interuss/dss/pkg/datastore/flags"
10+
"github.com/interuss/dss/pkg/datastore/params"
1111
)
1212

1313
func (a *Server) GetInstanceCAs(ctx context.Context, req *restapi.GetInstanceCAsRequest) restapi.GetInstanceCAsResponseSet {
1414

15-
connectParameters := flags.ConnectParameters()
15+
connectParameters := params.GetConnectParameters()
1616

1717
CAFile := connectParameters.GetInstanceCAFile()
1818

pkg/aux_/store/datastore/store.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,7 @@ import (
55

66
"github.com/interuss/dss/pkg/aux_/repos"
77
"github.com/interuss/dss/pkg/datastore"
8-
"github.com/interuss/dss/pkg/datastore/flags"
9-
"github.com/interuss/dss/pkg/datastoreutils"
8+
"github.com/interuss/dss/pkg/datastore/params"
109
"github.com/interuss/dss/pkg/logging"
1110
dssql "github.com/interuss/dss/pkg/sql"
1211
"github.com/jonboulle/clockwork"
@@ -34,7 +33,7 @@ func newStore(ctx context.Context, db *datastore.Datastore, logger *zap.Logger)
3433

3534
s := &Store{}
3635

37-
base, err := datastore.NewStore(ctx, db, flags.ConnectParameters().MaxRetries, currentCrdbMajorSchemaVersion, currentYugabyteMajorSchemaVersion, func(q dssql.Queryable) repos.Repository {
36+
base, err := datastore.NewStore(ctx, db, params.GetConnectParameters().MaxRetries, currentCrdbMajorSchemaVersion, currentYugabyteMajorSchemaVersion, func(q dssql.Queryable) repos.Repository {
3837
return &repo{
3938
Queryable: q,
4039
clock: s.Clock,
@@ -51,7 +50,7 @@ func newStore(ctx context.Context, db *datastore.Datastore, logger *zap.Logger)
5150

5251
func Dial(ctx context.Context, logger *zap.Logger, withCheckCron bool) (*Store, error) {
5352

54-
store, err := datastoreutils.DialStore(ctx, "aux", withCheckCron, func(db *datastore.Datastore) (*Store, error) {
53+
store, err := datastore.DialStore(ctx, "aux", withCheckCron, func(db *datastore.Datastore) (*Store, error) {
5554
return newStore(ctx, db, logger)
5655
})
5756

pkg/aux_/store/datastore/store_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import (
55
"testing"
66

77
"github.com/interuss/dss/pkg/datastore"
8-
"github.com/interuss/dss/pkg/datastore/flags"
8+
"github.com/interuss/dss/pkg/datastore/params"
99
"github.com/interuss/dss/pkg/logging"
1010
"github.com/jonboulle/clockwork"
1111
"github.com/stretchr/testify/require"
@@ -16,7 +16,7 @@ var (
1616
)
1717

1818
func setUpStore(ctx context.Context, t *testing.T) (*Store, func()) {
19-
connectParameters := flags.ConnectParameters()
19+
connectParameters := params.GetConnectParameters()
2020
if connectParameters.Host == "" || connectParameters.Port == 0 {
2121
t.Skip()
2222
}
@@ -32,7 +32,7 @@ func setUpStore(ctx context.Context, t *testing.T) (*Store, func()) {
3232
}
3333
}
3434

35-
func newTestStore(ctx context.Context, t *testing.T, connectParameters datastore.ConnectParameters) (*Store, error) {
35+
func newTestStore(ctx context.Context, t *testing.T, connectParameters params.ConnectParameters) (*Store, error) {
3636
db, err := datastore.Dial(ctx, connectParameters)
3737
require.NoError(t, err)
3838

pkg/datastore/connectParameters.go

Lines changed: 0 additions & 133 deletions
This file was deleted.

pkg/datastore/datastore.go

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,22 +3,82 @@ package datastore
33
import (
44
"context"
55
"fmt"
6+
"strings"
67
"time"
78

89
"github.com/coreos/go-semver/semver"
910
"github.com/exaring/otelpgx"
11+
"github.com/interuss/dss/pkg/datastore/params"
12+
"github.com/interuss/dss/pkg/logging"
1013
"github.com/interuss/stacktrace"
1114
"github.com/jackc/pgx/v5/pgxpool"
15+
"github.com/robfig/cron/v3"
16+
"go.uber.org/zap"
1217
)
1318

1419
type Datastore struct {
1520
Version *Version
1621
Pool *pgxpool.Pool
1722
}
1823

24+
const (
25+
CodeRetryable = stacktrace.ErrorCode(1)
26+
)
27+
1928
var UnknownVersion = &semver.Version{}
2029

21-
func Dial(ctx context.Context, connParams ConnectParameters) (*Datastore, error) {
30+
func checkDatabase(ctx context.Context, db *Datastore, databaseName string) {
31+
logger := logging.WithValuesFromContext(ctx, logging.Logger)
32+
statsPtr := db.Pool.Stat()
33+
if int(statsPtr.TotalConns()) == 0 {
34+
logger.Warn("Failed periodic DB Ping (TotalConns=0)", zap.String("Database", databaseName))
35+
} else {
36+
logger.Info("Successful periodic DB Ping", zap.String("Database", databaseName))
37+
}
38+
}
39+
40+
func DialStore[S any](ctx context.Context, dbName string, withCheckCron bool, newStore func(*Datastore) (S, error)) (S, error) {
41+
42+
var zero S
43+
44+
cp := params.GetConnectParameters()
45+
cp.DBName = dbName
46+
47+
db, err := Dial(ctx, cp)
48+
49+
if err != nil {
50+
if strings.Contains(err.Error(), "connect: connection refused") {
51+
return zero, stacktrace.PropagateWithCode(err, CodeRetryable, "Failed to connect to datastore server for %s", dbName)
52+
}
53+
return zero, stacktrace.Propagate(err, "Failed to connect to %s database", dbName)
54+
}
55+
s, err := newStore(db)
56+
if err != nil {
57+
db.Pool.Close()
58+
if strings.Contains(err.Error(), "connect: connection refused") || strings.Contains(err.Error(), fmt.Sprintf("database \"%s\" does not exist", dbName)) || strings.Contains(err.Error(), "database has not been bootstrapped with Schema Manager") {
59+
return zero, stacktrace.PropagateWithCode(err, CodeRetryable, "Failed to create %s store", dbName)
60+
}
61+
return zero, stacktrace.Propagate(err, "Failed to create %s store", dbName)
62+
}
63+
64+
if withCheckCron {
65+
c := cron.New()
66+
if _, err := c.AddFunc("@every 1m", func() { checkDatabase(ctx, db, dbName) }); err != nil {
67+
db.Pool.Close()
68+
return zero, stacktrace.Propagate(err, "Failed to schedule db check for %s", dbName)
69+
}
70+
c.Start()
71+
72+
go func() {
73+
<-ctx.Done()
74+
c.Stop()
75+
}()
76+
}
77+
78+
return s, nil
79+
}
80+
81+
func Dial(ctx context.Context, connParams params.ConnectParameters) (*Datastore, error) {
2282
dsn, err := connParams.BuildDSN()
2383
if err != nil {
2484
return nil, stacktrace.Propagate(err, "Failed to create connection config for pgx")

pkg/datastore/flags/flags.go

Lines changed: 0 additions & 38 deletions
This file was deleted.

0 commit comments

Comments
 (0)