Skip to content

Commit 259921d

Browse files
authored
Fix: meaningful errors during subscriptions (#235)
* fixed error propagation --------- Signed-off-by: Artem Shcherbatiuk <[email protected]>
1 parent bfb4de4 commit 259921d

File tree

3 files changed

+34
-34
lines changed

3 files changed

+34
-34
lines changed

gateway/resolver/resolver.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,12 @@ import (
99
"sort"
1010
"strings"
1111

12-
"gopkg.in/yaml.v3"
13-
v1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
14-
1512
"github.com/graphql-go/graphql"
1613
"go.opentelemetry.io/otel"
1714
"go.opentelemetry.io/otel/attribute"
1815
"go.opentelemetry.io/otel/trace"
16+
"gopkg.in/yaml.v3"
17+
v1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
1918
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
2019
"k8s.io/apimachinery/pkg/labels"
2120
"k8s.io/apimachinery/pkg/runtime/schema"

gateway/resolver/subscription.go

Lines changed: 28 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"strings"
88

99
"github.com/openmfp/golang-commons/sentry"
10+
"github.com/pkg/errors"
1011

1112
v1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
1213

@@ -58,14 +59,14 @@ func (r *Service) runWatch(
5859
labelSelector, err := getStringArg(p.Args, LabelSelectorArg, false)
5960
if err != nil {
6061
r.log.Error().Err(err).Msg("Failed to get label selector argument")
61-
resultChannel <- errorResult("Failed to get label selector: " + err.Error())
62+
resultChannel <- errors.Wrap(err, "failed to get label selector argument")
6263
return
6364
}
6465

6566
subscribeToAll, err := getBoolArg(p.Args, SubscribeToAllArg, false)
6667
if err != nil {
6768
r.log.Error().Err(err).Msg("Failed to get subscribeToAll argument")
68-
resultChannel <- errorResult("Failed to get subscribeToAll: " + err.Error())
69+
resultChannel <- errors.Wrap(err, "failed to get subscribeToAll argument")
6970
return
7071
}
7172

@@ -84,7 +85,7 @@ func (r *Service) runWatch(
8485
namespace, err = getStringArg(p.Args, NamespaceArg, isNamespaceRequired)
8586
if err != nil {
8687
r.log.Error().Err(err).Msg("Failed to get namespace argument")
87-
resultChannel <- errorResult("Failed to get namespace: " + err.Error())
88+
resultChannel <- errors.Wrap(err, "failed to get namespace argument")
8889
return
8990
}
9091
if namespace != "" {
@@ -96,7 +97,7 @@ func (r *Service) runWatch(
9697
selector, err := labels.Parse(labelSelector)
9798
if err != nil {
9899
r.log.Error().Err(err).Str("labelSelector", labelSelector).Msg("Invalid label selector")
99-
resultChannel <- errorResult("Invalid label selector: " + err.Error())
100+
resultChannel <- errors.Wrap(err, "invalid label selector")
100101
return
101102
}
102103
opts = append(opts, client.MatchingLabelsSelector{Selector: selector})
@@ -107,7 +108,7 @@ func (r *Service) runWatch(
107108
name, err = getStringArg(p.Args, NameArg, true)
108109
if err != nil {
109110
r.log.Error().Err(err).Msg("Failed to get name argument")
110-
resultChannel <- errorResult("Failed to get name: " + err.Error())
111+
resultChannel <- errors.Wrap(err, "failed to get name argument")
111112
return
112113
}
113114
opts = append(opts, client.MatchingFields{"metadata.name": name})
@@ -116,7 +117,7 @@ func (r *Service) runWatch(
116117
sortBy, err := getStringArg(p.Args, SortByArg, false)
117118
if err != nil {
118119
r.log.Error().Err(err).Msg("Failed to get sortBy argument")
119-
resultChannel <- errorResult("Failed to get sortBy: " + err.Error())
120+
resultChannel <- errors.Wrap(err, "failed to get sortBy argument")
120121
return
121122
}
122123

@@ -134,8 +135,7 @@ func (r *Service) runWatch(
134135

135136
sentry.CaptureError(err, sentry.Tags{"namespace": namespace}, sentry.Extras{"gvk": gvk.String()})
136137

137-
resultChannel <- errorResult("Failed to start watch: " + err.Error())
138-
138+
resultChannel <- errors.Wrap(err, "failed to start watch")
139139
return
140140
}
141141
defer watcher.Stop()
@@ -154,7 +154,8 @@ func (r *Service) runWatch(
154154

155155
sentry.CaptureError(err, sentry.Tags{"namespace": namespace})
156156

157-
continue
157+
resultChannel <- errors.Wrap(err, "failed to cast event object to unstructured")
158+
return
158159
}
159160
key := obj.GetNamespace() + "/" + obj.GetName()
160161

@@ -175,7 +176,7 @@ func (r *Service) runWatch(
175176

176177
sentry.CaptureError(err, sentry.Tags{"namespace": namespace})
177178

178-
resultChannel <- errorResult("Failed to determine field changes: " + err.Error())
179+
resultChannel <- errors.Wrap(err, "failed to determine field changed")
179180
return
180181
}
181182
sendUpdate = changed
@@ -192,15 +193,16 @@ func (r *Service) runWatch(
192193
if name != "" {
193194
singleObj = previousObjects[namespace+"/"+name]
194195
}
196+
197+
var data interface{}
198+
if singleObj != nil { // object can be nil in case it is deleted
199+
data = singleObj.Object
200+
}
201+
195202
select {
196203
case <-ctx.Done():
197204
return
198-
case resultChannel <- func() interface{} {
199-
if singleObj == nil { // object will be nil in case it is deleted
200-
return nil
201-
}
202-
return singleObj.Object
203-
}():
205+
case resultChannel <- data:
204206
}
205207
} else {
206208
items := make([]unstructured.Unstructured, 0, len(previousObjects))
@@ -211,7 +213,7 @@ func (r *Service) runWatch(
211213
err = validateSortBy(items, sortBy)
212214
if err != nil {
213215
r.log.Error().Err(err).Str(SortByArg, sortBy).Msg("Invalid sortBy field path")
214-
resultChannel <- errorResult("Invalid sortBy field path: " + err.Error())
216+
resultChannel <- errors.Wrap(err, "invalid sortBy field path")
215217
return
216218
}
217219

@@ -336,8 +338,14 @@ func getFieldValue(obj *unstructured.Unstructured, fieldPath string) (interface{
336338
return current, true, nil
337339
}
338340

339-
func errorResult(msg string) map[string]interface{} {
340-
return map[string]interface{}{
341-
"error": msg,
341+
func CreateSubscriptionResolver(isSingle bool) graphql.FieldResolveFn {
342+
return func(p graphql.ResolveParams) (interface{}, error) {
343+
source := p.Source
344+
345+
if err, ok := source.(error); ok {
346+
return nil, err
347+
}
348+
349+
return source, nil
342350
}
343351
}

gateway/schema/schema.go

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -252,22 +252,22 @@ func (g *Gateway) processSingleResource(
252252

253253
subscriptionSingular := strings.ToLower(fmt.Sprintf("%s_%s", gvk.Group, singular))
254254
rootSubscriptionFields[subscriptionSingular] = &graphql.Field{
255-
Type: addErrorFieldToGraphqlObject(resourceType),
255+
Type: resourceType,
256256
Args: itemArgsBuilder.
257257
WithSubscribeToAll().
258258
Complete(),
259-
Resolve: g.resolver.CommonResolver(),
259+
Resolve: resolver.CreateSubscriptionResolver(true),
260260
Subscribe: g.resolver.SubscribeItem(*gvk, resourceScope),
261261
Description: fmt.Sprintf("Subscribe to changes of %s", singular),
262262
}
263263

264264
subscriptionPlural := strings.ToLower(fmt.Sprintf("%s_%s", gvk.Group, plural))
265265
rootSubscriptionFields[subscriptionPlural] = &graphql.Field{
266-
Type: graphql.NewList(addErrorFieldToGraphqlObject(resourceType)),
266+
Type: graphql.NewList(resourceType),
267267
Args: listArgsBuilder.
268268
WithSubscribeToAll().
269269
Complete(),
270-
Resolve: g.resolver.CommonResolver(),
270+
Resolve: resolver.CreateSubscriptionResolver(false),
271271
Subscribe: g.resolver.SubscribeItems(*gvk, resourceScope),
272272
Description: fmt.Sprintf("Subscribe to changes of %s", plural),
273273
}
@@ -570,10 +570,3 @@ func sanitizeFieldName(name string) string {
570570

571571
return name
572572
}
573-
574-
func addErrorFieldToGraphqlObject(obj *graphql.Object) *graphql.Object {
575-
obj.AddFieldConfig("error", &graphql.Field{
576-
Type: graphql.String,
577-
})
578-
return obj
579-
}

0 commit comments

Comments
 (0)