Skip to content

Commit 422ecc1

Browse files
authored
fix: adding a new WithSpecDetail to SpecError for better user experience (#72)
Co-authored-by: Dimy Jeannot <>
1 parent 4b25744 commit 422ecc1

File tree

5 files changed

+103
-31
lines changed

5 files changed

+103
-31
lines changed

go/oeco-sdk/v2beta/bindings/nats/listener.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -178,15 +178,15 @@ func RespondToMultiplexedRequest(_ context.Context, request *ListenerMessage) {
178178
case optionv2pb.CQRSType_CQRS_TYPE_UNSPECIFIED:
179179
fallthrough
180180
default:
181-
request.Spec.SpecError = sdkv2betalib.ErrServerInternal.WithInternalErrorDetail(errors.New("Cannot respond to multiplexed requests, as CQRS type is invalid. This should have been caught at startup. Bad.")).ToStatus()
181+
request.Spec.SpecError = sdkv2betalib.ErrServerInternal.WithSpecDetail(request.Spec).WithInternalErrorDetail(errors.New("Cannot respond to multiplexed requests, as CQRS type is invalid. This should have been caught at startup. Bad.")).ToStatus()
182182
respond(&nm, request.Spec)
183183
return
184184
}
185185

186186
go func() {
187187
_, err = js.Publish(context.Background(), subject, specBytes)
188188
if err != nil {
189-
request.Spec.SpecError = sdkv2betalib.ErrServerInternal.WithInternalErrorDetail(errors.New("Found error when publishing"), err).ToStatus()
189+
request.Spec.SpecError = sdkv2betalib.ErrServerInternal.WithSpecDetail(request.Spec).WithInternalErrorDetail(errors.New("Found error when publishing"), err).ToStatus()
190190
respond(&nm, request.Spec)
191191
return
192192
}
@@ -203,24 +203,24 @@ func respond(msg *nats.Msg, spec *specv2pb.Spec) {
203203
}
204204

205205
if spec == nil {
206-
e := sdkv2betalib.ErrServerInternal.WithInternalErrorDetail(errors.New("the spec is nil: ")).ToStatus()
206+
e := sdkv2betalib.ErrServerInternal.WithSpecDetail(spec).WithInternalErrorDetail(errors.New("the spec is nil: ")).ToStatus()
207207
spec = &specv2pb.Spec{
208208
SpecError: e,
209209
}
210210
}
211211

212212
marshal, err := protopb.Marshal(spec)
213213
if err != nil {
214-
apexlog.Error(sdkv2betalib.ErrServerInternal.WithInternalErrorDetail(errors.New("cannot marshal spec: "), err).Error())
214+
apexlog.Error(sdkv2betalib.ErrServerInternal.WithSpecDetail(spec).WithInternalErrorDetail(errors.New("cannot marshal spec: "), err).Error())
215215
err = msg.Respond(nil)
216216
if err != nil {
217-
apexlog.Error(sdkv2betalib.ErrServerInternal.WithInternalErrorDetail(errors.New("error responding to NATS: "), err).Error())
217+
apexlog.Error(sdkv2betalib.ErrServerInternal.WithSpecDetail(spec).WithInternalErrorDetail(errors.New("error responding to NATS: "), err).Error())
218218
}
219219
}
220220

221221
err = msg.Respond(marshal)
222222
if err != nil {
223-
apexlog.Error(sdkv2betalib.ErrServerInternal.WithInternalErrorDetail(errors.New("error responding to NATS: "), err).Error())
223+
apexlog.Error(sdkv2betalib.ErrServerInternal.WithSpecDetail(spec).WithInternalErrorDetail(errors.New("error responding to NATS: "), err).Error())
224224
}
225225
}
226226

go/oeco-sdk/v2beta/bindings/nats/producer.go

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ func (b *Binding) MultiplexCommandSync(ctx context.Context, s *specv2pb.Spec, co
5252

5353
log.Debug("Issuing a multiplex command: " + command.Procedure + ", on channel: " + subject)
5454

55-
return publish(b.Nats, subject, specBytes)
55+
return publish(b.Nats, subject, s, specBytes)
5656
}
5757

5858
// MultiplexEventSync sends an event to a multiplexed stream and waits for the response or error within the specified timeout.
@@ -82,35 +82,35 @@ func (b *Binding) MultiplexEventSync(_ context.Context, s *specv2pb.Spec, event
8282

8383
log.Debug("Issuing a multiplex event: " + event.Procedure + ", on channel: " + subject)
8484

85-
return publish(b.Nats, subject, specBytes)
85+
return publish(b.Nats, subject, s, specBytes)
8686
}
8787

88-
func publish(n *nats.Conn, subject string, specBytes []byte) (*nats.Msg, error) {
88+
func publish(n *nats.Conn, subject string, s *specv2pb.Spec, specBytes []byte) (*nats.Msg, error) {
8989
reply, err := n.RequestMsg(&nats.Msg{
9090
Subject: subject,
9191
Data: specBytes,
9292
}, 10*time.Second)
9393
if err != nil {
9494
switch {
9595
case errors.Is(err, nats.ErrTimeout):
96-
return nil, ErrTimeout
96+
return nil, ErrTimeout.WithSpecDetail(s)
9797
case errors.Is(err, nats.ErrNoResponders):
9898
// no responders
99-
return nil, ErrNoResponders
99+
return nil, ErrNoResponders.WithSpecDetail(s)
100100
case errors.Is(err, nats.ErrConnectionClosed):
101101
// NATS connection was closed
102-
return nil, sdkv2betalib.ErrServerInternal.WithInternalErrorDetail(err)
102+
return nil, sdkv2betalib.ErrServerInternal.WithSpecDetail(s).WithInternalErrorDetail(err)
103103
case errors.Is(err, nats.ErrBadSubscription):
104104
// something wrong with the sub
105-
return nil, sdkv2betalib.ErrServerInternal.WithInternalErrorDetail(err)
105+
return nil, sdkv2betalib.ErrServerInternal.WithSpecDetail(s).WithInternalErrorDetail(err)
106106
default:
107107
// unknown or generic error
108-
return nil, sdkv2betalib.ErrServerInternal.WithInternalErrorDetail(errors.New("unhandled NATS error"), err)
108+
return nil, sdkv2betalib.ErrServerInternal.WithSpecDetail(s).WithInternalErrorDetail(errors.New("unhandled NATS error"), err)
109109
}
110110
}
111111

112112
if reply == nil {
113-
return nil, sdkv2betalib.ErrServerInternal.WithInternalErrorDetail(errors.New("received nil reply from NATS responder"))
113+
return nil, sdkv2betalib.ErrServerInternal.WithSpecDetail(s).WithInternalErrorDetail(errors.New("received nil reply from NATS responder"))
114114
}
115115

116116
return reply, err
@@ -120,7 +120,7 @@ func publish(n *nats.Conn, subject string, specBytes []byte) (*nats.Msg, error)
120120
// Uses Nats Sync Publish for streaming
121121
func MultiplexEventStreamSync[T any](ctx context.Context, s *specv2pb.Spec, event *SpecStreamEvent, nats *nats.Conn, stream *connect.ServerStream[T], convert func(*nats.Msg) (*T, error)) error {
122122
if event == nil {
123-
return sdkv2betalib.ErrServerInternal.WithInternalErrorDetail(errors.New("a SpecEvent object is required"))
123+
return sdkv2betalib.ErrServerInternal.WithSpecDetail(s).WithInternalErrorDetail(errors.New("a SpecEvent object is required"))
124124
}
125125

126126
log := *zaploggerv1.Bound.Logger
@@ -141,21 +141,21 @@ func MultiplexEventStreamSync[T any](ctx context.Context, s *specv2pb.Spec, even
141141

142142
specBytes, err := proto.Marshal(s)
143143
if err != nil {
144-
return sdkv2betalib.ErrServerInternal.WithInternalErrorDetail(errors.New("could not marshall spec"))
144+
return sdkv2betalib.ErrServerInternal.WithSpecDetail(s).WithInternalErrorDetail(errors.New("could not marshall spec"))
145145
}
146146

147147
// Encrypt here
148148

149149
log.Debug("Publishing on " + subject)
150150
if err = nats.Publish(subject, specBytes); err != nil {
151-
return sdkv2betalib.ErrServerInternal.WithInternalErrorDetail(errors.New("failed to publish")).WithInternalErrorDetail(err)
151+
return sdkv2betalib.ErrServerInternal.WithSpecDetail(s).WithInternalErrorDetail(errors.New("failed to publish")).WithInternalErrorDetail(err)
152152
}
153153

154154
// Subscribe to streamed results
155155
log.Debug("Waiting on results from " + responseSubject)
156156
sub, err := nats.SubscribeSync(responseSubject)
157157
if err != nil {
158-
return sdkv2betalib.ErrServerInternal.WithInternalErrorDetail(errors.New("could not subscribe stream sync to nats")).WithInternalErrorDetail(err)
158+
return sdkv2betalib.ErrServerInternal.WithSpecDetail(s).WithInternalErrorDetail(errors.New("could not subscribe stream sync to nats")).WithInternalErrorDetail(err)
159159
}
160160

161161
defer sub.Unsubscribe()
@@ -166,17 +166,17 @@ func MultiplexEventStreamSync[T any](ctx context.Context, s *specv2pb.Spec, even
166166
if errors.Is(err1, context.Canceled) {
167167
return nil
168168
}
169-
return sdkv2betalib.ErrServerInternal.WithInternalErrorDetail(errors.New("could not process additional events")).WithInternalErrorDetail(err1)
169+
return sdkv2betalib.ErrServerInternal.WithSpecDetail(s).WithInternalErrorDetail(errors.New("could not process additional events")).WithInternalErrorDetail(err1)
170170
}
171171

172172
converted, err1 := convert(msg)
173173
if err1 != nil {
174-
return sdkv2betalib.ErrServerInternal.WithInternalErrorDetail(errors.New("could not convert event")).WithInternalErrorDetail(err1)
174+
return sdkv2betalib.ErrServerInternal.WithSpecDetail(s).WithInternalErrorDetail(errors.New("could not convert event")).WithInternalErrorDetail(err1)
175175
}
176176

177177
err1 = stream.Send(converted)
178178
if err != nil {
179-
return sdkv2betalib.ErrServerInternal.WithInternalErrorDetail(errors.New("could not stream event")).WithInternalErrorDetail(err1)
179+
return sdkv2betalib.ErrServerInternal.WithSpecDetail(s).WithInternalErrorDetail(errors.New("could not stream event")).WithInternalErrorDetail(err1)
180180
}
181181
}
182182
}

go/oeco-sdk/v2beta/error.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,16 @@ import (
66
"errors"
77
"fmt"
88
"strings"
9+
"time"
910

1011
"connectrpc.com/connect"
1112
apexlog "github.com/apex/log"
1213
"google.golang.org/genproto/googleapis/rpc/errdetails"
1314

15+
specv2pb "github.com/openecosystems/ecosystem/go/oeco-sdk/v2beta/gen/platform/spec/v2"
1416
"google.golang.org/genproto/googleapis/rpc/status"
1517
"google.golang.org/protobuf/types/known/anypb"
18+
"google.golang.org/protobuf/types/known/timestamppb"
1619
)
1720

1821
// Using guidance from: https://google.aip.dev/193
@@ -32,6 +35,7 @@ type (
3235
WithPreconditionFailure(failure *errdetails.PreconditionFailure) SpecError
3336
WithBadRequest(request *errdetails.BadRequest) SpecError
3437
WithHelp(help *errdetails.Help) SpecError
38+
WithSpecDetail(spec *specv2pb.Spec) SpecError
3539
WithLocalizedMessage(message *errdetails.LocalizedMessage) SpecError
3640
WithInternalErrorDetail(errs ...error) SpecError
3741
// WithDebugDetail(ctx context.Context, spec *specv2pb.Spec, errs ...error) SpecError
@@ -189,6 +193,57 @@ func (se SpecError) WithLocalizedMessage(message *errdetails.LocalizedMessage) S
189193
return se
190194
}
191195

196+
func (se SpecError) WithSpecDetail(spec *specv2pb.Spec) SpecError {
197+
if spec == nil {
198+
return se
199+
}
200+
201+
s := specv2pb.SpecPublic{}
202+
203+
if spec.SpecVersion != "" {
204+
s.SpecVersion = spec.SpecVersion
205+
}
206+
207+
if spec.MessageId != "" {
208+
s.MessageId = spec.MessageId
209+
}
210+
211+
if spec.SentAt.AsTime().IsZero() || spec.SentAt.AsTime().Equal(time.Unix(0, 0).UTC()) {
212+
s.SentAt = spec.ReceivedAt
213+
} else {
214+
s.SentAt = spec.SentAt
215+
}
216+
217+
if spec.ReceivedAt != nil {
218+
s.ReceivedAt = spec.ReceivedAt
219+
}
220+
221+
if spec.CompletedAt.AsTime().IsZero() || spec.CompletedAt.AsTime().Equal(time.Unix(0, 0).UTC()) {
222+
s.CompletedAt = timestamppb.Now()
223+
} else {
224+
s.CompletedAt = spec.CompletedAt
225+
}
226+
227+
if spec.SpecType != "" {
228+
s.SpecType = spec.SpecType
229+
}
230+
231+
s.SpecEventType = spec.SpecEventType
232+
233+
if spec.SpecEvent != "" {
234+
s.SpecEvent = spec.SpecEvent
235+
}
236+
237+
d, err := connect.NewErrorDetail(&s)
238+
if err != nil {
239+
apexlog.Error("server: SpecError creating SpecPublic error detail")
240+
return se
241+
}
242+
243+
se.ConnectErr.AddDetail(d)
244+
return se
245+
}
246+
192247
// WithInternalErrorDetail sets internal error details for the SpecError instance and returns the updated SpecError object.
193248
func (se SpecError) WithInternalErrorDetail(errs ...error) SpecError {
194249
var errStrings []string

go/oeco-sdk/v2beta/server.go

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,9 @@ type Server struct {
3939
ConfigurationProvider *BaseSpecConfigurationProvider
4040
NetListener *net.Listener
4141

42-
options *serverOptions
42+
options *serverOptions
43+
publicHTTPServer *http.Server
44+
meshHTTPServer *http.Server
4345
// err error
4446
}
4547

@@ -158,17 +160,18 @@ func (server *Server) ListenAndServeWithCtx(_ context.Context) {
158160
/*
159161
* Graceful Shutdown Management
160162
*/
161-
signal.Notify(serverQuit, syscall.SIGTERM)
162-
signal.Notify(serverQuit, os.Interrupt)
163+
signal.Notify(serverQuit, syscall.SIGTERM, os.Interrupt)
163164
select {
164165
case err := <-specListenableErr:
165166
if err.Error != nil {
166167
fmt.Println(ErrServerInternal.WithInternalErrorDetail(err.Error, errors.New("received a specListenableErr")).Error())
167168
}
168169
case err := <-httpServerErr:
169170
fmt.Println(ErrServerInternal.WithInternalErrorDetail(err, errors.New("received an httpServerError")).Error())
170-
case <-serverQuit:
171+
case sig := <-serverQuit:
172+
fmt.Println("Received signal:", sig)
171173
server.Shutdown()
174+
os.Exit(0)
172175
}
173176
}
174177

@@ -238,6 +241,7 @@ func (server *Server) listenAndServe(ln *net.Listener) (httpServerErr chan error
238241
WriteTimeout: 10 * time.Second, // Time allowed to write the response
239242
IdleTimeout: 15 * time.Second, // Time for keep-alive connections
240243
}
244+
server.publicHTTPServer = publicHTTPServer
241245

242246
meshHTTPServer := &http.Server{
243247
Addr: meshEndpoint,
@@ -247,12 +251,13 @@ func (server *Server) listenAndServe(ln *net.Listener) (httpServerErr chan error
247251
WriteTimeout: 10 * time.Second, // Time allowed to write the response
248252
IdleTimeout: 15 * time.Second, // Time for keep-alive connections
249253
}
254+
server.meshHTTPServer = meshHTTPServer
250255

251256
_httpServerErr := make(chan error)
252257

253258
if server.PublicConnectHTTPServer != nil {
254259
go func() {
255-
_httpServerErr <- publicHTTPServer.ListenAndServe()
260+
_httpServerErr <- server.publicHTTPServer.ListenAndServe()
256261
}()
257262
fmt.Println("Public HTTP1.1/HTTP2.0/gRPC/gRPC-Web/Connect listening on " + settings.Platform.Endpoint)
258263
}
@@ -261,9 +266,9 @@ func (server *Server) listenAndServe(ln *net.Listener) (httpServerErr chan error
261266
if settings.Platform.Mesh.Enabled {
262267
go func() {
263268
if ln != nil {
264-
_httpServerErr <- meshHTTPServer.Serve(*ln)
269+
_httpServerErr <- server.meshHTTPServer.Serve(*ln)
265270
} else {
266-
_httpServerErr <- meshHTTPServer.ListenAndServe()
271+
_httpServerErr <- server.meshHTTPServer.ListenAndServe()
267272
}
268273
}()
269274
fmt.Println("Mesh HTTP1.1/HTTP2.0/gRPC/gRPC-Web/Connect listening on " + settings.Platform.Mesh.Endpoint)
@@ -292,9 +297,21 @@ func (server *Server) Shutdown() {
292297
fmt.Printf("Stopping server gracefully. Draining connections for up to %v seconds", 30)
293298
fmt.Println()
294299

295-
_, cancel := context.WithTimeout(context.Background(), 30)
300+
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
296301
defer cancel()
297302

303+
if server.PublicConnectHTTPServer != nil {
304+
if err := server.publicHTTPServer.Shutdown(ctx); err != nil {
305+
fmt.Println("Public server shutdown error:", err)
306+
}
307+
}
308+
309+
if server.MeshConnectHTTPServer != nil {
310+
if err := server.meshHTTPServer.Shutdown(ctx); err != nil {
311+
fmt.Println("Mesh server shutdown error:", err)
312+
}
313+
}
314+
298315
ShutdownBindings(server.Bindings)
299316
}
300317

proto/protobuf/platform/spec/v2/spec.proto

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ message SpecPublic {
152152
string spec_type = 6;
153153

154154
//
155-
SpecEventType spec_event_type = 7;
155+
// SpecEventType spec_event_type = 7;
156156

157157
//
158158
string spec_event = 8;

0 commit comments

Comments
 (0)