@@ -12,7 +12,7 @@ import (
12
12
"time"
13
13
14
14
"cloud.google.com/go/datastore"
15
- "cloud.google.com/go/pubsub"
15
+ "cloud.google.com/go/pubsub/v2 "
16
16
"cloud.google.com/go/storage"
17
17
"github.com/google/osv.dev/go/logger"
18
18
"github.com/google/osv.dev/go/models"
@@ -118,7 +118,7 @@ func run(ctx context.Context, env *appEnv) error {
118
118
go func () {
119
119
defer resultsWg .Done ()
120
120
for result := range resultsChan {
121
- if handleResult (ctx , env .topic , result ) {
121
+ if handleResult (ctx , env .publisher , result ) {
122
122
newInvalids = append (newInvalids , result .id )
123
123
}
124
124
}
@@ -182,7 +182,7 @@ func run(ctx context.Context, env *appEnv) error {
182
182
// handleResult handles logging and sending pub/sub message to the recoverer.
183
183
// Returns true if a pub/sub message was sent to the recoverer,
184
184
// to indicate that we need to verify that the recoverer fixes the problem on the next run.
185
- func handleResult (ctx context.Context , topic * pubsub.Topic , result checkRecordResult ) bool {
185
+ func handleResult (ctx context.Context , publisher * pubsub.Publisher , result checkRecordResult ) bool {
186
186
if result .err != nil {
187
187
logger .Error ("failed to process record" , slog .String ("id" , result .id ), slog .Any ("err" , result .err ))
188
188
}
@@ -191,7 +191,7 @@ func handleResult(ctx context.Context, topic *pubsub.Topic, result checkRecordRe
191
191
Attributes : map [string ]string {"type" : "gcs_missing" , "id" : result .id },
192
192
}
193
193
logger .Info ("publishing gcs_missing message" , slog .String ("id" , result .id ))
194
- _ , err := topic .Publish (ctx , & msg ).Get (ctx )
194
+ _ , err := publisher .Publish (ctx , & msg ).Get (ctx )
195
195
if err != nil {
196
196
logger .Error ("failed publishing message" , slog .String ("id" , result .id ), slog .Any ("err" , err ))
197
197
}
@@ -204,7 +204,7 @@ func handleResult(ctx context.Context, topic *pubsub.Topic, result checkRecordRe
204
204
type appEnv struct {
205
205
bucket * storage.BucketHandle
206
206
ds * datastore.Client
207
- topic * pubsub.Topic
207
+ publisher * pubsub.Publisher
208
208
numWorkers int
209
209
}
210
210
@@ -239,7 +239,7 @@ func setup(ctx context.Context) (*appEnv, error) {
239
239
err = fmt .Errorf ("failed to create pubsub client: %w" , err )
240
240
return nil , err
241
241
}
242
- topic := pubsubClient .Topic (pubsubTopic )
242
+ publisher := pubsubClient .Publisher (pubsubTopic )
243
243
244
244
numWorkers := defaultNumWorkers
245
245
if numWorkersStr , ok := os .LookupEnv ("NUM_WORKERS" ); ok {
@@ -253,7 +253,7 @@ func setup(ctx context.Context) (*appEnv, error) {
253
253
return & appEnv {
254
254
bucket : bucket ,
255
255
ds : dsClient ,
256
- topic : topic ,
256
+ publisher : publisher ,
257
257
numWorkers : numWorkers ,
258
258
}, nil
259
259
}
0 commit comments