Skip to content

Commit f0c8f99

Browse files
authored
feat(publisher): seperate generate ad and publish (#66)
seperate a parallizable step (generating ads) from one that is not parallizable (publishing). - create a generate ad function that just generates the add that will be published for a given set of CIDs - create an advertisement publisher that can batch several ads together with only one head publish step - port the generic queuepoller into go-libstoracha, now supporting handlers that batch all jobs at once - modify the queue interface to specifically seperate the job from the ID on read - rename queues to be called `publishing` rather than `publisher` (consistency) - modify extended queue to match queue interface - add advertisement publishing SQS queue - generate new handlers and interfaces for double step queue publishing
1 parent 5dabefb commit f0c8f99

File tree

11 files changed

+1257
-237
lines changed

11 files changed

+1257
-237
lines changed

awsutils/extendedqueue.go

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"github.com/aws/aws-sdk-go-v2/service/s3"
1313
"github.com/aws/aws-sdk-go-v2/service/sqs"
1414
"github.com/google/uuid"
15+
"github.com/storacha/go-libstoracha/queuepoller"
1516
)
1617

1718
// queueMessage is the struct that is serialized onto an SQS message queue in JSON
@@ -22,7 +23,6 @@ type queueMessage[Message any] struct {
2223

2324
// SerializedJob represents a job that has been serialized for transport over SQS + S3
2425
type SerializedJob[Message any] struct {
25-
ID string
2626
GroupID *string
2727
Message Message
2828
Extended io.Reader
@@ -31,7 +31,6 @@ type SerializedJob[Message any] struct {
3131
type JobMarshaller[Job any, Message any] interface {
3232
Marshall(job Job) (SerializedJob[Message], error)
3333
Unmarshall(SerializedJob[Message]) (Job, error)
34-
Empty() Job
3534
}
3635

3736
// SQSExtendedQueue implements a queue interface using SQS that can store extended data to an S3 bucket
@@ -115,7 +114,7 @@ func (s *SQSExtendedQueue[Job, Message]) sendMessage(ctx context.Context, groupI
115114
// Read reads a batch of jobs from the SQS queue.
116115
// Returns an empty slice if no jobs are available.
117116
// The caller must process jobs and delete them from the queue when done.
118-
func (s *SQSExtendedQueue[Job, Message]) Read(ctx context.Context, maxJobs int) ([]Job, error) {
117+
func (s *SQSExtendedQueue[Job, Message]) Read(ctx context.Context, maxJobs int) ([]queuepoller.WithID[Job], error) {
119118
receiveOutput, err := s.sqsClient.ReceiveMessage(ctx, &sqs.ReceiveMessageInput{
120119
QueueUrl: aws.String(s.queueID),
121120
MaxNumberOfMessages: int32(maxJobs),
@@ -126,10 +125,10 @@ func (s *SQSExtendedQueue[Job, Message]) Read(ctx context.Context, maxJobs int)
126125
}
127126

128127
if len(receiveOutput.Messages) == 0 {
129-
return []Job{}, nil
128+
return []queuepoller.WithID[Job]{}, nil
130129
}
131130

132-
jobs := make([]Job, 0, len(receiveOutput.Messages))
131+
jobs := make([]queuepoller.WithID[Job], 0, len(receiveOutput.Messages))
133132
for _, msg := range receiveOutput.Messages {
134133
job, err := s.decoder.DecodeMessage(ctx, aws.ToString(msg.ReceiptHandle), aws.ToString(msg.Body))
135134
if err != nil {
@@ -179,23 +178,26 @@ func NewSQSDecoder[Job any, Message any](cfg aws.Config, bucket string, marshall
179178
}
180179

181180
// DecodeMessage decodes a provider caching job from the SQS message body, reading the stored index from S3
182-
func (s *SQSDecoder[Job, Message]) DecodeMessage(ctx context.Context, receiptHandle string, messageBody string) (Job, error) {
181+
func (s *SQSDecoder[Job, Message]) DecodeMessage(ctx context.Context, receiptHandle string, messageBody string) (queuepoller.WithID[Job], error) {
183182
var msg queueMessage[Message]
184183
err := json.Unmarshal([]byte(messageBody), &msg)
185184
if err != nil {
186-
return s.marshaller.Empty(), fmt.Errorf("deserializing message: %w", err)
185+
return queuepoller.WithID[Job]{}, fmt.Errorf("deserializing message: %w", err)
187186
}
188187
received, err := s.s3Client.GetObject(ctx, &s3.GetObjectInput{
189188
Bucket: aws.String(s.bucket),
190189
Key: aws.String(msg.JobID.String()),
191190
})
192191
if err != nil {
193-
return s.marshaller.Empty(), fmt.Errorf("reading stored index CAR: %w", err)
192+
return queuepoller.WithID[Job]{}, fmt.Errorf("reading stored index CAR: %w", err)
194193
}
195194
defer received.Body.Close()
196-
return s.marshaller.Unmarshall(SerializedJob[Message]{
197-
ID: receiptHandle,
195+
job, err := s.marshaller.Unmarshall(SerializedJob[Message]{
198196
Message: msg.Message,
199197
Extended: received.Body,
200198
})
199+
if err != nil {
200+
return queuepoller.WithID[Job]{}, fmt.Errorf("unmarshalling job: %w", err)
201+
}
202+
return queuepoller.WithID[Job]{ID: receiptHandle, Job: job}, nil
201203
}

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
module github.com/storacha/go-libstoracha
22

3-
go 1.24.4
3+
go 1.25.3
44

55
require (
66
github.com/aws/aws-sdk-go v1.55.7
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
package publisher
2+
3+
import (
4+
"context"
5+
"fmt"
6+
7+
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
8+
"github.com/ipni/go-libipni/announce"
9+
"github.com/ipni/go-libipni/announce/httpsender"
10+
"github.com/ipni/go-libipni/dagsync/ipnisync/head"
11+
"github.com/ipni/go-libipni/ingest/schema"
12+
"github.com/libp2p/go-libp2p/core/crypto"
13+
"github.com/libp2p/go-libp2p/core/peer"
14+
"github.com/storacha/go-libstoracha/ipnipublisher/store"
15+
"github.com/storacha/go-ucanto/core/ipld"
16+
)
17+
18+
type AdvertisementPublisher struct {
19+
*options
20+
pendingAds []schema.Advertisement
21+
sender announce.Sender
22+
key crypto.PrivKey
23+
store store.PublisherStore
24+
}
25+
26+
func NewAdvertisementPublisher(id crypto.PrivKey, store store.PublisherStore, opts ...Option) (*AdvertisementPublisher, error) {
27+
o := &options{
28+
topic: "/indexer/ingest/mainnet",
29+
}
30+
for _, opt := range opts {
31+
err := opt(o)
32+
if err != nil {
33+
return nil, err
34+
}
35+
}
36+
peer, err := peer.IDFromPrivateKey(id)
37+
if err != nil {
38+
return nil, fmt.Errorf("cannot get peer ID from private key: %w", err)
39+
}
40+
batchPublisher := &AdvertisementPublisher{
41+
options: o,
42+
key: id,
43+
store: store,
44+
}
45+
if len(o.announceURLs) > 0 {
46+
sender, err := httpsender.New(o.announceURLs, peer)
47+
if err != nil {
48+
return nil, fmt.Errorf("cannot create http announce sender: %w", err)
49+
}
50+
log.Info("HTTP announcements enabled")
51+
batchPublisher.sender = sender
52+
}
53+
return batchPublisher, nil
54+
}
55+
56+
func (p *AdvertisementPublisher) AddToBatch(adv schema.Advertisement) error {
57+
p.pendingAds = append(p.pendingAds, adv)
58+
return nil
59+
}
60+
61+
func (p *AdvertisementPublisher) Commit(ctx context.Context) (ipld.Link, error) {
62+
pendingAds := p.pendingAds
63+
p.pendingAds = nil
64+
lnk, err := p.commit(ctx, pendingAds)
65+
if err != nil {
66+
for _, adv := range pendingAds {
67+
if !adv.IsRm {
68+
peer, err := peer.Decode(adv.Provider)
69+
if err == nil {
70+
_ = p.store.DeleteChunkLinkForProviderAndContextID(ctx, peer, adv.ContextID)
71+
}
72+
}
73+
}
74+
return nil, err
75+
}
76+
return lnk, nil
77+
}
78+
func (p *AdvertisementPublisher) commit(ctx context.Context, pendingAds []schema.Advertisement) (ipld.Link, error) {
79+
80+
// Get the previous advertisement that was generated.
81+
prevHead, err := p.store.Head(ctx)
82+
if err != nil {
83+
if !store.IsNotFound(err) {
84+
return nil, fmt.Errorf("could not get latest advertisement: %s", err)
85+
}
86+
}
87+
var prevLink ipld.Link
88+
// Check for cid.Undef for the previous link. If this is the case, then
89+
// this means there are no previous advertisements.
90+
if prevHead == nil {
91+
log.Info("Latest advertisement CID was undefined - no previous advertisement")
92+
} else {
93+
prevLink = prevHead.Head
94+
}
95+
96+
if len(pendingAds) == 0 {
97+
log.Info("No pending advertisements to commit")
98+
return prevLink, nil
99+
}
100+
101+
// Store all pending advertisements in order, linking each to the previous.
102+
for _, adv := range pendingAds {
103+
adv.PreviousID = prevLink
104+
105+
// Sign the advertisement.
106+
if err = adv.Sign(p.key); err != nil {
107+
return nil, err
108+
}
109+
110+
if err := adv.Validate(); err != nil {
111+
return nil, err
112+
}
113+
114+
lnk, err := p.store.PutAdvert(ctx, adv)
115+
if err != nil {
116+
return nil, err
117+
}
118+
log.Info("Stored ad in local link system")
119+
prevLink = lnk
120+
}
121+
122+
lnk := prevLink
123+
head, err := head.NewSignedHead(lnk.(cidlink.Link).Cid, p.topic, p.key)
124+
if err != nil {
125+
log.Errorw("Failed to generate signed head for the latest advertisement", "err", err)
126+
return nil, fmt.Errorf("failed to generate signed head for the latest advertisement: %w", err)
127+
}
128+
if _, err := p.store.ReplaceHead(ctx, prevHead, head); err != nil {
129+
log.Errorw("Failed to update reference to the latest advertisement", "err", err)
130+
return nil, fmt.Errorf("failed to update reference to latest advertisement: %w", err)
131+
}
132+
log.Info("Updated reference to the latest advertisement successfully")
133+
134+
if p.sender != nil {
135+
err = announce.Send(ctx, lnk.(cidlink.Link).Cid, p.pubHTTPAnnounceAddrs, p.sender)
136+
if err != nil {
137+
log.Warnw("Failed to announce advertisement", "err", err)
138+
}
139+
}
140+
141+
return lnk, nil
142+
}
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
package publisher
2+
3+
import (
4+
"context"
5+
"encoding/base64"
6+
"fmt"
7+
"iter"
8+
9+
"github.com/ipni/go-libipni/ingest/schema"
10+
"github.com/ipni/go-libipni/metadata"
11+
"github.com/libp2p/go-libp2p/core/peer"
12+
"github.com/multiformats/go-multiaddr"
13+
mh "github.com/multiformats/go-multihash"
14+
"github.com/storacha/go-libstoracha/ipnipublisher/store"
15+
)
16+
17+
// GenerateAd generates an advertisement for the given parameters.
18+
func GenerateAd(ctx context.Context, publisherStore store.PublisherStore, peer peer.ID, addrs []multiaddr.Multiaddr, contextID []byte, md metadata.Metadata, isRm bool, mhs iter.Seq[mh.Multihash]) (schema.Advertisement, error) {
19+
var err error
20+
21+
log := log.With("providerID", peer).With("contextID", base64.StdEncoding.EncodeToString(contextID))
22+
23+
chunkLink, err := publisherStore.ChunkLinkForProviderAndContextID(ctx, peer, contextID)
24+
if err != nil {
25+
if !store.IsNotFound(err) {
26+
return schema.Advertisement{}, fmt.Errorf("could not get entries cid by provider + context id: %s", err)
27+
}
28+
}
29+
30+
// If not removing, then generate the link for the list of CIDs from the
31+
// contextID using the multihash lister, and store the relationship.
32+
if !isRm {
33+
log.Info("Creating advertisement")
34+
35+
// If no previously-published ad for this context ID.
36+
if chunkLink == nil {
37+
log.Info("Generating entries linked list for advertisement")
38+
39+
// Generate the linked list ipld.Link that is added to the
40+
// advertisement and used for ingestion.
41+
chunkLink, err = publisherStore.PutEntries(ctx, mhs)
42+
if err != nil {
43+
return schema.Advertisement{}, fmt.Errorf("could not generate entries list: %s", err)
44+
}
45+
if chunkLink == nil {
46+
log.Warnw("chunking for context ID resulted in no link", "contextID", contextID)
47+
chunkLink = schema.NoEntries
48+
}
49+
50+
// Store the relationship between providerID, contextID and CID of the
51+
// advertised list of Cids.
52+
err = publisherStore.PutChunkLinkForProviderAndContextID(ctx, peer, contextID, chunkLink)
53+
if err != nil {
54+
return schema.Advertisement{}, fmt.Errorf("failed to write provider + context id to entries cid mapping: %s", err)
55+
}
56+
} else {
57+
// Lookup metadata for this providerID and contextID.
58+
prevMetadata, err := publisherStore.MetadataForProviderAndContextID(ctx, peer, contextID)
59+
if err != nil {
60+
if !store.IsNotFound(err) {
61+
return schema.Advertisement{}, fmt.Errorf("could not get metadata for provider + context id: %s", err)
62+
}
63+
log.Warn("No metadata for existing provider + context ID, generating new advertisement")
64+
}
65+
66+
if md.Equal(prevMetadata) {
67+
// Metadata is the same; no change, no need for new
68+
// advertisement.
69+
return schema.Advertisement{}, ErrAlreadyAdvertised
70+
}
71+
72+
// Linked list is the same, but metadata is different, so generate
73+
// new advertisement with same linked list, but new metadata.
74+
}
75+
76+
if err = publisherStore.PutMetadataForProviderAndContextID(ctx, peer, contextID, md); err != nil {
77+
return schema.Advertisement{}, fmt.Errorf("failed to write provider + context id to metadata mapping: %s", err)
78+
}
79+
} else {
80+
log.Info("Creating removal advertisement")
81+
82+
if chunkLink == nil {
83+
return schema.Advertisement{}, ErrContextIDNotFound
84+
}
85+
86+
// If removing by context ID, it means the list of CIDs is not needed
87+
// anymore, so we can remove the entry from the datastore.
88+
err = publisherStore.DeleteChunkLinkForProviderAndContextID(ctx, peer, contextID)
89+
if err != nil {
90+
return schema.Advertisement{}, fmt.Errorf("failed to delete provider + context id to entries cid mapping: %s", err)
91+
}
92+
err = publisherStore.DeleteMetadataForProviderAndContextID(ctx, peer, contextID)
93+
if err != nil {
94+
return schema.Advertisement{}, fmt.Errorf("failed to delete provider + context id to metadata mapping: %s", err)
95+
}
96+
97+
// Create an advertisement to delete content by contextID by specifying
98+
// that advertisement has no entries.
99+
chunkLink = schema.NoEntries
100+
101+
// The advertisement still requires a valid metadata even though
102+
// metadata is not used for removal. Create a valid empty metadata.
103+
md = metadata.Default.New()
104+
}
105+
106+
mdBytes, err := md.MarshalBinary()
107+
if err != nil {
108+
return schema.Advertisement{}, err
109+
}
110+
111+
var stringAddrs []string
112+
for _, addr := range addrs {
113+
stringAddrs = append(stringAddrs, addr.String())
114+
}
115+
116+
return schema.Advertisement{
117+
Provider: peer.String(),
118+
Addresses: stringAddrs,
119+
Entries: chunkLink,
120+
ContextID: contextID,
121+
Metadata: mdBytes,
122+
IsRm: isRm,
123+
}, nil
124+
125+
}

0 commit comments

Comments
 (0)