Skip to content

Commit bb7b693

Browse files
authored
fix: Hardening 2 (#79)
* fix: added additional logging * fix: added additional logging * fix: added additional bad request error handling --------- Co-authored-by: Dimy Jeannot <>
1 parent f7468db commit bb7b693

28 files changed

+739
-42
lines changed

go/oeco-sdk/v2beta/CHANGELOG.md

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,21 @@
22

33
### 🩹 Fixes
44

5-
- added fail early if proto transcoding fails ([#78](https://github.com/openecosystems/ecosystem/pull/78))
5+
- added fail early if proto transcoding fails ([#78](https://github.com/openecosystems/ecosystem/pull/78))
66

77
### ❤️ Thank You
88

9-
- Dimy Jeannot @dimyjeannot
9+
- Dimy Jeannot @dimyjeannot
10+
11+
## 0.20.2 (2025-09-11)
12+
13+
### 🩹 Fixes
14+
15+
- added fail early if proto transcoding fails ([#78](https://github.com/openecosystems/ecosystem/pull/78))
16+
17+
### ❤️ Thank You
18+
19+
- Dimy Jeannot @dimyjeannot
1020

1121
## 0.20.1 (2025-09-09)
1222

go/oeco-sdk/v2beta/bindings/nats/listener.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
specv2pb "github.com/openecosystems/ecosystem/go/oeco-sdk/v2beta/gen/platform/spec/v2"
1919
"go.opentelemetry.io/otel/trace"
2020
"google.golang.org/protobuf/types/known/anypb"
21+
"google.golang.org/protobuf/types/known/timestamppb"
2122
)
2223

2324
// SpecEventListener is an interface for handling event streaming, listening, and processing for specific configurations.
@@ -126,6 +127,7 @@ func ListenForMultiplexedRequests(_ context.Context, listener SpecEventListener)
126127

127128
// RespondToMultiplexedRequest processes an inbound message, modifies the provided message, and sends a response through NATS.
128129
func RespondToMultiplexedRequest(_ context.Context, message *ListenerMessage) {
130+
log := *zaploggerv1.Bound.Logger
129131
js := *Bound.JetStream
130132
nm := *message.NatsMessage
131133

@@ -135,6 +137,9 @@ func RespondToMultiplexedRequest(_ context.Context, message *ListenerMessage) {
135137
return
136138
}
137139

140+
fields := receivedFields(message.Spec, nm.Subject)
141+
log.Info("Received multiplexed request", fields...)
142+
138143
if message.Spec.SpecData != nil && message.Spec.SpecData.Data != nil {
139144
fmutils.Filter(message.Spec.SpecData.Data, message.Spec.SpecData.FieldMask.GetPaths())
140145
}
@@ -201,6 +206,7 @@ func RespondToMultiplexedRequest(_ context.Context, message *ListenerMessage) {
201206

202207
// respond reply to a NATS message
203208
func respond(msg *nats.Msg, spec *specv2pb.Spec) {
209+
log := *zaploggerv1.Bound.Logger
204210
if msg == nil {
205211
apexlog.Warn("Received nil message, ignoring")
206212
return
@@ -213,6 +219,15 @@ func respond(msg *nats.Msg, spec *specv2pb.Spec) {
213219
}
214220
}
215221

222+
spec.CompletedAt = timestamppb.Now()
223+
224+
fields := completedFields(spec, msg.Subject)
225+
var milliseconds int64
226+
if spec.CompletedAt != nil && spec.ReceivedAt != nil {
227+
milliseconds = spec.CompletedAt.AsTime().Sub(spec.ReceivedAt.AsTime()).Milliseconds()
228+
}
229+
log.Info(fmt.Sprintf("Completed multiplexed request in %d ms\n", milliseconds), fields...)
230+
216231
marshal, err := protopb.Marshal(spec)
217232
if err != nil {
218233
apexlog.Error(sdkv2betalib.ErrServerInternal.WithSpecDetail(spec).WithInternalErrorDetail(errors.New("cannot marshal spec: "), err).Error())

go/oeco-sdk/v2beta/bindings/nats/producer.go

Lines changed: 42 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package natsnodev1
33
import (
44
"context"
55
"errors"
6+
"fmt"
67
"time"
78

89
"connectrpc.com/connect"
@@ -13,18 +14,23 @@ import (
1314
sdkv2betalib "github.com/openecosystems/ecosystem/go/oeco-sdk/v2beta"
1415
zaploggerv1 "github.com/openecosystems/ecosystem/go/oeco-sdk/v2beta/bindings/zap"
1516
specv2pb "github.com/openecosystems/ecosystem/go/oeco-sdk/v2beta/gen/platform/spec/v2"
17+
"google.golang.org/protobuf/types/known/timestamppb"
1618
)
1719

1820
// MultiplexCommandSync sends a command synchronously by publishing it to a NATS stream and awaiting a reply.
1921
// Uses Nats Publish and Subscribe Pattern
20-
func (b *Binding) MultiplexCommandSync(ctx context.Context, s *specv2pb.Spec, command *SpecCommand) (*nats.Msg, error) {
22+
func (b *Binding) MultiplexCommandSync(ctx context.Context, spec *specv2pb.Spec, command *SpecCommand) (*nats.Msg, error) {
2123
if command == nil {
2224
return nil, sdkv2betalib.ErrServerInternal.WithInternalErrorDetail(errors.New("a SpecCommand object is required"))
2325
}
2426

27+
if spec == nil {
28+
return nil, sdkv2betalib.ErrServerInternal.WithInternalErrorDetail(errors.New("a Spec object is required"))
29+
}
30+
2531
log := *zaploggerv1.Bound.Logger
26-
s.SpecEvent = command.CommandName
27-
s.SpecType = command.EntityTypeName
32+
spec.SpecEvent = command.CommandName
33+
spec.SpecType = command.EntityTypeName
2834

2935
// Encrypt here
3036
//fmt.Println(command.Request.ProtoReflect())
@@ -41,77 +47,92 @@ func (b *Binding) MultiplexCommandSync(ctx context.Context, s *specv2pb.Spec, co
4147
return nil, sdkv2betalib.ErrServerInternal.WithInternalErrorDetail(errors.New("internal error"))
4248
}
4349

44-
s.Data = data
50+
spec.Data = data
4551

46-
specBytes, err := proto.Marshal(s)
52+
specBytes, err := proto.Marshal(spec)
4753
if err != nil {
4854
return nil, sdkv2betalib.ErrServerInternal.WithInternalErrorDetail(errors.New("could not marshall spec"))
4955
}
5056

5157
subject := GetMultiplexedRequestSubjectName(command.Stream.StreamPrefix(), command.CommandTopic, command.Procedure)
5258

53-
log.Debug("Issuing a multiplex command: " + command.Procedure + ", on channel: " + subject)
59+
fields := receivedFields(spec, subject)
60+
log.Info("Issuing a multiplex command: "+command.Procedure, fields...)
5461

55-
return publish(b.Nats, subject, s, specBytes)
62+
return publish(b.Nats, subject, spec, specBytes)
5663
}
5764

5865
// MultiplexEventSync sends an event to a multiplexed stream and waits for the response or error within the specified timeout.
5966
// Uses Nats Publish and Subscribe Pattern
60-
func (b *Binding) MultiplexEventSync(_ context.Context, s *specv2pb.Spec, event *SpecEvent) (*nats.Msg, error) {
67+
func (b *Binding) MultiplexEventSync(_ context.Context, spec *specv2pb.Spec, event *SpecEvent) (*nats.Msg, error) {
6168
if event == nil {
6269
return nil, sdkv2betalib.ErrServerInternal.WithInternalErrorDetail(errors.New("a SpecEvent object is required"))
6370
}
6471

72+
if spec == nil {
73+
return nil, sdkv2betalib.ErrServerInternal.WithInternalErrorDetail(errors.New("a Spec object is required"))
74+
}
75+
6576
log := *zaploggerv1.Bound.Logger
66-
s.SpecEvent = event.EventName
67-
s.SpecType = event.EntityTypeName
77+
spec.SpecEvent = event.EventName
78+
spec.SpecType = event.EntityTypeName
6879

6980
data, err := anypb.New(event.Request)
7081
if err != nil {
7182
return nil, sdkv2betalib.ErrServerInternal.WithInternalErrorDetail(errors.New("internal error"))
7283
}
7384

74-
s.Data = data
85+
spec.Data = data
7586

76-
specBytes, err := proto.Marshal(s)
87+
specBytes, err := proto.Marshal(spec)
7788
if err != nil {
7889
return nil, sdkv2betalib.ErrServerInternal.WithInternalErrorDetail(errors.New("could not marshall spec"))
7990
}
8091

8192
subject := GetMultiplexedRequestSubjectName(event.Stream.StreamPrefix(), event.EventTopic, event.Procedure)
8293

83-
log.Debug("Issuing a multiplex event: " + event.Procedure + ", on channel: " + subject)
94+
fields := receivedFields(spec, subject)
95+
log.Info("Issuing a multiplex event: "+event.Procedure, fields...)
8496

85-
return publish(b.Nats, subject, s, specBytes)
97+
return publish(b.Nats, subject, spec, specBytes)
8698
}
8799

88-
func publish(n *nats.Conn, subject string, s *specv2pb.Spec, specBytes []byte) (*nats.Msg, error) {
100+
func publish(n *nats.Conn, subject string, spec *specv2pb.Spec, specBytes []byte) (*nats.Msg, error) {
101+
log := *zaploggerv1.Bound.Logger
89102
reply, err := n.RequestMsg(&nats.Msg{
90103
Subject: subject,
91104
Data: specBytes,
92105
}, 10*time.Second)
93106
if err != nil {
94107
switch {
95108
case errors.Is(err, nats.ErrTimeout):
96-
return nil, ErrTimeout.WithSpecDetail(s)
109+
return nil, ErrTimeout.WithSpecDetail(spec).WithInternalErrorDetail(err)
97110
case errors.Is(err, nats.ErrNoResponders):
98111
// no responders
99-
return nil, ErrNoResponders.WithSpecDetail(s)
112+
return nil, ErrNoResponders.WithSpecDetail(spec).WithInternalErrorDetail(err)
100113
case errors.Is(err, nats.ErrConnectionClosed):
101114
// NATS connection was closed
102-
return nil, sdkv2betalib.ErrServerInternal.WithSpecDetail(s).WithInternalErrorDetail(err)
115+
return nil, sdkv2betalib.ErrServerInternal.WithSpecDetail(spec).WithInternalErrorDetail(err)
103116
case errors.Is(err, nats.ErrBadSubscription):
104117
// something wrong with the sub
105-
return nil, sdkv2betalib.ErrServerInternal.WithSpecDetail(s).WithInternalErrorDetail(err)
118+
return nil, sdkv2betalib.ErrServerInternal.WithSpecDetail(spec).WithInternalErrorDetail(err)
106119
default:
107120
// unknown or generic error
108-
return nil, sdkv2betalib.ErrServerInternal.WithSpecDetail(s).WithInternalErrorDetail(errors.New("unhandled NATS error"), err)
121+
return nil, sdkv2betalib.ErrServerInternal.WithSpecDetail(spec).WithInternalErrorDetail(errors.New("unhandled NATS error"), err)
109122
}
110123
}
111124

112125
if reply == nil {
113-
return nil, sdkv2betalib.ErrServerInternal.WithSpecDetail(s).WithInternalErrorDetail(errors.New("received nil reply from NATS responder"))
126+
return nil, sdkv2betalib.ErrServerInternal.WithSpecDetail(spec).WithInternalErrorDetail(errors.New("received nil reply from NATS responder"))
127+
}
128+
129+
spec.CompletedAt = timestamppb.Now()
130+
fields := completedFields(spec, subject)
131+
var milliseconds int64
132+
if spec.CompletedAt != nil && spec.ReceivedAt != nil {
133+
milliseconds = spec.CompletedAt.AsTime().Sub(spec.ReceivedAt.AsTime()).Milliseconds()
114134
}
135+
log.Info(fmt.Sprintf("Completed multiplexed request in %d ms\n", milliseconds), fields...)
115136

116137
return reply, err
117138
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
package natsnodev1
2+
3+
import (
4+
"strings"
5+
6+
specv2pb "github.com/openecosystems/ecosystem/go/oeco-sdk/v2beta/gen/platform/spec/v2"
7+
"go.uber.org/zap/zapcore"
8+
)
9+
10+
func baseFields(spec *specv2pb.Spec, subject string) []zapcore.Field {
11+
var fields []zapcore.Field
12+
13+
if spec.Context != nil {
14+
if subject != "" {
15+
fields = append(fields, zapcore.Field{Key: "subject", Type: zapcore.StringType, String: subject})
16+
}
17+
18+
if spec.SpanContext != nil && spec.SpanContext.TraceId != "" {
19+
fields = append(fields, zapcore.Field{Key: "trace_id", Type: zapcore.StringType, String: spec.SpanContext.TraceId})
20+
}
21+
22+
if spec.Context.Validation != nil && spec.Context.Validation.ValidateOnly {
23+
fields = append(fields, zapcore.Field{Key: "validate_only", Type: zapcore.StringType, String: "true"})
24+
}
25+
26+
if spec.Context.OrganizationId != "" {
27+
fields = append(fields, zapcore.Field{Key: "organization_id", Type: zapcore.StringType, String: spec.Context.OrganizationId})
28+
}
29+
30+
if spec.Context.EcosystemId != "" {
31+
fields = append(fields, zapcore.Field{Key: "ecosystem_id", Type: zapcore.StringType, String: spec.Context.EcosystemId})
32+
}
33+
34+
if spec.SpecData != nil && spec.SpecData.FieldMask != nil && len(spec.SpecData.FieldMask.GetPaths()) > 0 {
35+
fields = append(fields, zapcore.Field{Key: "field_mask", Type: zapcore.StringType, String: strings.Join(spec.SpecData.FieldMask.GetPaths(), ",")})
36+
}
37+
}
38+
39+
return fields
40+
}
41+
42+
func receivedFields(spec *specv2pb.Spec, subject string) []zapcore.Field {
43+
fields := baseFields(spec, subject)
44+
45+
if spec != nil {
46+
if spec.ReceivedAt != nil {
47+
fields = append(fields, zapcore.Field{Key: "received_at", Type: zapcore.StringType, String: spec.ReceivedAt.AsTime().String()})
48+
}
49+
}
50+
51+
return fields
52+
}
53+
54+
func completedFields(spec *specv2pb.Spec, subject string) []zapcore.Field {
55+
fields := baseFields(spec, subject)
56+
57+
if spec != nil {
58+
if spec.CompletedAt != nil {
59+
fields = append(fields, zapcore.Field{Key: "completed_at", Type: zapcore.StringType, String: spec.CompletedAt.AsTime().String()})
60+
}
61+
}
62+
63+
return fields
64+
}

go/oeco-sdk/v2beta/error.go

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -186,12 +186,21 @@ func (se *SpecError) WithBadRequest(request *errdetails.BadRequest) SpecErrorabl
186186
}
187187
}
188188
if len(msgs) > 0 {
189-
apexlog.WithField("bad_request_errors", strings.Join(msgs, "; ")).Error("captured bad request error details")
189+
apexlog.WithField("bad_request_errors", strings.Join(msgs, "; ")).Info("captured bad request error details")
190190
}
191191
}
192192

193-
se.ConnectErr.AddDetail(d)
194-
return se
193+
newErr := *se
194+
t := connect.NewError(se.ConnectErr.Code(), fmt.Errorf(se.ConnectErr.Message()))
195+
newErr.ConnectErr = *t
196+
197+
for _, detail := range se.ConnectErr.Details() {
198+
newErr.ConnectErr.AddDetail(detail)
199+
}
200+
201+
newErr.ConnectErr.AddDetail(d)
202+
203+
return &newErr
195204
}
196205

197206
func (se *SpecError) WithHelp(help *errdetails.Help) SpecErrorable {
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package sdkv2betalib
2+
3+
import (
4+
"errors"
5+
"fmt"
6+
"strings"
7+
8+
"buf.build/go/protovalidate"
9+
"google.golang.org/genproto/googleapis/rpc/errdetails"
10+
)
11+
12+
func ConvertValidationErrorToFieldValidations(validationErr error) ([]*errdetails.BadRequest_FieldViolation, SpecErrorable) {
13+
var err *protovalidate.ValidationError
14+
if !errors.As(validationErr, &err) {
15+
return nil, ErrServerInternal.WithInternalErrorDetail(
16+
fmt.Errorf("expected ValidationError, got: %T", validationErr),
17+
)
18+
}
19+
20+
var fv []*errdetails.BadRequest_FieldViolation
21+
for _, violation := range err.Violations {
22+
fieldBuilder := &strings.Builder{}
23+
descriptionBuilder := &strings.Builder{}
24+
25+
if fieldPath := protovalidate.FieldPathString(violation.Proto.GetField()); fieldPath != "" {
26+
fieldBuilder.WriteString(fieldPath)
27+
}
28+
29+
_, _ = fmt.Fprintf(descriptionBuilder, "%s [%s]",
30+
violation.Proto.GetMessage(),
31+
violation.Proto.GetRuleId())
32+
33+
fv = append(fv, &errdetails.BadRequest_FieldViolation{
34+
Field: fieldBuilder.String(),
35+
Description: descriptionBuilder.String(),
36+
Reason: "WIRE_PROTOCOL_VALIDATION_ERROR",
37+
LocalizedMessage: &errdetails.LocalizedMessage{
38+
Locale: LocaleEnglishUS,
39+
Message: "Validation failed for this request",
40+
},
41+
})
42+
}
43+
44+
return fv, nil
45+
}

libs/plugins/protoc-gen-platform/languages/go/plugins/v2beta/multiplexer/templates/file.go.tmpl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"errors"
88
"connectrpc.com/connect"
99
"google.golang.org/protobuf/types/known/anypb"
10+
"google.golang.org/genproto/googleapis/rpc/errdetails"
1011
{{ if $sm }}"github.com/nats-io/nats.go"{{end}}
1112
"github.com/nats-io/nats.go/jetstream"
1213
"github.com/openecosystems/ecosystem/go/oeco-sdk/v2beta/bindings/nats"

libs/plugins/protoc-gen-platform/languages/go/plugins/v2beta/multiplexer/templates/mutation_implemented.go.tmpl

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,14 @@ func (s *{{ $s.Name }}Handler) {{ .Name }}(ctx context.Context, req *connect.Req
2929
}
3030

3131
// Executes top level validation, no business domain validation
32-
validationCtx, validationSpan := tracer.Start(ctx, "{{ $methodDashCaseName }}-request-validation", trace.WithSpanKind(trace.SpanKindInternal))
33-
v := *protovalidatev0.Bound.Validator
34-
if err := v.Validate(req.Msg); err != nil {
35-
return nil, sdkv2betalib.ErrServerPreconditionFailed.WithInternalErrorDetail(err)
32+
_, validationSpan := tracer.Start(ctx, "{{ $methodDashCaseName }}-request-wire-validation", trace.WithSpanKind(trace.SpanKindInternal))
33+
v := *protovalidatev0.Bound.Validator
34+
if err := v.Validate(req.Msg); err != nil {
35+
fv, serr := sdkv2betalib.ConvertValidationErrorToFieldValidations(err)
36+
if serr != nil {
37+
return nil, serr
38+
}
39+
return nil, sdkv2betalib.ErrServerPreconditionFailed.WithBadRequest(&errdetails.BadRequest{FieldViolations: fv})
3640
}
3741
validationSpan.End()
3842

@@ -45,7 +49,7 @@ func (s *{{ $s.Name }}Handler) {{ .Name }}(ctx context.Context, req *connect.Req
4549
}
4650

4751
// Distributed Domain Handler
48-
handlerCtx, handlerSpan := tracer.Start(validationCtx, "{{ $methodDashCaseName }}-event-generation", trace.WithSpanKind(trace.SpanKindInternal))
52+
handlerCtx, handlerSpan := tracer.Start(ctx, "{{ $methodDashCaseName }}-event-multiplex", trace.WithSpanKind(trace.SpanKindInternal))
4953

5054
config := s.Get{{ .Name }}Configuration()
5155
reply, serr := natsnodev1.Bound.MultiplexCommandSync(handlerCtx, spec, &natsnodev1.SpecCommand{

0 commit comments

Comments
 (0)