Skip to content

Commit ae67b63

Browse files
committed
refactor(metadata): replace PostgreSQL DLQ with Watermill Kafka DLQ
- Remove PostgreSQL DLQ implementation (repository/dlq, migrations, archive.go) - Enable Watermill built-in Kafka DLQ (WATERMILL_DLQ_ENABLED=true) - Simplify subscribe.go: remove custom DLQ logic, rely on Watermill retry/DLQ - Remove STORE_POSTGRES_URI secret from metadata Helm values - Update wire.go: remove NewDLQStore and rawEvents parameter - Remove ADR-0042 (PostgreSQL DLQ is no longer used)
1 parent 2e9eddc commit ae67b63

File tree

9 files changed

+68
-105
lines changed

9 files changed

+68
-105
lines changed

boundaries/common/ops/common/templates/store/configmap.yaml

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,27 +9,42 @@ data:
99
-- link database
1010
\c link;
1111
CREATE SCHEMA AUTHORIZATION link;
12-
GRANT ALL PRIVILEGES ON DATABASE link TO link;
1312
ALTER DATABASE link OWNER TO link;
1413
1514
GRANT CONNECT ON DATABASE link TO grafana;
1615
GRANT USAGE ON SCHEMA link TO grafana;
17-
ALTER DEFAULT PRIVILEGES IN SCHEMA link GRANT SELECT ON TABLES TO grafana;
16+
ALTER DEFAULT PRIVILEGES FOR ROLE link IN SCHEMA link GRANT SELECT ON TABLES TO grafana;
17+
ALTER DEFAULT PRIVILEGES FOR ROLE link IN SCHEMA link GRANT USAGE ON SEQUENCES TO grafana;
1818
19+
-- statement_timeout applies globally to all connections (including Grafana and maintenance)
1920
ALTER DATABASE link SET statement_timeout = '180s';
2021
2122
-- proxy database
2223
\c proxy;
2324
CREATE SCHEMA AUTHORIZATION proxy;
24-
GRANT ALL PRIVILEGES ON DATABASE proxy TO proxy;
2525
ALTER DATABASE proxy OWNER TO proxy;
2626
2727
GRANT CONNECT ON DATABASE proxy TO grafana;
2828
GRANT USAGE ON SCHEMA proxy TO grafana;
29-
ALTER DEFAULT PRIVILEGES IN SCHEMA proxy GRANT SELECT ON TABLES TO grafana;
29+
ALTER DEFAULT PRIVILEGES FOR ROLE proxy IN SCHEMA proxy GRANT SELECT ON TABLES TO grafana;
30+
ALTER DEFAULT PRIVILEGES FOR ROLE proxy IN SCHEMA proxy GRANT USAGE ON SEQUENCES TO grafana;
3031
32+
-- statement_timeout applies globally to all connections (including Grafana and maintenance)
3133
ALTER DATABASE proxy SET statement_timeout = '180s';
3234
35+
-- metadata database
36+
\c metadata;
37+
CREATE SCHEMA AUTHORIZATION metadata;
38+
ALTER DATABASE metadata OWNER TO metadata;
39+
40+
GRANT CONNECT ON DATABASE metadata TO grafana;
41+
GRANT USAGE ON SCHEMA metadata TO grafana;
42+
ALTER DEFAULT PRIVILEGES FOR ROLE metadata IN SCHEMA metadata GRANT SELECT ON TABLES TO grafana;
43+
ALTER DEFAULT PRIVILEGES FOR ROLE metadata IN SCHEMA metadata GRANT USAGE ON SEQUENCES TO grafana;
44+
45+
-- statement_timeout applies globally to all connections (including Grafana and maintenance)
46+
ALTER DATABASE metadata SET statement_timeout = '180s';
47+
3348
-- Grant grafana access to pg_statio_user_tables via pg_read_all_stats role
3449
GRANT pg_read_all_stats TO grafana;
3550
GRANT pg_read_all_settings TO grafana;

boundaries/common/ops/common/templates/store/postgres.yaml

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -98,12 +98,12 @@ spec:
9898
max_parallel_workers: 8
9999
max_parallel_maintenance_workers: 4
100100
# Synchronous Replication
101-
synchronous_commit: 'on'
101+
synchronous_commit: "on"
102102
# Partitioning
103-
enable_partition_pruning: 'on'
103+
enable_partition_pruning: "on"
104104
# TODO: enabled pgrouting,timescaledb,postgis,pgmonitor_bgw
105105
shared_preload_libraries: pg_stat_statements,auto_explain,pgaudit,pg_prewarm,uuid-ossp,pg_partman_bgw
106-
pgmonitor_bgw.dbname: postgres,link
106+
pgmonitor_bgw.dbname: postgres,link,metadata
107107
pgmonitor_bgw.role: "postgres"
108108
syncPeriodSeconds: 60
109109
leaderLeaseDurationSeconds: 300
@@ -130,3 +130,7 @@ spec:
130130
- name: proxy
131131
databases:
132132
- proxy
133+
- name: metadata
134+
databases:
135+
- metadata
136+
options: "SUPERUSER"

boundaries/link/ops/link/values.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ deploy:
7070
WATERMILL_KAFKA_BROKERS: shortlink-kafka-bootstrap.kafka.svc.cluster.local:9092
7171
WATERMILL_KAFKA_CLIENT_ID: shortlink-link
7272
WATERMILL_KAFKA_CONSUMER_GROUP: shortlink-link
73-
WATERMILL_DLQ_ENABLED: false
73+
WATERMILL_DLQ_ENABLED: true
7474
WATERMILL_DLQ_MAX_RETRIES: 5
7575
TRACER_URI: grafana-tempo.grafana:4317
7676
GRPC_CLIENT_HOST: istio-ingress.istio-ingress

boundaries/metadata/internal/di/wire.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,12 @@ import (
2222
"github.com/shortlink-org/go-sdk/flight_trace"
2323
rpc "github.com/shortlink-org/go-sdk/grpc"
2424
"github.com/shortlink-org/go-sdk/logger"
25-
"github.com/shortlink-org/go-sdk/watermill"
26-
watermill_kafka "github.com/shortlink-org/go-sdk/watermill/backends/kafka"
2725
"github.com/shortlink-org/go-sdk/observability/metrics"
2826
"github.com/shortlink-org/go-sdk/observability/profiling"
2927
"github.com/shortlink-org/go-sdk/observability/tracing"
3028
"github.com/shortlink-org/go-sdk/s3"
29+
"github.com/shortlink-org/go-sdk/watermill"
30+
watermill_kafka "github.com/shortlink-org/go-sdk/watermill/backends/kafka"
3131
"go.opentelemetry.io/otel/metric"
3232
api "go.opentelemetry.io/otel/sdk/metric"
3333
"go.opentelemetry.io/otel/trace"

boundaries/metadata/internal/infrastructure/mq/mq.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,11 @@ import (
1313
type Event struct {
1414
subscriber message.Subscriber
1515
metadataUC *metadata_uc.UC
16-
rawEvents RawEventsStore
1716
}
1817

1918
func New(subscriber message.Subscriber, metadataUC *metadata_uc.UC) (*Event, error) {
2019
return &Event{
2120
subscriber: subscriber,
2221
metadataUC: metadataUC,
23-
rawEvents: newInMemoryRawEventsStore(),
2422
}, nil
2523
}

boundaries/metadata/internal/infrastructure/mq/raw_events_store.go

Lines changed: 0 additions & 69 deletions
This file was deleted.

boundaries/metadata/internal/infrastructure/mq/subscribe.go

Lines changed: 31 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import (
77
"log/slog"
88

99
linkpb "buf.build/gen/go/shortlink-org/shortlink-link-link/protocolbuffers/go/domain/link/v1"
10-
"github.com/ThreeDotsLabs/watermill/message"
1110
"github.com/shortlink-org/go-sdk/cqrs/bus"
1211
cqrsmessage "github.com/shortlink-org/go-sdk/cqrs/message"
1312
"github.com/shortlink-org/go-sdk/logger"
@@ -39,30 +38,51 @@ func (e *Event) SubscribeLinkCreated(ctx context.Context, log logger.Logger, reg
3938
}
4039

4140
go func(ctx context.Context) {
41+
defer func() {
42+
if r := recover(); r != nil {
43+
log.ErrorWithContext(ctx, "panic in LinkCreated subscriber",
44+
slog.Any("recover", r),
45+
slog.String("topic", linkCreatedEvent),
46+
)
47+
}
48+
}()
49+
4250
for msg := range messages {
4351
msgCtx := msg.Context() //nolint:contextcheck // inherit context from Watermill message
4452
if msgCtx == nil {
4553
msgCtx = ctx
4654
}
4755

56+
// Validate payload before unmarshaling
57+
if len(msg.Payload) == 0 {
58+
log.ErrorWithContext(msgCtx, "Received empty payload for link created event - nacking for Kafka DLQ",
59+
slog.String("topic", linkCreatedEvent),
60+
slog.String("message_uuid", msg.UUID),
61+
)
62+
63+
msg.Nack()
64+
continue
65+
}
66+
4867
// Create typed event instance directly - no reflect.New needed
4968
// We know the type is *linkpb.LinkCreated from the subscription
5069
event := &linkpb.LinkCreated{}
5170

5271
// Unmarshal using ProtoMarshaler (handles metadata extraction)
53-
watermillMsg := message.NewMessage(msg.UUID, msg.Payload)
54-
watermillMsg.Metadata = msg.Metadata
55-
watermillMsg.SetContext(msgCtx) //nolint:contextcheck // watermill message already carries derived context
72+
// msg is already *message.Message from Watermill, just update context if needed
73+
msg.SetContext(msgCtx) //nolint:contextcheck // update context for unmarshaling
5674

57-
unmarshalErr := marshaler.Unmarshal(watermillMsg, event)
75+
unmarshalErr := marshaler.Unmarshal(msg, event)
5876
if unmarshalErr != nil {
59-
log.ErrorWithContext(msgCtx, "Failed to unmarshal event using marshaler", //nolint:contextcheck // logging must use message context
77+
log.ErrorWithContext(msgCtx, "Failed to unmarshal event using marshaler - nacking for Kafka DLQ",
6078
slog.String("error", unmarshalErr.Error()),
6179
slog.String("topic", linkCreatedEvent),
80+
slog.Int("payload_size", len(msg.Payload)),
81+
slog.Int("metadata_count", len(msg.Metadata)),
82+
slog.String("message_uuid", msg.UUID),
6283
)
63-
e.archiveRawEvent(msgCtx, msg, linkCreatedEvent, fmt.Sprintf("unmarshal: %v", unmarshalErr))
64-
msg.Nack()
6584

85+
msg.Nack()
6686
continue
6787
}
6888

@@ -72,19 +92,18 @@ func (e *Event) SubscribeLinkCreated(ctx context.Context, log logger.Logger, reg
7292
var domainErr *domainerrors.Error
7393
if errors.As(handleErr, &domainErr) {
7494
dto := infraerrors.FromDomainError("metadata.mq.link_created", domainErr)
75-
log.ErrorWithContext(msgCtx, "Failed to handle link created event",
95+
log.ErrorWithContext(msgCtx, "Failed to handle link created event - nacking for Kafka DLQ",
7696
slog.String("error_code", dto.Code),
7797
slog.String("topic", linkCreatedEvent),
7898
slog.Bool("retryable", dto.Retryable),
7999
slog.String("message", dto.Message),
80100
)
81-
e.archiveRawEvent(msgCtx, msg, linkCreatedEvent, fmt.Sprintf("domain: %s", dto.Message))
82101
} else {
83-
log.ErrorWithContext(msgCtx, "Failed to handle link created event",
102+
log.ErrorWithContext(msgCtx, "Failed to handle link created event - nacking for Kafka DLQ",
84103
slog.String("error", handleErr.Error()),
85104
slog.String("topic", linkCreatedEvent),
105+
slog.Bool("retryable", true),
86106
)
87-
e.archiveRawEvent(msgCtx, msg, linkCreatedEvent, fmt.Sprintf("handler: %v", handleErr))
88107
}
89108

90109
msg.Nack()
@@ -145,15 +164,3 @@ func (e *Event) handleLinkCreated(ctx context.Context, event *linkpb.LinkCreated
145164
return nil
146165
}
147166

148-
func (e *Event) archiveRawEvent(ctx context.Context, msg *message.Message, topic, reason string) {
149-
if e == nil || e.rawEvents == nil || msg == nil {
150-
return
151-
}
152-
153-
_ = e.rawEvents.Save(ctx, RawEventRecord{
154-
Topic: topic,
155-
Payload: cloneBytes(msg.Payload),
156-
Metadata: cloneMetadata(msg.Metadata),
157-
Error: reason,
158-
})
159-
}

boundaries/metadata/ops/metadata/values.schema.json

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,12 @@
3737
},
3838
"WATERMILL_KAFKA_CONSUMER_GROUP": {
3939
"type": "string"
40+
},
41+
"WATERMILL_DLQ_ENABLED": {
42+
"type": "boolean"
43+
},
44+
"WATERMILL_DLQ_MAX_RETRIES": {
45+
"type": "integer"
4046
}
4147
},
4248
"type": "object"

boundaries/metadata/ops/metadata/values.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ deploy:
5151
WATERMILL_KAFKA_BROKERS: "shortlink-kafka-bootstrap.kafka:9092"
5252
WATERMILL_KAFKA_CLIENT_ID: "shortlink-metadata"
5353
WATERMILL_KAFKA_CONSUMER_GROUP: "shortlink-metadata"
54+
WATERMILL_DLQ_ENABLED: true
55+
WATERMILL_DLQ_MAX_RETRIES: 3
5456
TRACER_URI: grafana-tempo.grafana:4317
5557
GRPC_CLIENT_HOST: istio-ingress.istio-ingress
5658

0 commit comments

Comments
 (0)