Skip to content

Commit a9bebda

Browse files
committed
feat(event_producer): implement spanner adapters
Introduces the concrete Spanner adapters that connect the `EventProducer` domain logic to the underlying Spanner client. This includes: - `EventProducerDiffer`: Adapts the backend's `FeaturesSearch` and `GetFeature` logic to the interface required by the differ package. - `EventProducer`: The main adapter that handles the orchestration of locking, state retrieval, and event publishing, including the JSON serialization of event summaries.
1 parent fdf7972 commit a9bebda

File tree

2 files changed

+881
-0
lines changed

2 files changed

+881
-0
lines changed
Lines changed: 226 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,226 @@
1+
// Copyright 2025 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package spanneradapters
16+
17+
import (
18+
"context"
19+
"encoding/json"
20+
"fmt"
21+
"log/slog"
22+
"time"
23+
24+
"cloud.google.com/go/spanner"
25+
"github.com/GoogleChrome/webstatus.dev/lib/backendtypes"
26+
"github.com/GoogleChrome/webstatus.dev/lib/gcpspanner"
27+
"github.com/GoogleChrome/webstatus.dev/lib/gcpspanner/searchtypes"
28+
"github.com/GoogleChrome/webstatus.dev/lib/gen/openapi/backend"
29+
"github.com/GoogleChrome/webstatus.dev/lib/workertypes"
30+
)
31+
32+
type BackendAdapterForEventProducerDiffer interface {
33+
GetFeature(
34+
ctx context.Context,
35+
featureID string,
36+
wptMetricView backend.WPTMetricView,
37+
browsers []backend.BrowserPathParam,
38+
) (*backendtypes.GetFeatureResult, error)
39+
FeaturesSearch(
40+
ctx context.Context,
41+
pageToken *string,
42+
pageSize int,
43+
searchNode *searchtypes.SearchNode,
44+
sortOrder *backend.ListFeaturesParamsSort,
45+
wptMetricView backend.WPTMetricView,
46+
browsers []backend.BrowserPathParam,
47+
) (*backend.FeaturePage, error)
48+
}
49+
50+
type EventProducerDiffer struct {
51+
backendAdapter BackendAdapterForEventProducerDiffer
52+
}
53+
54+
type EventProducerDifferSpannerClient interface {
55+
BackendSpannerClient
56+
}
57+
58+
// NewEventProducerDiffer constructs an adapter for the differ in the event producer service.
59+
func NewEventProducerDiffer(adapter BackendAdapterForEventProducerDiffer) *EventProducerDiffer {
60+
return &EventProducerDiffer{backendAdapter: adapter}
61+
}
62+
63+
func (e *EventProducerDiffer) GetFeature(
64+
ctx context.Context,
65+
featureID string) (*backendtypes.GetFeatureResult, error) {
66+
return e.backendAdapter.GetFeature(ctx, featureID, backend.TestCounts,
67+
backendtypes.DefaultBrowsers())
68+
}
69+
70+
func (e *EventProducerDiffer) FetchFeatures(ctx context.Context, query string) ([]backend.Feature, error) {
71+
parser := searchtypes.FeaturesSearchQueryParser{}
72+
node, err := parser.Parse(query)
73+
if err != nil {
74+
return nil, err
75+
}
76+
var features []backend.Feature
77+
78+
s := backend.NameAsc
79+
var pageToken *string
80+
for {
81+
featurePage, err := e.backendAdapter.FeaturesSearch(
82+
ctx,
83+
pageToken,
84+
// TODO: Use helper for page size https://github.com/GoogleChrome/webstatus.dev/issues/2122
85+
100,
86+
node,
87+
&s,
88+
//TODO: Use helper for test type https://github.com/GoogleChrome/webstatus.dev/issues/2122
89+
backend.TestCounts,
90+
backendtypes.DefaultBrowsers(),
91+
)
92+
if err != nil {
93+
return nil, err
94+
}
95+
features = append(features, featurePage.Data...)
96+
if featurePage.Metadata.NextPageToken == nil {
97+
break
98+
}
99+
pageToken = featurePage.Metadata.NextPageToken
100+
}
101+
102+
return features, nil
103+
}
104+
105+
type EventProducerSpannerClient interface {
106+
TryAcquireSavedSearchStateWorkerLock(
107+
ctx context.Context,
108+
savedSearchID string,
109+
snapshotType gcpspanner.SavedSearchSnapshotType,
110+
workerID string,
111+
ttl time.Duration) (bool, error)
112+
PublishSavedSearchNotificationEvent(ctx context.Context,
113+
event gcpspanner.SavedSearchNotificationCreateRequest, newStatePath, workerID string,
114+
opts ...gcpspanner.CreateOption) (*string, error)
115+
GetLatestSavedSearchNotificationEvent(
116+
ctx context.Context,
117+
savedSearchID string,
118+
snapshotType gcpspanner.SavedSearchSnapshotType,
119+
) (*gcpspanner.SavedSearchNotificationEvent, error)
120+
ReleaseSavedSearchStateWorkerLock(
121+
ctx context.Context,
122+
savedSearchID string,
123+
snapshotType gcpspanner.SavedSearchSnapshotType,
124+
workerID string) error
125+
}
126+
127+
type EventProducer struct {
128+
client EventProducerSpannerClient
129+
}
130+
131+
func NewEventProducer(client EventProducerSpannerClient) *EventProducer {
132+
return &EventProducer{client: client}
133+
}
134+
135+
func convertFrequencyToSnapshotType(freq workertypes.JobFrequency) gcpspanner.SavedSearchSnapshotType {
136+
switch freq {
137+
// Eventually daily and unknown will be their own types.
138+
case workertypes.FrequencyImmediate, workertypes.FrequencyDaily, workertypes.FrequencyUnknown:
139+
return gcpspanner.SavedSearchSnapshotTypeImmediate
140+
case workertypes.FrequencyWeekly:
141+
return gcpspanner.SavedSearchSnapshotTypeWeekly
142+
case workertypes.FrequencyMonthly:
143+
return gcpspanner.SavedSearchSnapshotTypeMonthly
144+
}
145+
146+
return gcpspanner.SavedSearchSnapshotTypeImmediate
147+
}
148+
149+
func convertWorktypeReasonsToSpanner(reasons []workertypes.Reason) []string {
150+
if reasons == nil {
151+
return nil
152+
}
153+
spannerReasons := make([]string, 0, len(reasons))
154+
for _, r := range reasons {
155+
spannerReasons = append(spannerReasons, string(r))
156+
}
157+
158+
return spannerReasons
159+
}
160+
161+
func (e *EventProducer) AcquireLock(ctx context.Context, searchID string, frequency workertypes.JobFrequency,
162+
workerID string, lockTTL time.Duration) error {
163+
snapshotType := convertFrequencyToSnapshotType(frequency)
164+
_, err := e.client.TryAcquireSavedSearchStateWorkerLock(
165+
ctx,
166+
searchID,
167+
snapshotType,
168+
workerID,
169+
lockTTL,
170+
)
171+
172+
return err
173+
}
174+
175+
func (e *EventProducer) GetLatestEvent(ctx context.Context, frequency workertypes.JobFrequency,
176+
searchID string) (*workertypes.LatestEventInfo, error) {
177+
snapshotType := convertFrequencyToSnapshotType(frequency)
178+
179+
event, err := e.client.GetLatestSavedSearchNotificationEvent(ctx, searchID, snapshotType)
180+
if err != nil {
181+
return nil, err
182+
}
183+
184+
return &workertypes.LatestEventInfo{
185+
EventID: event.ID,
186+
StateBlobPath: event.BlobPath,
187+
}, nil
188+
}
189+
190+
func (e *EventProducer) ReleaseLock(ctx context.Context, searchID string, frequency workertypes.JobFrequency,
191+
workerID string) error {
192+
snapshotType := convertFrequencyToSnapshotType(frequency)
193+
194+
return e.client.ReleaseSavedSearchStateWorkerLock(ctx, searchID, snapshotType, workerID)
195+
}
196+
197+
func (e *EventProducer) PublishEvent(ctx context.Context, req workertypes.PublishEventRequest) error {
198+
var summaryObj interface{}
199+
if req.Summary != nil {
200+
if err := json.Unmarshal(req.Summary, &summaryObj); err != nil {
201+
return fmt.Errorf("failed to unmarshal summary JSON: %w", err)
202+
}
203+
}
204+
snapshotType := convertFrequencyToSnapshotType(req.Frequency)
205+
_, err := e.client.PublishSavedSearchNotificationEvent(ctx, gcpspanner.SavedSearchNotificationCreateRequest{
206+
SavedSearchID: req.SearchID,
207+
SnapshotType: snapshotType,
208+
Timestamp: req.GeneratedAt,
209+
EventType: "", // TODO: Set appropriate event type
210+
Reasons: convertWorktypeReasonsToSpanner(req.Reasons),
211+
BlobPath: req.StateBlobPath,
212+
DiffBlobPath: req.DiffBlobPath,
213+
Summary: spanner.NullJSON{Value: map[string]any{"summary": summaryObj}, Valid: req.Summary != nil},
214+
},
215+
req.StateBlobPath,
216+
req.EventID,
217+
gcpspanner.WithID(req.EventID),
218+
)
219+
if err != nil {
220+
slog.ErrorContext(ctx, "unable to publish notification event", "error", err, "eventID", req.EventID)
221+
222+
return err
223+
}
224+
225+
return nil
226+
}

0 commit comments

Comments
 (0)