@@ -10,7 +10,6 @@ import (
1010 "time"
1111
1212 ce "github.com/cloudevents/sdk-go/v2"
13- cetypes "github.com/cloudevents/sdk-go/v2/types"
1413 "google.golang.org/grpc"
1514 "google.golang.org/grpc/credentials"
1615 "google.golang.org/grpc/keepalive"
@@ -24,11 +23,14 @@ import (
2423 servergrpc "open-cluster-management.io/sdk-go/pkg/cloudevents/server/grpc"
2524
2625 "github.com/openshift-online/maestro/pkg/api"
26+ "github.com/openshift-online/maestro/pkg/client/cloudevents"
2727 "github.com/openshift-online/maestro/pkg/dao"
2828 "github.com/openshift-online/maestro/pkg/event"
2929 "github.com/openshift-online/maestro/pkg/services"
3030)
3131
32+ const source = "maestro"
33+
3234var _ EventServer = & GRPCBroker {}
3335
3436type GRPCBrokerService struct {
@@ -290,63 +292,18 @@ func (s *GRPCBroker) PredicateEvent(ctx context.Context, eventID string) (bool,
290292
291293// decodeResourceStatus translates a CloudEvent into a resource containing the status JSON map.
292294func decodeResourceStatus (evt * ce.Event ) (* api.Resource , error ) {
293- evtExtensions := evt .Context .GetExtensions ()
294-
295- clusterName , err := cetypes .ToString (evtExtensions [types .ExtensionClusterName ])
296- if err != nil {
297- return nil , fmt .Errorf ("failed to get clustername extension: %v" , err )
298- }
299-
300- resourceID , err := cetypes .ToString (evtExtensions [types .ExtensionResourceID ])
301- if err != nil {
302- return nil , fmt .Errorf ("failed to get resourceid extension: %v" , err )
303- }
304-
305- resourceVersion , err := cetypes .ToInteger (evtExtensions [types .ExtensionResourceVersion ])
306- if err != nil {
307- return nil , fmt .Errorf ("failed to get resourceversion extension: %v" , err )
308- }
309-
310- status , err := api .CloudEventToJSONMap (evt )
311- if err != nil {
312- return nil , fmt .Errorf ("failed to convert cloudevent to resource status: %v" , err )
313- }
314-
315- resource := & api.Resource {
316- Source : evt .Source (),
317- ConsumerName : clusterName ,
318- Version : resourceVersion ,
319- Meta : api.Meta {
320- ID : resourceID ,
321- },
322- Status : status ,
323- }
324-
325- return resource , nil
295+ codec := cloudevents .NewCodec (source )
296+ return codec .Decode (evt )
326297}
327298
328299// encodeResourceSpec translates a resource spec JSON map into a CloudEvent.
329300func encodeResourceSpec (resource * api.Resource ) (* ce.Event , error ) {
330- evt , err := api .JSONMAPToCloudEvent (resource .Payload )
331- if err != nil {
332- return nil , fmt .Errorf ("failed to convert resource payload to cloudevent: %v" , err )
333- }
334-
335301 eventType := types.CloudEventsType {
336302 CloudEventsDataType : workpayload .ManifestBundleEventDataType ,
337303 SubResource : types .SubResourceSpec ,
338304 Action : types .EventAction ("create_request" ),
339305 }
340- evt .SetType (eventType .String ())
341- evt .SetSource ("maestro" )
342- // TODO set resource.Source with a new extension attribute if the agent needs
343- evt .SetExtension (types .ExtensionResourceID , resource .ID )
344- evt .SetExtension (types .ExtensionResourceVersion , int64 (resource .Version ))
345- evt .SetExtension (types .ExtensionClusterName , resource .ConsumerName )
346-
347- if ! resource .GetDeletionTimestamp ().IsZero () {
348- evt .SetExtension (types .ExtensionDeletionTimestamp , resource .GetDeletionTimestamp ().Time )
349- }
350306
351- return evt , nil
307+ codec := cloudevents .NewCodec (source )
308+ return codec .Encode (source , eventType , resource )
352309}
0 commit comments