Skip to content

Commit 4876780

Browse files
author
Dimy Jeannot
committed
fix: added additional logging
1 parent f7468db commit 4876780

File tree

22 files changed

+654
-24
lines changed

22 files changed

+654
-24
lines changed

go/oeco-sdk/v2beta/CHANGELOG.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,16 @@
88

99
- Dimy Jeannot @dimyjeannot
1010

11+
## 0.20.2 (2025-09-11)
12+
13+
### 🩹 Fixes
14+
15+
- added fail early if proto transcoding fails ([#78](https://github.com/openecosystems/ecosystem/pull/78))
16+
17+
### ❤️ Thank You
18+
19+
- Dimy Jeannot @dimyjeannot
20+
1121
## 0.20.1 (2025-09-09)
1222

1323
### 🩹 Fixes

go/oeco-sdk/v2beta/bindings/nats/listener.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
specv2pb "github.com/openecosystems/ecosystem/go/oeco-sdk/v2beta/gen/platform/spec/v2"
1919
"go.opentelemetry.io/otel/trace"
2020
"google.golang.org/protobuf/types/known/anypb"
21+
"google.golang.org/protobuf/types/known/timestamppb"
2122
)
2223

2324
// SpecEventListener is an interface for handling event streaming, listening, and processing for specific configurations.
@@ -126,6 +127,7 @@ func ListenForMultiplexedRequests(_ context.Context, listener SpecEventListener)
126127

127128
// RespondToMultiplexedRequest processes an inbound message, modifies the provided message, and sends a response through NATS.
128129
func RespondToMultiplexedRequest(_ context.Context, message *ListenerMessage) {
130+
log := *zaploggerv1.Bound.Logger
129131
js := *Bound.JetStream
130132
nm := *message.NatsMessage
131133

@@ -135,6 +137,9 @@ func RespondToMultiplexedRequest(_ context.Context, message *ListenerMessage) {
135137
return
136138
}
137139

140+
fields := receivedFields(message.Spec, nm.Subject)
141+
log.Info("Received multiplexed request", fields...)
142+
138143
if message.Spec.SpecData != nil && message.Spec.SpecData.Data != nil {
139144
fmutils.Filter(message.Spec.SpecData.Data, message.Spec.SpecData.FieldMask.GetPaths())
140145
}
@@ -201,6 +206,7 @@ func RespondToMultiplexedRequest(_ context.Context, message *ListenerMessage) {
201206

202207
// respond reply to a NATS message
203208
func respond(msg *nats.Msg, spec *specv2pb.Spec) {
209+
log := *zaploggerv1.Bound.Logger
204210
if msg == nil {
205211
apexlog.Warn("Received nil message, ignoring")
206212
return
@@ -213,6 +219,15 @@ func respond(msg *nats.Msg, spec *specv2pb.Spec) {
213219
}
214220
}
215221

222+
spec.CompletedAt = timestamppb.Now()
223+
224+
fields := completedFields(spec, msg.Subject)
225+
var milliseconds int64
226+
if spec.CompletedAt != nil && spec.ReceivedAt != nil {
227+
milliseconds = spec.CompletedAt.AsTime().Sub(spec.ReceivedAt.AsTime()).Milliseconds()
228+
}
229+
log.Info(fmt.Sprintf("Completed multiplexed request in %d ms\n", milliseconds), fields...)
230+
216231
marshal, err := protopb.Marshal(spec)
217232
if err != nil {
218233
apexlog.Error(sdkv2betalib.ErrServerInternal.WithSpecDetail(spec).WithInternalErrorDetail(errors.New("cannot marshal spec: "), err).Error())

go/oeco-sdk/v2beta/bindings/nats/producer.go

Lines changed: 42 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package natsnodev1
33
import (
44
"context"
55
"errors"
6+
"fmt"
67
"time"
78

89
"connectrpc.com/connect"
@@ -13,18 +14,23 @@ import (
1314
sdkv2betalib "github.com/openecosystems/ecosystem/go/oeco-sdk/v2beta"
1415
zaploggerv1 "github.com/openecosystems/ecosystem/go/oeco-sdk/v2beta/bindings/zap"
1516
specv2pb "github.com/openecosystems/ecosystem/go/oeco-sdk/v2beta/gen/platform/spec/v2"
17+
"google.golang.org/protobuf/types/known/timestamppb"
1618
)
1719

1820
// MultiplexCommandSync sends a command synchronously by publishing it to a NATS stream and awaiting a reply.
1921
// Uses Nats Publish and Subscribe Pattern
20-
func (b *Binding) MultiplexCommandSync(ctx context.Context, s *specv2pb.Spec, command *SpecCommand) (*nats.Msg, error) {
22+
func (b *Binding) MultiplexCommandSync(ctx context.Context, spec *specv2pb.Spec, command *SpecCommand) (*nats.Msg, error) {
2123
if command == nil {
2224
return nil, sdkv2betalib.ErrServerInternal.WithInternalErrorDetail(errors.New("a SpecCommand object is required"))
2325
}
2426

27+
if spec == nil {
28+
return nil, sdkv2betalib.ErrServerInternal.WithInternalErrorDetail(errors.New("a Spec object is required"))
29+
}
30+
2531
log := *zaploggerv1.Bound.Logger
26-
s.SpecEvent = command.CommandName
27-
s.SpecType = command.EntityTypeName
32+
spec.SpecEvent = command.CommandName
33+
spec.SpecType = command.EntityTypeName
2834

2935
// Encrypt here
3036
//fmt.Println(command.Request.ProtoReflect())
@@ -41,77 +47,92 @@ func (b *Binding) MultiplexCommandSync(ctx context.Context, s *specv2pb.Spec, co
4147
return nil, sdkv2betalib.ErrServerInternal.WithInternalErrorDetail(errors.New("internal error"))
4248
}
4349

44-
s.Data = data
50+
spec.Data = data
4551

46-
specBytes, err := proto.Marshal(s)
52+
specBytes, err := proto.Marshal(spec)
4753
if err != nil {
4854
return nil, sdkv2betalib.ErrServerInternal.WithInternalErrorDetail(errors.New("could not marshall spec"))
4955
}
5056

5157
subject := GetMultiplexedRequestSubjectName(command.Stream.StreamPrefix(), command.CommandTopic, command.Procedure)
5258

53-
log.Debug("Issuing a multiplex command: " + command.Procedure + ", on channel: " + subject)
59+
fields := receivedFields(spec, subject)
60+
log.Info("Issuing a multiplex command: "+command.Procedure, fields...)
5461

55-
return publish(b.Nats, subject, s, specBytes)
62+
return publish(b.Nats, subject, spec, specBytes)
5663
}
5764

5865
// MultiplexEventSync sends an event to a multiplexed stream and waits for the response or error within the specified timeout.
5966
// Uses Nats Publish and Subscribe Pattern
60-
func (b *Binding) MultiplexEventSync(_ context.Context, s *specv2pb.Spec, event *SpecEvent) (*nats.Msg, error) {
67+
func (b *Binding) MultiplexEventSync(_ context.Context, spec *specv2pb.Spec, event *SpecEvent) (*nats.Msg, error) {
6168
if event == nil {
6269
return nil, sdkv2betalib.ErrServerInternal.WithInternalErrorDetail(errors.New("a SpecEvent object is required"))
6370
}
6471

72+
if spec == nil {
73+
return nil, sdkv2betalib.ErrServerInternal.WithInternalErrorDetail(errors.New("a Spec object is required"))
74+
}
75+
6576
log := *zaploggerv1.Bound.Logger
66-
s.SpecEvent = event.EventName
67-
s.SpecType = event.EntityTypeName
77+
spec.SpecEvent = event.EventName
78+
spec.SpecType = event.EntityTypeName
6879

6980
data, err := anypb.New(event.Request)
7081
if err != nil {
7182
return nil, sdkv2betalib.ErrServerInternal.WithInternalErrorDetail(errors.New("internal error"))
7283
}
7384

74-
s.Data = data
85+
spec.Data = data
7586

76-
specBytes, err := proto.Marshal(s)
87+
specBytes, err := proto.Marshal(spec)
7788
if err != nil {
7889
return nil, sdkv2betalib.ErrServerInternal.WithInternalErrorDetail(errors.New("could not marshall spec"))
7990
}
8091

8192
subject := GetMultiplexedRequestSubjectName(event.Stream.StreamPrefix(), event.EventTopic, event.Procedure)
8293

83-
log.Debug("Issuing a multiplex event: " + event.Procedure + ", on channel: " + subject)
94+
fields := receivedFields(spec, subject)
95+
log.Info("Issuing a multiplex event: "+event.Procedure, fields...)
8496

85-
return publish(b.Nats, subject, s, specBytes)
97+
return publish(b.Nats, subject, spec, specBytes)
8698
}
8799

88-
func publish(n *nats.Conn, subject string, s *specv2pb.Spec, specBytes []byte) (*nats.Msg, error) {
100+
func publish(n *nats.Conn, subject string, spec *specv2pb.Spec, specBytes []byte) (*nats.Msg, error) {
101+
log := *zaploggerv1.Bound.Logger
89102
reply, err := n.RequestMsg(&nats.Msg{
90103
Subject: subject,
91104
Data: specBytes,
92105
}, 10*time.Second)
93106
if err != nil {
94107
switch {
95108
case errors.Is(err, nats.ErrTimeout):
96-
return nil, ErrTimeout.WithSpecDetail(s)
109+
return nil, ErrTimeout.WithSpecDetail(spec).WithInternalErrorDetail(err)
97110
case errors.Is(err, nats.ErrNoResponders):
98111
// no responders
99-
return nil, ErrNoResponders.WithSpecDetail(s)
112+
return nil, ErrNoResponders.WithSpecDetail(spec).WithInternalErrorDetail(err)
100113
case errors.Is(err, nats.ErrConnectionClosed):
101114
// NATS connection was closed
102-
return nil, sdkv2betalib.ErrServerInternal.WithSpecDetail(s).WithInternalErrorDetail(err)
115+
return nil, sdkv2betalib.ErrServerInternal.WithSpecDetail(spec).WithInternalErrorDetail(err)
103116
case errors.Is(err, nats.ErrBadSubscription):
104117
// something wrong with the sub
105-
return nil, sdkv2betalib.ErrServerInternal.WithSpecDetail(s).WithInternalErrorDetail(err)
118+
return nil, sdkv2betalib.ErrServerInternal.WithSpecDetail(spec).WithInternalErrorDetail(err)
106119
default:
107120
// unknown or generic error
108-
return nil, sdkv2betalib.ErrServerInternal.WithSpecDetail(s).WithInternalErrorDetail(errors.New("unhandled NATS error"), err)
121+
return nil, sdkv2betalib.ErrServerInternal.WithSpecDetail(spec).WithInternalErrorDetail(errors.New("unhandled NATS error"), err)
109122
}
110123
}
111124

112125
if reply == nil {
113-
return nil, sdkv2betalib.ErrServerInternal.WithSpecDetail(s).WithInternalErrorDetail(errors.New("received nil reply from NATS responder"))
126+
return nil, sdkv2betalib.ErrServerInternal.WithSpecDetail(spec).WithInternalErrorDetail(errors.New("received nil reply from NATS responder"))
127+
}
128+
129+
spec.CompletedAt = timestamppb.Now()
130+
fields := completedFields(spec, subject)
131+
var milliseconds int64
132+
if spec.CompletedAt != nil && spec.ReceivedAt != nil {
133+
milliseconds = spec.CompletedAt.AsTime().Sub(spec.ReceivedAt.AsTime()).Milliseconds()
114134
}
135+
log.Info(fmt.Sprintf("Completed multiplexed request in %d ms\n", milliseconds), fields...)
115136

116137
return reply, err
117138
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
package natsnodev1
2+
3+
import (
4+
"strings"
5+
6+
specv2pb "github.com/openecosystems/ecosystem/go/oeco-sdk/v2beta/gen/platform/spec/v2"
7+
"go.uber.org/zap/zapcore"
8+
)
9+
10+
func baseFields(spec *specv2pb.Spec, subject string) []zapcore.Field {
11+
var fields []zapcore.Field
12+
13+
if spec.Context != nil {
14+
if subject != "" {
15+
fields = append(fields, zapcore.Field{Key: "subject", Type: zapcore.StringType, String: subject})
16+
}
17+
18+
if spec.SpanContext != nil && spec.SpanContext.TraceId != "" {
19+
fields = append(fields, zapcore.Field{Key: "trace_id", Type: zapcore.StringType, String: spec.SpanContext.TraceId})
20+
}
21+
22+
if spec.Context.Validation != nil && spec.Context.Validation.ValidateOnly {
23+
fields = append(fields, zapcore.Field{Key: "validate_only", Type: zapcore.BoolType, String: "true"})
24+
}
25+
26+
if spec.Context.OrganizationId != "" {
27+
fields = append(fields, zapcore.Field{Key: "organization_id", Type: zapcore.StringType, String: spec.Context.OrganizationId})
28+
}
29+
30+
if spec.Context.EcosystemId != "" {
31+
fields = append(fields, zapcore.Field{Key: "ecosystem_id", Type: zapcore.StringType, String: spec.Context.EcosystemId})
32+
}
33+
34+
if spec.SpecData != nil && spec.SpecData.FieldMask != nil && len(spec.SpecData.FieldMask.GetPaths()) > 0 {
35+
fields = append(fields, zapcore.Field{Key: "field_mask", Type: zapcore.StringType, String: strings.Join(spec.SpecData.FieldMask.GetPaths(), ",")})
36+
}
37+
}
38+
39+
return fields
40+
}
41+
42+
func receivedFields(spec *specv2pb.Spec, subject string) []zapcore.Field {
43+
fields := baseFields(spec, subject)
44+
45+
if spec != nil {
46+
if spec.ReceivedAt != nil {
47+
fields = append(fields, zapcore.Field{Key: "received_at", Type: zapcore.StringType, String: spec.ReceivedAt.AsTime().String()})
48+
}
49+
}
50+
51+
return fields
52+
}
53+
54+
func completedFields(spec *specv2pb.Spec, subject string) []zapcore.Field {
55+
fields := baseFields(spec, subject)
56+
57+
if spec != nil {
58+
if spec.CompletedAt != nil {
59+
fields = append(fields, zapcore.Field{Key: "completed_at", Type: zapcore.StringType, String: spec.CompletedAt.AsTime().String()})
60+
}
61+
}
62+
63+
return fields
64+
}

libs/plugins/protoc-gen-platform/languages/protobuf/plugins/data_catalog/protobuf_data_catalog.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,14 @@ func (m *ProtobufDataCatalogModule) Execute(targets map[string]pgs.File, _ map[s
6161
// Idempotent looping, use keys for range NOT targets
6262
versionedKeys := make(map[string][]string, 0)
6363
for k := range targets {
64+
65+
p := targets[k].Descriptor().GetPackage()
66+
s := strings.Split(p, ".")
67+
68+
if len(s) != 3 {
69+
continue
70+
}
71+
6472
version := fns.GetPackageVersion(targets[k])
6573
versionedKeys[version] = append(versionedKeys[version], k)
6674
}

libs/plugins/protoc-gen-platform/languages/protobuf/plugins/data_catalog/templates/message.go.tmpl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
{{- $e := entity . -}}{{- $s := service . -}}{{ $entityName := entityName $e }}{{ $entityKeyName := entityKeyName $e }}{{ $apiOptionsType := getApiOptionsTypeName . }}{{ $versionName := getPackageVersionName . }}
1+
{{- $e := entity . -}}{{- $s := service . -}}{{ $entityName := entityName $e }}{{ $apiOptionsType := getApiOptionsTypeName . }}{{ $versionName := getPackageVersionName . }}
22

33
message {{ $entityName.UpperCamelCase }}{{ $versionName.UpperCamelCase }} {
44
{{ range $e.Fields }}{{ template "entity_field.go.tmpl" . }}{{ end }}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
1-
{{- $e := entity . -}}{{- $s := service . -}}{{ $entityName := entityName $e }}{{ $entityKeyName := entityKeyName $e }}{{ $versionName := getPackageVersionName . }}
1+
{{- $e := entity . -}}{{- $s := service . -}}{{ $entityName := entityName $e }}{{ $versionName := getPackageVersionName . }}
22
{{ $entityName.UpperCamelCase }}{{ $versionName.UpperCamelCase }} {{ $e.Name.LowerSnakeCase }}_{{ getPackageVersion . }} = {{ fieldPosition }};
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
{{ $e := entity . -}}{{- $entityName := entityName $e }}{{- $versionName := getPackageVersionName . }}
2+
import * as {{ $entityName.UpperCamelCase }}{{ $versionName.UpperCamelCase }} from './gen/{{ protoPathWithoutProtoExtension .File }}_pb.js'
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
2+
3+
export class AdinoSDK {
4+
private readonly transport: Transport;
5+
6+
constructor(private readonly options: AdinoSDKOptions) {
7+
const interceptor = new SpecInterceptor(options);
8+
const baseUrl = this.options.baseUrl ?? BaseUrl;
9+
this.transport = createConnectTransport({
10+
baseUrl: baseUrl,
11+
defaultTimeoutMs: this.options.timeoutMs,
12+
interceptors: [interceptor.spec],
13+
});
14+
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
{{ $e := entity . -}}{{- $entityName := entityName $e }}{{- $versionName := getPackageVersionName . }}
2+
3+
private _{{ $entityName.LowerCamelCase }}{{ $versionName.UpperCamelCase }}Client?: Client<typeof {{ $entityName.UpperCamelCase }}{{ $versionName.UpperCamelCase }}.{{ $entityName.UpperCamelCase }}Service>;
4+
private get {{ $entityName.LowerCamelCase }}{{ $versionName.UpperCamelCase }}Client(): Client<typeof {{ $entityName.UpperCamelCase }}{{ $versionName.UpperCamelCase }}.{{ $entityName.UpperCamelCase }}Service> {
5+
if (!this._{{ $entityName.LowerCamelCase }}{{ $versionName.UpperCamelCase }}Client) {
6+
this._{{ $entityName.LowerCamelCase }}{{ $versionName.UpperCamelCase }}Client = createClient({{ $entityName.UpperCamelCase }}{{ $versionName.UpperCamelCase }}.{{ $entityName.UpperCamelCase }}Service, this.transport);
7+
}
8+
return this._{{ $entityName.LowerCamelCase }}{{ $versionName.UpperCamelCase }}Client;
9+
}

0 commit comments

Comments
 (0)