Skip to content

Commit 96d52e5

Browse files
committed
using resoruce id as the work metadata name
Signed-off-by: Wei Liu <liuweixa@redhat.com>
1 parent d33854d commit 96d52e5

File tree

8 files changed

+834
-54
lines changed

8 files changed

+834
-54
lines changed

cmd/maestro/server/grpc_broker.go

Lines changed: 7 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import (
1010
"time"
1111

1212
ce "github.com/cloudevents/sdk-go/v2"
13-
cetypes "github.com/cloudevents/sdk-go/v2/types"
1413
"google.golang.org/grpc"
1514
"google.golang.org/grpc/credentials"
1615
"google.golang.org/grpc/keepalive"
@@ -24,11 +23,14 @@ import (
2423
servergrpc "open-cluster-management.io/sdk-go/pkg/cloudevents/server/grpc"
2524

2625
"github.com/openshift-online/maestro/pkg/api"
26+
"github.com/openshift-online/maestro/pkg/client/cloudevents"
2727
"github.com/openshift-online/maestro/pkg/dao"
2828
"github.com/openshift-online/maestro/pkg/event"
2929
"github.com/openshift-online/maestro/pkg/services"
3030
)
3131

32+
const source = "maestro"
33+
3234
var _ EventServer = &GRPCBroker{}
3335

3436
type GRPCBrokerService struct {
@@ -290,63 +292,18 @@ func (s *GRPCBroker) PredicateEvent(ctx context.Context, eventID string) (bool,
290292

291293
// decodeResourceStatus translates a CloudEvent into a resource containing the status JSON map.
292294
func decodeResourceStatus(evt *ce.Event) (*api.Resource, error) {
293-
evtExtensions := evt.Context.GetExtensions()
294-
295-
clusterName, err := cetypes.ToString(evtExtensions[types.ExtensionClusterName])
296-
if err != nil {
297-
return nil, fmt.Errorf("failed to get clustername extension: %v", err)
298-
}
299-
300-
resourceID, err := cetypes.ToString(evtExtensions[types.ExtensionResourceID])
301-
if err != nil {
302-
return nil, fmt.Errorf("failed to get resourceid extension: %v", err)
303-
}
304-
305-
resourceVersion, err := cetypes.ToInteger(evtExtensions[types.ExtensionResourceVersion])
306-
if err != nil {
307-
return nil, fmt.Errorf("failed to get resourceversion extension: %v", err)
308-
}
309-
310-
status, err := api.CloudEventToJSONMap(evt)
311-
if err != nil {
312-
return nil, fmt.Errorf("failed to convert cloudevent to resource status: %v", err)
313-
}
314-
315-
resource := &api.Resource{
316-
Source: evt.Source(),
317-
ConsumerName: clusterName,
318-
Version: resourceVersion,
319-
Meta: api.Meta{
320-
ID: resourceID,
321-
},
322-
Status: status,
323-
}
324-
325-
return resource, nil
295+
codec := cloudevents.NewCodec(source)
296+
return codec.Decode(evt)
326297
}
327298

328299
// encodeResourceSpec translates a resource spec JSON map into a CloudEvent.
329300
func encodeResourceSpec(resource *api.Resource) (*ce.Event, error) {
330-
evt, err := api.JSONMAPToCloudEvent(resource.Payload)
331-
if err != nil {
332-
return nil, fmt.Errorf("failed to convert resource payload to cloudevent: %v", err)
333-
}
334-
335301
eventType := types.CloudEventsType{
336302
CloudEventsDataType: workpayload.ManifestBundleEventDataType,
337303
SubResource: types.SubResourceSpec,
338304
Action: types.EventAction("create_request"),
339305
}
340-
evt.SetType(eventType.String())
341-
evt.SetSource("maestro")
342-
// TODO set resource.Source with a new extension attribute if the agent needs
343-
evt.SetExtension(types.ExtensionResourceID, resource.ID)
344-
evt.SetExtension(types.ExtensionResourceVersion, int64(resource.Version))
345-
evt.SetExtension(types.ExtensionClusterName, resource.ConsumerName)
346-
347-
if !resource.GetDeletionTimestamp().IsZero() {
348-
evt.SetExtension(types.ExtensionDeletionTimestamp, resource.GetDeletionTimestamp().Time)
349-
}
350306

351-
return evt, nil
307+
codec := cloudevents.NewCodec(source)
308+
return codec.Encode(source, eventType, resource)
352309
}

docs/maestro.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,8 @@ The `grpcClientTokenFile` stores the token for the corresponding service account
186186

187187
4. After persisting the record, the Maestro server publishes a [CloudEvent](./resources/cloudevents.spec.maestro.json) representing the stored Resource to Maestro agent.
188188

189+
- Maestro server uses the `Resource` record ID as the `ManifestWork` name.
190+
189191
5. The Maestro agent receives this CloudEvent, converts it back into a `ManifestWork`, and applies it to the target Kubernetes cluster. After applied, there is a corresponding [appliedmanifestwork](./resources/appliedmanifestwork.json) created on the agent side.
190192

191193
### Maestro Subscribe Resource Status with MQTT

docs/resources/appliedmanifestwork.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,14 @@
1111
"cloudevents.open-cluster-management.io/originalsource": "maestro",
1212
"maestro.resource.type": "2d18a7dc-c731-5753-8aa9-6b2f22e3b12b"
1313
},
14-
"name": "f1d8a1049b93dffc1929d57a719c3a09a4dcbfe0cd6e42840325be3b2dde73c8-e44ec579-9646-549a-b679-db8d19d6da37",
14+
"name": "f1d8a1049b93dffc1929d57a719c3a09a4dcbfe0cd6e42840325be3b2dde73c8-55c61e54-a3f6-563d-9fec-b1fe297bdfdb",
1515
"resourceVersion": "1552",
1616
"uid": "432b3ff3-702e-4363-89e6-ad399c75608c"
1717
},
1818
"spec": {
1919
"agentID": "f1d8a1049b93dffc1929d57a719c3a09a4dcbfe0cd6e42840325be3b2dde73c8",
2020
"hubHash": "f1d8a1049b93dffc1929d57a719c3a09a4dcbfe0cd6e42840325be3b2dde73c8",
21-
"manifestWorkName": "e44ec579-9646-549a-b679-db8d19d6da37"
21+
"manifestWorkName": "55c61e54-a3f6-563d-9fec-b1fe297bdfdb"
2222
},
2323
"status": {
2424
"appliedResources": [

docs/resources/cloudevents.spec.maestro.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@
7474
},
7575
"clustername": "7ff1a8f4-8811-49b4-8c19-9271bc51975e",
7676
"logtracing": "{}",
77-
"metadata": "{\"creationTimestamp\":\"2025-12-16T06:00:17Z\",\"labels\":{\"maestro.resource.type\":\"2d18a7dc-c731-5753-8aa9-6b2f22e3b12b\"},\"name\":\"e44ec579-9646-549a-b679-db8d19d6da37\",\"namespace\":\"7ff1a8f4-8811-49b4-8c19-9271bc51975e\",\"resourceVersion\":\"0\",\"uid\":\"55c61e54-a3f6-563d-9fec-b1fe297bdfdb\"}",
77+
"metadata": "{\"creationTimestamp\":\"2025-12-16T06:00:17Z\",\"labels\":{\"maestro.resource.type\":\"2d18a7dc-c731-5753-8aa9-6b2f22e3b12b\"},\"name\":\"55c61e54-a3f6-563d-9fec-b1fe297bdfdb\",\"namespace\":\"7ff1a8f4-8811-49b4-8c19-9271bc51975e\",\"resourceVersion\":\"0\",\"uid\":\"55c61e54-a3f6-563d-9fec-b1fe297bdfdb\"}",
7878
"originalsource": "",
7979
"resourceid": "55c61e54-a3f6-563d-9fec-b1fe297bdfdb",
8080
"resourceversion": 1

docs/resources/cloudevents.status.maestro.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@
141141
},
142142
"clustername": "7ff1a8f4-8811-49b4-8c19-9271bc51975e",
143143
"logtracing": "{}",
144-
"metadata": "{\"creationTimestamp\":\"2025-12-16T06:00:17Z\",\"labels\":{\"maestro.resource.type\":\"2d18a7dc-c731-5753-8aa9-6b2f22e3b12b\"},\"name\":\"e44ec579-9646-549a-b679-db8d19d6da37\",\"namespace\":\"7ff1a8f4-8811-49b4-8c19-9271bc51975e\",\"resourceVersion\":\"0\",\"uid\":\"55c61e54-a3f6-563d-9fec-b1fe297bdfdb\"}",
144+
"metadata": "{\"creationTimestamp\":\"2025-12-16T06:00:17Z\",\"labels\":{\"maestro.resource.type\":\"2d18a7dc-c731-5753-8aa9-6b2f22e3b12b\"},\"name\":\"55c61e54-a3f6-563d-9fec-b1fe297bdfdb\",\"namespace\":\"7ff1a8f4-8811-49b4-8c19-9271bc51975e\",\"resourceVersion\":\"0\",\"uid\":\"55c61e54-a3f6-563d-9fec-b1fe297bdfdb\"}",
145145
"originalsource": "maestro",
146146
"resourceid": "55c61e54-a3f6-563d-9fec-b1fe297bdfdb",
147147
"resourceversion": 1,

pkg/client/cloudevents/codec.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,14 @@ func (codec *Codec) EventDataType() cetypes.CloudEventsDataType {
3131
}
3232

3333
func (codec *Codec) Encode(source string, eventType cetypes.CloudEventsType, res *api.Resource) (*cloudevents.Event, error) {
34+
// use resource id as the resource payload metadata name to ensure the payload metadata name is
35+
// unique on the agent side
36+
if err := resetPayloadMetadataNameWithResID(res); err != nil {
37+
return nil, err
38+
}
39+
40+
// converts a resource payload to a CloudEvent
41+
// If the resource payload has metadata the event will have the metadata extension
3442
evt, err := api.JSONMAPToCloudEvent(res.Payload)
3543
if err != nil {
3644
return nil, fmt.Errorf("failed to convert resource payload to cloudevent: %v", err)
@@ -107,3 +115,20 @@ func (codec *Codec) Decode(evt *cloudevents.Event) (*api.Resource, error) {
107115

108116
return resource, nil
109117
}
118+
119+
func resetPayloadMetadataNameWithResID(res *api.Resource) error {
120+
metadata, ok := res.Payload[cetypes.ExtensionWorkMeta]
121+
if !ok {
122+
// the resource payload does not have metadata, do nothing
123+
// the agent will use resource id as the payload metadata name
124+
return nil
125+
}
126+
127+
metaObj, ok := metadata.(map[string]interface{})
128+
if !ok {
129+
return fmt.Errorf("unsupported object type %T", metadata)
130+
}
131+
metaObj["name"] = res.ID
132+
res.Payload[cetypes.ExtensionWorkMeta] = metaObj
133+
return nil
134+
}

0 commit comments

Comments
 (0)