Skip to content

Commit a11abc6

Browse files
committed
using resoruce id as the work metadata name
Signed-off-by: Wei Liu <liuweixa@redhat.com>
1 parent 56cb4fa commit a11abc6

File tree

7 files changed

+825
-53
lines changed

7 files changed

+825
-53
lines changed

cmd/maestro/server/grpc_broker.go

Lines changed: 7 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import (
1010
"time"
1111

1212
ce "github.com/cloudevents/sdk-go/v2"
13-
cetypes "github.com/cloudevents/sdk-go/v2/types"
1413
"google.golang.org/grpc"
1514
"google.golang.org/grpc/credentials"
1615
"google.golang.org/grpc/keepalive"
@@ -24,11 +23,14 @@ import (
2423
servergrpc "open-cluster-management.io/sdk-go/pkg/cloudevents/server/grpc"
2524

2625
"github.com/openshift-online/maestro/pkg/api"
26+
"github.com/openshift-online/maestro/pkg/client/cloudevents"
2727
"github.com/openshift-online/maestro/pkg/dao"
2828
"github.com/openshift-online/maestro/pkg/event"
2929
"github.com/openshift-online/maestro/pkg/services"
3030
)
3131

32+
const source = "maestro"
33+
3234
type resourceHandler func(res *api.Resource) error
3335

3436
// subscriber defines a subscriber that can receive and handle resource spec.
@@ -299,63 +301,18 @@ func (s *GRPCBroker) PredicateEvent(ctx context.Context, eventID string) (bool,
299301

300302
// decodeResourceStatus translates a CloudEvent into a resource containing the status JSON map.
301303
func decodeResourceStatus(evt *ce.Event) (*api.Resource, error) {
302-
evtExtensions := evt.Context.GetExtensions()
303-
304-
clusterName, err := cetypes.ToString(evtExtensions[types.ExtensionClusterName])
305-
if err != nil {
306-
return nil, fmt.Errorf("failed to get clustername extension: %v", err)
307-
}
308-
309-
resourceID, err := cetypes.ToString(evtExtensions[types.ExtensionResourceID])
310-
if err != nil {
311-
return nil, fmt.Errorf("failed to get resourceid extension: %v", err)
312-
}
313-
314-
resourceVersion, err := cetypes.ToInteger(evtExtensions[types.ExtensionResourceVersion])
315-
if err != nil {
316-
return nil, fmt.Errorf("failed to get resourceversion extension: %v", err)
317-
}
318-
319-
status, err := api.CloudEventToJSONMap(evt)
320-
if err != nil {
321-
return nil, fmt.Errorf("failed to convert cloudevent to resource status: %v", err)
322-
}
323-
324-
resource := &api.Resource{
325-
Source: evt.Source(),
326-
ConsumerName: clusterName,
327-
Version: resourceVersion,
328-
Meta: api.Meta{
329-
ID: resourceID,
330-
},
331-
Status: status,
332-
}
333-
334-
return resource, nil
304+
codec := cloudevents.NewCodec(source)
305+
return codec.Decode(evt)
335306
}
336307

337308
// encodeResourceSpec translates a resource spec JSON map into a CloudEvent.
338309
func encodeResourceSpec(resource *api.Resource) (*ce.Event, error) {
339-
evt, err := api.JSONMAPToCloudEvent(resource.Payload)
340-
if err != nil {
341-
return nil, fmt.Errorf("failed to convert resource payload to cloudevent: %v", err)
342-
}
343-
344310
eventType := types.CloudEventsType{
345311
CloudEventsDataType: workpayload.ManifestBundleEventDataType,
346312
SubResource: types.SubResourceSpec,
347313
Action: types.EventAction("create_request"),
348314
}
349-
evt.SetType(eventType.String())
350-
evt.SetSource("maestro")
351-
// TODO set resource.Source with a new extension attribute if the agent needs
352-
evt.SetExtension(types.ExtensionResourceID, resource.ID)
353-
evt.SetExtension(types.ExtensionResourceVersion, int64(resource.Version))
354-
evt.SetExtension(types.ExtensionClusterName, resource.ConsumerName)
355-
356-
if !resource.GetDeletionTimestamp().IsZero() {
357-
evt.SetExtension(types.ExtensionDeletionTimestamp, resource.GetDeletionTimestamp().Time)
358-
}
359315

360-
return evt, nil
316+
codec := cloudevents.NewCodec(source)
317+
return codec.Encode(source, eventType, resource)
361318
}

docs/maestro.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,12 @@ The Maestro Server includes various components to fulfill its functions, as illu
3535

3636
![maestro-mqtt-pub-dataflow](./images/maestro-mqtt-pub-dataflow.png)
3737

38+
1. The Consumer (e.g. ClustersService) use Maestro GRPCSourceClient create a ManifestWork
39+
2. The GRPCSourceClient publish the ManifestWork using CloudEvents
40+
3. The Maestro server
41+
4. The Maestro server
42+
5. The Maestro agent
43+
3844
### Maestro Subscribe Resource Status with MQTT
3945

4046
![maestro-mqtt-sub-dataflow](./images/maestro-mqtt-sub-dataflow.png)

pkg/client/cloudevents/codec.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
cloudevents "github.com/cloudevents/sdk-go/v2"
88
cloudeventstypes "github.com/cloudevents/sdk-go/v2/types"
99
"github.com/google/uuid"
10+
1011
workpayload "open-cluster-management.io/sdk-go/pkg/cloudevents/clients/work/payload"
1112
cegeneric "open-cluster-management.io/sdk-go/pkg/cloudevents/generic"
1213
cetypes "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types"
@@ -31,6 +32,14 @@ func (codec *Codec) EventDataType() cetypes.CloudEventsDataType {
3132
}
3233

3334
func (codec *Codec) Encode(source string, eventType cetypes.CloudEventsType, res *api.Resource) (*cloudevents.Event, error) {
35+
// use resource id as the resource payload metadata name to ensure the payload metadata name is
36+
// unique on the agent side
37+
if err := resetPayloadMetadataNameWithResID(res); err != nil {
38+
return nil, err
39+
}
40+
41+
// converts a resource payload to a CloudEvent
42+
// If the resource payload has metadata the event will have the metadata extension
3443
evt, err := api.JSONMAPToCloudEvent(res.Payload)
3544
if err != nil {
3645
return nil, fmt.Errorf("failed to convert resource payload to cloudevent: %v", err)
@@ -107,3 +116,20 @@ func (codec *Codec) Decode(evt *cloudevents.Event) (*api.Resource, error) {
107116

108117
return resource, nil
109118
}
119+
120+
func resetPayloadMetadataNameWithResID(res *api.Resource) error {
121+
metadata, ok := res.Payload[cetypes.ExtensionWorkMeta]
122+
if !ok {
123+
// the resource payload does not have metadata, do nothing
124+
// the agent will using resource id as the payload metadata name
125+
return nil
126+
}
127+
128+
metaObj, ok := metadata.(map[string]interface{})
129+
if !ok {
130+
return fmt.Errorf("unsupported object type %T", metadata)
131+
}
132+
metaObj["name"] = res.ID
133+
res.Payload[cetypes.ExtensionWorkMeta] = metaObj
134+
return nil
135+
}

0 commit comments

Comments
 (0)