Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 91 additions & 13 deletions lib/gcpspanner/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ var (
ErrSyncAtomicWriteFailed = errors.New("sync atomic write failed")
ErrSyncBatchWriteFailed = errors.New("sync batch write failed")
ErrSyncFailedToGetChildMutations = errors.New("sync failed to get child mutations")
ErrSyncFailedToGetPreDeleteHooks = errors.New("sync failed to get pre delete hooks")
)

// Client is the client for interacting with GCP Spanner.
Expand Down Expand Up @@ -360,6 +361,10 @@ type readAllMapper interface {
SelectAll() spanner.Statement
}

type readAllByKeysMapper[KeysContainer comparable] interface {
SelectAllByKeys(KeysContainer) spanner.Statement
}

// mergeMapper handles the logic for updating an existing entity.
type mergeMapper[ExternalStruct any, SpannerStruct any] interface {
Merge(ExternalStruct, SpannerStruct) SpannerStruct
Expand Down Expand Up @@ -435,9 +440,10 @@ type syncableEntityMapper[ExternalStruct any, SpannerStruct any, Key comparable]
mergeAndCheckChangedMapper[ExternalStruct, SpannerStruct]
childDeleterMapper[SpannerStruct]
deleteByStructMapper[SpannerStruct]
preDeleteHookMapper[SpannerStruct]
}

type ChildDeleteKeyMutations struct {
type ExtraMutationsGroup struct {
tableName string
mutations []*spanner.Mutation
}
Expand All @@ -449,7 +455,11 @@ type childDeleterMapper[SpannerStruct any] interface {
ctx context.Context,
client *Client,
parentsToDelete []SpannerStruct,
) ([]ChildDeleteKeyMutations, error)
) ([]ExtraMutationsGroup, error)
}

type preDeleteHookMapper[SpannerStruct any] interface {
PreDeleteHook(ctx context.Context, client *Client, rowsToDelete []SpannerStruct) ([]ExtraMutationsGroup, error)
}

// --- Generic Entity Components ---
Expand Down Expand Up @@ -860,6 +870,57 @@ func (c *entityRemover[M, ExternalStruct, SpannerStruct, Key]) removeWithTransac
return nil
}

// allByKeysEntityReader handles the reading of a Spanner table with a set of key(s).
type allByKeysEntityReader[
M readAllByKeysMapper[KeysContainer],
KeysContainer comparable,
SpannerStruct any] struct {
*Client
}

// newAllByKeysEntityReader creates a new reader.
func newAllByKeysEntityReader[
M readAllByKeysMapper[KeysContainer],
KeysContainer comparable,
SpannerStruct any,
](
c *Client,
) *allByKeysEntityReader[M, KeysContainer, SpannerStruct] {
return &allByKeysEntityReader[M, KeysContainer, SpannerStruct]{
Client: c,
}
}

func (r *allByKeysEntityReader[M, KeysContainer, SpannerStruct]) readAllByKeys(
ctx context.Context,
keys KeysContainer,
) ([]SpannerStruct, error) {
var mapper M
stmt := mapper.SelectAllByKeys(keys)
txn := r.Single()
defer txn.Close()
it := txn.Query(ctx, stmt)
defer it.Stop()

var entities []SpannerStruct
for {
row, err := it.Next()
if errors.Is(err, iterator.Done) {
break
}
if err != nil {
return nil, errors.Join(ErrInternalQueryFailure, err)
}
var entity SpannerStruct
if err := row.ToStruct(&entity); err != nil {
return nil, err
}
entities = append(entities, entity)
}

return entities, nil
}

// entitySynchronizer handles the synchronization of a Spanner table with a
// desired state provided as a slice of entities. It determines whether to
// use a single atomic transaction or a high-throughput batch write based on
Expand All @@ -874,6 +935,9 @@ type entitySynchronizer[
// The number of mutations at which the synchronizer will switch from a
// single atomic transaction to the non-atomic batch writer.
batchWriteThreshold int
// Mapper is the entity mapper that provides the necessary database operation logic.
// This field should be configured before calling the Sync method.
Mapper M
}

// newEntitySynchronizer creates a new synchronizer with a default threshold.
Expand All @@ -882,12 +946,13 @@ func newEntitySynchronizer[
ExternalStruct any,
SpannerStruct any,
Key comparable,
](
c *Client,
) *entitySynchronizer[M, ExternalStruct, SpannerStruct, Key] {
](c *Client) *entitySynchronizer[M, ExternalStruct, SpannerStruct, Key] {
var m M

return &entitySynchronizer[M, ExternalStruct, SpannerStruct, Key]{
Client: c,
batchWriteThreshold: defaultBatchSize,
Mapper: m,
}
}

Expand All @@ -897,7 +962,7 @@ func (s *entitySynchronizer[M, ExternalStruct, SpannerStruct, Key]) Sync(
ctx context.Context,
desiredState []ExternalStruct,
) error {
var mapper M
mapper := s.Mapper
tableName := mapper.Table()

// 1. READ: Fetch all existing entities from the database.
Expand Down Expand Up @@ -974,22 +1039,21 @@ func (s *entitySynchronizer[M, ExternalStruct, SpannerStruct, Key]) Sync(
"updates", updates,
"deletes", deletes)

// 3. APPLY DELETES: Handle child and parent deletions first.
if err := s.applyDeletes(ctx, entitiesToDelete, deleteMutations, mapper); err != nil {
return err
}

// 4. APPLY UPSERTS: Apply all inserts and updates together.
// 3. APPLY UPSERTS: Apply all inserts and updates together.
if len(upsertMutations) < s.batchWriteThreshold {
err = s.applyAtomic(ctx, upsertMutations, tableName)
} else {
err = s.applyNonAtomic(ctx, upsertMutations, tableName)
}

if err != nil {
return err
}

// 4. APPLY DELETES: Handle child and parent deletions.
if err := s.applyDeletes(ctx, entitiesToDelete, deleteMutations, mapper); err != nil {
return err
}

slog.InfoContext(ctx, "Sync successful", "table", tableName)

return nil
Expand All @@ -1009,6 +1073,20 @@ func (s *entitySynchronizer[M, ExternalStruct, SpannerStruct, Key]) applyDeletes
}
tableName := mapper.Table()

// Handle pre delete hooks first.
mutationGroups, err := mapper.PreDeleteHook(ctx, s.Client, entitiesToDelete)
if err != nil {
return errors.Join(ErrSyncFailedToGetPreDeleteHooks, err)
}
for _, group := range mutationGroups {
slog.InfoContext(ctx, "Applying pre delete mutations via batch writer",
"count", len(group.mutations), "table", group.tableName)
err := s.applyNonAtomic(ctx, group.mutations, group.tableName)
if err != nil {
return err
}
}

// Handle manual child deletions first.
// The `ON DELETE CASCADE` constraint should be the default, but it can fail
// if a cascade exceeds Spanner's 80k mutation limit.
Expand Down
35 changes: 34 additions & 1 deletion lib/gcpspanner/daily_chromium_histogram_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,34 @@ func (m latestDailyChromiumHistogramMetricMapper) GetChildDeleteKeyMutations(
_ context.Context,
_ *Client,
_ []SpannerLatestDailyChromiumHistogramMetric,
) ([]ChildDeleteKeyMutations, error) {
) ([]ExtraMutationsGroup, error) {
return nil, nil
}

// PreDeleteHook is a no-op for this table.
func (m latestDailyChromiumHistogramMetricMapper) PreDeleteHook(
_ context.Context,
_ *Client,
_ []SpannerLatestDailyChromiumHistogramMetric,
) ([]ExtraMutationsGroup, error) {
return nil, nil
}

type latestDailyChromiumHistogramMetricByWebFeatureIDMapper struct{}

func (m latestDailyChromiumHistogramMetricByWebFeatureIDMapper) SelectAllByKeys(webFeatureID string) spanner.Statement {
stmt := spanner.NewStatement(`
SELECT
*
FROM LatestDailyChromiumHistogramMetrics
WHERE WebFeatureID = @webFeatureID`)
stmt.Params = map[string]interface{}{
"webFeatureID": webFeatureID,
}

return stmt
}

// DeleteMutation creates a Spanner delete mutation.
func (m latestDailyChromiumHistogramMetricMapper) DeleteMutation(
in SpannerLatestDailyChromiumHistogramMetric) *spanner.Mutation {
Expand Down Expand Up @@ -203,3 +227,12 @@ func (c *Client) getDesiredLatestDailyChromiumHistogramMetrics(

return desiredState, nil
}

func (c *Client) getAllLatestDailyChromiumHistogramMetricsByFeatureID(
ctx context.Context, featureID string) ([]SpannerLatestDailyChromiumHistogramMetric, error) {
return newAllByKeysEntityReader[
latestDailyChromiumHistogramMetricByWebFeatureIDMapper,
string,
SpannerLatestDailyChromiumHistogramMetric,
](c).readAllByKeys(ctx, featureID)
}
15 changes: 13 additions & 2 deletions lib/gcpspanner/spanneradapters/web_features_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ import (

// WebFeatureSpannerClient expects a subset of the functionality from lib/gcpspanner that only apply to WebFeatures.
type WebFeatureSpannerClient interface {
SyncWebFeatures(ctx context.Context, features []gcpspanner.WebFeature) error
SyncWebFeatures(ctx context.Context, features []gcpspanner.WebFeature,
options ...gcpspanner.SyncWebFeaturesOption) error
FetchAllWebFeatureIDsAndKeys(ctx context.Context) ([]gcpspanner.SpannerFeatureIDAndKey, error)
UpsertFeatureBaselineStatus(ctx context.Context, featureID string, status gcpspanner.FeatureBaselineStatus) error
UpsertBrowserFeatureAvailability(
Expand Down Expand Up @@ -68,9 +69,19 @@ func (c *WebFeaturesConsumer) InsertWebFeatures(
allFeatures = append(allFeatures, webFeature)
}

redirectMap := map[string]string{}
for sourceKey, targetData := range data.Features.Moved {
redirectMap[sourceKey] = targetData.RedirectTarget
}

options := []gcpspanner.SyncWebFeaturesOption{}
if len(redirectMap) > 0 {
options = append(options, gcpspanner.WithRedirectTargets(redirectMap))
}

// 2. Sync all features at once. This will insert, update, and delete features
// to make the database match the desired state.
if err := c.client.SyncWebFeatures(ctx, allFeatures); err != nil {
if err := c.client.SyncWebFeatures(ctx, allFeatures, options...); err != nil {
slog.ErrorContext(ctx, "failed to sync web features", "error", err)

return nil, err
Expand Down
55 changes: 35 additions & 20 deletions lib/gcpspanner/spanneradapters/web_features_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,9 +197,10 @@ func TestGetBaselineStatusEnum(t *testing.T) {
}

type mockSyncWebFeaturesConfig struct {
expectedInput []gcpspanner.WebFeature
err error
expectedCount int
expectedInput []gcpspanner.WebFeature
expectedOptions []gcpspanner.SyncWebFeaturesOption
err error
expectedCount int
}

type mockFetchIDsAndKeysConfig struct {
Expand Down Expand Up @@ -294,7 +295,7 @@ func (c *mockWebFeatureSpannerClient) SyncSplitWebFeatures(
}

func (c *mockWebFeatureSpannerClient) SyncWebFeatures(
_ context.Context, features []gcpspanner.WebFeature) error {
_ context.Context, features []gcpspanner.WebFeature, opts ...gcpspanner.SyncWebFeaturesOption) error {
// Sort both slices for stable comparison
sort.Slice(features, func(i, j int) bool {
return features[i].FeatureKey < features[j].FeatureKey
Expand All @@ -306,6 +307,12 @@ func (c *mockWebFeatureSpannerClient) SyncWebFeatures(
if diff := cmp.Diff(c.mockSyncWebFeaturesCfg.expectedInput, features); diff != "" {
c.t.Errorf("SyncWebFeatures unexpected input (-want +got):\n%s", diff)
}

if !reflect.DeepEqual(c.mockSyncWebFeaturesCfg.expectedOptions, opts) {
c.t.Errorf("SyncWebFeatures unexpected options expected %v received %v",
c.mockSyncWebFeaturesCfg.expectedOptions, opts)
}

c.syncWebFeaturesCount++

return c.mockSyncWebFeaturesCfg.err
Expand Down Expand Up @@ -532,8 +539,9 @@ func TestInsertWebFeatures(t *testing.T) {
DescriptionHTML: "<html>",
},
},
err: nil,
expectedCount: 1,
expectedOptions: []gcpspanner.SyncWebFeaturesOption{},
err: nil,
expectedCount: 1,
},
mockFetchIDsAndKeysCfg: mockFetchIDsAndKeysConfig{
output: []gcpspanner.SpannerFeatureIDAndKey{
Expand Down Expand Up @@ -730,8 +738,9 @@ func TestInsertWebFeatures(t *testing.T) {
expectedInput: []gcpspanner.WebFeature{
{FeatureKey: "feature1", Name: "Feature 1", Description: "text", DescriptionHTML: "<html>"},
},
err: ErrSyncWebFeaturesTest,
expectedCount: 1,
expectedOptions: []gcpspanner.SyncWebFeaturesOption{},
err: ErrSyncWebFeaturesTest,
expectedCount: 1,
},
mockFetchIDsAndKeysCfg: mockFetchIDsAndKeysConfig{
output: nil,
Expand Down Expand Up @@ -815,8 +824,9 @@ func TestInsertWebFeatures(t *testing.T) {
DescriptionHTML: "<html>",
},
},
err: nil,
expectedCount: 1,
expectedOptions: []gcpspanner.SyncWebFeaturesOption{},
err: nil,
expectedCount: 1,
},
mockFetchIDsAndKeysCfg: mockFetchIDsAndKeysConfig{
output: nil,
Expand Down Expand Up @@ -908,8 +918,9 @@ func TestInsertWebFeatures(t *testing.T) {
DescriptionHTML: "<html>",
},
},
err: nil,
expectedCount: 1,
expectedOptions: []gcpspanner.SyncWebFeaturesOption{},
err: nil,
expectedCount: 1,
},
mockFetchIDsAndKeysCfg: mockFetchIDsAndKeysConfig{
output: nil,
Expand Down Expand Up @@ -1012,8 +1023,9 @@ func TestInsertWebFeatures(t *testing.T) {
DescriptionHTML: "<html>",
},
},
err: nil,
expectedCount: 1,
expectedOptions: []gcpspanner.SyncWebFeaturesOption{},
err: nil,
expectedCount: 1,
},
mockFetchIDsAndKeysCfg: mockFetchIDsAndKeysConfig{
output: nil,
Expand Down Expand Up @@ -1158,8 +1170,9 @@ func TestInsertWebFeatures(t *testing.T) {
DescriptionHTML: "<html>",
},
},
err: nil,
expectedCount: 1,
expectedOptions: []gcpspanner.SyncWebFeaturesOption{},
err: nil,
expectedCount: 1,
},
mockFetchIDsAndKeysCfg: mockFetchIDsAndKeysConfig{
output: nil,
Expand Down Expand Up @@ -1356,8 +1369,9 @@ func TestInsertWebFeatures(t *testing.T) {
DescriptionHTML: "<html>",
},
},
err: nil,
expectedCount: 1,
expectedOptions: []gcpspanner.SyncWebFeaturesOption{},
err: nil,
expectedCount: 1,
},
mockFetchIDsAndKeysCfg: mockFetchIDsAndKeysConfig{
output: nil,
Expand Down Expand Up @@ -1556,8 +1570,9 @@ func TestInsertWebFeatures(t *testing.T) {
DescriptionHTML: "<html>",
},
},
err: nil,
expectedCount: 1,
expectedOptions: []gcpspanner.SyncWebFeaturesOption{},
err: nil,
expectedCount: 1,
},
mockFetchIDsAndKeysCfg: mockFetchIDsAndKeysConfig{
output: nil,
Expand Down
Loading