Are you sure you want to unsubscribe?
++ Select how and when you want to get updates for + ${this._subscription?.saved_search_id ?? + this._savedSearch?.name}. +
+ +Get an update when a feature...
+ ${ManageSubscriptionsDialog._TRIGGER_CONFIG.map( + trigger => html` +No subscriptions found.
`; + } + + return html` +Hello
") + if !errors.Is(err, tc.expectedError) { + t.Errorf("Expected error wrapping %v, but got %v (raw: %v)", tc.expectedError, err, errors.Unwrap(err)) + } + }) + } +} diff --git a/lib/gcppubsub/gcppubsubadapters/backend.go b/lib/gcppubsub/gcppubsubadapters/backend.go new file mode 100644 index 000000000..8b75feb43 --- /dev/null +++ b/lib/gcppubsub/gcppubsubadapters/backend.go @@ -0,0 +1,67 @@ +// Copyright 2026 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package gcppubsubadapters + +import ( + "context" + "fmt" + "log/slog" + + "github.com/GoogleChrome/webstatus.dev/lib/event" + searchconfigv1 "github.com/GoogleChrome/webstatus.dev/lib/event/searchconfigurationchanged/v1" + "github.com/GoogleChrome/webstatus.dev/lib/gen/openapi/backend" +) + +type BackendAdapter struct { + client EventPublisher + topicID string +} + +func NewBackendAdapter(client EventPublisher, topicID string) *BackendAdapter { + return &BackendAdapter{client: client, topicID: topicID} +} + +func (p *BackendAdapter) PublishSearchConfigurationChanged( + ctx context.Context, + resp *backend.SavedSearchResponse, + userID string, + isCreation bool) error { + + evt := searchconfigv1.SearchConfigurationChangedEvent{ + SearchID: resp.Id, + Query: resp.Query, + UserID: userID, + Timestamp: resp.UpdatedAt, + IsCreation: isCreation, + Frequency: searchconfigv1.FrequencyImmediate, + } + + msg, err := event.New(evt) + if err != nil { + return fmt.Errorf("failed to create event: %w", err) + } + + id, err := p.client.Publish(ctx, p.topicID, msg) + if err != nil { + return fmt.Errorf("failed to publish message: %w", err) + } + + slog.InfoContext(ctx, "published search configuration changed event", + "msgID", id, + "searchID", evt.SearchID, + "isCreation", evt.IsCreation) + + return nil +} diff --git a/lib/gcppubsub/gcppubsubadapters/backend_test.go b/lib/gcppubsub/gcppubsubadapters/backend_test.go new file mode 100644 index 000000000..061d5a462 --- /dev/null +++ b/lib/gcppubsub/gcppubsubadapters/backend_test.go @@ -0,0 +1,136 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package gcppubsubadapters + +import ( + "context" + "encoding/json" + "errors" + "testing" + "time" + + "github.com/GoogleChrome/webstatus.dev/lib/gen/openapi/backend" + "github.com/google/go-cmp/cmp" +) + +func testSavedSearchResponse(id string, query string, updatedAt time.Time) *backend.SavedSearchResponse { + var resp backend.SavedSearchResponse + resp.Id = id + resp.Query = query + resp.UpdatedAt = updatedAt + + return &resp +} +func TestSearchConfigurationPublisherAdapter_Publish(t *testing.T) { + fixedTime := time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC) + + tests := []struct { + name string + resp *backend.SavedSearchResponse + userID string + isCreation bool + publishErr error + wantErr bool + expectedJSON string + }{ + { + name: "success creation", + resp: testSavedSearchResponse("search-123", "group:css", fixedTime), + userID: "user-1", + isCreation: true, + publishErr: nil, + wantErr: false, + expectedJSON: `{ + "apiVersion": "v1", + "kind": "SearchConfigurationChangedEvent", + "data": { + "search_id": "search-123", + "query": "group:css", + "user_id": "user-1", + "timestamp": "2025-01-01T00:00:00Z", + "is_creation": true, + "frequency": "IMMEDIATE" + } + }`, + }, + { + name: "success update", + resp: testSavedSearchResponse("search-456", "group:html", fixedTime.Add(24*time.Hour)), + userID: "user-1", + isCreation: false, + publishErr: nil, + wantErr: false, + expectedJSON: `{ + "apiVersion": "v1", + "kind": "SearchConfigurationChangedEvent", + "data": { + "search_id": "search-456", + "query": "group:html", + "user_id": "user-1", + "timestamp": "2025-01-02T00:00:00Z", + "is_creation": false, + "frequency": "IMMEDIATE" + } + }`, + }, + { + name: "publish error", + resp: testSavedSearchResponse("search-err", "group:html", fixedTime), + userID: "user-1", + isCreation: false, + publishErr: errors.New("pubsub error"), + wantErr: true, + expectedJSON: "", + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + publisher := new(mockPublisher) + publisher.err = tc.publishErr + adapter := NewBackendAdapter(publisher, "test-topic") + + err := adapter.PublishSearchConfigurationChanged(context.Background(), tc.resp, tc.userID, tc.isCreation) + + if (err != nil) != tc.wantErr { + t.Errorf("PublishSearchConfigurationChanged() error = %v, wantErr %v", err, tc.wantErr) + } + + if tc.wantErr { + return + } + + if publisher.publishedTopic != "test-topic" { + t.Errorf("Topic mismatch: got %s, want test-topic", publisher.publishedTopic) + } + + // Unmarshal actual data + var actual interface{} + if err := json.Unmarshal(publisher.publishedData, &actual); err != nil { + t.Fatalf("failed to unmarshal published data: %v", err) + } + + // Unmarshal expected data + var expected interface{} + if err := json.Unmarshal([]byte(tc.expectedJSON), &expected); err != nil { + t.Fatalf("failed to unmarshal expected data: %v", err) + } + + if diff := cmp.Diff(expected, actual); diff != "" { + t.Errorf("Payload mismatch (-want +got):\n%s", diff) + } + }) + } +} diff --git a/lib/gcpspanner/feature_group_lookups.go b/lib/gcpspanner/feature_group_lookups.go index 120c15926..6fb81e239 100644 --- a/lib/gcpspanner/feature_group_lookups.go +++ b/lib/gcpspanner/feature_group_lookups.go @@ -124,3 +124,30 @@ func calculateAllFeatureGroupLookups( } } } + +// AddFeatureToGroup adds a feature to a group. +func (c *Client) AddFeatureToGroup(ctx context.Context, featureKey, groupKey string) error { + _, err := c.ReadWriteTransaction(ctx, func(ctx context.Context, txn *spanner.ReadWriteTransaction) error { + featureID, err := c.GetIDFromFeatureKey(ctx, NewFeatureKeyFilter(featureKey)) + if err != nil { + return err + } + groupID, err := c.GetGroupIDFromGroupKey(ctx, groupKey) + if err != nil { + return err + } + m, err := spanner.InsertStruct(featureGroupKeysLookupTable, spannerFeatureGroupKeysLookup{ + GroupID: *groupID, + GroupKeyLowercase: groupKey, + WebFeatureID: *featureID, + Depth: 0, + }) + if err != nil { + return err + } + + return txn.BufferWrite([]*spanner.Mutation{m}) + }) + + return err +} diff --git a/lib/gcpspanner/saved_search_subscription.go b/lib/gcpspanner/saved_search_subscription.go index 4dcd93eff..a69ab756d 100644 --- a/lib/gcpspanner/saved_search_subscription.go +++ b/lib/gcpspanner/saved_search_subscription.go @@ -412,3 +412,27 @@ func (c *Client) FindAllActivePushSubscriptions( return results, nil } + +// DeleteUserSubscriptions deletes all saved search subscriptions for a given list of user IDs. +// Used for E2E tests. +func (c *Client) DeleteUserSubscriptions(ctx context.Context, userIDs []string) error { + _, err := c.ReadWriteTransaction(ctx, func(ctx context.Context, txn *spanner.ReadWriteTransaction) error { + _, err := txn.Update(ctx, spanner.Statement{ + SQL: `DELETE FROM SavedSearchSubscriptions WHERE ChannelID IN + (SELECT ID FROM NotificationChannels WHERE UserID IN UNNEST(@userIDs))`, + Params: map[string]interface{}{ + "userIDs": userIDs, + }, + }) + if err != nil { + return errors.Join(ErrInternalQueryFailure, err) + } + + return nil + }) + if err != nil { + return fmt.Errorf("failed to delete user subscriptions: %w", err) + } + + return nil +} diff --git a/lib/gcpspanner/web_features.go b/lib/gcpspanner/web_features.go index 292499b9e..4aa005392 100644 --- a/lib/gcpspanner/web_features.go +++ b/lib/gcpspanner/web_features.go @@ -513,6 +513,22 @@ func (c *Client) FetchAllFeatureKeys(ctx context.Context) ([]string, error) { return fetchSingleColumnValuesWithTransaction[string](ctx, txn, webFeaturesTable, "FeatureKey") } +// UpdateFeatureDescription updates the description of a web feature. +// Useful for e2e tests. +func (c *Client) UpdateFeatureDescription( + ctx context.Context, featureKey, newDescription string) error { + _, err := c.ReadWriteTransaction(ctx, func(_ context.Context, txn *spanner.ReadWriteTransaction) error { + return txn.BufferWrite([]*spanner.Mutation{ + spanner.Update(webFeaturesTable, + []string{"FeatureKey", "Description"}, + []any{featureKey, newDescription}, + ), + }) + }) + + return err +} + type SpannerFeatureIDAndKey struct { ID string `spanner:"ID"` FeatureKey string `spanner:"FeatureKey"` diff --git a/skaffold.yaml b/skaffold.yaml index be8cdd607..8dff3e4f4 100644 --- a/skaffold.yaml +++ b/skaffold.yaml @@ -19,3 +19,4 @@ metadata: requires: - path: ./backend - path: ./frontend + - path: ./workers diff --git a/util/cmd/load_fake_data/main.go b/util/cmd/load_fake_data/main.go index 53186d190..4f2ca617d 100644 --- a/util/cmd/load_fake_data/main.go +++ b/util/cmd/load_fake_data/main.go @@ -248,6 +248,13 @@ func resetTestData(ctx context.Context, spannerClient *gcpspanner.Client, authCl return nil } + // Delete all subscriptions for the test users. + err := spannerClient.DeleteUserSubscriptions(ctx, userIDs) + if err != nil { + return fmt.Errorf("failed to delete test user subscriptions: %w", err) + } + slog.InfoContext(ctx, "Deleted subscriptions for test users") + for _, userID := range userIDs { page, err := spannerClient.ListUserSavedSearches(ctx, userID, 1000, nil) if err != nil { @@ -740,6 +747,59 @@ func generateSavedSearchBookmarks(ctx context.Context, spannerClient *gcpspanner return len(bookmarksToInsert), nil } +func generateSubscriptions(ctx context.Context, spannerClient *gcpspanner.Client, + authClient *auth.Client) (int, error) { + // Get the channel ID for test.user.1@example.com's primary email. + userID, err := findUserIDByEmail(ctx, "test.user.1@example.com", authClient) + if err != nil { + return 0, fmt.Errorf("could not find userID for test.user.1@example.com: %w", err) + } + channels, _, err := spannerClient.ListNotificationChannels(ctx, gcpspanner.ListNotificationChannelsRequest{ + UserID: userID, + PageSize: 100, + PageToken: nil, + }) + if err != nil { + return 0, fmt.Errorf("could not list notification channels for user %s: %w", userID, err) + } + if len(channels) == 0 { + return 0, fmt.Errorf("no notification channels found for user %s", userID) + } + primaryChannelID := channels[0].ID + + subscriptionsToInsert := []struct { + SavedSearchUUID string + ChannelID string + Frequency gcpspanner.SavedSearchSnapshotType + Triggers []gcpspanner.SubscriptionTrigger + }{ + { + // Subscription for "my first project query" + SavedSearchUUID: "74bdb85f-59d3-43b0-8061-20d5818e8c97", + ChannelID: primaryChannelID, + Frequency: gcpspanner.SavedSearchSnapshotTypeWeekly, + Triggers: []gcpspanner.SubscriptionTrigger{ + gcpspanner.SubscriptionTriggerFeatureBaselinePromoteToWidely, + }, + }, + } + + for _, sub := range subscriptionsToInsert { + _, err := spannerClient.CreateSavedSearchSubscription(ctx, gcpspanner.CreateSavedSearchSubscriptionRequest{ + UserID: userID, + ChannelID: sub.ChannelID, + SavedSearchID: sub.SavedSearchUUID, + Triggers: sub.Triggers, + Frequency: sub.Frequency, + }) + if err != nil { + return 0, fmt.Errorf("failed to create subscription for saved search %s: %w", sub.SavedSearchUUID, err) + } + } + + return len(subscriptionsToInsert), nil +} + func generateUserData(ctx context.Context, spannerClient *gcpspanner.Client, authClient *auth.Client) error { savedSearchesCount, err := generateSavedSearches(ctx, spannerClient, authClient) @@ -757,6 +817,13 @@ func generateUserData(ctx context.Context, spannerClient *gcpspanner.Client, slog.InfoContext(ctx, "saved search bookmarks generated", "amount of bookmarks created", bookmarkCount) + subscriptionsCount, err := generateSubscriptions(ctx, spannerClient, authClient) + if err != nil { + return fmt.Errorf("subscriptions generation failed %w", err) + } + slog.InfoContext(ctx, "subscriptions generated", + "amount of subscriptions created", subscriptionsCount) + return nil } func generateData(ctx context.Context, spannerClient *gcpspanner.Client, datastoreClient *gds.Client) error { @@ -1354,6 +1421,7 @@ func main() { datastoreDatabase = flag.String("datastore_database", "", "Datastore Database") scope = flag.String("scope", "all", "Scope of data generation: all, user") resetFlag = flag.Bool("reset", false, "Reset test user data before loading") + triggerScenario = flag.String("trigger-scenario", "", "Trigger a specific data change scenario for E2E tests") ) flag.Parse() @@ -1387,6 +1455,16 @@ func main() { gofakeit.GlobalFaker = gofakeit.New(seedValue) ctx := context.Background() + if *triggerScenario != "" { + err := triggerDataChange(ctx, spannerClient, *triggerScenario) + if err != nil { + slog.ErrorContext(ctx, "Failed to trigger data change", "scenario", *triggerScenario, "error", err) + os.Exit(1) + } + slog.InfoContext(ctx, "Data change triggered successfully", "scenario", *triggerScenario) + + return // Exit immediately after triggering the change + } var finalErr error @@ -1425,5 +1503,36 @@ func main() { slog.ErrorContext(ctx, "Data generation failed", "scope", *scope, "reset", *resetFlag, "error", finalErr) os.Exit(1) } + slog.InfoContext(ctx, "loading fake data successful") } + +func triggerDataChange(ctx context.Context, spannerClient *gcpspanner.Client, scenario string) error { + slog.InfoContext(ctx, "Triggering data change", "scenario", scenario) + // These feature keys are used in the E2E tests. + const nonMatchingFeatureKey = "popover" + const matchingFeatureKey = "popover" + const batchChangeFeatureKey = "dialog" + const batchChangeGroupKey = "css" + + switch scenario { + case "non-matching": + // Change a property that the test subscription is not listening for. + return spannerClient.UpdateFeatureDescription(ctx, nonMatchingFeatureKey, "A non-matching change") + case "matching": + // Change the BaselineStatus to 'widely' to match the test subscription's trigger. + status := gcpspanner.BaselineStatusHigh + + return spannerClient.UpsertFeatureBaselineStatus(ctx, matchingFeatureKey, gcpspanner.FeatureBaselineStatus{ + Status: &status, + // In reality, these would be set, but we are strictly testing the transition of baseline status. + LowDate: nil, + HighDate: nil, + }) + case "batch-change": + // Change a feature to match the 'group:css' search criteria. + return spannerClient.AddFeatureToGroup(ctx, batchChangeFeatureKey, batchChangeGroupKey) + default: + return fmt.Errorf("unknown trigger scenario: %s", scenario) + } +} diff --git a/workers/email/cmd/job/main.go b/workers/email/cmd/job/main.go index 91bf8ec39..6ad983f4d 100644 --- a/workers/email/cmd/job/main.go +++ b/workers/email/cmd/job/main.go @@ -19,8 +19,13 @@ import ( "log/slog" "net/url" "os" + "strconv" + "strings" + "github.com/GoogleChrome/webstatus.dev/lib/email/chime" "github.com/GoogleChrome/webstatus.dev/lib/email/chime/chimeadapters" + "github.com/GoogleChrome/webstatus.dev/lib/email/smtpsender" + "github.com/GoogleChrome/webstatus.dev/lib/email/smtpsender/smtpsenderadapters" "github.com/GoogleChrome/webstatus.dev/lib/gcppubsub" "github.com/GoogleChrome/webstatus.dev/lib/gcppubsub/gcppubsubadapters" "github.com/GoogleChrome/webstatus.dev/lib/gcpspanner" @@ -29,6 +34,33 @@ import ( "github.com/GoogleChrome/webstatus.dev/workers/email/pkg/sender" ) +func getSMTPSender(ctx context.Context, smtpHost string) *smtpsenderadapters.EmailWorkerSMTPAdapter { + slog.InfoContext(ctx, "using smtp email sender") + smtpPortStr := os.Getenv("SMTP_PORT") + smtpPort, err := strconv.Atoi(smtpPortStr) + if err != nil { + slog.ErrorContext(ctx, "invalid SMTP_PORT", "error", err) + os.Exit(1) + } + smtpUsername := os.Getenv("SMTP_USERNAME") + smtpPassword := os.Getenv("SMTP_PASSWORD") + fromAddress := os.Getenv("FROM_ADDRESS") + + smtpCfg := smtpsender.SMTPClientConfig{ + Host: smtpHost, + Port: smtpPort, + Username: smtpUsername, + Password: smtpPassword, + } + smtpClient, err := smtpsender.NewClient(smtpCfg, fromAddress) + if err != nil { + slog.ErrorContext(ctx, "failed to create smtp client", "error", err) + os.Exit(1) + } + + return smtpsenderadapters.NewEmailWorkerSMTPAdapter(smtpClient) +} + func main() { ctx := context.Background() @@ -86,8 +118,33 @@ func main() { os.Exit(1) } + var emailSender sender.EmailSender + smtpHost := os.Getenv("SMTP_HOST") + if smtpHost != "" { + emailSender = getSMTPSender(ctx, smtpHost) + } else { + slog.InfoContext(ctx, "using chime email sender") + chimeEnvStr := os.Getenv("CHIME_ENV") + chimeEnv := chime.EnvProd + if chimeEnvStr == "autopush" { + chimeEnv = chime.EnvAutopush + } + chimeBCC := os.Getenv("CHIME_BCC") + bccList := []string{} + if chimeBCC != "" { + bccList = strings.Split(chimeBCC, ",") + } + fromAddress := os.Getenv("FROM_ADDRESS") + chimeSender, err := chime.NewChimeSender(ctx, chimeEnv, bccList, fromAddress, nil) + if err != nil { + slog.ErrorContext(ctx, "failed to create chime sender", "error", err) + os.Exit(1) + } + emailSender = chimeadapters.NewEmailWorkerChimeAdapter(chimeSender) + } + listener := gcppubsubadapters.NewEmailWorkerSubscriberAdapter(sender.NewSender( - chimeadapters.NewEmailWorkerChimeAdapter(nil), + emailSender, spanneradapters.NewEmailWorkerChannelStateManager(spannerClient), renderer, ), queueClient, emailSubID) diff --git a/workers/email/manifests/pod.yaml b/workers/email/manifests/pod.yaml index d16c68011..b5a1e80f7 100644 --- a/workers/email/manifests/pod.yaml +++ b/workers/email/manifests/pod.yaml @@ -38,6 +38,12 @@ spec: value: 'chime-delivery-sub-id' - name: FRONTEND_BASE_URL value: 'http://localhost:5555' + - name: SMTP_HOST + value: 'mailpit' + - name: SMTP_PORT + value: '1025' + - name: FROM_ADDRESS + value: 'test@webstatus.dev' resources: limits: cpu: 250m diff --git a/workers/email/skaffold.yaml b/workers/email/skaffold.yaml index 1e4c3c6ad..4650b090b 100644 --- a/workers/email/skaffold.yaml +++ b/workers/email/skaffold.yaml @@ -19,6 +19,7 @@ metadata: requires: - path: ../../.dev/pubsub - path: ../../.dev/spanner + - path: ../../.dev/mailpit profiles: - name: local build: diff --git a/workers/skaffold.yaml b/workers/skaffold.yaml new file mode 100644 index 000000000..13045a967 --- /dev/null +++ b/workers/skaffold.yaml @@ -0,0 +1,22 @@ +# Copyright 2026 Google LLC + +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at + +# https://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: skaffold/v4beta9 +kind: Config +metadata: + name: workers +requires: + - path: ./email + - path: ./event_producer + - path: ./push_delivery