Skip to content

Commit fbd6e1e

Browse files
author
Dimy Jeannot
committed
fix: stability
1 parent 468c7aa commit fbd6e1e

File tree

58 files changed

+1317
-1066
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

58 files changed

+1317
-1066
lines changed

libs/partner/go/nats/listener.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ type ListenerMessage struct {
4444
Message *jetstream.Msg
4545
NatsMessage *nats.Msg
4646
ListenerConfiguration *ListenerConfiguration
47+
EventResponseChannel string
4748
}
4849

4950
// ListenerErr represents an error encountered by a listener, including the related subscription for context.
@@ -85,12 +86,13 @@ func ListenForMultiplexedRequests(_ context.Context, listener SpecEventListener)
8586
fallthrough
8687
case optionv2pb.CQRSType_CQRS_TYPE_QUERY_CLIENT_STREAM:
8788
fallthrough
88-
case optionv2pb.CQRSType_CQRS_TYPE_QUERY_SERVER_STREAM:
89-
fallthrough
9089
case optionv2pb.CQRSType_CQRS_TYPE_QUERY_BIDI_STREAM:
9190
fallthrough
9291
case optionv2pb.CQRSType_CQRS_TYPE_QUERY_GET:
9392
subject = GetMultiplexedRequestSubjectName(configuration.StreamType.StreamPrefix(), configuration.Entity.EventTopic(), configuration.Procedure)
93+
case optionv2pb.CQRSType_CQRS_TYPE_QUERY_SERVER_STREAM:
94+
// subject = GetMultiplexedRequestSubjectName(configuration.StreamType.StreamPrefix(), configuration.Entity.EventTopic(), configuration.Procedure+".>")
95+
subject = GetMultiplexedRequestSubjectName(configuration.StreamType.StreamPrefix(), configuration.Entity.EventTopic(), configuration.Procedure)
9496
case optionv2pb.CQRSType_CQRS_TYPE_NONE:
9597
fallthrough
9698
case optionv2pb.CQRSType_CQRS_TYPE_UNSPECIFIED:
@@ -255,11 +257,19 @@ func convertNatsToListenerMessage(config *ListenerConfiguration, msg *nats.Msg)
255257

256258
// ctx = interceptor.DecorateContextWithSpec(ctx, *s)
257259

260+
responseSubject := GetStreamResponseSubjectName(
261+
config.StreamType.StreamPrefix(),
262+
config.Topic,
263+
config.Procedure,
264+
s.MessageId,
265+
)
266+
258267
return ctx, ListenerMessage{
259268
Spec: s,
260269
Subscription: nil,
261270
NatsMessage: &m,
262271
ListenerConfiguration: config,
272+
EventResponseChannel: responseSubject,
263273
}, nil
264274
}
265275

libs/partner/go/nats/producer.go

Lines changed: 73 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,17 +7,17 @@ import (
77
"time"
88

99
"connectrpc.com/connect"
10-
specv2pb "github.com/openecosystems/ecosystem/libs/protobuf/go/protobuf/gen/platform/spec/v2"
11-
12-
zaploggerv1 "github.com/openecosystems/ecosystem/libs/partner/go/zap"
13-
1410
"github.com/nats-io/nats.go"
15-
sdkv2alphalib "github.com/openecosystems/ecosystem/libs/public/go/sdk/v2alpha"
1611
"google.golang.org/protobuf/proto"
1712
"google.golang.org/protobuf/types/known/anypb"
13+
14+
zaploggerv1 "github.com/openecosystems/ecosystem/libs/partner/go/zap"
15+
specv2pb "github.com/openecosystems/ecosystem/libs/protobuf/go/protobuf/gen/platform/spec/v2"
16+
sdkv2alphalib "github.com/openecosystems/ecosystem/libs/public/go/sdk/v2alpha"
1817
)
1918

2019
// MultiplexCommandSync sends a command synchronously by publishing it to a NATS stream and awaiting a reply.
20+
// Uses Nats Publish and Subscribe Pattern
2121
func (b *Binding) MultiplexCommandSync(_ context.Context, s *specv2pb.Spec, command *SpecCommand) (*nats.Msg, error) {
2222
if command == nil {
2323
return nil, sdkv2alphalib.ErrServerInternal.WithInternalErrorDetail(errors.New("a SpecCommand object is required"))
@@ -60,7 +60,7 @@ func (b *Binding) MultiplexCommandSync(_ context.Context, s *specv2pb.Spec, comm
6060
reply, err := n.RequestMsg(&nats.Msg{
6161
Subject: subject,
6262
Data: specBytes,
63-
}, 4*time.Second)
63+
}, 10*time.Second)
6464
if err != nil {
6565
fmt.Println(err)
6666
}
@@ -69,6 +69,7 @@ func (b *Binding) MultiplexCommandSync(_ context.Context, s *specv2pb.Spec, comm
6969
}
7070

7171
// MultiplexEventSync sends an event to a multiplexed stream and waits for the response or error within the specified timeout.
72+
// Uses Nats Publish and Subscribe Pattern
7273
func (b *Binding) MultiplexEventSync(_ context.Context, s *specv2pb.Spec, event *SpecEvent) (*nats.Msg, error) {
7374
if event == nil {
7475
return nil, sdkv2alphalib.ErrServerInternal.WithInternalErrorDetail(errors.New("a SpecEvent object is required"))
@@ -103,10 +104,75 @@ func (b *Binding) MultiplexEventSync(_ context.Context, s *specv2pb.Spec, event
103104
reply, err := n.RequestMsg(&nats.Msg{
104105
Subject: subject,
105106
Data: specBytes,
106-
}, 4*time.Second)
107+
}, 10*time.Second)
107108
if err != nil {
108109
fmt.Println(err)
109110
}
110111

111112
return reply, err
112113
}
114+
115+
// MultiplexEventStreamSync sends an event to a multiplexed stream and waits for the response or error within the specified timeout.
116+
// Uses Nats Sync Publish for streaming
117+
func MultiplexEventStreamSync[T any](ctx context.Context, s *specv2pb.Spec, event *SpecStreamEvent, nats *nats.Conn, stream *connect.ServerStream[T], convert func(*nats.Msg) (*T, error)) error {
118+
if event == nil {
119+
return sdkv2alphalib.ErrServerInternal.WithInternalErrorDetail(errors.New("a SpecEvent object is required"))
120+
}
121+
122+
log := *zaploggerv1.Bound.Logger
123+
124+
s.SpecEvent = event.EventName
125+
s.SpecType = event.EntityTypeName
126+
127+
data, err := anypb.New(event.Request)
128+
if err != nil {
129+
log.Error(err.Error())
130+
return connect.NewError(connect.CodeInternal, errors.New("internal error"))
131+
}
132+
133+
s.Data = data
134+
135+
subject := GetMultiplexedRequestSubjectName(event.Stream.StreamPrefix(), event.EventTopic, event.Procedure)
136+
responseSubject := GetStreamResponseSubjectName(event.Stream.StreamPrefix(), event.EventTopic, event.Procedure, s.MessageId)
137+
138+
specBytes, err := proto.Marshal(s)
139+
if err != nil {
140+
return sdkv2alphalib.ErrServerInternal.WithInternalErrorDetail(errors.New("could not marshall spec"))
141+
}
142+
143+
// Encrypt here
144+
145+
log.Debug("Publishing on " + subject)
146+
if err = nats.Publish(subject, specBytes); err != nil {
147+
return sdkv2alphalib.ErrServerInternal.WithInternalErrorDetail(errors.New("failed to publish")).WithInternalErrorDetail(err)
148+
}
149+
150+
// Subscribe to streamed results
151+
log.Debug("Waiting on results from " + responseSubject)
152+
sub, err := nats.SubscribeSync(responseSubject)
153+
if err != nil {
154+
return sdkv2alphalib.ErrServerInternal.WithInternalErrorDetail(errors.New("could not subscribe stream sync to nats")).WithInternalErrorDetail(err)
155+
}
156+
157+
defer sub.Unsubscribe()
158+
159+
for {
160+
msg, err1 := sub.NextMsgWithContext(ctx)
161+
if err1 != nil {
162+
if errors.Is(err1, context.Canceled) {
163+
return nil
164+
}
165+
return sdkv2alphalib.ErrServerInternal.WithInternalErrorDetail(errors.New("could not process additional events")).WithInternalErrorDetail(err1)
166+
}
167+
168+
converted, err1 := convert(msg)
169+
if err1 != nil {
170+
return sdkv2alphalib.ErrServerInternal.WithInternalErrorDetail(errors.New("could not convert event")).WithInternalErrorDetail(err1)
171+
}
172+
173+
err1 = stream.Send(converted)
174+
if err != nil {
175+
return sdkv2alphalib.ErrServerInternal.WithInternalErrorDetail(errors.New("could not stream event")).WithInternalErrorDetail(err1)
176+
}
177+
}
178+
}

libs/partner/go/nats/registry.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,11 @@ func GetSubjectName(scope string, subjectName string, procedure string) string {
9595
return scope + "-" + subjectName + "." + procedure
9696
}
9797

98+
// GetStreamResponseSubjectName generates a subject name by combining the provided scope and subject name with a hyphen separator.
99+
func GetStreamResponseSubjectName(scope string, subjectName string, procedure string, messageID string) string {
100+
return scope + "-" + subjectName + "." + procedure + "." + messageID
101+
}
102+
98103
// GetQueueGroupName generates a queue group name by combining the given scope and entityName with a predefined prefix "req.".
99104
func GetQueueGroupName(scope string, entityName string, procedure string) string {
100105
return "req." + scope + "-" + entityName + "." + procedure

libs/partner/go/nats/spec.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,15 @@ type SpecEvent struct {
5757
EventTopic string
5858
EntityTypeName string
5959
}
60+
type SpecStreamEvent struct {
61+
Request proto.Message
62+
Stream Stream
63+
Procedure string
64+
EventName string
65+
EventTopic string
66+
EntityTypeName string
67+
// EventResponseTopic string
68+
}
6069

6170
// GetListenerGroup generates a unique listener group identifier by combining the type names of the source and sink.
6271
func GetListenerGroup(source Type, sink Type) string {

libs/plugins/protoc-gen-platform/languages/go/plugins/multiplexer/go_multiplexer.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ func (m GoServerModule) GenerateFile(file pgs.File) {
9090
"parentService": fns.ParentService,
9191
"queries": fns.QueryMethods,
9292
"mutations": fns.MutationMethods,
93+
"streams": fns.StreamingMethods,
9394
"getCqrsType": fns.GetCQRSType,
9495
"goPath": fns.GoPath,
9596
"goPackage": fns.GoPackage,

libs/plugins/protoc-gen-platform/languages/go/plugins/multiplexer/templates/file.go.tmpl

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
// Code generated by protoc-gen-platform {{ pluginName }}. DO NOT EDIT.
22
// source: {{ .InputPath }}
3-
{{ $s := service . }}{{ $q := queries . }}{{ $m := mutations . }}{{ $goPath := goPath . }}{{ $e := entity . }}{{ $imports := getAllGoFieldLevelImportPaths .File }}{{ $system := domainSystemName2 .File }}{{ $apiType := getApiOptionsTypeName .File }}{{ $version := getPackageVersion .File }}{{ $topLevel := getTopLevelFolderFromFile .File }}
3+
{{ $s := service . }}{{ $q := queries . }}{{ $m := mutations . }}{{ $sm := streams . }}{{ $goPath := goPath . }}{{ $e := entity . }}{{ $imports := getAllGoFieldLevelImportPaths .File }}{{ $system := domainSystemName2 .File }}{{ $apiType := getApiOptionsTypeName .File }}{{ $version := getPackageVersion .File }}{{ $topLevel := getTopLevelFolderFromFile .File }}
44

55
package {{ package . }}
66
import (
77
"errors"
88
"connectrpc.com/connect"
9+
{{ if $sm }}"github.com/nats-io/nats.go"{{end}}
910
"github.com/nats-io/nats.go/jetstream"
1011
"github.com/openecosystems/ecosystem/libs/partner/go/zap"
1112
"github.com/openecosystems/ecosystem/libs/partner/go/nats"
Lines changed: 68 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,70 @@
1-
{{ $s := parentService . }}{{ $goPath := goPath .File }}{{ $e := parentEntity . }}func (s *{{ $s.Name }}Handler) {{ .Name }}(req *{{ package .File }}.{{ .Input.Name }}, stream {{ package .File }}.{{ $s.Name }}_{{ .Name }}Server) error {
1+
{{ $s := parentService . }}{{ $goPath := goPath .File }}{{ $e := parentEntity . }}{{ $entityName := entityName $e }}{{ $system := domainSystemName2 .File }}
2+
func (s *{{ $s.Name }}Handler) Get{{ .Name }}Configuration() *natsnodev1.ListenerConfiguration {
23

3-
return nil
4+
return &natsnodev1.ListenerConfiguration{
5+
Entity: &{{ $entityName.UpperCamelCase }}SpecEntity{},
6+
Procedure: "{{ .Name }}",
7+
CQRS: optionv2pb.CQRSType_{{ getCQRSTypeEnumName .}},
8+
Topic: EventData{{ $entityName.UpperCamelCase }}Topic,
9+
StreamType: natsnodev1.NewInboundStream(),
10+
JetstreamConfiguration: &jetstream.ConsumerConfig{
11+
Durable: "{{ $system.LowerCamelCase }}-{{ $entityName.LowerCamelCase }}-{{ .Name.LowerCamelCase }}",
12+
AckPolicy: jetstream.AckExplicitPolicy,
13+
MemoryStorage: false,
14+
},
15+
}
416
}
17+
18+
func (s *{{ $s.Name }}Handler) {{ .Name }}(ctx context.Context, req *connect.Request[{{ .Input.Name }}], stream *connect.ServerStream[{{ .Output.Name }}]) error {
19+
20+
tracer := *opentelemetryv1.Bound.Tracer
21+
log := *zaploggerv1.Bound.Logger
22+
n := natsnodev1.Bound.Nats
23+
24+
// Executes top level validation, no business domain validation
25+
validationCtx, validationSpan := tracer.Start(ctx, "request-validation", trace.WithSpanKind(trace.SpanKindInternal))
26+
v := *protovalidatev0.Bound.Validator
27+
if err := v.Validate(req.Msg); err != nil {
28+
return sdkv2alphalib.ErrServerPreconditionFailed.WithInternalErrorDetail(err)
29+
}
30+
validationSpan.End()
31+
32+
// Spec Propagation
33+
specCtx, specSpan := tracer.Start(validationCtx, "spec-propagation", trace.WithSpanKind(trace.SpanKindInternal))
34+
spec, ok := ctx.Value(sdkv2alphalib.SpecContextKey).(*specv2pb.Spec)
35+
if !ok {
36+
return sdkv2alphalib.ErrServerInternal.WithInternalErrorDetail(errors.New("Cannot propagate spec to context"))
37+
}
38+
specSpan.End()
39+
40+
// Validate field mask
41+
if spec.SpecData.FieldMask != nil && len(spec.SpecData.FieldMask.Paths) > 0 {
42+
spec.SpecData.FieldMask.Normalize()
43+
if !spec.SpecData.FieldMask.IsValid(&{{ .Output.Name }}{}) {
44+
log.Error("Invalid field mask")
45+
return sdkv2alphalib.ErrServerPreconditionFailed.WithInternalErrorDetail(errors.New("Invalid field mask"))
46+
}
47+
}
48+
49+
// Distributed Domain Handler
50+
handlerCtx, handlerSpan := tracer.Start(specCtx, "event-generation", trace.WithSpanKind(trace.SpanKindInternal))
51+
defer handlerSpan.End()
52+
53+
config := s.Get{{ .Name }}Configuration()
54+
return natsnodev1.MultiplexEventStreamSync[{{ .Output.Name }}](handlerCtx, spec, &natsnodev1.SpecStreamEvent{
55+
Request: req.Msg,
56+
Stream: config.StreamType,
57+
Procedure: config.Procedure,
58+
EventName: "",
59+
EventTopic: config.Topic,
60+
EntityTypeName: config.Entity.TypeName(),
61+
}, n, stream, func(m *nats.Msg) (*{{ .Output.Name }}, error) {
62+
63+
var resp {{ .Output.Name }}
64+
if err := proto.Unmarshal(m.Data, &resp); err != nil {
65+
return nil, sdkv2alphalib.ErrServerPreconditionFailed.WithInternalErrorDetail(errors.New("failed to unmarshal event to {{ .Output.Name }}:")).WithInternalErrorDetail(err)
66+
}
67+
68+
return &resp, nil
69+
})
70+
}

libs/plugins/protoc-gen-platform/shared/functions_cqrs.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,25 @@ func (fns Functions) MutationMethods(file pgs.File) []pgs.Method {
9191
return mutationMethods
9292
}
9393

94+
// StreamingMethods extracts and returns methods that are classified as "mutation" based on their CQRS type from the provided file.
95+
func (fns Functions) StreamingMethods(file pgs.File) []pgs.Method {
96+
var methods []pgs.Method
97+
for _, service := range file.Services() {
98+
methods = service.Methods()
99+
break
100+
}
101+
102+
var mutationMethods []pgs.Method
103+
104+
for _, method := range methods {
105+
if strings.Contains(fns.IsCQRSType(method), "stream") {
106+
mutationMethods = append(mutationMethods, method)
107+
}
108+
}
109+
110+
return mutationMethods
111+
}
112+
94113
// IsMethodMutation determines if the given method is a mutation based on its CQRS type prefix.
95114
func (fns Functions) IsMethodMutation(method pgs.Method) bool {
96115
if strings.HasPrefix(fns.IsCQRSType(method), "mutation") {

libs/poc/go/model/project.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@
1212
"options": {
1313
"command": "buf generate",
1414
"cwd": "libs/poc/go/model"
15-
}
15+
},
16+
"cache": false
1617
},
1718
"build": {
1819
"executor": "nx:run-commands",

libs/poc/go/sdk/project.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,8 @@
1010
"options": {
1111
"command": "buf generate",
1212
"cwd": "libs/poc/go/sdk"
13-
}
13+
},
14+
"cache": false
1415
}
1516
},
1617
"tags": [

0 commit comments

Comments
 (0)