Skip to content

Commit 0fa903f

Browse files
author
Dimy Jeannot
committed
fix: added additional error handling; preparing for batch handling
1 parent 69274a9 commit 0fa903f

File tree

19 files changed

+699
-130
lines changed

19 files changed

+699
-130
lines changed

go/oeco-sdk/v2beta/CHANGELOG.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,11 @@
22

33
### 🩹 Fixes
44

5-
- added global panic recovery and correlation id to requests ([88c5a59](https://github.com/openecosystems/ecosystem/commit/88c5a59))
5+
- added global panic recovery and correlation id to requests ([88c5a59](https://github.com/openecosystems/ecosystem/commit/88c5a59))
66

77
### ❤️ Thank You
88

9-
- Dimy Jeannot
9+
- Dimy Jeannot
1010

1111
## 0.20.3 (2025-09-17)
1212

go/oeco-sdk/v2beta/bindings/nats/binding.go

Lines changed: 37 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,12 @@ import (
1919

2020
// Binding represents a structure managing NATS connections, JetStream instances, and event stream configurations.
2121
type Binding struct {
22-
Registry map[string]nats.StreamConfig
23-
SpecEventListeners []SpecEventListener
24-
Listeners map[string]*nats.Subscription
25-
Nats *nats.Conn
26-
JetStream *jetstream.JetStream
22+
Registry map[string]nats.StreamConfig
23+
SpecEventListeners []SpecEventListener
24+
SpecEventBatchListeners []SpecEventBatchListener
25+
Listeners map[string]*nats.Subscription
26+
Nats *nats.Conn
27+
JetStream *jetstream.JetStream
2728

2829
server *natsd.Server
2930
configuration *Configuration
@@ -166,6 +167,7 @@ func (b *Binding) Bind(_ context.Context, bindings *sdkv2betalib.Bindings) *sdkv
166167
bindings.Registered[b.Name()] = Bound
167168

168169
bindings = b.RegisterSpecListeners(bindings)
170+
bindings = b.RegisterSpecBatchListeners(bindings)
169171
}
170172
})
171173
} else {
@@ -229,3 +231,33 @@ func (b *Binding) RegisterSpecListeners(bindings *sdkv2betalib.Bindings) *sdkv2b
229231

230232
return bindings
231233
}
234+
235+
// RegisterSpecBatchListeners registers specification event listeners by configuring them and associating them with bindings.
236+
func (b *Binding) RegisterSpecBatchListeners(bindings *sdkv2betalib.Bindings) *sdkv2betalib.Bindings {
237+
for _, listener := range b.SpecEventBatchListeners {
238+
configuration := listener.Configure()
239+
if configuration == nil {
240+
fmt.Println("Please configure the Listener")
241+
panic("Misconfigured")
242+
}
243+
244+
name := ""
245+
if configuration.JetstreamConfiguration.Name == "" && configuration.JetstreamConfiguration.Durable == "" {
246+
fmt.Println("Either the Name or the Durable name is required")
247+
panic("Misconfigured")
248+
}
249+
250+
if configuration.JetstreamConfiguration.Name != "" {
251+
name = configuration.JetstreamConfiguration.Durable
252+
}
253+
254+
// Use the durable name if set
255+
if configuration.JetstreamConfiguration.Durable != "" {
256+
name = configuration.JetstreamConfiguration.Durable
257+
}
258+
259+
bindings.RegisteredListenableChannels[name] = listener
260+
}
261+
262+
return bindings
263+
}
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
package natsnodev1
2+
3+
import (
4+
"context"
5+
"errors"
6+
7+
"github.com/nats-io/nats.go"
8+
"github.com/nats-io/nats.go/jetstream"
9+
sdkv2betalib "github.com/openecosystems/ecosystem/go/oeco-sdk/v2beta"
10+
zaploggerv1 "github.com/openecosystems/ecosystem/go/oeco-sdk/v2beta/bindings/zap"
11+
optionv2pb "github.com/openecosystems/ecosystem/go/oeco-sdk/v2beta/gen/platform/options/v2"
12+
specv2pb "github.com/openecosystems/ecosystem/go/oeco-sdk/v2beta/gen/platform/spec/v2"
13+
"go.opentelemetry.io/otel/trace"
14+
"go.uber.org/zap"
15+
protopb "google.golang.org/protobuf/proto"
16+
)
17+
18+
type SpecEventBatchListener interface {
19+
Configure() *ListenerBatchConfiguration
20+
Listen(ctx context.Context, listenerErr chan sdkv2betalib.SpecListenableErr)
21+
BatchProcess(ctx context.Context, messages []*ListenerBatchMessage)
22+
}
23+
24+
// ListenerBatchConfiguration defines the configuration for a listener, including stream type, entity, subject, queue, and Jetstream settings.
25+
type ListenerBatchConfiguration struct {
26+
StreamType Stream
27+
Entity sdkv2betalib.Entity
28+
BatchSize int
29+
TypeName string
30+
Procedure string
31+
Topic string
32+
CQRS optionv2pb.CQRSType
33+
JetstreamConfiguration *jetstream.ConsumerConfig
34+
}
35+
36+
type ListenerBatchMessage struct {
37+
Context context.Context
38+
SpecKey *specv2pb.SpecKey
39+
Spec *specv2pb.Spec
40+
Subscription *jetstream.Consumer
41+
Message *jetstream.Msg
42+
NatsMessage *nats.Msg
43+
ListenerBatchConfiguration *ListenerBatchConfiguration
44+
EventResponseChannel string
45+
Request protopb.Message
46+
Response protopb.Message
47+
SpecError sdkv2betalib.SpecErrorable
48+
}
49+
50+
// ListenForJetStreamEvents subscribes to a Jetstream subject.
51+
func ListenForJetStreamEvents(ctx context.Context, env string, listener SpecEventBatchListener) {
52+
log := *zaploggerv1.Bound.Logger
53+
configuration := listener.Configure()
54+
55+
if configuration == nil || configuration.Entity == nil || configuration.StreamType == nil {
56+
log.Error("Configuration is missing. Entity, StreamType are required when configuring a SpecListener")
57+
panic("Configuration is missing")
58+
}
59+
60+
js := *Bound.JetStream
61+
streamName := GetStreamName(env, configuration.StreamType.StreamPrefix(), configuration.Entity.TypeName())
62+
63+
stream, err := js.Stream(ctx, streamName)
64+
if err != nil {
65+
log.Error("Could not find stream: " + streamName + "; " + err.Error())
66+
return
67+
}
68+
69+
c, err := stream.CreateOrUpdateConsumer(ctx, *configuration.JetstreamConfiguration)
70+
if err != nil {
71+
log.Error("SpecError creating consumer", zap.Error(err))
72+
panic("Cannot start consumer")
73+
}
74+
75+
log.Info("Listening for stream spec events on subject: " + streamName)
76+
77+
for {
78+
messages, err := c.Fetch(configuration.BatchSize)
79+
if err != nil && !errors.Is(err, nats.ErrTimeout) {
80+
log.Error("Fetch error:", zap.Error(err))
81+
continue
82+
}
83+
84+
var batch []*ListenerBatchMessage
85+
for msg := range messages.Messages() {
86+
message, _ := convertJetstreamToListenerMessage(configuration, &msg)
87+
batch = append(batch, message)
88+
}
89+
90+
listener.BatchProcess(ctx, batch)
91+
}
92+
93+
//
94+
//_, err = c.Fetch(configuration.BatchSize, func(msg jetstream.Msg) {
95+
// messageCtx, message, _ := convertJetstreamToListenerMessage(configuration, &msg)
96+
//
97+
// // The Processor is responsible for replying to the Reply subject and responding with any errors
98+
// listener.BatchProcess(messageCtx, message)
99+
// // RespondToJetstreamEvent(messageCtx, message)
100+
//}, jetstream.ConsumeErrHandler(func(_ jetstream.ConsumeContext, err error) {
101+
// fmt.Println(err)
102+
//}))
103+
//if err != nil {
104+
// log.Fatal("consume error", zap.Error(err))
105+
//}
106+
107+
//
108+
}
109+
110+
// RespondToJetstreamEvent processes an inbound message, modifies the provided message, and sends a response through NATS.
111+
func RespondToJetstreamEvent(_ context.Context, message *ListenerMessage) {
112+
log := *zaploggerv1.Bound.Logger
113+
jm := *message.Message
114+
115+
err4 := jm.Ack()
116+
if err4 != nil {
117+
log.Error("SpecError acknowledging message", zap.Error(err4))
118+
return
119+
}
120+
}
121+
122+
func convertJetstreamToListenerMessage(config *ListenerBatchConfiguration, msg *jetstream.Msg) (*ListenerBatchMessage, sdkv2betalib.SpecErrorable) {
123+
// Start with a new ctx here because it must remain transaction safe
124+
ctx := context.Background()
125+
s := &specv2pb.Spec{}
126+
m := *msg
127+
err := protopb.Unmarshal(m.Data(), s)
128+
if err != nil {
129+
return nil, sdkv2betalib.ErrServerInternal.WithInternalErrorDetail(errors.New("could not unmarshall spec"), err)
130+
}
131+
132+
parentSpanCtx := convertSpecSpanContextToContext(s)
133+
ctx = trace.ContextWithRemoteSpanContext(ctx, parentSpanCtx)
134+
135+
return &ListenerBatchMessage{
136+
Context: ctx,
137+
Spec: s,
138+
Subscription: nil,
139+
Message: &m,
140+
ListenerBatchConfiguration: config,
141+
}, nil
142+
}

go/oeco-sdk/v2beta/bindings/nats/listener.go

Lines changed: 4 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,8 @@ import (
77

88
"github.com/nats-io/nats.go"
99
"github.com/nats-io/nats.go/jetstream"
10-
"go.uber.org/zap"
1110
protopb "google.golang.org/protobuf/proto"
1211

13-
apexlog "github.com/apex/log"
1412
"github.com/mennanov/fmutils"
1513
sdkv2betalib "github.com/openecosystems/ecosystem/go/oeco-sdk/v2beta"
1614
zaploggerv1 "github.com/openecosystems/ecosystem/go/oeco-sdk/v2beta/bindings/zap"
@@ -214,7 +212,7 @@ func RespondToMultiplexedRequest(_ context.Context, message *ListenerMessage) {
214212
func respond(msg *nats.Msg, spec *specv2pb.Spec) {
215213
log := *zaploggerv1.Bound.Logger
216214
if msg == nil {
217-
apexlog.Warn("Received nil message, ignoring")
215+
log.Warn("Received nil message, ignoring")
218216
return
219217
}
220218

@@ -236,72 +234,16 @@ func respond(msg *nats.Msg, spec *specv2pb.Spec) {
236234

237235
marshal, err := protopb.Marshal(spec)
238236
if err != nil {
239-
apexlog.Error(sdkv2betalib.ErrServerInternal.WithSpecDetail(spec).WithInternalErrorDetail(errors.New("cannot marshal spec: "), err).Error())
237+
log.Error(sdkv2betalib.ErrServerInternal.WithSpecDetail(spec).WithInternalErrorDetail(errors.New("cannot marshal spec: "), err).Error())
240238
err = msg.Respond(nil)
241239
if err != nil {
242-
apexlog.Error(sdkv2betalib.ErrServerInternal.WithSpecDetail(spec).WithInternalErrorDetail(errors.New("error responding to NATS: "), err).Error())
240+
log.Error(sdkv2betalib.ErrServerInternal.WithSpecDetail(spec).WithInternalErrorDetail(errors.New("error responding to NATS: "), err).Error())
243241
}
244242
}
245243

246244
err = msg.Respond(marshal)
247245
if err != nil {
248-
apexlog.Error(sdkv2betalib.ErrServerInternal.WithSpecDetail(spec).WithInternalErrorDetail(errors.New("error responding to NATS: "), err).Error())
249-
}
250-
}
251-
252-
// ListenForJetStreamEvents subscribes to a Jetstream subject.
253-
func ListenForJetStreamEvents(ctx context.Context, env string, listener SpecEventListener) {
254-
configuration := listener.Configure()
255-
256-
if configuration == nil || configuration.Procedure == "" || configuration.StreamType == nil {
257-
apexlog.Error("Configuration is missing. Procedure, StreamType are required when configuring a SpecListener")
258-
panic("Configuration is missing")
259-
}
260-
261-
log := *zaploggerv1.Bound.Logger
262-
js := *Bound.JetStream
263-
streamName := GetStreamName(env, configuration.StreamType.StreamPrefix(), configuration.TypeName)
264-
265-
stream, err := js.Stream(ctx, streamName)
266-
if err != nil {
267-
apexlog.Error("Could not find stream: " + streamName + err.Error())
268-
return
269-
}
270-
271-
c, err := stream.CreateOrUpdateConsumer(ctx, *configuration.JetstreamConfiguration)
272-
if err != nil {
273-
log.Error("SpecError creating consumer", zap.Error(err))
274-
panic("Cannot start consumer")
275-
}
276-
277-
// TODO: This must be closed
278-
_, err = c.Consume(func(msg jetstream.Msg) {
279-
messageCtx, message, _ := convertJetstreamToListenerMessage(configuration, &msg)
280-
281-
// The Processor is responsible for replying to the Reply subject and responding with any errors
282-
listener.Process(messageCtx, message)
283-
RespondToJetstreamEvent(messageCtx, message)
284-
}, jetstream.ConsumeErrHandler(func(_ jetstream.ConsumeContext, err error) {
285-
fmt.Println(err)
286-
}))
287-
if err != nil {
288-
log.Fatal("consume error", zap.Error(err))
289-
}
290-
291-
apexlog.Info("Listening for stream spec events on subject: " + streamName)
292-
293-
//
294-
}
295-
296-
// RespondToJetstreamEvent processes an inbound message, modifies the provided message, and sends a response through NATS.
297-
func RespondToJetstreamEvent(_ context.Context, message *ListenerMessage) {
298-
log := *zaploggerv1.Bound.Logger
299-
jm := *message.Message
300-
301-
err4 := jm.Ack()
302-
if err4 != nil {
303-
log.Error("SpecError acknowledging message", zap.Error(err4))
304-
return
246+
log.Error(sdkv2betalib.ErrServerInternal.WithSpecDetail(spec).WithInternalErrorDetail(errors.New("error responding to NATS: "), err).Error())
305247
}
306248
}
307249

@@ -368,27 +310,6 @@ func convertNatsToListenerMessage(config *ListenerConfiguration, msg *nats.Msg)
368310
}, nil
369311
}
370312

371-
func convertJetstreamToListenerMessage(config *ListenerConfiguration, msg *jetstream.Msg) (context.Context, *ListenerMessage, sdkv2betalib.SpecErrorable) {
372-
// Start with a new ctx here because it must remain transaction safe
373-
ctx := context.Background()
374-
s := &specv2pb.Spec{}
375-
m := *msg
376-
err := protopb.Unmarshal(m.Data(), s)
377-
if err != nil {
378-
return ctx, nil, sdkv2betalib.ErrServerInternal.WithInternalErrorDetail(errors.New("could not unmarshall spec"), err)
379-
}
380-
381-
parentSpanCtx := convertSpecSpanContextToContext(s)
382-
ctx = trace.ContextWithRemoteSpanContext(ctx, parentSpanCtx)
383-
384-
return ctx, &ListenerMessage{
385-
Spec: s,
386-
Subscription: nil,
387-
Message: &m,
388-
ListenerConfiguration: config,
389-
}, nil
390-
}
391-
392313
func convertSpecSpanContextToContext(spec *specv2pb.Spec) trace.SpanContext {
393314
if spec == nil || spec.SpanContext == nil {
394315
return trace.SpanContext{}

go/oeco-sdk/v2beta/bindings/tink/binding.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ func (b *Binding) Bind(ctx context.Context, bindings *sdkv2betalib.Bindings) *sd
5353
fmt.Println(err)
5454
panic(err)
5555
}
56+
registry.RegisterKMSClient(client)
57+
5658
kekAEAD, err := client.GetAEAD(keyURI)
5759
if err != nil {
5860
fmt.Println(err)

go/oeco-sdk/v2beta/error.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import (
2424
// Reason is the stable logical identity for a SpecError.
2525
type Reason string
2626

27-
// HasReason check if the Error has a reson
27+
// HasReason check if the Error has a reason
2828
type HasReason interface {
2929
error
3030
SpecReason() Reason
@@ -49,6 +49,7 @@ type (
4949
WithLocalizedMessage(message *errdetails.LocalizedMessage) SpecErrorable
5050
WithInternalErrorDetail(errs ...error) SpecErrorable
5151
// WithDebugDetail(ctx context.Context, spec *specv2pb.Spec, errs ...error) SpecErrorable
52+
SpecReason() Reason
5253
ToStatus() *status.Status
5354
ToConnectError() *connect.Error
5455
error
@@ -123,6 +124,10 @@ func (se *SpecError) WithErrorInfo(info *errdetails.ErrorInfo) SpecErrorable {
123124
return se
124125
}
125126

127+
if info.Reason != "" {
128+
se.reason = Reason(info.Reason)
129+
}
130+
126131
se.ConnectErr.AddDetail(d)
127132
return se
128133
}
Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
package sdkv2betalib
22

33
const (
4-
ReasonRequiredField = "REQUIRED_FIELD"
5-
ReasonTooShort = "TOO_SHORT"
6-
ReasonInvalidFormat = "INVALID_FORMAT"
7-
ReasonNotAllowed = "NOT_ALLOWED"
8-
ReasonDuplicate = "DUPLICATE"
9-
ReasonMissingField = "MISSING_FIELD"
4+
ReasonRequiredField = "required_field"
5+
ReasonTooShort = "too_short"
6+
ReasonInvalidFormat = "invalid_format"
7+
ReasonNotAllowed = "not_allowed"
8+
ReasonDuplicate = "duplicate"
9+
ReasonMissingField = "missing_field"
1010
)

0 commit comments

Comments
 (0)