Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -140,10 +140,10 @@ test-go-units-crdb: cleanup-test-go-units-crdb
go run ./cmds/db-manager/main.go migrate --schemas_dir ./build/db_schemas/rid --db_version latest --datastore_host localhost
go run ./cmds/db-manager/main.go migrate --schemas_dir ./build/db_schemas/scd --db_version latest --datastore_host localhost
go run ./cmds/db-manager/main.go migrate --schemas_dir ./build/db_schemas/aux_ --db_version latest --datastore_host localhost
go test -cover -count=1 -v ./pkg/rid/store/datastore --datastore_host localhost --datastore_port 26257 --datastore_ssl_mode disable --datastore_user root --datastore_db_name rid -test.gocoverdir=$(COVERDATA_DIR)
go test -cover -count=1 -v ./pkg/rid/application --datastore_host localhost --datastore_port 26257 --datastore_ssl_mode disable --datastore_user root --datastore_db_name rid -test.gocoverdir=$(COVERDATA_DIR)
go test -cover -count=1 -v ./pkg/scd/store/datastore --datastore_host localhost --datastore_port 26257 --datastore_ssl_mode disable --datastore_user root --datastore_db_name scd -test.gocoverdir=$(COVERDATA_DIR)
go test -cover -count=1 -v ./pkg/aux_/store/datastore --datastore_host localhost --datastore_port 26257 --datastore_ssl_mode disable --datastore_user root --datastore_db_name aux -test.gocoverdir=$(COVERDATA_DIR)
go test -cover -count=1 -v ./pkg/rid/store/datastore --datastore_host localhost --datastore_port 26257 --datastore_ssl_mode disable --datastore_user root -test.gocoverdir=$(COVERDATA_DIR)
go test -cover -count=1 -v ./pkg/rid/application --datastore_host localhost --datastore_port 26257 --datastore_ssl_mode disable --datastore_user root -test.gocoverdir=$(COVERDATA_DIR)
go test -cover -count=1 -v ./pkg/scd/store/datastore --datastore_host localhost --datastore_port 26257 --datastore_ssl_mode disable --datastore_user root -test.gocoverdir=$(COVERDATA_DIR)
go test -cover -count=1 -v ./pkg/aux_/store/datastore --datastore_host localhost --datastore_port 26257 --datastore_ssl_mode disable --datastore_user root -test.gocoverdir=$(COVERDATA_DIR)
@docker stop dss-crdb-for-testing > /dev/null
@docker rm dss-crdb-for-testing > /dev/null

Expand Down
105 changes: 8 additions & 97 deletions cmds/core-service/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"net/url"
"os"
"os/signal"
"strconv"
"strings"
"syscall"
"time"
Expand All @@ -24,8 +23,7 @@ import (
aux "github.com/interuss/dss/pkg/aux_"
auxc "github.com/interuss/dss/pkg/aux_/store/datastore"
"github.com/interuss/dss/pkg/build"
"github.com/interuss/dss/pkg/datastore"
"github.com/interuss/dss/pkg/datastore/flags" // Force command line flag registration
"github.com/interuss/dss/pkg/datastoreutils"
"github.com/interuss/dss/pkg/logging"
"github.com/interuss/dss/pkg/rid/application"
rid_v1 "github.com/interuss/dss/pkg/rid/server/v1"
Expand All @@ -36,7 +34,6 @@ import (
"github.com/interuss/dss/pkg/version"
"github.com/interuss/dss/pkg/versioning"
"github.com/interuss/stacktrace"
"github.com/robfig/cron/v3"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -65,30 +62,6 @@ var (
scdGlobalLock = flag.Bool("enable_scd_global_lock", false, "Experimental: Use a global lock when working with SCD subscriptions. Reduce global throughput but improve throughput with lot of subscriptions in the same areas.")
)

const (
codeRetryable = stacktrace.ErrorCode(1)
)

func getDBStats(ctx context.Context, db *datastore.Datastore, databaseName string) {
logger := logging.WithValuesFromContext(ctx, logging.Logger)
statsPtr := db.Pool.Stat()
stats := make(map[string]string)
stats["DBName"] = databaseName
stats["AcquireCount"] = strconv.Itoa(int(statsPtr.AcquireCount()))
stats["AcquiredConns"] = strconv.Itoa(int(statsPtr.AcquiredConns()))
stats["CanceledAcquireCount"] = strconv.Itoa(int(statsPtr.CanceledAcquireCount()))
stats["ConstructingConns"] = strconv.Itoa(int(statsPtr.ConstructingConns()))
stats["EmptyAcquireCount"] = strconv.Itoa(int(statsPtr.EmptyAcquireCount()))
stats["IdleConns"] = strconv.Itoa(int(statsPtr.IdleConns()))
stats["MaxConns"] = strconv.Itoa(int(statsPtr.MaxConns()))
stats["TotalConns"] = strconv.Itoa(int(statsPtr.TotalConns()))
if stats["TotalConns"] == "0" {
logger.Warn("Failed periodic DB Ping (TotalConns=0)", zap.String("Database", databaseName))
} else {
logger.Info("Successful periodic DB Ping ", zap.String("Database", databaseName))
}
}

func createKeyResolver() (auth.KeyResolver, error) {
switch {
case *pkFile != "":
Expand All @@ -111,24 +84,9 @@ func createKeyResolver() (auth.KeyResolver, error) {
}

func createAuxServer(ctx context.Context, locality string, publicEndpoint string, scdGlobalLock bool, logger *zap.Logger) (*aux.Server, error) {
connectParameters := flags.ConnectParameters()
connectParameters.DBName = "aux"
datastore, err := datastore.Dial(ctx, connectParameters)
auxStore, err := auxc.Dial(ctx, logger)
if err != nil {
if strings.Contains(err.Error(), "connect: connection refused") {
return nil, stacktrace.PropagateWithCode(err, codeRetryable, "Failed to connect to database for pool information store")
}
return nil, stacktrace.Propagate(err, "Failed to connect to pool information database; verify your database configuration is current with https://github.com/interuss/dss/tree/master/build#upgrading-database-schemas")
}

auxStore, err := auxc.NewStore(ctx, datastore, connectParameters.DBName, logger)
if err != nil {
// TODO: More robustly detect failure to create SCD server is due to a problem that may be temporary
if strings.Contains(err.Error(), "connect: connection refused") || strings.Contains(err.Error(), "database \"aux\" does not exist") {
datastore.Pool.Close()
return nil, stacktrace.PropagateWithCode(err, codeRetryable, "Failed to connect to datastore server for strategic conflict detection store")
}
return nil, stacktrace.Propagate(err, "Failed to create strategic conflict detection store")
return nil, err
}

repo, err := auxStore.Interact(ctx)
Expand All @@ -146,81 +104,36 @@ func createAuxServer(ctx context.Context, locality string, publicEndpoint string
}

func createRIDServers(ctx context.Context, locality string, logger *zap.Logger) (*rid_v1.Server, *rid_v2.Server, error) {
connectParameters := flags.ConnectParameters()
connectParameters.DBName = "rid"
datastore, err := datastore.Dial(ctx, connectParameters)
if err != nil {
// TODO: More robustly detect failure to create RID server is due to a problem that may be temporary
if strings.Contains(err.Error(), "connect: connection refused") {
return nil, nil, stacktrace.PropagateWithCode(err, codeRetryable, "Failed to connect to datastore server for remote ID store")
}
return nil, nil, stacktrace.Propagate(err, "Failed to connect to remote ID database; verify your database configuration is current with https://github.com/interuss/dss/tree/master/build#upgrading-database-schemas")
}

ridStore, err := ridc.NewStore(ctx, datastore, connectParameters.DBName, logger)
ridStore, err := ridc.Dial(ctx, logger)
if err != nil {
// TODO: More robustly detect failure to create RID server is due to a problem that may be temporary
if strings.Contains(err.Error(), "connect: connection refused") || strings.Contains(err.Error(), "database has not been bootstrapped with Schema Manager") {
datastore.Pool.Close()
return nil, nil, stacktrace.PropagateWithCode(err, codeRetryable, "Failed to connect to datastore server for remote ID store")
}
return nil, nil, stacktrace.Propagate(err, "Failed to create remote ID store")
return nil, nil, err
}

_, err = ridStore.Interact(ctx)
if err != nil {
return nil, nil, stacktrace.Propagate(err, "Unable to interact with store")
}

// schedule period tasks for RID Server
ridCron := cron.New()
// schedule printing of DB connection stats every minute for the underlying storage for RID Server
if _, err := ridCron.AddFunc("@every 1m", func() { getDBStats(ctx, datastore, connectParameters.DBName) }); err != nil {
return nil, nil, stacktrace.Propagate(err, "Failed to schedule periodic db stat check to %s", connectParameters.DBName)
}
ridCron.Start()

app := application.NewFromTransactor(ridStore, logger)
return &rid_v1.Server{
App: app,
Locality: locality,
AllowHTTPBaseUrls: *allowHTTPBaseUrls,
Cron: ridCron,
}, &rid_v2.Server{
App: app,
Locality: locality,
AllowHTTPBaseUrls: *allowHTTPBaseUrls,
Cron: ridCron,
}, nil
}

func createSCDServer(ctx context.Context, logger *zap.Logger) (*scd.Server, error) {
connectParameters := flags.ConnectParameters()
connectParameters.DBName = scdc.DatabaseName
datastore, err := datastore.Dial(ctx, connectParameters)
if err != nil {
return nil, stacktrace.Propagate(err, "Failed to connect to strategic conflict detection database; verify your database configuration is current with https://github.com/interuss/dss/tree/master/build#upgrading-database-schemas")
}

scdStore, err := scdc.NewStore(ctx, datastore, *scdGlobalLock)
scdStore, err := scdc.Dial(ctx, logger, *scdGlobalLock)
if err != nil {
// TODO: More robustly detect failure to create SCD server is due to a problem that may be temporary
if strings.Contains(err.Error(), "connect: connection refused") || strings.Contains(err.Error(), "database \"scd\" does not exist") {
datastore.Pool.Close()
return nil, stacktrace.PropagateWithCode(err, codeRetryable, "Failed to connect to datastore server for strategic conflict detection store")
}
return nil, stacktrace.Propagate(err, "Failed to create strategic conflict detection store")
return nil, err
}

// schedule period tasks for SCD Server
scdCron := cron.New()
// schedule printing of DB connection stats every minute for the underlying storage for RID Server
if _, err := scdCron.AddFunc("@every 1m", func() { getDBStats(ctx, datastore, scdc.DatabaseName) }); err != nil {
return nil, stacktrace.Propagate(err, "Failed to schedule periodic db stat check to %s", scdc.DatabaseName)
}

scdCron.Start()

return &scd.Server{
Store: scdStore,
DSSReportHandler: &scd.JSONLoggingReceivedReportHandler{ReportLogger: logger},
Expand Down Expand Up @@ -299,8 +212,6 @@ func RunHTTPServer(ctx context.Context, ctxCanceler func(), address, locality st
if *enableSCD {
scdV1Server, err = createSCDServer(ctx, logger)
if err != nil {
ridV1Server.Cron.Stop()
ridV2Server.Cron.Stop()
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

'Hidden' changes: crons that print stats are not stopped anymore if scd fails, but I didn't find a real purpose for that (since it's going to stop anyways)

return stacktrace.Propagate(err, "Failed to create strategic conflict detection server")
}

Expand Down Expand Up @@ -436,7 +347,7 @@ func main() {
backoff := 0
for {
if err := RunHTTPServer(ctx, cancel, *address, *locality); err != nil {
if stacktrace.GetCode(err) == codeRetryable {
if stacktrace.GetCode(err) == datastoreutils.CodeRetryable {
logger.Info(fmt.Sprintf("Prerequisites not yet satisfied; waiting %.fs to retry...", backoffs[backoff].Seconds()), zap.Error(err))
time.Sleep(backoffs[backoff])
if backoff < len(backoffs)-1 {
Expand Down
1 change: 0 additions & 1 deletion cmds/db-manager/cleanup/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ Flags:
Global Flags:
--datastore_application_name string application name for tagging the connection to the database (default "dss")
--datastore_db_name string database name to connect to
--datastore_host string database host to connect to
--datastore_max_conn_idle_secs int maximum amount of time in seconds a connection may be idle, default is 30 seconds (default 30)
--datastore_max_open_conns int maximum number of open connections to the database, default is 4 (default 4)
Expand Down
52 changes: 8 additions & 44 deletions cmds/db-manager/cleanup/evict.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ import (
"log"
"time"

"github.com/interuss/dss/pkg/datastore"
datastoreflags "github.com/interuss/dss/pkg/datastore/flags"
"github.com/interuss/dss/pkg/datastoreutils"
"github.com/interuss/dss/pkg/logging"
dssmodels "github.com/interuss/dss/pkg/models"
ridmodels "github.com/interuss/dss/pkg/rid/models"
Expand Down Expand Up @@ -54,12 +53,16 @@ func evict(cmd *cobra.Command, _ []string) error {
ctx, cancel := context.WithTimeout(ctx, *timeout)
defer cancel()

scdStore, err := getSCDStore(ctx)
logger := logging.WithValuesFromContext(ctx, logging.Logger)

datastoreutils.ApplicationName = "db-manager"

scdStore, err := scdc.Dial(ctx, logger, false)
if err != nil {
return err
}

ridStore, err := getRIDStore(ctx)
ridStore, err := ridc.Dial(ctx, logger)
if err != nil {
return err
}
Expand Down Expand Up @@ -104,7 +107,7 @@ func evict(cmd *cobra.Command, _ []string) error {
return fmt.Errorf("failed to execute SCD transaction: %w", err)
}

ridAction := func(r ridrepos.Repository) (err error) {
ridAction := func(ctx context.Context, r ridrepos.Repository) (err error) {
if *checkRidISAs {

expiredISAs, err = r.ListExpiredISAs(ctx, *locality, ridThreshold)
Expand Down Expand Up @@ -169,45 +172,6 @@ func evict(cmd *cobra.Command, _ []string) error {
return nil
}

func getSCDStore(ctx context.Context) (*scdc.Store, error) {
connectParameters := datastoreflags.ConnectParameters()
connectParameters.ApplicationName = "db-manager"
connectParameters.DBName = scdc.DatabaseName
datastore, err := datastore.Dial(ctx, connectParameters)
if err != nil {
logParams := connectParameters
logParams.Credentials.Password = "[REDACTED]"
return nil, fmt.Errorf("failed to connect to SCD database with %+v: %w", logParams, err)
}

scdStore, err := scdc.NewStore(ctx, datastore, false)
if err != nil {
return nil, fmt.Errorf("failed to create strategic conflict detection store with %+v: %w", connectParameters, err)
}
return scdStore, nil
}

func getRIDStore(ctx context.Context) (*ridc.Store, error) {

logger := logging.WithValuesFromContext(ctx, logging.Logger)

connectParameters := datastoreflags.ConnectParameters()
connectParameters.ApplicationName = "db-manager"
connectParameters.DBName = "rid"
datastore, err := datastore.Dial(ctx, connectParameters)
if err != nil {
logParams := connectParameters
logParams.Credentials.Password = "[REDACTED]"
return nil, fmt.Errorf("failed to connect to remote ID database with %+v: %w", logParams, err)
}

ridStore, err := ridc.NewStore(ctx, datastore, connectParameters.DBName, logger)
if err != nil {
return nil, fmt.Errorf("failed to create remote ID store with %+v: %w", connectParameters, err)
}
return ridStore, nil
}

func logExpiredEntity(entity string, entityID dssmodels.ID, threshold time.Time, deleted, hasEndTime bool) {
logMsg := "found"
if deleted {
Expand Down
Loading
Loading