Skip to content

Commit 8473046

Browse files
committed
feat(event_producer): implement pubsub adapters
Introduces the `gcppubsubadapters` package to connect the `EventProducer` domain logic to Google Cloud Pub/Sub. This includes: - `EventProducerSubscriberAdapter`: A high-level adapter that routes incoming Pub/Sub messages (RefreshSearch, BatchRefresh, ConfigurationChanged) to the appropriate `EventProducer` methods. - `EventProducerPublisherAdapter`: An adapter for publishing `FeatureDiffEvent` notifications back to Pub/Sub. - `RunGroup`: A concurrency utility for managing the lifecycle of multiple blocking subscribers.
1 parent a9bebda commit 8473046

File tree

4 files changed

+664
-0
lines changed

4 files changed

+664
-0
lines changed
Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
// Copyright 2025 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package gcppubsubadapters
16+
17+
import (
18+
"context"
19+
"log/slog"
20+
21+
"github.com/GoogleChrome/webstatus.dev/lib/event"
22+
batchrefreshv1 "github.com/GoogleChrome/webstatus.dev/lib/event/batchrefreshtrigger/v1"
23+
featurediffv1 "github.com/GoogleChrome/webstatus.dev/lib/event/featurediff/v1"
24+
refreshv1 "github.com/GoogleChrome/webstatus.dev/lib/event/refreshsearchcommand/v1"
25+
searchconfigv1 "github.com/GoogleChrome/webstatus.dev/lib/event/searchconfigurationchanged/v1"
26+
"github.com/GoogleChrome/webstatus.dev/lib/workertypes"
27+
)
28+
29+
type EventProducerSearchMessageHandler interface {
30+
ProcessSearch(ctx context.Context, searchID string, query string,
31+
frequency workertypes.JobFrequency, triggerID string) error
32+
}
33+
34+
type EventProducerBatchUpdateHandler interface {
35+
ProcessBatchUpdate(ctx context.Context, triggerID string, frequency workertypes.JobFrequency) error
36+
}
37+
38+
type EventSubscriber interface {
39+
Subscribe(ctx context.Context, subID string,
40+
handler func(ctx context.Context, msgID string, data []byte) error) error
41+
}
42+
43+
type SubscriberConfig struct {
44+
SearchSubscriptionID string
45+
BatchUpdateSubscriptionID string
46+
}
47+
48+
type EventProducerSubscriberAdapter struct {
49+
searchEventHandler EventProducerSearchMessageHandler
50+
batchUpdateHandler EventProducerBatchUpdateHandler
51+
eventSubscriber EventSubscriber
52+
config SubscriberConfig
53+
searchEventRouter *event.Router
54+
batchUpdateRouter *event.Router
55+
}
56+
57+
func NewEventProducerSubscriberAdapter(
58+
searchMessageHandler EventProducerSearchMessageHandler,
59+
batchUpdateHandler EventProducerBatchUpdateHandler,
60+
eventSubscriber EventSubscriber,
61+
config SubscriberConfig,
62+
) *EventProducerSubscriberAdapter {
63+
searchEventRouter := event.NewRouter()
64+
65+
batchUpdateRouter := event.NewRouter()
66+
67+
ret := &EventProducerSubscriberAdapter{
68+
searchEventHandler: searchMessageHandler,
69+
batchUpdateHandler: batchUpdateHandler,
70+
eventSubscriber: eventSubscriber,
71+
config: config,
72+
searchEventRouter: searchEventRouter,
73+
batchUpdateRouter: batchUpdateRouter,
74+
}
75+
76+
event.Register(searchEventRouter, ret.processRefreshSearchCommand)
77+
event.Register(searchEventRouter, ret.processSearchConfigurationChangedEvent)
78+
79+
event.Register(batchUpdateRouter, ret.processBatchUpdateCommand)
80+
81+
return ret
82+
}
83+
84+
func (a *EventProducerSubscriberAdapter) processRefreshSearchCommand(ctx context.Context,
85+
eventID string, event refreshv1.RefreshSearchCommand) error {
86+
slog.InfoContext(ctx, "received refresh search command", "eventID", eventID, "event", event)
87+
88+
return a.searchEventHandler.ProcessSearch(ctx, event.SearchID, event.Query,
89+
event.Frequency.ToWorkerTypeJobFrequency(), eventID)
90+
}
91+
92+
func (a *EventProducerSubscriberAdapter) processSearchConfigurationChangedEvent(ctx context.Context,
93+
eventID string, event searchconfigv1.SearchConfigurationChangedEvent) error {
94+
slog.InfoContext(ctx, "received search configuration changed event", "eventID", eventID, "event", event)
95+
96+
return a.searchEventHandler.ProcessSearch(ctx, event.SearchID, event.Query,
97+
event.Frequency.ToWorkerTypeJobFrequency(), eventID)
98+
}
99+
100+
func (a *EventProducerSubscriberAdapter) Subscribe(ctx context.Context) error {
101+
return RunGroup(ctx,
102+
// Handler 1: Search
103+
func(ctx context.Context) error {
104+
return a.eventSubscriber.Subscribe(ctx, a.config.SearchSubscriptionID,
105+
func(ctx context.Context, msgID string, data []byte) error {
106+
return a.searchEventRouter.HandleMessage(ctx, msgID, data)
107+
})
108+
},
109+
// Handler 2: Batch Update
110+
func(ctx context.Context) error {
111+
return a.eventSubscriber.Subscribe(ctx, a.config.BatchUpdateSubscriptionID,
112+
func(ctx context.Context, msgID string, data []byte) error {
113+
return a.batchUpdateRouter.HandleMessage(ctx, msgID, data)
114+
})
115+
},
116+
)
117+
}
118+
119+
func (a *EventProducerSubscriberAdapter) processBatchUpdateCommand(ctx context.Context,
120+
eventID string, event batchrefreshv1.BatchRefreshTrigger) error {
121+
slog.InfoContext(ctx, "received batch update command", "eventID", eventID, "event", event)
122+
123+
return a.batchUpdateHandler.ProcessBatchUpdate(ctx, eventID,
124+
event.Frequency.ToWorkerTypeJobFrequency())
125+
}
126+
127+
type EventPublisher interface {
128+
Publish(ctx context.Context, topicID string, data []byte) (string, error)
129+
}
130+
131+
type EventProducerPublisherAdapter struct {
132+
eventPublisher EventPublisher
133+
topicID string
134+
}
135+
136+
func NewEventProducerPublisherAdapter(eventPublisher EventPublisher, topicID string) *EventProducerPublisherAdapter {
137+
return &EventProducerPublisherAdapter{
138+
eventPublisher: eventPublisher,
139+
topicID: topicID,
140+
}
141+
}
142+
143+
func (a *EventProducerPublisherAdapter) Publish(ctx context.Context,
144+
req workertypes.PublishEventRequest) (string, error) {
145+
b, err := event.New(featurediffv1.FeatureDiffEvent{
146+
EventID: req.EventID,
147+
SearchID: req.SearchID,
148+
Query: req.Query,
149+
Summary: req.Summary,
150+
StateID: req.StateID,
151+
StateBlobPath: req.StateBlobPath,
152+
DiffID: req.DiffID,
153+
DiffBlobPath: req.DiffBlobPath,
154+
GeneratedAt: req.GeneratedAt,
155+
Frequency: featurediffv1.ToJobFrequency(req.Frequency),
156+
Reasons: featurediffv1.ToReasons(req.Reasons),
157+
})
158+
if err != nil {
159+
return "", err
160+
}
161+
162+
return a.eventPublisher.Publish(ctx, a.topicID, b)
163+
}

0 commit comments

Comments
 (0)