Skip to content

Commit b3df6e2

Browse files
committed
Upgrade go-sdk to 05d50a0 and migrate metadata from mq.MQ to watermill
- Upgrade all go-sdk dependencies to commit 05d50a0 - Add wire.Value for empty watermill.Option slice in link module - Replace mq.MQ with watermill Publisher/Subscriber in metadata - Update metadata MQ infrastructure to use watermill directly - Regenerate wire_gen.go files for both modules
1 parent 446cb83 commit b3df6e2

File tree

8 files changed

+104
-81
lines changed

8 files changed

+104
-81
lines changed

boundaries/link/internal/di/wire.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ var LinkSet = wire.NewSet(
104104
// Delivery
105105
wire.Bind(new(watermill.Backend), new(*watermill_kafka.Backend)),
106106
watermill_kafka.New,
107+
wire.Value([]watermill.Option{}),
107108
watermill.New,
108109
wire.FieldsOf(new(*watermill.Client), "Publisher"),
109110
rpc.InitServer,

boundaries/link/internal/di/wire_gen.go

Lines changed: 14 additions & 33 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

boundaries/metadata/go.mod

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ go 1.25.4
55
require (
66
buf.build/gen/go/shortlink-org/shortlink-link-link/protocolbuffers/go v1.36.10-20251123201407-1a9472e15af6.1
77
github.com/PuerkitoBio/goquery v1.11.0
8+
github.com/ThreeDotsLabs/watermill v1.5.1
89
github.com/chromedp/chromedp v0.14.2
910
github.com/google/wire v0.7.0
1011
github.com/minio/minio-go/v7 v7.0.97
@@ -25,8 +26,11 @@ require (
2526
github.com/shortlink-org/go-sdk/observability v0.0.0-20251125180156-05d50a07cca8
2627
github.com/shortlink-org/go-sdk/s3 v0.0.0-20251125180156-05d50a07cca8
2728
github.com/shortlink-org/go-sdk/saga v0.0.0-20251125180156-05d50a07cca8
29+
github.com/shortlink-org/go-sdk/watermill v0.0.0-20251125180156-05d50a07cca8
2830
github.com/spf13/viper v1.21.0
2931
github.com/stretchr/testify v1.11.1
32+
go.opentelemetry.io/otel/metric v1.38.0
33+
go.opentelemetry.io/otel/sdk/metric v1.38.0
3034
go.opentelemetry.io/otel/trace v1.38.0
3135
google.golang.org/grpc v1.77.0
3236
google.golang.org/protobuf v1.36.10
@@ -103,6 +107,7 @@ require (
103107
github.com/klauspost/cpuid/v2 v2.3.0 // indirect
104108
github.com/klauspost/crc32 v1.3.0 // indirect
105109
github.com/lib/pq v1.10.9 // indirect
110+
github.com/lithammer/shortuuid/v3 v3.0.7 // indirect
106111
github.com/mattn/go-sqlite3 v1.14.32 // indirect
107112
github.com/minio/crc64nvme v1.1.1 // indirect
108113
github.com/minio/md5-simd v1.1.2 // indirect
@@ -112,6 +117,7 @@ require (
112117
github.com/nats-io/nkeys v0.4.11 // indirect
113118
github.com/nats-io/nuid v1.0.1 // indirect
114119
github.com/neo4j/neo4j-go-driver/v5 v5.28.4 // indirect
120+
github.com/oklog/ulid v1.3.1 // indirect
115121
github.com/onsi/ginkgo/v2 v2.23.4 // indirect
116122
github.com/onsi/gomega v1.38.0 // indirect
117123
github.com/ory/client-go v1.22.11 // indirect
@@ -136,6 +142,7 @@ require (
136142
github.com/sagikazarmark/locafero v0.12.0 // indirect
137143
github.com/samber/lo v1.52.0 // indirect
138144
github.com/segmentio/asm v1.2.0 // indirect
145+
github.com/sony/gobreaker v1.0.0 // indirect
139146
github.com/spf13/afero v1.15.0 // indirect
140147
github.com/spf13/cast v1.10.0 // indirect
141148
github.com/spf13/cobra v1.10.1 // indirect
@@ -162,9 +169,7 @@ require (
162169
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.38.0 // indirect
163170
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.38.0 // indirect
164171
go.opentelemetry.io/otel/exporters/prometheus v0.60.0 // indirect
165-
go.opentelemetry.io/otel/metric v1.38.0 // indirect
166172
go.opentelemetry.io/otel/sdk v1.38.0 // indirect
167-
go.opentelemetry.io/otel/sdk/metric v1.38.0 // indirect
168173
go.opentelemetry.io/proto/otlp v1.7.1 // indirect
169174
go.uber.org/atomic v1.11.0 // indirect
170175
go.yaml.in/yaml/v2 v2.4.3 // indirect

boundaries/metadata/go.sum

Lines changed: 11 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

boundaries/metadata/internal/di/wire.go

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ package metadata_di
1111
import (
1212
"context"
1313

14+
"github.com/ThreeDotsLabs/watermill/message"
1415
"github.com/google/wire"
1516
"github.com/shortlink-org/go-sdk/auth/permission"
1617
"github.com/shortlink-org/go-sdk/cache"
@@ -21,12 +22,15 @@ import (
2122
"github.com/shortlink-org/go-sdk/flight_trace"
2223
rpc "github.com/shortlink-org/go-sdk/grpc"
2324
"github.com/shortlink-org/go-sdk/logger"
24-
"github.com/shortlink-org/go-sdk/mq"
2525
"github.com/shortlink-org/go-sdk/notify"
26+
"github.com/shortlink-org/go-sdk/watermill"
27+
watermill_kafka "github.com/shortlink-org/go-sdk/watermill/backends/kafka"
2628
"github.com/shortlink-org/go-sdk/observability/metrics"
2729
"github.com/shortlink-org/go-sdk/observability/profiling"
2830
"github.com/shortlink-org/go-sdk/observability/tracing"
2931
"github.com/shortlink-org/go-sdk/s3"
32+
"go.opentelemetry.io/otel/metric"
33+
api "go.opentelemetry.io/otel/sdk/metric"
3034
"go.opentelemetry.io/otel/trace"
3135

3236
metadata_domain "github.com/shortlink-org/shortlink/boundaries/metadata/internal/domain/metadata/v1"
@@ -78,11 +82,16 @@ var DefaultSet = wire.NewSet(
7882
var MetaDataSet = wire.NewSet(
7983
DefaultSet,
8084
permission.New,
81-
mq.New,
85+
wire.FieldsOf(new(*metrics.Monitoring), "Prometheus", "Metrics"),
86+
wire.Bind(new(metric.MeterProvider), new(*api.MeterProvider)),
8287
db.New,
88+
wire.Bind(new(watermill.Backend), new(*watermill_kafka.Backend)),
89+
watermill_kafka.New,
90+
wire.Value([]watermill.Option{}),
91+
watermill.New,
92+
wire.FieldsOf(new(*watermill.Client), "Publisher", "Subscriber"),
8393
rpc.InitServer,
8494
s3.New,
85-
wire.FieldsOf(new(*metrics.Monitoring), "Prometheus", "Metrics"),
8695

8796
// Delivery
8897
InitMetadataMQ,
@@ -100,8 +109,8 @@ var MetaDataSet = wire.NewSet(
100109
NewMetaDataService,
101110
)
102111

103-
func InitMetadataMQ(ctx context.Context, log logger.Logger, dataBus mq.MQ, metadataUC *metadata.UC) (*metadata_mq.Event, error) {
104-
metadataMQ, err := metadata_mq.New(dataBus, metadataUC)
112+
func InitMetadataMQ(ctx context.Context, log logger.Logger, publisher message.Publisher, subscriber message.Subscriber, metadataUC *metadata.UC) (*metadata_mq.Event, error) {
113+
metadataMQ, err := metadata_mq.New(publisher, subscriber, metadataUC)
105114
if err != nil {
106115
return nil, err
107116
}
@@ -110,7 +119,7 @@ func InitMetadataMQ(ctx context.Context, log logger.Logger, dataBus mq.MQ, metad
110119
notify.Subscribe(metadata_domain.METHOD_ADD, metadataMQ)
111120

112121
// Subscribe to link creation events from Kafka
113-
if err := metadataMQ.SubscribeLinkCreated(log); err != nil {
122+
if err := metadataMQ.SubscribeLinkCreated(ctx, log); err != nil {
114123
return nil, err
115124
}
116125

0 commit comments

Comments
 (0)