Skip to content

Commit c770af2

Browse files
authored
Add support for Spanner as a data repository (#220)
feat: add support for Spanner as a data repository
1 parent aee27ea commit c770af2

10 files changed

+193
-6
lines changed

assets.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,11 @@ package achgateway
22

33
import "embed"
44

5-
//go:embed migrations/*.sql
6-
var MigrationFS embed.FS
5+
//go:embed migrations/*.up.mysql.sql
6+
var MySqlMigrationFS embed.FS
7+
8+
//go:embed migrations/*.up.spanner.sql
9+
var SpannerMigrationFS embed.FS
710

811
//go:embed configs/config.default.yml
912
var ConfigFS embed.FS

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ module github.com/moov-io/achgateway
55
go 1.22.0
66

77
require (
8+
cloud.google.com/go/spanner v1.55.0
89
github.com/PagerDuty/go-pagerduty v1.7.0
910
github.com/ProtonMail/go-crypto v1.0.0
1011
github.com/Shopify/sarama v1.38.1
@@ -45,7 +46,6 @@ require (
4546
cloud.google.com/go/compute/metadata v0.2.3 // indirect
4647
cloud.google.com/go/iam v1.1.5 // indirect
4748
cloud.google.com/go/longrunning v0.5.4 // indirect
48-
cloud.google.com/go/spanner v1.55.0 // indirect
4949
cloud.google.com/go/storage v1.35.1 // indirect
5050
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.7.0 // indirect
5151
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.3.0 // indirect

internal/environment.go

Lines changed: 61 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ import (
4242
"github.com/moov-io/base/stime"
4343
"github.com/moov-io/base/telemetry"
4444

45+
"cloud.google.com/go/spanner"
4546
"github.com/gorilla/mux"
4647
"gocloud.dev/pubsub"
4748
)
@@ -52,6 +53,7 @@ type Environment struct {
5253
Config *service.Config
5354
TimeService stime.TimeService
5455
DB *sql.DB
56+
SpannerClient *spanner.Client
5557
InternalClient *http.Client
5658
Events events.Emitter
5759
Telemetry telemetry.Config
@@ -125,6 +127,25 @@ func NewEnvironment(env *Environment) (*Environment, error) {
125127
}
126128
}
127129

130+
// spanner setup
131+
if env.DB == nil && env.SpannerClient == nil && env.Config.Database.Spanner != nil {
132+
env.Logger.Info().Log("connecting to spanner database")
133+
client, close, err := initializeSpannerDatabase(env.Logger, env.Config.Database)
134+
if err != nil {
135+
close()
136+
return env, fmt.Errorf("setting up spanner database failed: %w", err)
137+
}
138+
env.SpannerClient = client
139+
140+
// Add DB closing to the Shutdown call for the Environment
141+
prev := env.Shutdown
142+
env.Shutdown = func() {
143+
prev()
144+
cancelFunc()
145+
close()
146+
}
147+
}
148+
128149
if env.InternalClient == nil {
129150
env.InternalClient = service.NewInternalClient(env.Logger, env.Config.Clients, "internal-client")
130151
}
@@ -161,8 +182,18 @@ func NewEnvironment(env *Environment) (*Environment, error) {
161182
return env, fmt.Errorf("unable to create http files subscription: %v", err)
162183
}
163184

164-
fileRepository := files.NewRepository(env.DB)
165-
shardRepository := shards.NewRepository(env.DB, env.Config.Sharding.Mappings)
185+
var fileRepository files.Repository
186+
var shardRepository shards.Repository
187+
188+
if env.DB != nil {
189+
fileRepository = files.NewRepository(env.DB)
190+
shardRepository = shards.NewRepository(env.DB, env.Config.Sharding.Mappings)
191+
}
192+
193+
if env.SpannerClient != nil {
194+
fileRepository = files.NewSpannerRepository(env.SpannerClient)
195+
shardRepository = shards.NewSpannerRepository(env.SpannerClient, env.Config.Sharding.Mappings)
196+
}
166197

167198
fileReceiver, err := pipeline.Start(ctx, env.Logger, env.Config, shardRepository, fileRepository, httpSub)
168199
if err != nil {
@@ -235,6 +266,33 @@ func LoadConfig(logger log.Logger) (*service.Config, error) {
235266
return cfg, nil
236267
}
237268

269+
func initializeSpannerDatabase(logger log.Logger, config database.DatabaseConfig) (*spanner.Client, func(), error) {
270+
ctx, cancelFunc := context.WithCancel(context.Background())
271+
272+
logger.Info().Log("connecting to spanner database")
273+
274+
// Create the spanner client
275+
dbString := "projects/" + config.Spanner.Project + "/instances/" + config.Spanner.Instance + "/databases/" + config.DatabaseName
276+
client, err := spanner.NewClient(ctx, dbString)
277+
if err != nil {
278+
return nil, cancelFunc, logger.Fatal().LogErrorf("Error creating spanner client: %w", err).Err()
279+
}
280+
281+
shutdown := func() {
282+
logger.Info().Log("Shutting down the spanner client")
283+
cancelFunc()
284+
client.Close()
285+
}
286+
287+
if err := database.RunMigrations(logger, config, database.WithEmbeddedMigrations(achgateway.SpannerMigrationFS)); err != nil {
288+
return nil, shutdown, logger.Fatal().LogErrorf("Error running migrations: %w", err).Err()
289+
}
290+
291+
logger.Info().Log("Finished initializing spanner client")
292+
293+
return client, shutdown, err
294+
}
295+
238296
func initializeDatabase(logger log.Logger, config database.DatabaseConfig) (*sql.DB, func(), error) {
239297
ctx, cancelFunc := context.WithCancel(context.Background())
240298

@@ -258,7 +316,7 @@ func initializeDatabase(logger log.Logger, config database.DatabaseConfig) (*sql
258316
}
259317

260318
// Run the migrations
261-
if err := database.RunMigrations(logger, config, database.WithEmbeddedMigrations(achgateway.MigrationFS)); err != nil {
319+
if err := database.RunMigrations(logger, config, database.WithEmbeddedMigrations(achgateway.MySqlMigrationFS)); err != nil {
262320
return nil, shutdown, logger.Fatal().LogErrorf("Error running migrations: %w", err).Err()
263321
}
264322

internal/files/repo_files.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"fmt"
77
"time"
88

9+
"cloud.google.com/go/spanner"
910
"github.com/moov-io/base/telemetry"
1011

1112
"go.opentelemetry.io/otel/attribute"
@@ -24,13 +25,21 @@ func NewRepository(db *sql.DB) Repository {
2425
return &sqlRepository{db: db}
2526
}
2627

28+
func NewSpannerRepository(client *spanner.Client) Repository {
29+
if client == nil {
30+
return &MockRepository{}
31+
}
32+
return &spannerRepository{client: client}
33+
}
34+
2735
type sqlRepository struct {
2836
db *sql.DB
2937
}
3038

3139
func (r *sqlRepository) Record(ctx context.Context, file AcceptedFile) error {
3240
ctx, span := telemetry.StartSpan(ctx, "files-record", trace.WithAttributes(
3341
attribute.String("achgateway.file_id", file.FileID),
42+
attribute.String("achgateway.database", "mysql"),
3443
))
3544
defer span.End()
3645

@@ -50,6 +59,7 @@ func (r *sqlRepository) Record(ctx context.Context, file AcceptedFile) error {
5059
func (r *sqlRepository) Cancel(ctx context.Context, fileID string) error {
5160
ctx, span := telemetry.StartSpan(ctx, "files-cancel", trace.WithAttributes(
5261
attribute.String("achgateway.file_id", fileID),
62+
attribute.String("achgateway.database", "mysql"),
5363
))
5464
defer span.End()
5565

@@ -65,3 +75,43 @@ func (r *sqlRepository) Cancel(ctx context.Context, fileID string) error {
6575
}
6676
return nil
6777
}
78+
79+
type spannerRepository struct {
80+
client *spanner.Client
81+
}
82+
83+
func (r *spannerRepository) Record(ctx context.Context, file AcceptedFile) error {
84+
ctx, span := telemetry.StartSpan(ctx, "files-record", trace.WithAttributes(
85+
attribute.String("achgateway.file_id", file.FileID),
86+
attribute.String("achgateway.database", "spanner"),
87+
))
88+
defer span.End()
89+
90+
m := spanner.Insert("files",
91+
[]string{"file_id", "shard_key", "hostname", "accepted_at"},
92+
[]interface{}{file.FileID, file.ShardKey, file.Hostname, file.AcceptedAt},
93+
)
94+
_, err := r.client.Apply(ctx, []*spanner.Mutation{m})
95+
if err != nil {
96+
return fmt.Errorf("recording file failed: %w", err)
97+
}
98+
return nil
99+
}
100+
101+
func (r *spannerRepository) Cancel(ctx context.Context, fileID string) error {
102+
ctx, span := telemetry.StartSpan(ctx, "files-cancel", trace.WithAttributes(
103+
attribute.String("achgateway.file_id", fileID),
104+
attribute.String("achgateway.database", "spanner"),
105+
))
106+
defer span.End()
107+
108+
m := spanner.Update("files",
109+
[]string{"file_id", "canceled_at"},
110+
[]interface{}{fileID, time.Now().In(time.UTC)},
111+
)
112+
_, err := r.client.Apply(ctx, []*spanner.Mutation{m})
113+
if err != nil {
114+
return fmt.Errorf("saving file cancellation failed: %w", err)
115+
}
116+
return nil
117+
}

internal/shards/repository.go

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,14 @@
1818
package shards
1919

2020
import (
21+
"context"
2122
"database/sql"
2223
"fmt"
2324

25+
"cloud.google.com/go/spanner"
2426
"github.com/moov-io/achgateway/internal/service"
2527
"github.com/moov-io/base/database"
28+
"google.golang.org/api/iterator"
2629

2730
"github.com/pkg/errors"
2831
)
@@ -40,6 +43,13 @@ func NewRepository(db *sql.DB, static []service.ShardMapping) Repository {
4043
return &sqlRepository{db: db}
4144
}
4245

46+
func NewSpannerRepository(client *spanner.Client, static []service.ShardMapping) Repository {
47+
if client == nil {
48+
return &InMemoryRepository{Shards: static}
49+
}
50+
return &spannerRepository{client: client}
51+
}
52+
4353
type sqlRepository struct {
4454
db *sql.DB
4555
}
@@ -168,3 +178,58 @@ func (r *sqlRepository) write(shardKey, shardName string) error {
168178
}
169179
return err
170180
}
181+
182+
type spannerRepository struct {
183+
client *spanner.Client
184+
}
185+
186+
func (r *spannerRepository) Lookup(shardKey string) (string, error) {
187+
row, err := r.client.Single().ReadRow(context.Background(), "shard_mappings", spanner.Key{shardKey}, []string{"shard_name"})
188+
if err != nil {
189+
return "", err
190+
}
191+
192+
return row.String(), nil
193+
}
194+
195+
func (r *spannerRepository) List() ([]service.ShardMapping, error) {
196+
ctx := context.Background()
197+
198+
iter := r.client.Single().Read(ctx, "shard_mappings", spanner.AllKeys(), []string{"shard_key", "shard_name"})
199+
defer iter.Stop()
200+
201+
var items []service.ShardMapping
202+
for {
203+
row, err := iter.Next()
204+
if err == iterator.Done {
205+
break
206+
}
207+
if err != nil {
208+
return nil, errors.Wrap(err, "querying")
209+
}
210+
211+
item := service.ShardMapping{}
212+
if err := row.ToStruct(&item); err != nil {
213+
return nil, err
214+
}
215+
216+
items = append(items, item)
217+
}
218+
219+
return items, nil
220+
}
221+
222+
func (r *spannerRepository) Add(create service.ShardMapping, run database.RunInTx) error {
223+
ctx := context.Background()
224+
225+
m := spanner.Insert("shard_mappings",
226+
[]string{"shard_key", "shard_name"},
227+
[]interface{}{create.ShardKey, create.ShardName},
228+
)
229+
230+
_, err := r.client.Apply(ctx, []*spanner.Mutation{m})
231+
if err != nil {
232+
return fmt.Errorf("recording file failed: %w", err)
233+
}
234+
return nil
235+
}
File renamed without changes.
File renamed without changes.
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
CREATE TABLE shard_mappings (
2+
shard_key STRING(50) NOT NULL,
3+
shard_name STRING(50) NOT NULL,
4+
) PRIMARY KEY (shard_key);
File renamed without changes.
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
CREATE TABLE files (
2+
file_id STRING(MAX) NOT NULL,
3+
shard_key STRING(MAX) NOT NULL,
4+
hostname STRING(MAX) NOT NULL,
5+
accepted_at TIMESTAMP NOT NULL,
6+
canceled_at TIMESTAMP,
7+
) PRIMARY KEY (file_id);

0 commit comments

Comments
 (0)