Skip to content

Commit e32f29a

Browse files
committed
feat: utilize subscriptions v2 API
1 parent 463e491 commit e32f29a

File tree

11 files changed

+147
-103
lines changed

11 files changed

+147
-103
lines changed

api/http/reader/callback.go

Lines changed: 0 additions & 12 deletions
This file was deleted.

api/http/reader/logging.go

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"github.com/awakari/int-bluesky/util"
77
"github.com/cloudevents/sdk-go/binding/format/protobuf/v2/pb"
88
"log/slog"
9+
"time"
910
)
1011

1112
type serviceLogging struct {
@@ -20,29 +21,29 @@ func NewServiceLogging(svc Service, log *slog.Logger) Service {
2021
}
2122
}
2223

23-
func (sl serviceLogging) CreateCallback(ctx context.Context, subId, url string) (err error) {
24-
err = sl.svc.CreateCallback(ctx, subId, url)
24+
func (sl serviceLogging) Subscribe(ctx context.Context, interestId, groupId, userId, url string, interval time.Duration) (err error) {
25+
err = sl.svc.Subscribe(ctx, interestId, groupId, userId, url, interval)
2526
ll := util.LogLevel(err)
26-
sl.log.Log(ctx, ll, fmt.Sprintf("reader.CreateCallback(%s, %s): err=%s", subId, url, err))
27+
sl.log.Log(ctx, ll, fmt.Sprintf("reader.Subscribe(%s, %s): err=%s", interestId, url, err))
2728
return
2829
}
2930

30-
func (sl serviceLogging) GetCallback(ctx context.Context, subId, url string) (cb Callback, err error) {
31-
cb, err = sl.svc.GetCallback(ctx, subId, url)
31+
func (sl serviceLogging) Subscription(ctx context.Context, interestId, groupId, userId, url string) (cb Subscription, err error) {
32+
cb, err = sl.svc.Subscription(ctx, interestId, groupId, userId, url)
3233
ll := util.LogLevel(err)
33-
sl.log.Log(ctx, ll, fmt.Sprintf("reader.GetCallback(%s, %s): %+v, err=%s", subId, url, cb, err))
34+
sl.log.Log(ctx, ll, fmt.Sprintf("reader.Subscription(%s, %s): %+v, err=%s", interestId, url, cb, err))
3435
return
3536
}
3637

37-
func (sl serviceLogging) DeleteCallback(ctx context.Context, subId, url string) (err error) {
38-
err = sl.svc.DeleteCallback(ctx, subId, url)
38+
func (sl serviceLogging) Unsubscribe(ctx context.Context, interestId, groupId, userId, url string) (err error) {
39+
err = sl.svc.Unsubscribe(ctx, interestId, groupId, userId, url)
3940
ll := util.LogLevel(err)
40-
sl.log.Log(ctx, ll, fmt.Sprintf("reader.DeleteCallback(%s, %s): err=%s", subId, url, err))
41+
sl.log.Log(ctx, ll, fmt.Sprintf("reader.Unsubscribe(%s, %s): err=%s", interestId, url, err))
4142
return
4243
}
4344

44-
func (sl serviceLogging) CountByInterest(ctx context.Context, interestId string) (count int64, err error) {
45-
count, err = sl.svc.CountByInterest(ctx, interestId)
45+
func (sl serviceLogging) CountByInterest(ctx context.Context, interestId, groupId, userId string) (count int64, err error) {
46+
count, err = sl.svc.CountByInterest(ctx, interestId, groupId, userId)
4647
ll := util.LogLevel(err)
4748
sl.log.Log(ctx, ll, fmt.Sprintf("reader.CountByInterest(%s): %d, err=%s", interestId, count, err))
4849
return

api/http/reader/mock.go

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,36 +3,37 @@ package reader
33
import (
44
"context"
55
"github.com/cloudevents/sdk-go/binding/format/protobuf/v2/pb"
6+
"time"
67
)
78

89
type mock struct {
910
}
1011

11-
func (m mock) Read(ctx context.Context, interestId string, limit int) (last []*pb.CloudEvent, err error) {
12-
//TODO implement me
13-
panic("implement me")
14-
}
15-
1612
func NewServiceMock() Service {
1713
return mock{}
1814
}
1915

20-
func (m mock) CreateCallback(ctx context.Context, subId, url string) (err error) {
16+
func (m mock) Subscribe(ctx context.Context, _, _, _, url string, interval time.Duration) (err error) {
2117
//TODO implement me
2218
panic("implement me")
2319
}
2420

25-
func (m mock) GetCallback(ctx context.Context, subId, url string) (cb Callback, err error) {
21+
func (m mock) Subscription(ctx context.Context, _, _, _, url string) (cb Subscription, err error) {
2622
//TODO implement me
2723
panic("implement me")
2824
}
2925

30-
func (m mock) DeleteCallback(ctx context.Context, subId, url string) (err error) {
26+
func (m mock) Unsubscribe(ctx context.Context, _, _, _, url string) (err error) {
3127
//TODO implement me
3228
panic("implement me")
3329
}
3430

35-
func (m mock) CountByInterest(ctx context.Context, interestId string) (count int64, err error) {
31+
func (m mock) CountByInterest(ctx context.Context, interestId, _, _ string) (count int64, err error) {
3632
count = 42
3733
return
3834
}
35+
36+
func (m mock) Read(ctx context.Context, interestId string, limit int) (last []*pb.CloudEvent, err error) {
37+
//TODO implement me
38+
panic("implement me")
39+
}

api/http/reader/service.go

Lines changed: 84 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -5,88 +5,123 @@ import (
55
"encoding/base64"
66
"errors"
77
"fmt"
8+
"github.com/awakari/int-bluesky/model"
89
"github.com/bytedance/sonic"
910
ceProto "github.com/cloudevents/sdk-go/binding/format/protobuf/v2"
1011
"github.com/cloudevents/sdk-go/binding/format/protobuf/v2/pb"
1112
ce "github.com/cloudevents/sdk-go/v2/event"
1213
"io"
1314
"net/http"
15+
"net/url"
16+
"strings"
17+
"time"
1418
)
1519

1620
type Service interface {
17-
CreateCallback(ctx context.Context, subId, url string) (err error)
18-
GetCallback(ctx context.Context, subId, url string) (cb Callback, err error)
19-
DeleteCallback(ctx context.Context, subId, url string) (err error)
20-
CountByInterest(ctx context.Context, interestId string) (count int64, err error)
21+
Subscribe(ctx context.Context, interestId, userId, groupId, url string, interval time.Duration) (err error)
22+
Unsubscribe(ctx context.Context, interestId, userId, groupId, url string) (err error)
23+
Subscription(ctx context.Context, interestId, userId, groupId, url string) (sub Subscription, err error)
24+
CountByInterest(ctx context.Context, interestId, userId, groupId string) (count int64, err error)
2125
Read(ctx context.Context, interestId string, limit int) (last []*pb.CloudEvent, err error)
2226
}
2327

2428
type service struct {
25-
clientHttp *http.Client
26-
uriBase string
29+
clientHttp *http.Client
30+
uriBase string
31+
tokenInternal string
2732
}
2833

2934
const keyHubCallback = "hub.callback"
3035
const KeyHubMode = "hub.mode"
3136
const KeyHubTopic = "hub.topic"
3237
const modeSubscribe = "subscribe"
3338
const modeUnsubscribe = "unsubscribe"
34-
const fmtTopicUri = "%s/sub/%s/%s"
39+
const fmtTopicUri = "%s/v1/sub/%s/%s"
3540
const FmtJson = "json"
36-
const fmtReadUri = "%s/sub/%s/%s?limit=%d"
41+
const fmtReadUri = "%s/v1/sub/%s/%s?limit=%d"
3742

3843
var ErrInternal = errors.New("internal failure")
3944
var ErrConflict = errors.New("conflict")
4045
var ErrNotFound = errors.New("not found")
4146

42-
func NewService(clientHttp *http.Client, uriBase string) Service {
47+
func NewService(clientHttp *http.Client, uriBase, tokenInternal string) Service {
4348
return service{
44-
clientHttp: clientHttp,
45-
uriBase: uriBase,
49+
clientHttp: clientHttp,
50+
uriBase: uriBase,
51+
tokenInternal: tokenInternal,
4652
}
4753
}
4854

49-
func (svc service) CreateCallback(ctx context.Context, subId, callbackUrl string) (err error) {
50-
err = svc.updateCallback(ctx, subId, callbackUrl, modeSubscribe)
55+
func (svc service) Subscribe(ctx context.Context, interestId, userId, groupId, urlCallback string, interval time.Duration) (err error) {
56+
err = svc.updateCallback(ctx, interestId, userId, groupId, urlCallback, modeSubscribe, interval)
5157
return
5258
}
5359

54-
func (svc service) CountByInterest(ctx context.Context, interestId string) (count int64, err error) {
55-
var req *http.Request
56-
req, err = http.NewRequestWithContext(ctx, http.MethodGet, fmt.Sprintf("%s/callbacks/list/%s", svc.uriBase, interestId), http.NoBody)
60+
func (svc service) Unsubscribe(ctx context.Context, interestId, userId, groupId, urlCallback string) (err error) {
61+
err = svc.updateCallback(ctx, interestId, userId, groupId, urlCallback, modeUnsubscribe, 0)
62+
return
63+
}
64+
65+
func (svc service) updateCallback(ctx context.Context, interestId, userId, groupId, urlCallback, mode string, interval time.Duration) (err error) {
66+
topicUri := fmt.Sprintf(fmtTopicUri, svc.uriBase, FmtJson, interestId)
67+
data := url.Values{
68+
keyHubCallback: {
69+
urlCallback,
70+
},
71+
KeyHubMode: {
72+
mode,
73+
},
74+
KeyHubTopic: {
75+
topicUri,
76+
},
77+
}
78+
reqUri := fmt.Sprintf("%s/v2?format=%s&interestId=%s", svc.uriBase, FmtJson, interestId)
79+
if interval > 0 && mode == modeSubscribe {
80+
reqUri += "&interval=" + interval.String()
81+
}
82+
83+
req, err := http.NewRequestWithContext(ctx, http.MethodPost, reqUri, strings.NewReader(data.Encode()))
5784
var resp *http.Response
5885
if err == nil {
86+
req.Header.Set("Authorization", "Bearer "+svc.tokenInternal)
87+
req.Header.Set(model.KeyGroupId, groupId)
88+
req.Header.Set(model.KeyUserId, userId)
89+
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
5990
resp, err = svc.clientHttp.Do(req)
6091
}
92+
6193
switch err {
6294
case nil:
63-
defer resp.Body.Close()
6495
switch resp.StatusCode {
65-
case http.StatusOK:
66-
var cbl CallbackList
67-
err = sonic.ConfigDefault.NewDecoder(resp.Body).Decode(&cbl)
68-
switch err {
69-
case nil:
70-
count = cbl.Count
71-
default:
72-
err = fmt.Errorf("%w: %s", ErrInternal, err)
73-
}
96+
case http.StatusAccepted, http.StatusNoContent:
7497
case http.StatusNotFound:
75-
err = ErrNotFound
98+
err = fmt.Errorf("%w: callback not found for the subscription %s", ErrConflict, interestId)
99+
case http.StatusConflict:
100+
err = fmt.Errorf("%w: callback already registered for the subscription %s", ErrConflict, interestId)
76101
default:
77-
err = fmt.Errorf("%w: response status %d", ErrInternal, resp.StatusCode)
102+
defer resp.Body.Close()
103+
respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 0x1000))
104+
err = fmt.Errorf("%w: unexpected create callback response %d, %s", ErrInternal, resp.StatusCode, string(respBody))
78105
}
79106
default:
80107
err = fmt.Errorf("%w: %s", ErrInternal, err)
81108
}
82109
return
83110
}
84111

85-
func (svc service) GetCallback(ctx context.Context, subId, url string) (cb Callback, err error) {
112+
func (svc service) Subscription(ctx context.Context, interestId, userId, groupId, url string) (cb Subscription, err error) {
86113
var req *http.Request
87-
req, err = http.NewRequestWithContext(ctx, http.MethodGet, fmt.Sprintf("%s/callbacks/%s/%s", svc.uriBase, subId, base64.URLEncoding.EncodeToString([]byte(url))), http.NoBody)
114+
req, err = http.NewRequestWithContext(
115+
ctx,
116+
http.MethodGet,
117+
fmt.Sprintf("%s/v2?interestId=%s&url=%s", svc.uriBase, interestId, base64.URLEncoding.EncodeToString([]byte(url))),
118+
http.NoBody,
119+
)
88120
var resp *http.Response
89121
if err == nil {
122+
req.Header.Set("Authorization", "Bearer "+svc.tokenInternal)
123+
req.Header.Set(model.KeyGroupId, groupId)
124+
req.Header.Set(model.KeyUserId, userId)
90125
resp, err = svc.clientHttp.Do(req)
91126
}
92127
switch err {
@@ -101,46 +136,38 @@ func (svc service) GetCallback(ctx context.Context, subId, url string) (cb Callb
101136
case http.StatusNotFound:
102137
err = ErrNotFound
103138
default:
104-
err = fmt.Errorf("%w: response status %d", ErrInternal, resp.StatusCode)
139+
body, _ := io.ReadAll(io.LimitReader(resp.Body, 0x1000))
140+
err = fmt.Errorf("%w: response %d, %s", ErrInternal, resp.StatusCode, string(body))
105141
}
106142
default:
107143
err = fmt.Errorf("%w: %s", ErrInternal, err)
108144
}
109145
return
110146
}
111147

112-
func (svc service) DeleteCallback(ctx context.Context, subId, callbackUrl string) (err error) {
113-
err = svc.updateCallback(ctx, subId, callbackUrl, modeUnsubscribe)
114-
return
115-
}
116-
117-
func (svc service) updateCallback(_ context.Context, subId, url, mode string) (err error) {
118-
topicUri := fmt.Sprintf(fmtTopicUri, svc.uriBase, FmtJson, subId)
119-
data := map[string][]string{
120-
keyHubCallback: {
121-
url,
122-
},
123-
KeyHubMode: {
124-
mode,
125-
},
126-
KeyHubTopic: {
127-
topicUri,
128-
},
129-
}
148+
func (svc service) CountByInterest(ctx context.Context, interestId, userId, groupId string) (count int64, err error) {
149+
var req *http.Request
150+
req, err = http.NewRequestWithContext(ctx, http.MethodGet, fmt.Sprintf("%s/v2?interestId=%s", svc.uriBase, interestId), http.NoBody)
130151
var resp *http.Response
131-
resp, err = svc.clientHttp.PostForm(topicUri, data)
152+
if err == nil {
153+
req.Header.Set("Authorization", "Bearer "+svc.tokenInternal)
154+
req.Header.Set(model.KeyGroupId, groupId)
155+
req.Header.Set(model.KeyUserId, userId)
156+
resp, err = svc.clientHttp.Do(req)
157+
}
132158
switch err {
133159
case nil:
160+
defer resp.Body.Close()
134161
switch resp.StatusCode {
135-
case http.StatusAccepted, http.StatusNoContent:
162+
case http.StatusOK:
163+
err = sonic.ConfigDefault.NewDecoder(resp.Body).Decode(&count)
164+
if err != nil {
165+
err = fmt.Errorf("%w: %s", ErrInternal, err)
166+
}
136167
case http.StatusNotFound:
137-
err = fmt.Errorf("%w: callback not found for the subscription %s", ErrConflict, subId)
138-
case http.StatusConflict:
139-
err = fmt.Errorf("%w: callback already registered for the subscription %s", ErrConflict, subId)
168+
err = ErrNotFound
140169
default:
141-
defer resp.Body.Close()
142-
respBody, _ := io.ReadAll(resp.Body)
143-
err = fmt.Errorf("%w: unexpected create callback response %d, %s", ErrInternal, resp.StatusCode, string(respBody))
170+
err = fmt.Errorf("%w: response status %d", ErrInternal, resp.StatusCode)
144171
}
145172
default:
146173
err = fmt.Errorf("%w: %s", ErrInternal, err)

api/http/reader/subscription.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
package reader
2+
3+
type Subscription struct {
4+
Url string `json:"url"`
5+
Format string `json:"fmt"`
6+
}

config/config.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ type PrometheusConfig struct {
6060
}
6161

6262
type ReaderConfig struct {
63-
Uri string `envconfig:"API_READER_URI" default:"http://reader:8080/v1" required:"true"`
63+
Uri string `envconfig:"API_READER_URI" default:"http://reader:8080" required:"true"`
6464
UriEventBase string `envconfig:"API_READER_URI_EVT_BASE" default:"https://awakari.com/pub-msg.html?id=" required:"true"`
6565
CallBack struct {
6666
Protocol string `envconfig:"API_READER_CALLBACK_PROTOCOL" default:"http" required:"true"`

go.mod

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,12 @@ module github.com/awakari/int-bluesky
33
go 1.24
44

55
require (
6-
github.com/bluesky-social/indigo v0.0.0-20250506174012-7075cf22f63e
6+
github.com/bluesky-social/indigo v0.0.0-20250520232546-236dd575c91e
77
github.com/bytedance/sonic v1.13.2
88
github.com/cloudevents/sdk-go/binding/format/protobuf/v2 v2.15.2
99
github.com/cloudevents/sdk-go/v2 v2.16.0
1010
github.com/fxamacker/cbor/v2 v2.8.0
11-
github.com/gin-gonic/gin v1.10.0
11+
github.com/gin-gonic/gin v1.10.1
1212
github.com/ipfs/go-block-format v0.2.1
1313
github.com/ipfs/go-cid v0.5.0
1414
github.com/ipld/go-car v0.6.2
@@ -19,7 +19,7 @@ require (
1919
github.com/processout/grpc-go-pool v1.2.1
2020
github.com/reiver/go-bsky v0.0.0-20240906205655-8c7fadb4f3bb
2121
github.com/stretchr/testify v1.10.0
22-
google.golang.org/grpc v1.72.0
22+
google.golang.org/grpc v1.72.1
2323
google.golang.org/protobuf v1.36.6
2424
)
2525

@@ -85,10 +85,10 @@ require (
8585
github.com/whyrusleeping/cbor-gen v0.3.1 // indirect
8686
github.com/x448/float16 v0.8.4 // indirect
8787
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
88-
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.60.0 // indirect
89-
go.opentelemetry.io/otel v1.35.0 // indirect
90-
go.opentelemetry.io/otel/metric v1.35.0 // indirect
91-
go.opentelemetry.io/otel/trace v1.35.0 // indirect
88+
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.61.0 // indirect
89+
go.opentelemetry.io/otel v1.36.0 // indirect
90+
go.opentelemetry.io/otel/metric v1.36.0 // indirect
91+
go.opentelemetry.io/otel/trace v1.36.0 // indirect
9292
go.uber.org/atomic v1.11.0 // indirect
9393
go.uber.org/multierr v1.11.0 // indirect
9494
go.uber.org/zap v1.27.0 // indirect
@@ -98,7 +98,7 @@ require (
9898
golang.org/x/sys v0.33.0 // indirect
9999
golang.org/x/text v0.25.0 // indirect
100100
golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da // indirect
101-
google.golang.org/genproto/googleapis/rpc v0.0.0-20250505200425-f936aa4a68b2 // indirect
101+
google.golang.org/genproto/googleapis/rpc v0.0.0-20250519155744-55703ea1f237 // indirect
102102
gopkg.in/yaml.v3 v3.0.1 // indirect
103-
lukechampine.com/blake3 v1.4.0 // indirect
103+
lukechampine.com/blake3 v1.4.1 // indirect
104104
)

0 commit comments

Comments
 (0)