Skip to content

Commit 5552634

Browse files
committed
feat: handle firehose events
1 parent 26bdd3e commit 5552634

File tree

21 files changed

+1121
-79
lines changed

21 files changed

+1121
-79
lines changed

api/http/bluesky/service.go

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,11 @@ package bluesky
33
import (
44
"bytes"
55
"context"
6+
"fmt"
67
"github.com/bluesky-social/indigo/api/bsky"
78
"github.com/bytedance/sonic"
89
"io"
910
"net/http"
10-
"net/http/httptest"
1111
)
1212

1313
type Service interface {
@@ -55,7 +55,7 @@ func (s service) Login(ctx context.Context, id, password string) (did, token str
5555
Identifier: id,
5656
Password: password,
5757
})
58-
req := httptest.NewRequestWithContext(
58+
req, _ := http.NewRequestWithContext(
5959
ctx,
6060
http.MethodPost,
6161
"https://bsky.social/xrpc/com.atproto.server.createSession",
@@ -68,15 +68,18 @@ func (s service) Login(ctx context.Context, id, password string) (did, token str
6868
var respData []byte
6969
if err == nil {
7070
defer resp.Body.Close()
71-
respData, err = io.ReadAll(io.LimitReader(req.Body, limitBodyLen))
71+
respData, err = io.ReadAll(io.LimitReader(resp.Body, limitBodyLen))
7272
}
7373
var lr loginResp
7474
if err == nil {
7575
err = sonic.Unmarshal(respData, &lr)
7676
}
77-
if err == nil {
77+
switch err {
78+
case nil:
7879
did = lr.Did
7980
token = lr.AccessJwt
81+
default:
82+
err = fmt.Errorf("response: %d, %+v, %s", resp.StatusCode, resp.Header, string(respData))
8083
}
8184
return
8285
}
@@ -87,7 +90,7 @@ func (s service) CreatePost(ctx context.Context, post *bsky.FeedPost, did, token
8790
Collection: coll,
8891
Record: post,
8992
})
90-
req := httptest.NewRequestWithContext(
93+
req, _ := http.NewRequestWithContext(
9194
ctx,
9295
http.MethodPost,
9396
"https://bsky.social/xrpc/com.atproto.repo.createRecord",

api/http/pub/event.go

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
package pub
2+
3+
import (
4+
"encoding/base64"
5+
"fmt"
6+
"github.com/bytedance/sonic"
7+
"github.com/cloudevents/sdk-go/binding/format/protobuf/v2/pb"
8+
"time"
9+
)
10+
11+
type event struct {
12+
Id string `json:"id"`
13+
SpecVersion string `json:"specVersion,omitempty"`
14+
Source string `json:"source"`
15+
Type string `json:"type"`
16+
Attributes map[string]attribute `json:"attributes"`
17+
TextData string `json:"textData,omitempty"`
18+
}
19+
20+
type attribute struct {
21+
CeBoolean *bool `json:"ceBoolean,omitempty"`
22+
CeBytes *string `json:"ceBytes,omitempty"`
23+
CeInteger *int32 `json:"ceInteger,omitempty"`
24+
CeString *string `json:"ceString,omitempty"`
25+
CeTimestamp *time.Time `json:"ceTimestamp,omitempty"`
26+
CeUri *string `json:"ceUri,omitempty"`
27+
CeUriRef *string `json:"ceUriRef,omitempty"`
28+
}
29+
30+
func MarshalEvent(src *pb.CloudEvent) (data []byte, err error) {
31+
32+
evt := event{
33+
Id: src.Id,
34+
SpecVersion: src.SpecVersion,
35+
Source: src.Source,
36+
Type: src.Type,
37+
Attributes: make(map[string]attribute),
38+
TextData: src.GetTextData(),
39+
}
40+
41+
for k, v := range src.GetAttributes() {
42+
switch vt := v.Attr.(type) {
43+
case *pb.CloudEventAttributeValue_CeBoolean:
44+
evt.Attributes[k] = attribute{
45+
CeBoolean: &vt.CeBoolean,
46+
}
47+
case *pb.CloudEventAttributeValue_CeBytes:
48+
b64s := base64.StdEncoding.EncodeToString(vt.CeBytes)
49+
evt.Attributes[k] = attribute{
50+
CeBytes: &b64s,
51+
}
52+
case *pb.CloudEventAttributeValue_CeInteger:
53+
evt.Attributes[k] = attribute{
54+
CeInteger: &vt.CeInteger,
55+
}
56+
case *pb.CloudEventAttributeValue_CeString:
57+
evt.Attributes[k] = attribute{
58+
CeString: &vt.CeString,
59+
}
60+
case *pb.CloudEventAttributeValue_CeTimestamp:
61+
ts := vt.CeTimestamp.AsTime().UTC()
62+
evt.Attributes[k] = attribute{
63+
CeTimestamp: &ts,
64+
}
65+
case *pb.CloudEventAttributeValue_CeUri:
66+
evt.Attributes[k] = attribute{
67+
CeUri: &vt.CeUri,
68+
}
69+
case *pb.CloudEventAttributeValue_CeUriRef:
70+
evt.Attributes[k] = attribute{
71+
CeUriRef: &vt.CeUriRef,
72+
}
73+
default:
74+
err = fmt.Errorf("failed to marshal event %s, unknown attribute type: %T", src.Id, vt)
75+
}
76+
}
77+
78+
if err == nil {
79+
data, err = sonic.Marshal(evt)
80+
}
81+
82+
return
83+
}

api/http/pub/event_test.go

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package pub
2+
3+
import (
4+
"fmt"
5+
"github.com/cloudevents/sdk-go/binding/format/protobuf/v2/pb"
6+
"github.com/stretchr/testify/require"
7+
"google.golang.org/protobuf/types/known/timestamppb"
8+
"testing"
9+
"time"
10+
)
11+
12+
func TestMarshalEvent(t *testing.T) {
13+
in := &pb.CloudEvent{
14+
Id: "id1",
15+
Source: "src1",
16+
SpecVersion: "1.0",
17+
Type: "type1",
18+
Attributes: map[string]*pb.CloudEventAttributeValue{
19+
"boolean1": {
20+
Attr: &pb.CloudEventAttributeValue_CeBoolean{
21+
CeBoolean: true,
22+
},
23+
},
24+
"bytes1": {
25+
Attr: &pb.CloudEventAttributeValue_CeBytes{
26+
CeBytes: []byte("some bytes"),
27+
},
28+
},
29+
"integer1": {
30+
Attr: &pb.CloudEventAttributeValue_CeInteger{
31+
CeInteger: -42,
32+
},
33+
},
34+
"string1": {
35+
Attr: &pb.CloudEventAttributeValue_CeString{
36+
CeString: "string1",
37+
},
38+
},
39+
"timestamp1": {
40+
Attr: &pb.CloudEventAttributeValue_CeTimestamp{
41+
CeTimestamp: timestamppb.New(time.Date(2024, 12, 20, 10, 40, 15, 0, time.UTC)),
42+
},
43+
},
44+
"uri1": {
45+
Attr: &pb.CloudEventAttributeValue_CeUri{
46+
CeUri: "uri1",
47+
},
48+
},
49+
},
50+
Data: &pb.CloudEvent_TextData{
51+
TextData: "text1",
52+
},
53+
}
54+
out, err := MarshalEvent(in)
55+
require.NoError(t, err)
56+
fmt.Println(string(out))
57+
}

api/http/pub/logging.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package pub
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"github.com/awakari/int-bluesky/util"
7+
"github.com/cloudevents/sdk-go/binding/format/protobuf/v2/pb"
8+
"log/slog"
9+
)
10+
11+
type logging struct {
12+
svc Service
13+
log *slog.Logger
14+
}
15+
16+
func NewLogging(svc Service, log *slog.Logger) Service {
17+
return logging{
18+
svc: svc,
19+
log: log,
20+
}
21+
}
22+
23+
func (l logging) Publish(ctx context.Context, evt *pb.CloudEvent, groupId, userId string) (err error) {
24+
err = l.svc.Publish(ctx, evt, groupId, userId)
25+
l.log.Log(ctx, util.LogLevel(err), fmt.Sprintf("pub.Publish(%s, %s, %s): err=%s", evt.Id, groupId, userId, err))
26+
return
27+
}

api/http/pub/mock.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package pub
2+
3+
import (
4+
"context"
5+
"errors"
6+
"github.com/cloudevents/sdk-go/binding/format/protobuf/v2/pb"
7+
)
8+
9+
type mock struct {
10+
}
11+
12+
func NewMock() Service {
13+
return mock{}
14+
}
15+
16+
func (m mock) Publish(ctx context.Context, evt *pb.CloudEvent, groupId, userId string) (err error) {
17+
switch userId {
18+
case "fail":
19+
err = errors.New("fail")
20+
case "noack":
21+
err = ErrNoAck
22+
}
23+
return
24+
}

api/http/pub/service.go

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
package pub
2+
3+
import (
4+
"bytes"
5+
"context"
6+
"errors"
7+
"fmt"
8+
"github.com/awakari/int-bluesky/model"
9+
"github.com/bytedance/sonic"
10+
"github.com/cloudevents/sdk-go/binding/format/protobuf/v2/pb"
11+
"io"
12+
"net/http"
13+
"time"
14+
)
15+
16+
type Service interface {
17+
Publish(ctx context.Context, evt *pb.CloudEvent, groupId, userId string) (err error)
18+
}
19+
20+
type service struct {
21+
clientHttp *http.Client
22+
url string
23+
token string
24+
timeout time.Duration
25+
}
26+
27+
type payloadResp struct {
28+
AckCount uint32 `json:"ackCount"`
29+
}
30+
31+
const valContentTypeJson = "application/json"
32+
33+
var ErrNoAck = errors.New("publishing is not acknowledged")
34+
var ErrNoAuth = errors.New("unauthenticated request")
35+
var ErrInvalid = errors.New("invalid request")
36+
var ErrLimitReached = errors.New("publishing limit reached")
37+
38+
func NewService(clientHttp *http.Client, url, token string, timeout time.Duration) Service {
39+
return service{
40+
clientHttp: clientHttp,
41+
url: url,
42+
token: token,
43+
timeout: timeout,
44+
}
45+
}
46+
47+
func (svc service) Publish(ctx context.Context, evt *pb.CloudEvent, groupId, userId string) (err error) {
48+
49+
var reqData []byte
50+
reqData, err = MarshalEvent(evt)
51+
52+
var req *http.Request
53+
if err == nil {
54+
ctxTimeout, cancel := context.WithTimeout(ctx, svc.timeout)
55+
defer cancel()
56+
req, err = http.NewRequestWithContext(ctxTimeout, http.MethodPost, svc.url, bytes.NewReader(reqData))
57+
}
58+
59+
var resp *http.Response
60+
if err == nil {
61+
req.Header.Add("Accept", valContentTypeJson)
62+
req.Header.Add("Authorization", "Bearer "+svc.token)
63+
req.Header.Add("Content-Type", valContentTypeJson)
64+
req.Header.Add(model.KeyGroupId, groupId)
65+
req.Header.Add(model.KeyUserId, userId)
66+
resp, err = svc.clientHttp.Do(req)
67+
}
68+
69+
if err == nil {
70+
switch resp.StatusCode {
71+
case http.StatusServiceUnavailable:
72+
err = fmt.Errorf("%w: %s", ErrNoAck, evt.Id)
73+
case http.StatusUnauthorized:
74+
err = ErrNoAuth
75+
case http.StatusRequestTimeout:
76+
err = fmt.Errorf("%w: %s", ErrNoAck, evt.Id)
77+
case http.StatusBadRequest:
78+
err = fmt.Errorf("%w: %s", ErrInvalid, evt.Id)
79+
case http.StatusTooManyRequests:
80+
err = fmt.Errorf("%w: %s", ErrLimitReached, evt.Id)
81+
}
82+
}
83+
84+
var respData []byte
85+
if err == nil {
86+
defer resp.Body.Close()
87+
respData, err = io.ReadAll(resp.Body)
88+
}
89+
90+
var p payloadResp
91+
if err == nil {
92+
err = sonic.Unmarshal(respData, &p)
93+
}
94+
95+
if err == nil && p.AckCount < 1 {
96+
err = fmt.Errorf("%w: %s", ErrNoAck, evt.Id)
97+
}
98+
99+
return
100+
}

config/config.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package config
22

33
import (
44
"github.com/kelseyhightower/envconfig"
5+
"time"
56
)
67

78
type Config struct {
@@ -14,7 +15,7 @@ type Config struct {
1415
type ApiConfig struct {
1516
Bluesky struct {
1617
App struct {
17-
Id string `required:"true" envconfig:"API_BLUESKY_APP_ID" default:"awakari"`
18+
Id string `required:"true" envconfig:"API_BLUESKY_APP_ID" default:"bluesky.awakari.com"`
1819
Password string `required:"true" envconfig:"API_BLUESKY_APP_PASSWORD"`
1920
}
2021
}
@@ -30,12 +31,18 @@ type ApiConfig struct {
3031
Uri string `envconfig:"API_INTERESTS_URI" required:"true" default:"http://interests-api:8080/v1"`
3132
DetailsUriPrefix string `envconfig:"API_INTERESTS_DETAILS_URI_PREFIX" required:"true" default:"https://awakari.com/sub-details.html?id="`
3233
}
34+
Writer struct {
35+
Backoff time.Duration `envconfig:"API_WRITER_BACKOFF" default:"10s" required:"true"`
36+
Timeout time.Duration `envconfig:"API_WRITER_TIMEOUT" default:"10s" required:"true"`
37+
Uri string `envconfig:"API_WRITER_URI" default:"http://pub:8080/v1" required:"true"`
38+
}
3339
Reader ReaderConfig
3440
Prometheus PrometheusConfig
3541
Queue QueueConfig
3642
Token struct {
3743
Internal string `envconfig:"API_TOKEN_INTERNAL" required:"true"`
3844
}
45+
GroupId string `envconfig:"API_GROUP_ID" default:"default" required:"true"`
3946
}
4047

4148
type PrometheusConfig struct {
@@ -69,6 +76,11 @@ type QueueConfig struct {
6976
Name string `envconfig:"API_QUEUE_INTERESTS_UPDATED_NAME" default:"int-bluesky" required:"true"`
7077
Subj string `envconfig:"API_QUEUE_INTERESTS_UPDATED_SUBJ" default:"interests-updated" required:"true"`
7178
}
79+
SourceWebsocket struct {
80+
BatchSize uint32 `envconfig:"API_QUEUE_SRC_WEBSOCKET_BATCH_SIZE" default:"100" required:"true"`
81+
Name string `envconfig:"API_QUEUE_SRC_WEBSOCKET_NAME" default:"int-bluesky" required:"true"`
82+
Subj string `envconfig:"API_QUEUE_SRC_WEBSOCKET_SUBJ" default:"source-websocket-bluesky" required:"true"`
83+
}
7284
}
7385

7486
func NewConfigFromEnv() (cfg Config, err error) {

0 commit comments

Comments
 (0)