Skip to content

Commit f93ae24

Browse files
authored
fix: added fail early if proto transcoding fails (#78)
Co-authored-by: Dimy Jeannot <>
1 parent 20e0464 commit f93ae24

File tree

4 files changed

+168
-156
lines changed

4 files changed

+168
-156
lines changed

go/oeco-sdk/v2beta/CHANGELOG.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,11 @@
22

33
### 🩹 Fixes
44

5-
- hardening ([460a75a](https://github.com/openecosystems/ecosystem/commit/460a75a))
5+
- hardening ([460a75a](https://github.com/openecosystems/ecosystem/commit/460a75a))
66

77
### ❤️ Thank You
88

9-
- Dimy Jeannot
9+
- Dimy Jeannot
1010

1111
## 0.20.0 (2025-08-18)
1212

go/oeco-sdk/v2beta/connector.go

Lines changed: 0 additions & 153 deletions
Original file line numberDiff line numberDiff line change
@@ -84,159 +84,6 @@ func NewConnector(ctx context.Context, opts ...ConnectorOption) *Connector {
8484
return connector
8585
}
8686

87-
// NewDynamicConnectorWithSchema creates a dynamically configured Connector using the provided schema, bindings, and options.
88-
func NewDynamicConnectorWithSchema(ctx context.Context, service protoreflect.ServiceDescriptor, bounds []Binding, opts ...ConnectorOption) *Connector {
89-
procedureName := "/" + string(service.FullName()) + "/"
90-
methods := make([]*ConnectorMethod, 0, service.Methods().Len())
91-
for j := 0; j < service.Methods().Len(); j++ {
92-
method := service.Methods().Get(j)
93-
// fmt.Printf(" Method Name: %s\n", method.Name())
94-
95-
methodProcedureName := procedureName + string(method.Name())
96-
methods = append(methods, &ConnectorMethod{
97-
ProcedureName: methodProcedureName,
98-
Input: method.Input(),
99-
Output: method.Output(),
100-
Schema: method,
101-
})
102-
}
103-
104-
mbp := make(map[string]*ConnectorMethod)
105-
for _, method := range methods {
106-
mbp[method.ProcedureName] = method
107-
}
108-
109-
// c := Configuration{}
110-
// c.ResolveConfiguration()
111-
112-
bindings := RegisterBindings(ctx, bounds)
113-
114-
options, err := newConnectorOptions(opts)
115-
if err != nil {
116-
fmt.Println(err)
117-
}
118-
119-
connector := &Connector{
120-
Bindings: bindings,
121-
ProcedureName: procedureName,
122-
Name: string(service.FullName()),
123-
Err: nil,
124-
Schema: service,
125-
Methods: methods,
126-
MethodsByPath: mbp,
127-
Opts: opts,
128-
129-
options: options,
130-
}
131-
132-
// TODO create a WithConnectOption option to allow to pass data directly to connect
133-
// connector.Handler = NewDynamicConnectorHandler(connector)
134-
135-
return connector
136-
}
137-
138-
//
139-
//// NewDynamicConnector creates a new instance of Connector based on the given service path, bindings, and optional configurations.
140-
//// It resolves the service schema dynamically and initializes the Connector with methods and bindings information.
141-
//// Returns a Connector, which may include an error if schema resolution fails.
142-
//func NewDynamicConnector(ctx context.Context, servicePath string, bounds []Binding, opts ...ConnectorOption) *Connector {
143-
// serviceName := strings.TrimSuffix(strings.TrimPrefix(servicePath, "/"), "/")
144-
// desc, err := protoregistry.GlobalFiles.FindDescriptorByName(protoreflect.FullName(serviceName))
145-
// if err != nil {
146-
// return &Connector{Err: fmt.Errorf("could not resolve schema for service at path %q: %w", servicePath, err)}
147-
// }
148-
// svcDesc, ok := desc.(protoreflect.ServiceDescriptor)
149-
// if !ok {
150-
// return &Connector{
151-
// Err: fmt.Errorf("could not resolve schema for service at path %q: resolved descriptor is %s, not a service", servicePath, descKind(desc)),
152-
// }
153-
// }
154-
// return NewDynamicConnectorWithSchema(ctx, svcDesc, bounds, opts...)
155-
//}
156-
//
157-
//// func (ImplementedDynamicServiceHandler) DynamicUnary(context.Context, *connect.Request[dynamicpb.Message]) (*connect.Response[dynamicpb.Message], error) {
158-
//
159-
//// DynamicUnary processes a CreateConfigurationRequest and returns a CreateConfigurationResponse or an error.
160-
//func (connector *Connector) DynamicUnary(_ context.Context, req *connect.Request[v2alpha.CreateConfigurationRequest]) (*connect.Response[v2alpha.CreateConfigurationResponse], error) {
161-
// // fmt.Println(req.HTTPMethod())
162-
// // fmt.Println(req.Spec().Schema)
163-
// // fmt.Println(req.Spec().StreamType)
164-
// fmt.Println(req.Spec().Procedure)
165-
// // fmt.Println(req.Spec().IdempotencyLevel)
166-
// // fmt.Println(req.Spec().IsClient)
167-
//
168-
// fmt.Println(req.Msg)
169-
//
170-
// return connect.NewResponse(&v2alpha.CreateConfigurationResponse{
171-
// SpecContext: &specv2pb.SpecResponseContext{
172-
// ResponseValidation: &typev2pb.ResponseValidation{
173-
// ValidateOnly: true,
174-
// },
175-
// OrganizationSlug: "hello",
176-
// WorkspaceSlug: "world",
177-
// WorkspaceJan: 1,
178-
// RoutineId: "123",
179-
// },
180-
// Configuration: &v2alpha.Configuration{
181-
// Id: "123",
182-
// OrganizationSlug: "hello",
183-
// WorkspaceSlug: "world",
184-
// },
185-
// }), nil
186-
// //
187-
// //tracer := *opentelemetryv1.Bound.Tracer
188-
// //log := *zaploggerv1.Bound.Logger
189-
// //
190-
// //// Get it from the GlobalSystem Registry
191-
// //_, err := GlobalSystems.GetSystemByName(req.Spec().Procedure)
192-
// //if err != nil {
193-
// // return nil, err
194-
// //}
195-
// //
196-
// //// Executes top level validation, no business domain validation
197-
// //validationCtx, validationSpan := tracer.Start(ctx, "request-validation", trace.WithSpanKind(trace.SpanKindInternal))
198-
// //v := *protovalidatev0.Bound.Validator
199-
// //if err := v.Validate(req.Msg); err != nil {
200-
// // return nil, ErrServerPreconditionFailed.WithInternalErrorDetail(err)
201-
// //}
202-
// //validationSpan.End()
203-
// //
204-
// //// Spec Propagation
205-
// //specCtx, specSpan := tracer.Start(validationCtx, "spec-propagation", trace.WithSpanKind(trace.SpanKindInternal))
206-
// //spec, ok := ctx.Value("spec").(*specv2pb.Spec)
207-
// //if !ok {
208-
// // return nil, ErrServerInternal.WithInternalErrorDetail(errors.New("cannot propagate spec to context"))
209-
// //}
210-
// //specSpan.End()
211-
// //
212-
// //// Distributed Domain Handler
213-
// //handlerCtx, handlerSpan := tracer.Start(specCtx, "event-generation", trace.WithSpanKind(trace.SpanKindInternal))
214-
// //
215-
// //entity := DynamicSpecEntity{}
216-
// //reply, err2 := natsnodev1.Bound.MultiplexCommandSync(handlerCtx, spec, &natsnodev1.SpecCommand{
217-
// // Request: req.Msg,
218-
// // Stream: natsnodev1.NewInboundStream(),
219-
// // CommandName: "",
220-
// // CommandTopic: EventDataDynamicTopic,
221-
// // EntityTypeName: entity.TypeName(),
222-
// //})
223-
// //if err2 != nil {
224-
// // log.SpecError(err2.SpecError())
225-
// // return nil, connect.NewError(connect.CodeInternal, errors.New("internal error"))
226-
// //}
227-
// //
228-
// //var dd v2alpha.CreateConfigurationResponse
229-
// //err3 := proto.Unmarshal(reply.Data, &dd)
230-
// //if err3 != nil {
231-
// // log.SpecError(err3.SpecError())
232-
// // return nil, connect.NewError(connect.CodeInternal, errors.New("internal error"))
233-
// //}
234-
// //
235-
// //handlerSpan.End()
236-
// //
237-
// //return connect.NewResponse(&dd), nil
238-
//}
239-
24087
// ListenAndProcess initializes the connector's context, manages its lifecycle, and delegates processing tasks with context.
24188
func (connector *Connector) ListenAndProcess() {
24289
ctx := context.Background()
Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
package sdkv2betalib
2+
3+
import (
4+
"context"
5+
"fmt"
6+
7+
"google.golang.org/protobuf/reflect/protoreflect"
8+
)
9+
10+
// NewDynamicConnectorWithSchema creates a dynamically configured Connector using the provided schema, bindings, and options.
11+
func NewDynamicConnectorWithSchema(ctx context.Context, service protoreflect.ServiceDescriptor, bounds []Binding, opts ...ConnectorOption) *Connector {
12+
procedureName := "/" + string(service.FullName()) + "/"
13+
methods := make([]*ConnectorMethod, 0, service.Methods().Len())
14+
for j := 0; j < service.Methods().Len(); j++ {
15+
method := service.Methods().Get(j)
16+
// fmt.Printf(" Method Name: %s\n", method.Name())
17+
18+
methodProcedureName := procedureName + string(method.Name())
19+
methods = append(methods, &ConnectorMethod{
20+
ProcedureName: methodProcedureName,
21+
Input: method.Input(),
22+
Output: method.Output(),
23+
Schema: method,
24+
})
25+
}
26+
27+
mbp := make(map[string]*ConnectorMethod)
28+
for _, method := range methods {
29+
mbp[method.ProcedureName] = method
30+
}
31+
32+
// c := Configuration{}
33+
// c.ResolveConfiguration()
34+
35+
bindings := RegisterBindings(ctx, bounds)
36+
37+
options, err := newConnectorOptions(opts)
38+
if err != nil {
39+
fmt.Println(err)
40+
}
41+
42+
connector := &Connector{
43+
Bindings: bindings,
44+
ProcedureName: procedureName,
45+
Name: string(service.FullName()),
46+
Err: nil,
47+
Schema: service,
48+
Methods: methods,
49+
MethodsByPath: mbp,
50+
Opts: opts,
51+
52+
options: options,
53+
}
54+
55+
// TODO create a WithConnectOption option to allow to pass data directly to connect
56+
// connector.Handler = NewDynamicConnectorHandler(connector)
57+
58+
return connector
59+
}
60+
61+
//
62+
//// NewDynamicConnector creates a new instance of Connector based on the given service path, bindings, and optional configurations.
63+
//// It resolves the service schema dynamically and initializes the Connector with methods and bindings information.
64+
//// Returns a Connector, which may include an error if schema resolution fails.
65+
//func NewDynamicConnector(ctx context.Context, servicePath string, bounds []Binding, opts ...ConnectorOption) *Connector {
66+
// serviceName := strings.TrimSuffix(strings.TrimPrefix(servicePath, "/"), "/")
67+
// desc, err := protoregistry.GlobalFiles.FindDescriptorByName(protoreflect.FullName(serviceName))
68+
// if err != nil {
69+
// return &Connector{Err: fmt.Errorf("could not resolve schema for service at path %q: %w", servicePath, err)}
70+
// }
71+
// svcDesc, ok := desc.(protoreflect.ServiceDescriptor)
72+
// if !ok {
73+
// return &Connector{
74+
// Err: fmt.Errorf("could not resolve schema for service at path %q: resolved descriptor is %s, not a service", servicePath, descKind(desc)),
75+
// }
76+
// }
77+
// return NewDynamicConnectorWithSchema(ctx, svcDesc, bounds, opts...)
78+
//}
79+
//
80+
//// func (ImplementedDynamicServiceHandler) DynamicUnary(context.Context, *connect.Request[dynamicpb.Message]) (*connect.Response[dynamicpb.Message], error) {
81+
//
82+
//// DynamicUnary processes a CreateConfigurationRequest and returns a CreateConfigurationResponse or an error.
83+
//func (connector *Connector) DynamicUnary(_ context.Context, req *connect.Request[v2alpha.CreateConfigurationRequest]) (*connect.Response[v2alpha.CreateConfigurationResponse], error) {
84+
// // fmt.Println(req.HTTPMethod())
85+
// // fmt.Println(req.Spec().Schema)
86+
// // fmt.Println(req.Spec().StreamType)
87+
// fmt.Println(req.Spec().Procedure)
88+
// // fmt.Println(req.Spec().IdempotencyLevel)
89+
// // fmt.Println(req.Spec().IsClient)
90+
//
91+
// fmt.Println(req.Msg)
92+
//
93+
// return connect.NewResponse(&v2alpha.CreateConfigurationResponse{
94+
// SpecContext: &specv2pb.SpecResponseContext{
95+
// ResponseValidation: &typev2pb.ResponseValidation{
96+
// ValidateOnly: true,
97+
// },
98+
// OrganizationSlug: "hello",
99+
// WorkspaceSlug: "world",
100+
// WorkspaceJan: 1,
101+
// RoutineId: "123",
102+
// },
103+
// Configuration: &v2alpha.Configuration{
104+
// Id: "123",
105+
// OrganizationSlug: "hello",
106+
// WorkspaceSlug: "world",
107+
// },
108+
// }), nil
109+
// //
110+
// //tracer := *opentelemetryv1.Bound.Tracer
111+
// //log := *zaploggerv1.Bound.Logger
112+
// //
113+
// //// Get it from the GlobalSystem Registry
114+
// //_, err := GlobalSystems.GetSystemByName(req.Spec().Procedure)
115+
// //if err != nil {
116+
// // return nil, err
117+
// //}
118+
// //
119+
// //// Executes top level validation, no business domain validation
120+
// //validationCtx, validationSpan := tracer.Start(ctx, "request-validation", trace.WithSpanKind(trace.SpanKindInternal))
121+
// //v := *protovalidatev0.Bound.Validator
122+
// //if err := v.Validate(req.Msg); err != nil {
123+
// // return nil, ErrServerPreconditionFailed.WithInternalErrorDetail(err)
124+
// //}
125+
// //validationSpan.End()
126+
// //
127+
// //// Spec Propagation
128+
// //specCtx, specSpan := tracer.Start(validationCtx, "spec-propagation", trace.WithSpanKind(trace.SpanKindInternal))
129+
// //spec, ok := ctx.Value("spec").(*specv2pb.Spec)
130+
// //if !ok {
131+
// // return nil, ErrServerInternal.WithInternalErrorDetail(errors.New("cannot propagate spec to context"))
132+
// //}
133+
// //specSpan.End()
134+
// //
135+
// //// Distributed Domain Handler
136+
// //handlerCtx, handlerSpan := tracer.Start(specCtx, "event-generation", trace.WithSpanKind(trace.SpanKindInternal))
137+
// //
138+
// //entity := DynamicSpecEntity{}
139+
// //reply, err2 := natsnodev1.Bound.MultiplexCommandSync(handlerCtx, spec, &natsnodev1.SpecCommand{
140+
// // Request: req.Msg,
141+
// // Stream: natsnodev1.NewInboundStream(),
142+
// // CommandName: "",
143+
// // CommandTopic: EventDataDynamicTopic,
144+
// // EntityTypeName: entity.TypeName(),
145+
// //})
146+
// //if err2 != nil {
147+
// // log.SpecError(err2.SpecError())
148+
// // return nil, connect.NewError(connect.CodeInternal, errors.New("internal error"))
149+
// //}
150+
// //
151+
// //var dd v2alpha.CreateConfigurationResponse
152+
// //err3 := proto.Unmarshal(reply.Data, &dd)
153+
// //if err3 != nil {
154+
// // log.SpecError(err3.SpecError())
155+
// // return nil, connect.NewError(connect.CodeInternal, errors.New("internal error"))
156+
// //}
157+
// //
158+
// //handlerSpan.End()
159+
// //
160+
// //return connect.NewResponse(&dd), nil
161+
//}

go/oeco-sdk/v2beta/server.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ func NewServer(ctx context.Context, opts ...ServerOption) *Server {
8989
publicTranscoder, err := vanguard.NewTranscoder(options.PublicServices)
9090
if err != nil {
9191
fmt.Println(err)
92+
panic("cannot create vanguard transcoder from public services. Failing early")
9293
}
9394

9495
server.PublicConnectHTTPServer = publicHTTPServer
@@ -231,8 +232,11 @@ func (server *Server) listenAndServe(ln *net.Listener) (httpServerErr chan error
231232
server.PublicHTTPServerHandler = publicMux
232233
if server.RawServiceHandler != nil {
233234
publicMux.Handle(server.ServicePath, server.RawServiceHandler)
234-
} else {
235+
} else if server.PublicServiceHandler != nil {
235236
publicMux.Handle("/", server.PublicServiceHandler)
237+
} else {
238+
// No handler registered → fail fast
239+
panic("no public service handler configured (RawServiceHandler or PublicServiceHandler)")
236240
}
237241

238242
meshMux := http.NewServeMux()

0 commit comments

Comments
 (0)